working rate limits!

This commit is contained in:
0ceanSlim 2024-07-26 10:02:34 -04:00
parent 8956eeea63
commit f8138cb2cb
7 changed files with 55 additions and 41 deletions

View File

@ -6,12 +6,14 @@ server:
port: ":8080" # Port for the server to listen on
rate_limit:
#WS is not working as intended. It's only working for accepting Events, which is what event limit already does
#WS is not working as intended. It's only working for accepting Events, which is what event is supposed to do
ws_limit: 100 # Global rate limit for WebSocket messages (50 messages per second)
ws_burst: 200 # Global burst limit for WebSocket messages (allows a burst of 100 messages)
#event limit doesn't seem to work either
event_limit: 50 # Global rate limit for events (25 events per second)
event_burst: 100 # Global burst limit for events (allows a burst of 50 events)
req_limit: 50 # Added limit for REQ messages
req_burst: 100 # Added burst limit for REQ messages
category_limits: # Rate limits based on event categories
regular:
@ -35,5 +37,5 @@ rate_limit:
limit: 25 # Rate limit for events of kind 1 (100 events per second)
burst: 50 # Burst limit for events of kind 1 (allows a burst of 200 events)
- kind: 3
limit: 25 # Rate limit for events of kind 3 (25 events per second)
burst: 50 # Burst limit for events of kind 3 (allows a burst of 50 events)
limit: 10 # Rate limit for events of kind 3 (25 events per second)
burst: 20 # Burst limit for events of kind 3 (allows a burst of 50 events)

15
main.go
View File

