diff --git a/relay/handlers/req.go b/relay/handlers/req.go index 75edfa1..fd4fd39 100644 --- a/relay/handlers/req.go +++ b/relay/handlers/req.go @@ -19,12 +19,14 @@ var subscriptions = make(map[string]relay.Subscription) func HandleReq(ws *websocket.Conn, message []interface{}) { if len(message) < 3 { fmt.Println("Invalid REQ message format") + SendClosed(ws, "", "invalid: invalid REQ message format") return } subID, ok := message[1].(string) if !ok { fmt.Println("Invalid subscription ID format") + SendClosed(ws, "", "invalid: invalid subscription ID format") return } @@ -33,6 +35,7 @@ func HandleReq(ws *websocket.Conn, message []interface{}) { filterData, ok := filter.(map[string]interface{}) if !ok { fmt.Println("Invalid filter format") + SendClosed(ws, subID, "invalid: invalid filter format") return } @@ -48,6 +51,12 @@ func HandleReq(ws *websocket.Conn, message []interface{}) { filters[i] = f } + // Check if subscription already exists + if _, exists := subscriptions[subID]; exists { + SendClosed(ws, subID, "duplicate: subID already opened") + return + } + subscriptions[subID] = relay.Subscription{ID: subID, Filters: filters} fmt.Println("Subscription added:", subID) @@ -55,6 +64,7 @@ func HandleReq(ws *websocket.Conn, message []interface{}) { queriedEvents, err := QueryEvents(filters, db.GetClient(), "grain") if err != nil { fmt.Println("Error querying events:", err) + SendClosed(ws, subID, "error: could not query events") return } @@ -64,6 +74,7 @@ func HandleReq(ws *websocket.Conn, message []interface{}) { err = websocket.Message.Send(ws, string(msgBytes)) if err != nil { fmt.Println("Error sending event:", err) + SendClosed(ws, subID, "error: could not send event") return } } @@ -74,6 +85,7 @@ func HandleReq(ws *websocket.Conn, message []interface{}) { err = websocket.Message.Send(ws, string(eoseBytes)) if err != nil { fmt.Println("Error sending EOSE:", err) + SendClosed(ws, subID, "error: could not send EOSE") return } } @@ -141,3 +153,9 @@ func QueryEvents(filters []relay.Filter, client *mongo.Client, databaseName stri return results, nil } + +func SendClosed(ws *websocket.Conn, subID string, message string) { + closeMsg := []interface{}{"CLOSED", subID, message} + closeBytes, _ := json.Marshal(closeMsg) + websocket.Message.Send(ws, string(closeBytes)) +}