From 7e5e3758c362e2ed634857568bea6fdd418b9b48 Mon Sep 17 00:00:00 2001 From: Chris kerr Date: Tue, 12 Nov 2024 21:37:43 -0500 Subject: [PATCH] event purging refactor --- config/types/eventPurging.go | 18 ++--- server/db/mongo/purgeEvents.go | 95 +++++++++++++++++--------- server/handlers/event.go | 19 +----- server/utils/determineEventCategory.go | 18 +++++ 4 files changed, 89 insertions(+), 61 deletions(-) create mode 100644 server/utils/determineEventCategory.go diff --git a/config/types/eventPurging.go b/config/types/eventPurging.go index 4434be8..c049d85 100644 --- a/config/types/eventPurging.go +++ b/config/types/eventPurging.go @@ -1,15 +1,11 @@ package config type EventPurgeConfig struct { - Enabled bool `yaml:"enabled"` - KeepDurationDays int `yaml:"keep_duration_days"` - PurgeIntervalHours int `yaml:"purge_interval_hours"` - PurgeByCategory map[string]bool `yaml:"purge_by_category"` - PurgeByKind []KindPurgeRule `yaml:"purge_by_kind"` - ExcludeWhitelisted bool `yaml:"exclude_whitelisted"` -} - -type KindPurgeRule struct { - Kind int `yaml:"kind"` - Enabled bool `yaml:"enabled"` + Enabled bool `yaml:"enabled"` + KeepIntervalHours int `yaml:"keep_interval_hours"` + PurgeIntervalMinutes int `yaml:"purge_interval_minutes"` + PurgeByCategory map[string]bool `yaml:"purge_by_category"` + PurgeByKindEnabled bool `yaml:"purge_by_kind_enabled"` + KindsToPurge []int `yaml:"kinds_to_purge"` + ExcludeWhitelisted bool `yaml:"exclude_whitelisted"` } diff --git a/server/db/mongo/purgeEvents.go b/server/db/mongo/purgeEvents.go index 8bb6db7..9a3c305 100644 --- a/server/db/mongo/purgeEvents.go +++ b/server/db/mongo/purgeEvents.go @@ -5,10 +5,13 @@ import ( "grain/config" types "grain/config/types" nostr "grain/server/types" + "grain/server/utils" "log" + "strconv" "time" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" ) // PurgeOldEvents removes old events based on the configuration and a list of whitelisted pubkeys. @@ -18,49 +21,77 @@ func PurgeOldEvents(cfg *types.EventPurgeConfig) { } client := GetClient() - collection := client.Database("grain").Collection("events") + cutoff := time.Now().Add(-time.Duration(cfg.KeepIntervalHours) * time.Hour).Unix() + var collectionsToPurge []string - // Calculate the cutoff time - cutoff := time.Now().AddDate(0, 0, -cfg.KeepDurationDays).Unix() - - // Create the base filter for fetching old events - baseFilter := bson.M{ - "created_at": bson.M{"$lt": cutoff}, // Filter for events older than the cutoff + // Determine collections to purge + if cfg.PurgeByKindEnabled { + for _, kind := range cfg.KindsToPurge { + collectionsToPurge = append(collectionsToPurge, "event-kind"+strconv.Itoa(kind)) + } + } else { + // If `purge_by_kind_enabled` is false, add all potential event kinds or find dynamically + collectionsToPurge = getAllEventCollections(client) } - cursor, err := collection.Find(context.TODO(), baseFilter) - if err != nil { - log.Printf("Error fetching old events for purging: %v", err) - return - } - defer cursor.Close(context.TODO()) + for _, collectionName := range collectionsToPurge { + collection := client.Database("grain").Collection(collectionName) + baseFilter := bson.M{"created_at": bson.M{"$lt": cutoff}} - for cursor.Next(context.TODO()) { - var evt nostr.Event - if err := cursor.Decode(&evt); err != nil { - log.Printf("Error decoding event: %v", err) - continue - } - - // Check if the event's pubkey is whitelisted and skip purging if configured to do so - if cfg.ExcludeWhitelisted && config.IsPubKeyWhitelisted(evt.PubKey) { - log.Printf("Skipping purging for whitelisted event ID: %s, pubkey: %s", evt.ID, evt.PubKey) - continue - } - - // Proceed with deleting the event if it is not whitelisted - _, err := collection.DeleteOne(context.TODO(), bson.M{"id": evt.ID}) + cursor, err := collection.Find(context.TODO(), baseFilter) if err != nil { - log.Printf("Error purging event ID %s: %v", evt.ID, err) - } else { - log.Printf("Purged event ID: %s", evt.ID) + log.Printf("Error fetching old events for purging from %s: %v", collectionName, err) + continue + } + defer cursor.Close(context.TODO()) + + for cursor.Next(context.TODO()) { + var evt nostr.Event + if err := cursor.Decode(&evt); err != nil { + log.Printf("Error decoding event from %s: %v", collectionName, err) + continue + } + + // Skip if the pubkey is whitelisted + if cfg.ExcludeWhitelisted && config.IsPubKeyWhitelisted(evt.PubKey) { + log.Printf("Skipping purging for whitelisted event ID: %s, pubkey: %s", evt.ID, evt.PubKey) + continue + } + + // Check if purging by category is enabled and if the event matches the allowed category + category := utils.DetermineEventCategory(evt.Kind) + if purge, exists := cfg.PurgeByCategory[category]; exists && purge { + _, err := collection.DeleteOne(context.TODO(), bson.M{"id": evt.ID}) + if err != nil { + log.Printf("Error purging event ID %s from %s: %v", evt.ID, collectionName, err) + } else { + log.Printf("Purged event ID: %s from %s", evt.ID, collectionName) + } + } } } } +// getAllEventCollections returns a list of all event collections if purging all kinds. +func getAllEventCollections(client *mongo.Client) []string { + var collections []string + collectionNames, err := client.Database("grain").ListCollectionNames(context.TODO(), bson.M{}) + if err != nil { + log.Printf("Error listing collection names: %v", err) + return collections + } + + for _, name := range collectionNames { + if len(name) > 10 && name[:10] == "event-kind" { + collections = append(collections, name) + } + } + return collections +} + // ScheduleEventPurging runs the event purging at a configurable interval. func ScheduleEventPurging(cfg *types.ServerConfig) { - purgeInterval := time.Duration(cfg.EventPurge.PurgeIntervalHours) * time.Hour + purgeInterval := time.Duration(cfg.EventPurge.PurgeIntervalMinutes) * time.Minute ticker := time.NewTicker(purgeInterval) defer ticker.Stop() diff --git a/server/handlers/event.go b/server/handlers/event.go index ea0cc65..0ab8da6 100644 --- a/server/handlers/event.go +++ b/server/handlers/event.go @@ -183,7 +183,7 @@ func handleBlacklistAndWhitelist(ws *websocket.Conn, evt nostr.Event) bool { func handleRateAndSizeLimits(ws *websocket.Conn, evt nostr.Event, eventSize int) bool { rateLimiter := config.GetRateLimiter() sizeLimiter := config.GetSizeLimiter() - category := determineCategory(evt.Kind) + category := utils.DetermineEventCategory(evt.Kind) if allowed, msg := rateLimiter.AllowEvent(evt.Kind, category); !allowed { response.SendOK(ws, evt.ID, false, msg) @@ -197,20 +197,3 @@ func handleRateAndSizeLimits(ws *websocket.Conn, evt nostr.Event, eventSize int) return true } - -func determineCategory(kind int) string { - switch { - case kind == 0, kind == 3, kind >= 10000 && kind < 20000: - return "replaceable" - case kind == 1, kind >= 4 && kind < 45, kind >= 1000 && kind < 10000: - return "regular" - case kind == 2: - return "deprecated" - case kind >= 20000 && kind < 30000: - return "ephemeral" - case kind >= 30000 && kind < 40000: - return "parameterized_replaceable" - default: - return "unknown" - } -} diff --git a/server/utils/determineEventCategory.go b/server/utils/determineEventCategory.go new file mode 100644 index 0000000..812bdb2 --- /dev/null +++ b/server/utils/determineEventCategory.go @@ -0,0 +1,18 @@ +package utils + +func DetermineEventCategory(kind int) string { + switch { + case kind == 0, kind == 3, kind >= 10000 && kind < 20000: + return "replaceable" + case kind == 1, kind >= 4 && kind < 45, kind >= 1000 && kind < 10000: + return "regular" + case kind == 2: + return "deprecated" + case kind >= 20000 && kind < 30000: + return "ephemeral" + case kind >= 30000 && kind < 40000: + return "parameterized_replaceable" + default: + return "unknown" + } +}