mirror of
https://github.com/0ceanSlim/grain.git
synced 2024-11-22 08:37:13 +00:00
event size limits
This commit is contained in:
parent
4181ff5f94
commit
77f3e0314d
@ -6,18 +6,25 @@ server:
|
|||||||
port: ":8080" # Port for the server to listen on
|
port: ":8080" # Port for the server to listen on
|
||||||
|
|
||||||
rate_limit:
|
rate_limit:
|
||||||
ws_limit: 100 # Global rate limit for WebSocket messages (100 messages per second)
|
ws_limit: 100 # Global rate limit for WebSocket messages (50 messages per second)
|
||||||
ws_burst: 200 # Global burst limit for WebSocket messages (allows a burst of 200 messages)
|
ws_burst: 200 # Global burst limit for WebSocket messages (allows a burst of 100 messages)
|
||||||
|
|
||||||
event_limit: 50 # Global rate limit for events (50 events per second)
|
event_limit: 50 # Global rate limit for events (25 events per second)
|
||||||
event_burst: 100 # Global burst limit for events (allows a burst of 100 events)
|
event_burst: 100 # Global burst limit for events (allows a burst of 50 events)
|
||||||
req_limit: 50 # Added limit for REQ messages
|
req_limit: 50 # Added limit for REQ messages
|
||||||
req_burst: 100 # Added burst limit for REQ messages
|
req_burst: 100 # Added burst limit for REQ messages
|
||||||
|
|
||||||
|
max_event_size: 51200 # Global maximum event size in bytes (50Kb)
|
||||||
|
kind_size_limits:
|
||||||
|
- kind: 0
|
||||||
|
max_size: 10240 # Maximum event size for kind 0 in bytes (10Kb)
|
||||||
|
- kind: 1
|
||||||
|
max_size: 25600 # Maximum event size for kind 1 in bytes (25Kb)
|
||||||
|
|
||||||
category_limits: # Rate limits based on event categories
|
category_limits: # Rate limits based on event categories
|
||||||
regular:
|
regular:
|
||||||
limit: 25 # Rate limit for regular events (25 events per second)
|
limit: 25 # Rate limit for regular events (50 events per second)
|
||||||
burst: 50 # Burst limit for regular events (allows a burst of 50 events)
|
burst: 50 # Burst limit for regular events (allows a burst of 100 events)
|
||||||
replaceable:
|
replaceable:
|
||||||
limit: 10 # Rate limit for replaceable events (10 events per second)
|
limit: 10 # Rate limit for replaceable events (10 events per second)
|
||||||
burst: 20 # Burst limit for replaceable events (allows a burst of 20 events)
|
burst: 20 # Burst limit for replaceable events (allows a burst of 20 events)
|
||||||
@ -33,8 +40,8 @@ rate_limit:
|
|||||||
limit: 1 # Rate limit for events of kind 0 (1 event per second)
|
limit: 1 # Rate limit for events of kind 0 (1 event per second)
|
||||||
burst: 5 # Burst limit for events of kind 0 (allows a burst of 5 events)
|
burst: 5 # Burst limit for events of kind 0 (allows a burst of 5 events)
|
||||||
- kind: 1
|
- kind: 1
|
||||||
limit: 25 # Rate limit for events of kind 1 (25 events per second)
|
limit: 25 # Rate limit for events of kind 1 (100 events per second)
|
||||||
burst: 50 # Burst limit for events of kind 1 (allows a burst of 50 events)
|
burst: 50 # Burst limit for events of kind 1 (allows a burst of 200 events)
|
||||||
- kind: 3
|
- kind: 3
|
||||||
limit: 25 # Rate limit for events of kind 3 (25 events per second)
|
limit: 25 # Rate limit for events of kind 3 (25 events per second)
|
||||||
burst: 50 # Burst limit for events of kind 3 (allows a burst of 50 events)
|
burst: 50 # Burst limit for events of kind 3 (allows a burst of 50 events)
|
||||||
|
7
main.go
7
main.go
@ -45,6 +45,13 @@ func main() {
|
|||||||
|
|
||||||
utils.SetRateLimiter(rateLimiter)
|
utils.SetRateLimiter(rateLimiter)
|
||||||
|
|
||||||
|
sizeLimiter := utils.NewSizeLimiter(config.RateLimit.MaxEventSize)
|
||||||
|
for _, kindSizeLimit := range config.RateLimit.KindSizeLimits {
|
||||||
|
sizeLimiter.AddKindSizeLimit(kindSizeLimit.Kind, kindSizeLimit.MaxSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
utils.SetSizeLimiter(sizeLimiter)
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.HandleFunc("/", ListenAndServe)
|
mux.HandleFunc("/", ListenAndServe)
|
||||||
mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("web/static"))))
|
mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("web/static"))))
|
||||||
|
@ -38,12 +38,12 @@ func HandleEvent(ws *websocket.Conn, message []interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
HandleKind(context.TODO(), evt, ws)
|
HandleKind(context.TODO(), evt, ws, eventBytes)
|
||||||
|
|
||||||
fmt.Println("Event processed:", evt.ID)
|
fmt.Println("Event processed:", evt.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleKind(ctx context.Context, evt relay.Event, ws *websocket.Conn) {
|
func HandleKind(ctx context.Context, evt relay.Event, ws *websocket.Conn, eventBytes []byte) {
|
||||||
if !utils.CheckSignature(evt) {
|
if !utils.CheckSignature(evt) {
|
||||||
response.SendOK(ws, evt.ID, false, "invalid: signature verification failed")
|
response.SendOK(ws, evt.ID, false, "invalid: signature verification failed")
|
||||||
return
|
return
|
||||||
@ -52,6 +52,7 @@ func HandleKind(ctx context.Context, evt relay.Event, ws *websocket.Conn) {
|
|||||||
collection := db.GetCollection(evt.Kind)
|
collection := db.GetCollection(evt.Kind)
|
||||||
|
|
||||||
rateLimiter := utils.GetRateLimiter()
|
rateLimiter := utils.GetRateLimiter()
|
||||||
|
sizeLimiter := utils.GetSizeLimiter()
|
||||||
var category string
|
var category string
|
||||||
switch {
|
switch {
|
||||||
case evt.Kind == 0:
|
case evt.Kind == 0:
|
||||||
@ -81,6 +82,12 @@ func HandleKind(ctx context.Context, evt relay.Event, ws *websocket.Conn) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eventSize := len(eventBytes) // Calculate event size
|
||||||
|
if allowed, msg := sizeLimiter.AllowSize(evt.Kind, eventSize); !allowed {
|
||||||
|
response.SendOK(ws, evt.ID, false, msg)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
switch {
|
switch {
|
||||||
case evt.Kind == 0:
|
case evt.Kind == 0:
|
||||||
|
@ -22,6 +22,11 @@ type LimitBurst struct {
|
|||||||
Burst int `yaml:"burst"`
|
Burst int `yaml:"burst"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type KindSizeLimitConfig struct {
|
||||||
|
Kind int `yaml:"kind"`
|
||||||
|
MaxSize int `yaml:"max_size"`
|
||||||
|
}
|
||||||
|
|
||||||
type RateLimitConfig struct {
|
type RateLimitConfig struct {
|
||||||
WsLimit float64 `yaml:"ws_limit"`
|
WsLimit float64 `yaml:"ws_limit"`
|
||||||
WsBurst int `yaml:"ws_burst"`
|
WsBurst int `yaml:"ws_burst"`
|
||||||
@ -29,6 +34,8 @@ type RateLimitConfig struct {
|
|||||||
EventBurst int `yaml:"event_burst"`
|
EventBurst int `yaml:"event_burst"`
|
||||||
ReqLimit float64 `yaml:"req_limit"`
|
ReqLimit float64 `yaml:"req_limit"`
|
||||||
ReqBurst int `yaml:"req_burst"`
|
ReqBurst int `yaml:"req_burst"`
|
||||||
|
MaxEventSize int `yaml:"max_event_size"`
|
||||||
|
KindSizeLimits []KindSizeLimitConfig `yaml:"kind_size_limits"`
|
||||||
CategoryLimits map[string]KindLimitConfig `yaml:"category_limits"`
|
CategoryLimits map[string]KindLimitConfig `yaml:"category_limits"`
|
||||||
KindLimits []KindLimitConfig `yaml:"kind_limits"`
|
KindLimits []KindLimitConfig `yaml:"kind_limits"`
|
||||||
}
|
}
|
||||||
|
60
relay/utils/sizeLimiter.go
Normal file
60
relay/utils/sizeLimiter.go
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SizeLimiter struct {
|
||||||
|
globalMaxSize int
|
||||||
|
kindSizeLimits map[int]int
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSizeLimiter(globalMaxSize int) *SizeLimiter {
|
||||||
|
return &SizeLimiter{
|
||||||
|
globalMaxSize: globalMaxSize,
|
||||||
|
kindSizeLimits: make(map[int]int),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var sizeLimiterInstance *SizeLimiter
|
||||||
|
var sizeOnce sync.Once
|
||||||
|
|
||||||
|
func GetSizeLimiter() *SizeLimiter {
|
||||||
|
return sizeLimiterInstance
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetSizeLimiter(sl *SizeLimiter) {
|
||||||
|
sizeOnce.Do(func() {
|
||||||
|
sizeLimiterInstance = sl
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sl *SizeLimiter) SetGlobalMaxSize(maxSize int) {
|
||||||
|
sl.mu.Lock()
|
||||||
|
defer sl.mu.Unlock()
|
||||||
|
sl.globalMaxSize = maxSize
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sl *SizeLimiter) AddKindSizeLimit(kind int, maxSize int) {
|
||||||
|
sl.mu.Lock()
|
||||||
|
defer sl.mu.Unlock()
|
||||||
|
sl.kindSizeLimits[kind] = maxSize
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sl *SizeLimiter) AllowSize(kind int, size int) (bool, string) {
|
||||||
|
sl.mu.RLock()
|
||||||
|
defer sl.mu.RUnlock()
|
||||||
|
|
||||||
|
if size > sl.globalMaxSize {
|
||||||
|
return false, "Global event size limit exceeded"
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxSize, exists := sl.kindSizeLimits[kind]; exists {
|
||||||
|
if size > maxSize {
|
||||||
|
return false, "Event size limit exceeded for kind"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, ""
|
||||||
|
}
|
@ -44,6 +44,9 @@ func TestConfigValidity(t *testing.T) {
|
|||||||
if config.RateLimit.ReqBurst == 0 {
|
if config.RateLimit.ReqBurst == 0 {
|
||||||
t.Error("REQ burst is required")
|
t.Error("REQ burst is required")
|
||||||
}
|
}
|
||||||
|
if config.RateLimit.MaxEventSize == 0 {
|
||||||
|
t.Error("Global maximum event size is required")
|
||||||
|
}
|
||||||
|
|
||||||
// Check Category Limits
|
// Check Category Limits
|
||||||
if len(config.RateLimit.CategoryLimits) == 0 {
|
if len(config.RateLimit.CategoryLimits) == 0 {
|
||||||
@ -74,4 +77,11 @@ func TestConfigValidity(t *testing.T) {
|
|||||||
t.Errorf("Burst is required for kind: %d", kindLimit.Kind)
|
t.Errorf("Burst is required for kind: %d", kindLimit.Kind)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Validate kind size limits
|
||||||
|
for _, kindSizeLimit := range config.RateLimit.KindSizeLimits {
|
||||||
|
if kindSizeLimit.MaxSize == 0 {
|
||||||
|
t.Errorf("Maximum size is required for kind: %d", kindSizeLimit.Kind)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
74
tests/sizeLimits_test.go
Normal file
74
tests/sizeLimits_test.go
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
package tests
|
||||||
|
|
||||||
|
import (
|
||||||
|
"grain/relay/utils"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSizeLimiterGlobalMaxSize(t *testing.T) {
|
||||||
|
sizeLimiter := utils.NewSizeLimiter(1024) // Set global max size to 1024 bytes
|
||||||
|
|
||||||
|
// Test that an event within the global max size is allowed
|
||||||
|
if allowed, _ := sizeLimiter.AllowSize(0, 512); !allowed {
|
||||||
|
t.Error("Event within global max size should be allowed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that an event exceeding the global max size is not allowed
|
||||||
|
if allowed, msg := sizeLimiter.AllowSize(0, 2048); allowed {
|
||||||
|
t.Error("Event exceeding global max size should not be allowed")
|
||||||
|
} else {
|
||||||
|
expectedMsg := "Global event size limit exceeded"
|
||||||
|
if msg != expectedMsg {
|
||||||
|
t.Errorf("Expected message: %s, got: %s", expectedMsg, msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSizeLimiterKindSpecificSize(t *testing.T) {
|
||||||
|
sizeLimiter := utils.NewSizeLimiter(1024) // Set global max size to 1024 bytes
|
||||||
|
sizeLimiter.AddKindSizeLimit(1, 512) // Set max size for kind 1 to 512 bytes
|
||||||
|
|
||||||
|
// Test that an event within the kind-specific max size is allowed
|
||||||
|
if allowed, _ := sizeLimiter.AllowSize(1, 256); !allowed {
|
||||||
|
t.Error("Event within kind-specific max size should be allowed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that an event exceeding the kind-specific max size is not allowed
|
||||||
|
if allowed, msg := sizeLimiter.AllowSize(1, 1024); allowed {
|
||||||
|
t.Error("Event exceeding kind-specific max size should not be allowed")
|
||||||
|
} else {
|
||||||
|
expectedMsg := "Event size limit exceeded for kind"
|
||||||
|
if msg != expectedMsg {
|
||||||
|
t.Errorf("Expected message: %s, got: %s", expectedMsg, msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that an event exceeding the global max size is not allowed even if within the kind-specific max size
|
||||||
|
if allowed, msg := sizeLimiter.AllowSize(1, 2048); allowed {
|
||||||
|
t.Error("Event exceeding global max size should not be allowed even if within kind-specific max size")
|
||||||
|
} else {
|
||||||
|
expectedMsg := "Global event size limit exceeded"
|
||||||
|
if msg != expectedMsg {
|
||||||
|
t.Errorf("Expected message: %s, got: %s", expectedMsg, msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSizeLimiterNoKindSpecificLimit(t *testing.T) {
|
||||||
|
sizeLimiter := utils.NewSizeLimiter(1024) // Set global max size to 1024 bytes
|
||||||
|
|
||||||
|
// Test that an event for a kind without a specific limit is governed by the global limit
|
||||||
|
if allowed, _ := sizeLimiter.AllowSize(2, 512); !allowed {
|
||||||
|
t.Error("Event within global max size should be allowed for kinds without specific limit")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that an event exceeding the global max size is not allowed for kinds without a specific limit
|
||||||
|
if allowed, msg := sizeLimiter.AllowSize(2, 2048); allowed {
|
||||||
|
t.Error("Event exceeding global max size should not be allowed for kinds without specific limit")
|
||||||
|
} else {
|
||||||
|
expectedMsg := "Global event size limit exceeded"
|
||||||
|
if msg != expectedMsg {
|
||||||
|
t.Errorf("Expected message: %s, got: %s", expectedMsg, msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user