mirror of
https://github.com/0ceanSlim/grain.git
synced 2024-11-22 00:27:14 +00:00
properly update subscriptions, sub ID's per connection
This commit is contained in:
parent
ae2f98aafc
commit
5ea0d36ba3
@ -17,7 +17,7 @@ import (
|
|||||||
|
|
||||||
var subscriptions = make(map[string]relay.Subscription)
|
var subscriptions = make(map[string]relay.Subscription)
|
||||||
|
|
||||||
func HandleReq(ws *websocket.Conn, message []interface{}) {
|
func HandleReq(ws *websocket.Conn, message []interface{}, subscriptions map[string][]relay.Filter) {
|
||||||
if len(message) < 3 {
|
if len(message) < 3 {
|
||||||
fmt.Println("Invalid REQ message format")
|
fmt.Println("Invalid REQ message format")
|
||||||
response.SendClosed(ws, "", "invalid: invalid REQ message format")
|
response.SendClosed(ws, "", "invalid: invalid REQ message format")
|
||||||
@ -52,14 +52,9 @@ func HandleReq(ws *websocket.Conn, message []interface{}) {
|
|||||||
filters[i] = f
|
filters[i] = f
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if subscription already exists
|
// Update or add the subscription for the given subID
|
||||||
if _, exists := subscriptions[subID]; exists {
|
subscriptions[subID] = filters
|
||||||
response.SendClosed(ws, subID, "duplicate: subID already opened")
|
fmt.Println("Subscription updated:", subID)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
subscriptions[subID] = relay.Subscription{ID: subID, Filters: filters}
|
|
||||||
fmt.Println("Subscription added:", subID)
|
|
||||||
|
|
||||||
// Query the database with filters and send back the results
|
// Query the database with filters and send back the results
|
||||||
queriedEvents, err := QueryEvents(filters, db.GetClient(), "grain")
|
queriedEvents, err := QueryEvents(filters, db.GetClient(), "grain")
|
||||||
|
@ -7,6 +7,8 @@ import (
|
|||||||
|
|
||||||
"grain/config"
|
"grain/config"
|
||||||
|
|
||||||
|
relay "grain/server/types"
|
||||||
|
|
||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -16,11 +18,12 @@ func WebSocketHandler(ws *websocket.Conn) {
|
|||||||
var msg string
|
var msg string
|
||||||
rateLimiter := config.GetRateLimiter()
|
rateLimiter := config.GetRateLimiter()
|
||||||
|
|
||||||
|
subscriptions := make(map[string][]relay.Filter) // Subscription map scoped to the connection
|
||||||
|
|
||||||
for {
|
for {
|
||||||
err := websocket.Message.Receive(ws, &msg)
|
err := websocket.Message.Receive(ws, &msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Error receiving message:", err)
|
fmt.Println("Error receiving message:", err)
|
||||||
// Send a close message with an error code and reason
|
|
||||||
ws.Close()
|
ws.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -59,7 +62,7 @@ func WebSocketHandler(ws *websocket.Conn) {
|
|||||||
ws.Close()
|
ws.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
handlers.HandleReq(ws, message)
|
handlers.HandleReq(ws, message, subscriptions)
|
||||||
case "CLOSE":
|
case "CLOSE":
|
||||||
handlers.HandleClose(ws, message)
|
handlers.HandleClose(ws, message)
|
||||||
default:
|
default:
|
||||||
|
Loading…
Reference in New Issue
Block a user