grain/server/relay.go

115 lines
2.5 KiB
Go
Raw Normal View History

2024-07-23 17:30:29 +00:00
package relay
2024-07-20 12:41:47 +00:00
import (
"encoding/json"
"fmt"
2024-07-30 15:27:38 +00:00
"grain/config"
"grain/server/handlers"
relay "grain/server/types"
"grain/server/utils"
"log"
"sync"
2024-07-20 12:41:47 +00:00
"golang.org/x/net/websocket"
)
// Global connection count
var (
currentConnections int
mu sync.Mutex
)
// Client subscription count
var clientSubscriptions = make(map[*websocket.Conn]int)
2024-07-25 13:57:24 +00:00
func WebSocketHandler(ws *websocket.Conn) {
defer func() {
mu.Lock()
currentConnections--
delete(clientSubscriptions, ws)
mu.Unlock()
ws.Close()
}()
mu.Lock()
if currentConnections >= config.GetConfig().Server.MaxConnections {
websocket.Message.Send(ws, `{"error": "too many connections"}`)
mu.Unlock()
return
}
currentConnections++
mu.Unlock()
2024-08-09 13:03:14 +00:00
clientInfo := utils.ClientInfo{
IP: utils.GetClientIP(ws.Request()),
UserAgent: ws.Request().Header.Get("User-Agent"),
Origin: ws.Request().Header.Get("Origin"),
}
log.Printf("New connection from IP: %s, User-Agent: %s, Origin: %s", clientInfo.IP, clientInfo.UserAgent, clientInfo.Origin)
2024-07-20 12:41:47 +00:00
var msg string
2024-07-30 15:27:38 +00:00
rateLimiter := config.GetRateLimiter()
2024-07-26 14:02:34 +00:00
subscriptions := make(map[string][]relay.Filter) // Subscription map scoped to the connection
clientSubscriptions[ws] = 0
2024-07-20 12:41:47 +00:00
for {
err := websocket.Message.Receive(ws, &msg)
if err != nil {
fmt.Println("Error receiving message:", err)
2024-07-30 13:31:26 +00:00
ws.Close()
2024-07-20 12:41:47 +00:00
return
}
fmt.Println("Received message:", msg)
2024-07-26 14:02:34 +00:00
if allowed, msg := rateLimiter.AllowWs(); !allowed {
websocket.Message.Send(ws, fmt.Sprintf(`{"error": "%s"}`, msg))
2024-07-30 13:31:26 +00:00
ws.Close()
2024-07-26 14:02:34 +00:00
return
}
2024-07-20 12:41:47 +00:00
var message []interface{}
err = json.Unmarshal([]byte(msg), &message)
if err != nil {
fmt.Println("Error parsing message:", err)
continue
}
if len(message) < 2 {
fmt.Println("Invalid message format")
continue
}
messageType, ok := message[0].(string)
if !ok {
fmt.Println("Invalid message type")
continue
}
switch messageType {
case "EVENT":
2024-07-25 13:57:24 +00:00
handlers.HandleEvent(ws, message)
2024-07-20 12:41:47 +00:00
case "REQ":
mu.Lock()
if clientSubscriptions[ws] >= config.GetConfig().Server.MaxSubscriptionsPerClient {
websocket.Message.Send(ws, `{"error": "too many subscriptions"}`)
mu.Unlock()
continue
}
clientSubscriptions[ws]++
mu.Unlock()
2024-07-26 14:02:34 +00:00
if allowed, msg := rateLimiter.AllowReq(); !allowed {
websocket.Message.Send(ws, fmt.Sprintf(`{"error": "%s"}`, msg))
2024-07-30 13:31:26 +00:00
ws.Close()
2024-07-26 14:02:34 +00:00
return
}
handlers.HandleReq(ws, message, subscriptions)
2024-07-20 12:41:47 +00:00
case "CLOSE":
handlers.HandleClose(ws, message)
2024-07-20 12:41:47 +00:00
default:
fmt.Println("Unknown message type:", messageType)
}
}
2024-07-25 13:57:24 +00:00
}