diff --git a/relay/handlers/close.go b/relay/handlers/close.go new file mode 100644 index 0000000..9a62ca6 --- /dev/null +++ b/relay/handlers/close.go @@ -0,0 +1,32 @@ +package handlers + +import ( + "encoding/json" + "fmt" + + "golang.org/x/net/websocket" +) + +func HandleClose(ws *websocket.Conn, message []interface{}) { + if len(message) != 2 { + fmt.Println("Invalid CLOSE message format") + return + } + + subID, ok := message[1].(string) + if !ok { + fmt.Println("Invalid subscription ID format") + return + } + + delete(subscriptions, subID) + fmt.Println("Subscription closed:", subID) + + closeMsg := []interface{}{"CLOSED", subID, "Subscription closed"} + closeBytes, _ := json.Marshal(closeMsg) + err := websocket.Message.Send(ws, string(closeBytes)) + if err != nil { + fmt.Println("Error sending CLOSE message:", err) + return + } +} diff --git a/relay/handlers/event.go b/relay/handlers/event.go new file mode 100644 index 0000000..574729a --- /dev/null +++ b/relay/handlers/event.go @@ -0,0 +1,76 @@ +package handlers + +import ( + "context" + "encoding/json" + "fmt" + "grain/relay/db" + "grain/relay/kinds" + "grain/relay/utils" + + relay "grain/relay/types" + + "golang.org/x/net/websocket" +) + +func HandleEvent(ws *websocket.Conn, message []interface{}) { + if len(message) != 2 { + fmt.Println("Invalid EVENT message format") + return + } + + eventData, ok := message[1].(map[string]interface{}) + if !ok { + fmt.Println("Invalid event data format") + return + } + eventBytes, err := json.Marshal(eventData) + if err != nil { + fmt.Println("Error marshaling event data:", err) + return + } + + var evt relay.Event + err = json.Unmarshal(eventBytes, &evt) + if err != nil { + fmt.Println("Error unmarshaling event data:", err) + return + } + + // Call the HandleKind function + HandleKind(context.TODO(), evt, ws) + + fmt.Println("Event processed:", evt.ID) +} + +func HandleKind(ctx context.Context, evt relay.Event, ws *websocket.Conn) { + if !utils.CheckSignature(evt) { + OKResponse(ws, evt.ID, false, "invalid: signature verification failed") + return + } + + collection := db.GetCollection(evt.Kind) + + var err error + switch evt.Kind { + case 0: + err = kinds.HandleKind0(ctx, evt, collection) + case 1: + err = kinds.HandleKind1(ctx, evt, collection) + default: + err = kinds.HandleUnknownKind(ctx, evt, collection) + } + + if err != nil { + OKResponse(ws, evt.ID, false, fmt.Sprintf("error: %v", err)) + return + } + + OKResponse(ws, evt.ID, true, "") +} + +func OKResponse(ws *websocket.Conn, eventID string, status bool, message string) { + response := []interface{}{"OK", eventID, status, message} + responseBytes, _ := json.Marshal(response) + websocket.Message.Send(ws, string(responseBytes)) +} diff --git a/relay/handlers/req.go b/relay/handlers/req.go new file mode 100644 index 0000000..328d88b --- /dev/null +++ b/relay/handlers/req.go @@ -0,0 +1,137 @@ +package handlers + +import ( + "context" + "encoding/json" + "fmt" + "grain/relay/db" + relay "grain/relay/types" + "grain/relay/utils" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "golang.org/x/net/websocket" +) + +var subscriptions = make(map[string]relay.Subscription) + +func HandleReq(ws *websocket.Conn, message []interface{}) { + if len(message) < 3 { + fmt.Println("Invalid REQ message format") + return + } + + subID, ok := message[1].(string) + if !ok { + fmt.Println("Invalid subscription ID format") + return + } + + filters := make([]relay.Filter, len(message)-2) + for i, filter := range message[2:] { + filterData, ok := filter.(map[string]interface{}) + if !ok { + fmt.Println("Invalid filter format") + return + } + + var f relay.Filter + f.IDs = utils.ToStringArray(filterData["ids"]) + f.Authors = utils.ToStringArray(filterData["authors"]) + f.Kinds = utils.ToIntArray(filterData["kinds"]) + f.Tags = utils.ToTagsMap(filterData["tags"]) + f.Since = utils.ToTime(filterData["since"]) + f.Until = utils.ToTime(filterData["until"]) + f.Limit = utils.ToInt(filterData["limit"]) + + filters[i] = f + } + + subscriptions[subID] = relay.Subscription{ID: subID, Filters: filters} + fmt.Println("Subscription added:", subID) + + // Query the database with filters and send back the results + // TO DO why is this taking a certain kind as an argument for collection??? + queriedEvents, err := QueryEvents(filters, db.GetClient(), "grain", "event-kind1") + if err != nil { + fmt.Println("Error querying events:", err) + return + } + + for _, evt := range queriedEvents { + msg := []interface{}{"EVENT", subID, evt} + msgBytes, _ := json.Marshal(msg) + err = websocket.Message.Send(ws, string(msgBytes)) + if err != nil { + fmt.Println("Error sending event:", err) + return + } + } + + // Indicate end of stored events + eoseMsg := []interface{}{"EOSE", subID} + eoseBytes, _ := json.Marshal(eoseMsg) + err = websocket.Message.Send(ws, string(eoseBytes)) + if err != nil { + fmt.Println("Error sending EOSE:", err) + return + } +} +// QueryEvents queries events from the MongoDB collection based on filters +func QueryEvents(filters []relay.Filter, client *mongo.Client, databaseName, collectionName string) ([]relay.Event, error) { + collection := client.Database(databaseName).Collection(collectionName) + + var results []relay.Event + + for _, filter := range filters { + filterBson := bson.M{} + + if len(filter.IDs) > 0 { + filterBson["_id"] = bson.M{"$in": filter.IDs} + } + if len(filter.Authors) > 0 { + filterBson["author"] = bson.M{"$in": filter.Authors} + } + if len(filter.Kinds) > 0 { + filterBson["kind"] = bson.M{"$in": filter.Kinds} + } + if filter.Tags != nil { + for key, values := range filter.Tags { + if len(values) > 0 { + filterBson[key] = bson.M{"$in": values} + } + } + } + if filter.Since != nil { + filterBson["created_at"] = bson.M{"$gte": *filter.Since} + } + if filter.Until != nil { + filterBson["created_at"] = bson.M{"$lte": *filter.Until} + } + + opts := options.Find() + if filter.Limit != nil { + opts.SetLimit(int64(*filter.Limit)) + } + + cursor, err := collection.Find(context.TODO(), filterBson, opts) + if err != nil { + return nil, fmt.Errorf("error querying events: %v", err) + } + defer cursor.Close(context.TODO()) + + for cursor.Next(context.TODO()) { + var event relay.Event + if err := cursor.Decode(&event); err != nil { + return nil, fmt.Errorf("error decoding event: %v", err) + } + results = append(results, event) + } + if err := cursor.Err(); err != nil { + return nil, fmt.Errorf("cursor error: %v", err) + } + } + + return results, nil +} diff --git a/relay/query.go b/relay/query.go deleted file mode 100644 index 51f8ce1..0000000 --- a/relay/query.go +++ /dev/null @@ -1,70 +0,0 @@ -package relay - -import ( - "context" - "fmt" - - relay "grain/relay/types" - - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -// QueryEvents queries events from the MongoDB collection based on filters -func QueryEvents(filters []relay.Filter, client *mongo.Client, databaseName, collectionName string) ([]relay.Event, error) { - collection := client.Database(databaseName).Collection(collectionName) - - var results []relay.Event - - for _, filter := range filters { - filterBson := bson.M{} - - if len(filter.IDs) > 0 { - filterBson["_id"] = bson.M{"$in": filter.IDs} - } - if len(filter.Authors) > 0 { - filterBson["author"] = bson.M{"$in": filter.Authors} - } - if len(filter.Kinds) > 0 { - filterBson["kind"] = bson.M{"$in": filter.Kinds} - } - if filter.Tags != nil { - for key, values := range filter.Tags { - if len(values) > 0 { - filterBson[key] = bson.M{"$in": values} - } - } - } - if filter.Since != nil { - filterBson["created_at"] = bson.M{"$gte": *filter.Since} - } - if filter.Until != nil { - filterBson["created_at"] = bson.M{"$lte": *filter.Until} - } - - opts := options.Find() - if filter.Limit != nil { - opts.SetLimit(int64(*filter.Limit)) - } - - cursor, err := collection.Find(context.TODO(), filterBson, opts) - if err != nil { - return nil, fmt.Errorf("error querying events: %v", err) - } - defer cursor.Close(context.TODO()) - - for cursor.Next(context.TODO()) { - var event relay.Event - if err := cursor.Decode(&event); err != nil { - return nil, fmt.Errorf("error decoding event: %v", err) - } - results = append(results, event) - } - if err := cursor.Err(); err != nil { - return nil, fmt.Errorf("cursor error: %v", err) - } - } - - return results, nil -} diff --git a/relay/relay.go b/relay/relay.go index eae5427..86035dd 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -1,19 +1,13 @@ package relay import ( - "context" "encoding/json" "fmt" - "grain/relay/db" - "grain/relay/kinds" - relay "grain/relay/types" - "grain/relay/utils" + "grain/relay/handlers" "golang.org/x/net/websocket" ) -var subscriptions = make(map[string]relay.Subscription) - func Listener(ws *websocket.Conn) { defer ws.Close() @@ -46,162 +40,13 @@ func Listener(ws *websocket.Conn) { switch messageType { case "EVENT": - handleEvent(ws, message) + handlers.HandleEvent(ws, message) case "REQ": - handleReq(ws, message) + handlers.HandleReq(ws, message) case "CLOSE": - handleClose(ws, message) + handlers.HandleClose(ws, message) default: fmt.Println("Unknown message type:", messageType) } } } - -func handleEvent(ws *websocket.Conn, message []interface{}) { - if len(message) != 2 { - fmt.Println("Invalid EVENT message format") - return - } - - eventData, ok := message[1].(map[string]interface{}) - if !ok { - fmt.Println("Invalid event data format") - return - } - eventBytes, err := json.Marshal(eventData) - if err != nil { - fmt.Println("Error marshaling event data:", err) - return - } - - var evt relay.Event - err = json.Unmarshal(eventBytes, &evt) - if err != nil { - fmt.Println("Error unmarshaling event data:", err) - return - } - - // Call the HandleKind function - HandleKind(context.TODO(), evt, ws) - - fmt.Println("Event processed:", evt.ID) -} - -func HandleKind(ctx context.Context, evt relay.Event, ws *websocket.Conn) { - if !utils.CheckSignature(evt) { - sendOKResponse(ws, evt.ID, false, "invalid: signature verification failed") - return - } - - collection := db.GetCollection(evt.Kind) - - var err error - switch evt.Kind { - case 0: - err = kinds.HandleKind0(ctx, evt, collection) - case 1: - err = kinds.HandleKind1(ctx, evt, collection) - default: - err = kinds.HandleUnknownKind(ctx, evt, collection) - } - - if err != nil { - sendOKResponse(ws, evt.ID, false, fmt.Sprintf("error: %v", err)) - return - } - - sendOKResponse(ws, evt.ID, true, "") -} - -func sendOKResponse(ws *websocket.Conn, eventID string, status bool, message string) { - response := []interface{}{"OK", eventID, status, message} - responseBytes, _ := json.Marshal(response) - websocket.Message.Send(ws, string(responseBytes)) -} - -func handleReq(ws *websocket.Conn, message []interface{}) { - if len(message) < 3 { - fmt.Println("Invalid REQ message format") - return - } - - subID, ok := message[1].(string) - if !ok { - fmt.Println("Invalid subscription ID format") - return - } - - filters := make([]relay.Filter, len(message)-2) - for i, filter := range message[2:] { - filterData, ok := filter.(map[string]interface{}) - if !ok { - fmt.Println("Invalid filter format") - return - } - - var f relay.Filter - f.IDs = utils.ToStringArray(filterData["ids"]) - f.Authors = utils.ToStringArray(filterData["authors"]) - f.Kinds = utils.ToIntArray(filterData["kinds"]) - f.Tags = utils.ToTagsMap(filterData["tags"]) - f.Since = utils.ToTime(filterData["since"]) - f.Until = utils.ToTime(filterData["until"]) - f.Limit = utils.ToInt(filterData["limit"]) - - filters[i] = f - } - - subscriptions[subID] = relay.Subscription{ID: subID, Filters: filters} - fmt.Println("Subscription added:", subID) - - // Query the database with filters and send back the results - // TO DO why is this taking a certain kind as an argument for collection??? - queriedEvents, err := QueryEvents(filters, db.GetClient(), "grain", "event-kind1") - if err != nil { - fmt.Println("Error querying events:", err) - return - } - - for _, evt := range queriedEvents { - msg := []interface{}{"EVENT", subID, evt} - msgBytes, _ := json.Marshal(msg) - err = websocket.Message.Send(ws, string(msgBytes)) - if err != nil { - fmt.Println("Error sending event:", err) - return - } - } - - // Indicate end of stored events - eoseMsg := []interface{}{"EOSE", subID} - eoseBytes, _ := json.Marshal(eoseMsg) - err = websocket.Message.Send(ws, string(eoseBytes)) - if err != nil { - fmt.Println("Error sending EOSE:", err) - return - } -} - -func handleClose(ws *websocket.Conn, message []interface{}) { - if len(message) != 2 { - fmt.Println("Invalid CLOSE message format") - return - } - - subID, ok := message[1].(string) - if !ok { - fmt.Println("Invalid subscription ID format") - return - } - - delete(subscriptions, subID) - fmt.Println("Subscription closed:", subID) - - closeMsg := []interface{}{"CLOSED", subID, "Subscription closed"} - closeBytes, _ := json.Marshal(closeMsg) - err := websocket.Message.Send(ws, string(closeBytes)) - if err != nil { - fmt.Println("Error sending CLOSE message:", err) - return - } -}