added sendClose Func in req

This commit is contained in:
Chris kerr 2024-07-27 10:50:04 -04:00
parent ffbfe76b42
commit a7518e83a5

View File

@ -19,12 +19,14 @@ var subscriptions = make(map[string]relay.Subscription)
func HandleReq(ws *websocket.Conn, message []interface{}) { func HandleReq(ws *websocket.Conn, message []interface{}) {
if len(message) < 3 { if len(message) < 3 {
fmt.Println("Invalid REQ message format") fmt.Println("Invalid REQ message format")
SendClosed(ws, "", "invalid: invalid REQ message format")
return return
} }
subID, ok := message[1].(string) subID, ok := message[1].(string)
if !ok { if !ok {
fmt.Println("Invalid subscription ID format") fmt.Println("Invalid subscription ID format")
SendClosed(ws, "", "invalid: invalid subscription ID format")
return return
} }
@ -33,6 +35,7 @@ func HandleReq(ws *websocket.Conn, message []interface{}) {
filterData, ok := filter.(map[string]interface{}) filterData, ok := filter.(map[string]interface{})
if !ok { if !ok {
fmt.Println("Invalid filter format") fmt.Println("Invalid filter format")
SendClosed(ws, subID, "invalid: invalid filter format")
return return
} }
@ -48,6 +51,12 @@ func HandleReq(ws *websocket.Conn, message []interface{}) {
filters[i] = f filters[i] = f
} }
// Check if subscription already exists
if _, exists := subscriptions[subID]; exists {
SendClosed(ws, subID, "duplicate: subID already opened")
return
}
subscriptions[subID] = relay.Subscription{ID: subID, Filters: filters} subscriptions[subID] = relay.Subscription{ID: subID, Filters: filters}
fmt.Println("Subscription added:", subID) fmt.Println("Subscription added:", subID)
@ -55,6 +64,7 @@ func HandleReq(ws *websocket.Conn, message []interface{}) {
queriedEvents, err := QueryEvents(filters, db.GetClient(), "grain") queriedEvents, err := QueryEvents(filters, db.GetClient(), "grain")
if err != nil { if err != nil {
fmt.Println("Error querying events:", err) fmt.Println("Error querying events:", err)
SendClosed(ws, subID, "error: could not query events")
return return
} }
@ -64,6 +74,7 @@ func HandleReq(ws *websocket.Conn, message []interface{}) {
err = websocket.Message.Send(ws, string(msgBytes)) err = websocket.Message.Send(ws, string(msgBytes))
if err != nil { if err != nil {
fmt.Println("Error sending event:", err) fmt.Println("Error sending event:", err)
SendClosed(ws, subID, "error: could not send event")
return return
} }
} }
@ -74,6 +85,7 @@ func HandleReq(ws *websocket.Conn, message []interface{}) {
err = websocket.Message.Send(ws, string(eoseBytes)) err = websocket.Message.Send(ws, string(eoseBytes))
if err != nil { if err != nil {
fmt.Println("Error sending EOSE:", err) fmt.Println("Error sending EOSE:", err)
SendClosed(ws, subID, "error: could not send EOSE")
return return
} }
} }
@ -141,3 +153,9 @@ func QueryEvents(filters []relay.Filter, client *mongo.Client, databaseName stri
return results, nil return results, nil
} }
func SendClosed(ws *websocket.Conn, subID string, message string) {
closeMsg := []interface{}{"CLOSED", subID, message}
closeBytes, _ := json.Marshal(closeMsg)
websocket.Message.Send(ws, string(closeBytes))
}