@ -1,3 +1,4 @@
// main.go
package main
import (
@ -15,25 +16,24 @@ import (
)
func main() {
// Load configuration
config, err := utils.LoadConfig("config.yml")
if err != nil {
log.Fatal("Error loading config: ", err)
}
// Initialize MongoDB client
_, err = db.InitDB(config.MongoDB.URI, config.MongoDB.Database)
if err != nil {
log.Fatal("Error initializing database: ", err)
}
defer db.DisconnectDB()
// Initialize Rate Limiter
rateLimiter := utils.NewRateLimiter(
rate.Limit(config.RateLimit.WsLimit),
config.RateLimit.WsBurst,
rate.Limit(config.RateLimit.EventLimit),
config.RateLimit.EventBurst,
rate.Limit(config.RateLimit.ReqLimit),
config.RateLimit.ReqBurst,
)
for _, kindLimit := range config.RateLimit.KindLimits {
@ -46,21 +46,13 @@ func main() {
utils.SetRateLimiter(rateLimiter)
// Create a new ServeMux
mux := http.NewServeMux()
// Handle the root path
mux.HandleFunc("/", ListenAndServe)
// Serve static files
mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("web/static"))))
// Serve the favicon
mux.HandleFunc("/favicon.ico", func(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "web/static/img/favicon.ico")
})
// Start the Relay
fmt.Printf("Server is running on http://localhost%s\n", config.Server.Port)
err = http.ListenAndServe(config.Server.Port, mux)
if err != nil {
@ -68,7 +60,6 @@ func main() {
}
}
// Listener serves both WebSocket and HTML
func ListenAndServe(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Upgrade") == "websocket" {
websocket.Handler(func(ws *websocket.Conn) {

View File

@ -6,15 +6,15 @@ GRAIN is an open-source Nostr relay implementation written in Go. This project a
## Features
- **NIP-01 Protocol Support**: GRAIN (nearly)fully supports the NIP-01 for WebSocket communication.
- **Event Processing**: Handles events of kind 0 (user metadata) and kind 1 (text note).
- **NIP-01 Protocol Support**: GRAIN (nearly)fully supports NIP-01 for WebSocket communication.
- **Event Processing**: Handles all events by category and kind.
- **MongoDB 🍃**: Utilizes MongoDB to store and manage events efficiently.
- **Scalability**: Built with Go, ensuring high performance and scalability.
- **Open Source**: Licensed under the MIT License, making it free to use and modify.
## Configuration
Configuration options can be set through environment variables or a configuration file.
Configuration options can be set through a configuration file.
There is an example config in this repo. Copy the example config to config.yml to get started
@ -22,15 +22,13 @@ There is an example config in this repo. Copy the example config to config.yml t
cp config.example.yml config.yml
```
### WebSocket Endpoints
- Connect: / - Clients can connect to this endpoint to start a WebSocket session.
- Publish Event: Send events of kind 0 (user metadata) or kind 1 (text note) to the relay.
### TODO
- Explicitely Handle more kinds
- configurable event purging for regular events
- Handle Kind 5 explicitely to delete Events from the Database
- Handle Ephemeral event
- configurable amount of time to keep ephemeral notes
- configurable event purging
- by category
- by kind
- by time since latest
- create whitelist/blacklist functionality
@ -40,9 +38,6 @@ cp config.example.yml config.yml
- npub
- kind int
- kind 1 wordlist
- Rate limit Events.
- by kind
- configurable in config.yml
### Development

View File

@ -1,3 +1,4 @@
// event.go
package handlers
import (
@ -38,7 +39,6 @@ func HandleEvent(ws *websocket.Conn, message []interface{}) {
return
}
// Call the HandleKind function
HandleKind(context.TODO(), evt, ws)
fmt.Println("Event processed:", evt.ID)
@ -77,8 +77,8 @@ func HandleKind(ctx context.Context, evt relay.Event, ws *websocket.Conn) {
category = "unknown"
}
if !rateLimiter.AllowEvent(evt.Kind, category) {
response.SendOK(ws, evt.ID, false, fmt.Sprintf("rate limit exceeded for category: %s", category))
if allowed, msg := rateLimiter.AllowEvent(evt.Kind, category); !allowed {
response.SendOK(ws, evt.ID, false, msg)
return
}
@ -99,7 +99,6 @@ func HandleKind(ctx context.Context, evt relay.Event, ws *websocket.Conn) {
case evt.Kind >= 10000 && evt.Kind < 20000:
err = kinds.HandleReplaceableKind(ctx, evt, collection, ws)
case evt.Kind >= 20000 && evt.Kind < 30000:
// Ephemeral events are not stored
fmt.Println("Ephemeral event received and ignored:", evt.ID)
case evt.Kind >= 30000 && evt.Kind < 40000:
err = kinds.HandleParameterizedReplaceableKind(ctx, evt, collection, ws)
@ -114,4 +113,3 @@ func HandleKind(ctx context.Context, evt relay.Event, ws *websocket.Conn) {
response.SendOK(ws, evt.ID, true, "")
}

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"grain/relay/handlers"
"grain/relay/utils"
"golang.org/x/net/websocket"
)
@ -12,6 +13,8 @@ func WebSocketHandler(ws *websocket.Conn) {
defer ws.Close()
var msg string
rateLimiter := utils.GetRateLimiter()
for {
err := websocket.Message.Receive(ws, &msg)
if err != nil {
@ -20,6 +23,11 @@ func WebSocketHandler(ws *websocket.Conn) {
}
fmt.Println("Received message:", msg)
if allowed, msg := rateLimiter.AllowWs(); !allowed {
websocket.Message.Send(ws, fmt.Sprintf(`{"error": "%s"}`, msg))
return
}
var message []interface{}
err = json.Unmarshal([]byte(msg), &message)
if err != nil {
@ -42,6 +50,10 @@ func WebSocketHandler(ws *websocket.Conn) {
case "EVENT":
handlers.HandleEvent(ws, message)
case "REQ":
if allowed, msg := rateLimiter.AllowReq(); !allowed {
websocket.Message.Send(ws, fmt.Sprintf(`{"error": "%s"}`, msg))
return
}
handlers.HandleReq(ws, message)
case "CLOSE":
handlers.HandleClose(ws, message)

View File

@ -27,6 +27,8 @@ type RateLimitConfig struct {
WsBurst int `yaml:"ws_burst"`
EventLimit float64 `yaml:"event_limit"`
EventBurst int `yaml:"event_burst"`
ReqLimit float64 `yaml:"req_limit"`
ReqBurst int `yaml:"req_burst"`
CategoryLimits map[string]KindLimitConfig `yaml:"category_limits"`
KindLimits []KindLimitConfig `yaml:"kind_limits"`
}

View File

@ -1,6 +1,8 @@
// rateLimiter.go
package utils
import (
"fmt"
"sync"
"golang.org/x/time/rate"
@ -21,6 +23,7 @@ type CategoryLimiter struct {
type RateLimiter struct {
wsLimiter *rate.Limiter
eventLimiter *rate.Limiter
reqLimiter *rate.Limiter
categoryLimiters map[string]*CategoryLimiter
kindLimiters map[int]*KindLimiter
mu sync.RWMutex
@ -39,40 +42,51 @@ func GetRateLimiter() *RateLimiter {
return rateLimiterInstance
}
func NewRateLimiter(wsLimit rate.Limit, wsBurst int, eventLimit rate.Limit, eventBurst int) *RateLimiter {
func NewRateLimiter(wsLimit rate.Limit, wsBurst int, eventLimit rate.Limit, eventBurst int, reqLimit rate.Limit, reqBurst int) *RateLimiter {
return &RateLimiter{
wsLimiter: rate.NewLimiter(wsLimit, wsBurst),
eventLimiter: rate.NewLimiter(eventLimit, eventBurst),
reqLimiter: rate.NewLimiter(reqLimit, reqBurst),
categoryLimiters: make(map[string]*CategoryLimiter),
kindLimiters: make(map[int]*KindLimiter),
}
}
func (rl *RateLimiter) AllowWs() bool {
return rl.wsLimiter.Allow()
func (rl *RateLimiter) AllowWs() (bool, string) {
if !rl.wsLimiter.Allow() {
return false, "WebSocket message rate limit exceeded"
}
return true, ""
}
func (rl *RateLimiter) AllowEvent(kind int, category string) bool {
func (rl *RateLimiter) AllowEvent(kind int, category string) (bool, string) {
rl.mu.RLock()
defer rl.mu.RUnlock()
if !rl.eventLimiter.Allow() {
return false
return false, "Global event rate limit exceeded"
}
if kindLimiter, exists := rl.kindLimiters[kind]; exists {
if !kindLimiter.Limiter.Allow() {
return false
return false, fmt.Sprintf("Rate limit exceeded for kind: %d", kind)
}
}
if categoryLimiter, exists := rl.categoryLimiters[category]; exists {
if !categoryLimiter.Limiter.Allow() {
return false
return false, fmt.Sprintf("Rate limit exceeded for category: %s", category)
}
}
return true
return true, ""
}
func (rl *RateLimiter) AllowReq() (bool, string) {
if !rl.reqLimiter.Allow() {
return false, "REQ rate limit exceeded"
}
return true, ""
}
func (rl *RateLimiter) AddCategoryLimit(category string, limit rate.Limit, burst int) {