mirror of
https://github.com/0ceanSlim/grain.git
synced 2024-11-21 16:17:13 +00:00
added configurable rate limits
This commit is contained in:
parent
a61c539c54
commit
80e80c4215
@ -3,3 +3,20 @@ mongodb:
|
||||
database: "grain"
|
||||
server:
|
||||
port: ":8080"
|
||||
# Rate Limits Integers are per second
|
||||
# burst is an override for the limit, this is to handle spikes in traffic
|
||||
rate_limit:
|
||||
event_limit: 25
|
||||
event_burst: 50
|
||||
ws_limit: 50
|
||||
ws_burst: 100
|
||||
kind_limits:
|
||||
- kind: 0
|
||||
limit: 1
|
||||
burst: 5
|
||||
- kind: 1
|
||||
limit: 100
|
||||
burst: 200
|
||||
- kind: 3
|
||||
limit: 25
|
||||
burst: 50
|
||||
|
1
go.mod
1
go.mod
@ -23,4 +23,5 @@ require (
|
||||
golang.org/x/crypto v0.25.0 // indirect
|
||||
golang.org/x/sync v0.7.0 // indirect
|
||||
golang.org/x/text v0.16.0 // indirect
|
||||
golang.org/x/time v0.5.0
|
||||
)
|
||||
|
2
go.sum
2
go.sum
@ -54,6 +54,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
|
||||
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
|
||||
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
|
||||
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
|
||||
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||
|
20
main.go
20
main.go
@ -7,12 +7,16 @@ import (
|
||||
|
||||
"grain/relay"
|
||||
"grain/relay/db"
|
||||
"grain/relay/handlers"
|
||||
"grain/relay/utils"
|
||||
"grain/web"
|
||||
|
||||
"golang.org/x/net/websocket"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
var rl *utils.RateLimiter
|
||||
|
||||
func main() {
|
||||
// Load configuration
|
||||
config, err := utils.LoadConfig("config.yml")
|
||||
@ -27,6 +31,14 @@ func main() {
|
||||
}
|
||||
defer db.DisconnectDB()
|
||||
|
||||
// Initialize rate limiter
|
||||
rl = utils.NewRateLimiter(rate.Limit(config.RateLimit.EventLimit), config.RateLimit.EventBurst, rate.Limit(config.RateLimit.WsLimit), config.RateLimit.WsBurst)
|
||||
for _, kindLimit := range config.RateLimit.KindLimits {
|
||||
rl.AddKindLimit(kindLimit.Kind, rate.Limit(kindLimit.Limit), kindLimit.Burst)
|
||||
}
|
||||
|
||||
handlers.SetRateLimiter(rl)
|
||||
|
||||
// Create a new ServeMux
|
||||
mux := http.NewServeMux()
|
||||
|
||||
@ -52,7 +64,13 @@ func main() {
|
||||
// Listener serves both WebSocket and HTML
|
||||
func ListenAndServe(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Header.Get("Upgrade") == "websocket" {
|
||||
websocket.Handler(relay.WebSocketHandler).ServeHTTP(w, r)
|
||||
websocket.Handler(func(ws *websocket.Conn) {
|
||||
if !rl.AllowWs() {
|
||||
ws.Close()
|
||||
return
|
||||
}
|
||||
relay.WebSocketHandler(ws)
|
||||
}).ServeHTTP(w, r)
|
||||
} else {
|
||||
web.RootHandler(w, r)
|
||||
}
|
||||
|
@ -13,6 +13,12 @@ import (
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
var rl *utils.RateLimiter
|
||||
|
||||
func SetRateLimiter(rateLimiter *utils.RateLimiter) {
|
||||
rl = rateLimiter
|
||||
}
|
||||
|
||||
func HandleEvent(ws *websocket.Conn, message []interface{}) {
|
||||
if len(message) != 2 {
|
||||
fmt.Println("Invalid EVENT message format")
|
||||
@ -37,6 +43,12 @@ func HandleEvent(ws *websocket.Conn, message []interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check rate limits
|
||||
if !rl.AllowEvent(evt.Kind) {
|
||||
kinds.SendNotice(ws, evt.ID, "rate limit exceeded")
|
||||
return
|
||||
}
|
||||
|
||||
// Call the HandleKind function
|
||||
HandleKind(context.TODO(), evt, ws)
|
||||
|
||||
@ -83,5 +95,3 @@ func HandleKind(ctx context.Context, evt relay.Event, ws *websocket.Conn) {
|
||||
|
||||
sendOK(ws, evt.ID, true, "")
|
||||
}
|
||||
|
||||
|
||||
|
@ -24,7 +24,7 @@ func HandleKind0(ctx context.Context, evt relay.Event, collection *mongo.Collect
|
||||
if err != mongo.ErrNoDocuments {
|
||||
if existingEvent.CreatedAt >= evt.CreatedAt {
|
||||
// If the existing event is newer or the same, respond with a NOTICE
|
||||
sendNotice(ws, evt.PubKey, "relay already has a newer kind 0 event for this pubkey")
|
||||
SendNotice(ws, evt.PubKey, "relay already has a newer kind 0 event for this pubkey")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -49,4 +49,4 @@ func HandleKind0(ctx context.Context, evt relay.Event, collection *mongo.Collect
|
||||
|
||||
fmt.Println("Upserted event kind 0 into MongoDB:", evt.ID)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -6,7 +6,8 @@ import (
|
||||
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
func HandleKind2Deprecated(ctx context.Context, evt relay.Event, ws *websocket.Conn) error {
|
||||
sendNotice(ws, evt.PubKey, "kind 2 is deprecated, event not accepted to the relay, please use kind 10002 as defined in NIP-65")
|
||||
SendNotice(ws, evt.PubKey, "kind 2 is deprecated, event not accepted to the relay, please use kind 10002 as defined in NIP-65")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -6,7 +6,7 @@ import (
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
func sendNotice(ws *websocket.Conn, pubKey, message string) {
|
||||
func SendNotice(ws *websocket.Conn, pubKey, message string) {
|
||||
notice := []interface{}{"NOTICE", pubKey, message}
|
||||
noticeBytes, _ := json.Marshal(notice)
|
||||
websocket.Message.Send(ws, string(noticeBytes))
|
||||
|
@ -21,7 +21,7 @@ func HandleReplaceableKind(ctx context.Context, evt relay.Event, collection *mon
|
||||
|
||||
if err != mongo.ErrNoDocuments {
|
||||
if existingEvent.CreatedAt > evt.CreatedAt || (existingEvent.CreatedAt == evt.CreatedAt && existingEvent.ID < evt.ID) {
|
||||
sendNotice(ws, evt.PubKey, "relay already has a newer kind 0 event for this pubkey")
|
||||
SendNotice(ws, evt.PubKey, "relay already has a newer kind 0 event for this pubkey")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -37,4 +37,4 @@ func HandleReplaceableKind(ctx context.Context, evt relay.Event, collection *mon
|
||||
|
||||
fmt.Printf("Upserted event kind %d into MongoDB: %s\n", evt.Kind, evt.ID)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ func HandleParameterizedReplaceableKind(ctx context.Context, evt relay.Event, co
|
||||
|
||||
if err != mongo.ErrNoDocuments {
|
||||
if existingEvent.CreatedAt > evt.CreatedAt || (existingEvent.CreatedAt == evt.CreatedAt && existingEvent.ID < evt.ID) {
|
||||
sendNotice(ws, evt.PubKey, "relay already has a newer event for this pubkey and d tag")
|
||||
SendNotice(ws, evt.PubKey, "relay already has a newer event for this pubkey and d tag")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -44,4 +44,4 @@ func HandleParameterizedReplaceableKind(ctx context.Context, evt relay.Event, co
|
||||
|
||||
fmt.Printf("Upserted event kind %d into MongoDB: %s\n", evt.Kind, evt.ID)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,20 @@ import (
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
type RateLimitConfig struct {
|
||||
EventLimit float64 `yaml:"event_limit"`
|
||||
EventBurst int `yaml:"event_burst"`
|
||||
WsLimit float64 `yaml:"ws_limit"`
|
||||
WsBurst int `yaml:"ws_burst"`
|
||||
KindLimits []KindLimitConfig `yaml:"kind_limits"`
|
||||
}
|
||||
|
||||
type KindLimitConfig struct {
|
||||
Kind int `yaml:"kind"`
|
||||
Limit float64 `yaml:"limit"`
|
||||
Burst int `yaml:"burst"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
MongoDB struct {
|
||||
URI string `yaml:"uri"`
|
||||
@ -14,6 +28,7 @@ type Config struct {
|
||||
Server struct {
|
||||
Port string `yaml:"port"`
|
||||
} `yaml:"server"`
|
||||
RateLimit RateLimitConfig `yaml:"rate_limit"`
|
||||
}
|
||||
|
||||
func LoadConfig(filename string) (*Config, error) {
|
||||
|
59
relay/utils/rateLimiter.go
Normal file
59
relay/utils/rateLimiter.go
Normal file
@ -0,0 +1,59 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type KindLimiter struct {
|
||||
Limiter *rate.Limiter
|
||||
Limit rate.Limit
|
||||
Burst int
|
||||
}
|
||||
|
||||
type RateLimiter struct {
|
||||
eventLimiter *rate.Limiter
|
||||
wsLimiter *rate.Limiter
|
||||
kindLimiters map[int]*KindLimiter
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewRateLimiter(eventLimit rate.Limit, eventBurst int, wsLimit rate.Limit, wsBurst int) *RateLimiter {
|
||||
return &RateLimiter{
|
||||
eventLimiter: rate.NewLimiter(eventLimit, eventBurst),
|
||||
wsLimiter: rate.NewLimiter(wsLimit, wsBurst),
|
||||
kindLimiters: make(map[int]*KindLimiter),
|
||||
}
|
||||
}
|
||||
|
||||
func (rl *RateLimiter) AddKindLimit(kind int, limit rate.Limit, burst int) {
|
||||
rl.mu.Lock()
|
||||
defer rl.mu.Unlock()
|
||||
rl.kindLimiters[kind] = &KindLimiter{
|
||||
Limiter: rate.NewLimiter(limit, burst),
|
||||
Limit: limit,
|
||||
Burst: burst,
|
||||
}
|
||||
}
|
||||
|
||||
func (rl *RateLimiter) AllowEvent(kind int) bool {
|
||||
rl.mu.RLock()
|
||||
defer rl.mu.RUnlock()
|
||||
|
||||
if !rl.eventLimiter.Allow() {
|
||||
return false
|
||||
}
|
||||
|
||||
if kindLimiter, exists := rl.kindLimiters[kind]; exists {
|
||||
if !kindLimiter.Limiter.Allow() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (rl *RateLimiter) AllowWs() bool {
|
||||
return rl.wsLimiter.Allow()
|
||||
}
|
BIN
web/favicon.ico
Normal file
BIN
web/favicon.ico
Normal file
Binary file not shown.
After Width: | Height: | Size: 66 KiB |
Loading…
Reference in New Issue
Block a user