mirror of
https://github.com/0ceanSlim/grain.git
synced 2024-11-22 00:27:14 +00:00
move kinds to db package, refactor new mongo package
This commit is contained in:
parent
a1801da490
commit
d1b3750c87
6
main.go
6
main.go
@ -7,7 +7,7 @@ import (
|
|||||||
"grain/config"
|
"grain/config"
|
||||||
configTypes "grain/config/types"
|
configTypes "grain/config/types"
|
||||||
relay "grain/server"
|
relay "grain/server"
|
||||||
"grain/server/db"
|
"grain/server/db/mongo"
|
||||||
"grain/server/utils"
|
"grain/server/utils"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -41,7 +41,7 @@ func main() {
|
|||||||
|
|
||||||
config.SetResourceLimit(&cfg.ResourceLimits) // Apply limits once before starting the server
|
config.SetResourceLimit(&cfg.ResourceLimits) // Apply limits once before starting the server
|
||||||
|
|
||||||
client, err := db.InitDB(cfg)
|
client, err := mongo.InitDB(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Error initializing database: ", err)
|
log.Fatal("Error initializing database: ", err)
|
||||||
}
|
}
|
||||||
@ -71,7 +71,7 @@ func main() {
|
|||||||
case <-signalChan:
|
case <-signalChan:
|
||||||
log.Println("Shutting down server...")
|
log.Println("Shutting down server...")
|
||||||
server.Close() // Stop the server
|
server.Close() // Stop the server
|
||||||
db.DisconnectDB(client) // Disconnect from MongoDB
|
mongo.DisconnectDB(client) // Disconnect from MongoDB
|
||||||
wg.Wait() // Wait for all goroutines to finish
|
wg.Wait() // Wait for all goroutines to finish
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package db
|
package mongo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -12,7 +12,7 @@ import (
|
|||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
func HandleKind5(ctx context.Context, evt relay.Event, dbClient *mongo.Client, ws *websocket.Conn) error {
|
func HandleDeleteKind(ctx context.Context, evt relay.Event, dbClient *mongo.Client, ws *websocket.Conn) error {
|
||||||
for _, tag := range evt.Tags {
|
for _, tag := range evt.Tags {
|
||||||
if len(tag) < 2 {
|
if len(tag) < 2 {
|
||||||
continue
|
continue
|
@ -8,7 +8,7 @@ import (
|
|||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
func HandleKind2(ctx context.Context, evt relay.Event, ws *websocket.Conn) error {
|
func HandleDeprecatedKind(ctx context.Context, evt relay.Event, ws *websocket.Conn) error {
|
||||||
|
|
||||||
// Send an OK message to indicate the event was not accepted
|
// Send an OK message to indicate the event was not accepted
|
||||||
response.SendOK(ws, evt.ID, false, "invalid: kind 2 is deprecated, use kind 10002 (NIP65)")
|
response.SendOK(ws, evt.ID, false, "invalid: kind 2 is deprecated, use kind 10002 (NIP65)")
|
@ -1,4 +1,4 @@
|
|||||||
package db
|
package mongo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -1,9 +1,9 @@
|
|||||||
package db
|
package mongo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"grain/server/handlers/kinds"
|
"grain/server/db/mongo/kinds"
|
||||||
"grain/server/handlers/response"
|
"grain/server/handlers/response"
|
||||||
nostr "grain/server/types"
|
nostr "grain/server/types"
|
||||||
|
|
||||||
@ -16,15 +16,15 @@ func StoreMongoEvent(ctx context.Context, evt nostr.Event, ws *websocket.Conn) {
|
|||||||
var err error
|
var err error
|
||||||
switch {
|
switch {
|
||||||
case evt.Kind == 0:
|
case evt.Kind == 0:
|
||||||
err = kinds.HandleKind0(ctx, evt, collection, ws)
|
err = kinds.HandleReplaceableKind(ctx, evt, collection, ws)
|
||||||
case evt.Kind == 1:
|
case evt.Kind == 1:
|
||||||
err = kinds.HandleKind1(ctx, evt, collection, ws)
|
err = kinds.HandleRegularKind(ctx, evt, collection, ws)
|
||||||
case evt.Kind == 2:
|
case evt.Kind == 2:
|
||||||
err = kinds.HandleKind2(ctx, evt, ws)
|
err = kinds.HandleDeprecatedKind(ctx, evt, ws)
|
||||||
case evt.Kind == 3:
|
case evt.Kind == 3:
|
||||||
err = kinds.HandleReplaceableKind(ctx, evt, collection, ws)
|
err = kinds.HandleReplaceableKind(ctx, evt, collection, ws)
|
||||||
case evt.Kind == 5:
|
case evt.Kind == 5:
|
||||||
err = kinds.HandleKind5(ctx, evt, GetClient(), ws)
|
err = kinds.HandleDeleteKind(ctx, evt, GetClient(), ws)
|
||||||
case evt.Kind >= 4 && evt.Kind < 45:
|
case evt.Kind >= 4 && evt.Kind < 45:
|
||||||
err = kinds.HandleRegularKind(ctx, evt, collection, ws)
|
err = kinds.HandleRegularKind(ctx, evt, collection, ws)
|
||||||
case evt.Kind >= 1000 && evt.Kind < 10000:
|
case evt.Kind >= 1000 && evt.Kind < 10000:
|
@ -5,7 +5,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"grain/config"
|
"grain/config"
|
||||||
"grain/server/db"
|
"grain/server/db/mongo"
|
||||||
|
|
||||||
"grain/server/handlers/response"
|
"grain/server/handlers/response"
|
||||||
"grain/server/utils"
|
"grain/server/utils"
|
||||||
@ -61,7 +61,7 @@ func HandleEvent(ws *websocket.Conn, message []interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// This is where I'll handle storage for multiple database types in the future
|
// This is where I'll handle storage for multiple database types in the future
|
||||||
db.StoreMongoEvent(context.TODO(), evt, ws)
|
mongo.StoreMongoEvent(context.TODO(), evt, ws)
|
||||||
|
|
||||||
fmt.Println("Event processed:", evt.ID)
|
fmt.Println("Event processed:", evt.ID)
|
||||||
|
|
||||||
|
@ -1,50 +0,0 @@
|
|||||||
package kinds
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"grain/server/handlers/response"
|
|
||||||
relay "grain/server/types"
|
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
|
||||||
"golang.org/x/net/websocket"
|
|
||||||
)
|
|
||||||
|
|
||||||
func HandleKind0(ctx context.Context, evt relay.Event, collection *mongo.Collection, ws *websocket.Conn) error {
|
|
||||||
filter := bson.M{"pubkey": evt.PubKey}
|
|
||||||
var existingEvent relay.Event
|
|
||||||
err := collection.FindOne(ctx, filter).Decode(&existingEvent)
|
|
||||||
if err != nil && err != mongo.ErrNoDocuments {
|
|
||||||
return fmt.Errorf("error finding existing event: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != mongo.ErrNoDocuments {
|
|
||||||
if existingEvent.CreatedAt >= evt.CreatedAt {
|
|
||||||
response.SendOK(ws, evt.ID, false, "blocked: a newer kind 0 event already exists for this pubkey")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
update := bson.M{
|
|
||||||
"$set": bson.M{
|
|
||||||
"id": evt.ID,
|
|
||||||
"created_at": evt.CreatedAt,
|
|
||||||
"kind": evt.Kind,
|
|
||||||
"tags": evt.Tags,
|
|
||||||
"content": evt.Content,
|
|
||||||
"sig": evt.Sig,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := options.Update().SetUpsert(true)
|
|
||||||
_, err = collection.UpdateOne(ctx, filter, update, opts)
|
|
||||||
if err != nil {
|
|
||||||
response.SendOK(ws, evt.ID, false, "error: could not connect to the database")
|
|
||||||
return fmt.Errorf("error updating/inserting event kind 0 into MongoDB: %v", err)
|
|
||||||
}
|
|
||||||
response.SendOK(ws, evt.ID, true, "")
|
|
||||||
fmt.Println("Upserted event kind 0 into MongoDB:", evt.ID)
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,24 +0,0 @@
|
|||||||
// kinds/kind1.go
|
|
||||||
package kinds
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"grain/server/handlers/response"
|
|
||||||
relay "grain/server/types"
|
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
|
||||||
"golang.org/x/net/websocket"
|
|
||||||
)
|
|
||||||
|
|
||||||
func HandleKind1(ctx context.Context, evt relay.Event, collection *mongo.Collection, ws *websocket.Conn) error {
|
|
||||||
_, err := collection.InsertOne(ctx, evt)
|
|
||||||
if err != nil {
|
|
||||||
response.SendOK(ws, evt.ID, false, "error: could not connect to the database")
|
|
||||||
return fmt.Errorf("error inserting event into MongoDB: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("Inserted event kind 1 into MongoDB:", evt.ID)
|
|
||||||
response.SendOK(ws, evt.ID, true, "")
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -4,7 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"grain/config"
|
"grain/config"
|
||||||
"grain/server/db"
|
"grain/server/db/mongo"
|
||||||
"grain/server/handlers/response"
|
"grain/server/handlers/response"
|
||||||
relay "grain/server/types"
|
relay "grain/server/types"
|
||||||
"grain/server/utils"
|
"grain/server/utils"
|
||||||
@ -113,7 +113,7 @@ func processRequest(ws *websocket.Conn, message []interface{}) {
|
|||||||
fmt.Printf("Subscription updated: %s with %d filters\n", subID, len(filters))
|
fmt.Printf("Subscription updated: %s with %d filters\n", subID, len(filters))
|
||||||
|
|
||||||
// Query the database with filters and send back the results
|
// Query the database with filters and send back the results
|
||||||
queriedEvents, err := db.QueryEvents(filters, db.GetClient(), "grain")
|
queriedEvents, err := mongo.QueryEvents(filters, mongo.GetClient(), "grain")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Error querying events:", err)
|
fmt.Println("Error querying events:", err)
|
||||||
response.SendClosed(ws, subID, "error: could not query events")
|
response.SendClosed(ws, subID, "error: could not query events")
|
||||||
|
Loading…
Reference in New Issue
Block a user