mirror of
https://github.com/0ceanSlim/grain.git
synced 2024-11-22 08:37:13 +00:00
dynamically making events in database
This commit is contained in:
parent
51a7fd7924
commit
0557a7e93b
@ -26,24 +26,16 @@ type Event struct {
|
|||||||
Sig string `json:"sig"`
|
Sig string `json:"sig"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var collections = make(map[int]*mongo.Collection)
|
var (
|
||||||
|
client *mongo.Client
|
||||||
|
collections = make(map[int]*mongo.Collection)
|
||||||
|
)
|
||||||
|
|
||||||
func InitCollections(client *mongo.Client, kinds ...int) {
|
func SetClient(mongoClient *mongo.Client) {
|
||||||
for _, kind := range kinds {
|
client = mongoClient
|
||||||
collectionName := fmt.Sprintf("event-kind%d", kind)
|
|
||||||
collections[kind] = client.Database("grain").Collection(collectionName)
|
|
||||||
indexModel := mongo.IndexModel{
|
|
||||||
Keys: bson.D{{Key: "id", Value: 1}},
|
|
||||||
Options: options.Index().SetUnique(true),
|
|
||||||
}
|
|
||||||
_, err := collections[kind].Indexes().CreateOne(context.TODO(), indexModel)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Failed to create index on %s: %v\n", collectionName, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetCollection(kind int, client *mongo.Client) *mongo.Collection {
|
func GetCollection(kind int) *mongo.Collection {
|
||||||
if collection, exists := collections[kind]; exists {
|
if collection, exists := collections[kind]; exists {
|
||||||
return collection
|
return collection
|
||||||
}
|
}
|
||||||
@ -61,13 +53,13 @@ func GetCollection(kind int, client *mongo.Client) *mongo.Collection {
|
|||||||
return collection
|
return collection
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleEvent(ctx context.Context, evt Event, client *mongo.Client, ws *websocket.Conn) {
|
func HandleEvent(ctx context.Context, evt Event, ws *websocket.Conn) {
|
||||||
if !ValidateEvent(evt) {
|
if !CheckSignature(evt) {
|
||||||
sendOKResponse(ws, evt.ID, false, "invalid: signature verification failed")
|
sendOKResponse(ws, evt.ID, false, "invalid: signature verification failed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
collection := GetCollection(evt.Kind, client)
|
collection := GetCollection(evt.Kind)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
switch evt.Kind {
|
switch evt.Kind {
|
||||||
@ -76,7 +68,7 @@ func HandleEvent(ctx context.Context, evt Event, client *mongo.Client, ws *webso
|
|||||||
case 1:
|
case 1:
|
||||||
err = HandleEventKind1(ctx, evt, collection)
|
err = HandleEventKind1(ctx, evt, collection)
|
||||||
default:
|
default:
|
||||||
err = HandleDefaultEvent(ctx, evt, collection)
|
err = HandleUnknownEvent(ctx, evt, collection)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -106,8 +98,7 @@ func SerializeEvent(evt Event) []byte {
|
|||||||
return serializedEvent
|
return serializedEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CheckSignature(evt Event) bool {
|
||||||
func ValidateEvent(evt Event) bool {
|
|
||||||
serializedEvent := SerializeEvent(evt)
|
serializedEvent := SerializeEvent(evt)
|
||||||
hash := sha256.Sum256(serializedEvent)
|
hash := sha256.Sum256(serializedEvent)
|
||||||
eventID := hex.EncodeToString(hash[:])
|
eventID := hex.EncodeToString(hash[:])
|
||||||
@ -154,13 +145,3 @@ func ValidateEvent(evt Event) bool {
|
|||||||
|
|
||||||
return verified
|
return verified
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleDefaultEvent(ctx context.Context, evt Event, collection *mongo.Collection) error {
|
|
||||||
_, err := collection.InsertOne(ctx, evt)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Error inserting default event into MongoDB: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("Inserted default event into MongoDB:", evt.ID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
18
events/unhandled.go
Normal file
18
events/unhandled.go
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
)
|
||||||
|
|
||||||
|
func HandleUnknownEvent(ctx context.Context, evt Event, collection *mongo.Collection) error {
|
||||||
|
_, err := collection.InsertOne(ctx, evt)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Error inserting unknown event into MongoDB: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Inserted unknown event into MongoDB:", evt.ID)
|
||||||
|
return nil
|
||||||
|
}
|
4
main.go
4
main.go
@ -27,8 +27,8 @@ func main() {
|
|||||||
}
|
}
|
||||||
defer db.DisconnectDB(client)
|
defer db.DisconnectDB(client)
|
||||||
|
|
||||||
// Initialize collections
|
// Initialize collections (dynamically handled in the events package)
|
||||||
events.InitCollections(client, 0, 1) // Initialize known kinds
|
events.SetClient(client)
|
||||||
|
|
||||||
server.SetClient(client)
|
server.SetClient(client)
|
||||||
|
|
||||||
|
@ -5,9 +5,8 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"grain/events"
|
"grain/events"
|
||||||
"time"
|
|
||||||
|
|
||||||
"grain/utils"
|
"grain/utils"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
@ -34,6 +33,7 @@ var client *mongo.Client
|
|||||||
|
|
||||||
func SetClient(mongoClient *mongo.Client) {
|
func SetClient(mongoClient *mongo.Client) {
|
||||||
client = mongoClient
|
client = mongoClient
|
||||||
|
events.SetClient(mongoClient) // Ensure the events package has the MongoDB client
|
||||||
}
|
}
|
||||||
|
|
||||||
func Handler(ws *websocket.Conn) {
|
func Handler(ws *websocket.Conn) {
|
||||||
@ -103,7 +103,8 @@ func handleEvent(ws *websocket.Conn, message []interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
events.HandleEvent(context.TODO(), evt, client, ws)
|
// Call the HandleEvent function from the events package
|
||||||
|
events.HandleEvent(context.TODO(), evt, ws)
|
||||||
|
|
||||||
fmt.Println("Event processed:", evt.ID)
|
fmt.Println("Event processed:", evt.ID)
|
||||||
}
|
}
|
||||||
@ -144,13 +145,13 @@ func handleReq(ws *websocket.Conn, message []interface{}) {
|
|||||||
fmt.Println("Subscription added:", subID)
|
fmt.Println("Subscription added:", subID)
|
||||||
|
|
||||||
// Query the database with filters and send back the results
|
// Query the database with filters and send back the results
|
||||||
events, err := QueryEvents(filters, client, "grain", "event-kind1")
|
queriedEvents, err := QueryEvents(filters, client, "grain", "event-kind1")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Error querying events:", err)
|
fmt.Println("Error querying events:", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, evt := range events {
|
for _, evt := range queriedEvents {
|
||||||
msg := []interface{}{"EVENT", subID, evt}
|
msg := []interface{}{"EVENT", subID, evt}
|
||||||
msgBytes, _ := json.Marshal(msg)
|
msgBytes, _ := json.Marshal(msg)
|
||||||
err = websocket.Message.Send(ws, string(msgBytes))
|
err = websocket.Message.Send(ws, string(msgBytes))
|
||||||
|
@ -11,10 +11,6 @@ type Config struct {
|
|||||||
URI string `yaml:"uri"`
|
URI string `yaml:"uri"`
|
||||||
Database string `yaml:"database"`
|
Database string `yaml:"database"`
|
||||||
} `yaml:"mongodb"`
|
} `yaml:"mongodb"`
|
||||||
Collections struct {
|
|
||||||
EventKind0 string `yaml:"event_kind0"`
|
|
||||||
EventKind1 string `yaml:"event_kind1"`
|
|
||||||
} `yaml:"collections"`
|
|
||||||
Server struct {
|
Server struct {
|
||||||
Address string `yaml:"address"`
|
Address string `yaml:"address"`
|
||||||
} `yaml:"server"`
|
} `yaml:"server"`
|
||||||
|
Loading…
Reference in New Issue
Block a user