handling import events with the same Event Handler

This commit is contained in:
0ceanSlim 2024-08-07 11:36:24 -04:00
parent 8d66e3decc
commit a2f7f0a5b4
2 changed files with 102 additions and 47 deletions

View File

@ -1,7 +1,6 @@
package api
import (
"context"
"encoding/json"
"fmt"
"html/template"
@ -9,11 +8,10 @@ import (
"log"
"net/http"
"strings"
"time"
"grain/server/db"
relay "grain/server/types"
"grain/config"
"go.mongodb.org/mongo-driver/mongo"
"golang.org/x/net/websocket"
)
@ -33,30 +31,41 @@ func ImportEvents(w http.ResponseWriter, r *http.Request) {
relayUrls := r.FormValue("relayUrls")
urls := strings.Split(relayUrls, ",")
var totalEvents int
var errorMessage string
totalEventsChan := make(chan int)
errorChan := make(chan error)
for _, url := range urls {
events, err := fetchEventsFromRelay(pubkey, url)
if err != nil {
log.Printf("Error fetching events from relay %s: %v", url, err)
errorMessage = fmt.Sprintf("Error fetching events from relay %s", url)
renderResult(w, false, errorMessage, 0)
return
go func() {
var totalEvents int
var err error
for _, url := range urls {
var events []map[string]interface{}
events, err = fetchEventsFromRelay(pubkey, url)
if err != nil {
errorChan <- fmt.Errorf("error fetching events from relay %s: %w", url, err)
return
}
err = sendEventsToRelay(events)
if err != nil {
errorChan <- fmt.Errorf("error sending events to relay: %w", err)
return
}
totalEvents += len(events)
}
err = storeEvents(events)
if err != nil {
log.Printf("Error storing events: %v", err)
errorMessage = "Error storing events"
renderResult(w, false, errorMessage, 0)
return
}
totalEventsChan <- totalEvents
}()
totalEvents += len(events)
select {
case totalEvents := <-totalEventsChan:
renderResult(w, true, "Events imported successfully", totalEvents)
case err := <-errorChan:
renderResult(w, false, err.Error(), 0)
case <-time.After(5 * time.Minute):
renderResult(w, false, "Timeout importing events", 0)
}
renderResult(w, true, "Events imported successfully", totalEvents)
}
func renderResult(w http.ResponseWriter, success bool, message string, count int) {
@ -83,7 +92,7 @@ func renderResult(w http.ResponseWriter, success bool, message string, count int
}
}
func fetchEventsFromRelay(pubkey, relayUrl string) ([]relay.Event, error) {
func fetchEventsFromRelay(pubkey, relayUrl string) ([]map[string]interface{}, error) {
log.Printf("Connecting to relay: %s", relayUrl)
conn, err := websocket.Dial(relayUrl, "", "http://localhost/")
if err != nil {
@ -100,7 +109,7 @@ func fetchEventsFromRelay(pubkey, relayUrl string) ([]relay.Event, error) {
return nil, err
}
var events []relay.Event
var events []map[string]interface{}
for {
var msg []byte
if err := websocket.Message.Receive(conn, &msg); err != nil {
@ -125,17 +134,12 @@ func fetchEventsFromRelay(pubkey, relayUrl string) ([]relay.Event, error) {
}
if response[0] == "EVENT" {
eventData, err := json.Marshal(response[2]) // Change index from 1 to 2
if err != nil {
log.Printf("Error marshaling event data from relay %s: %v", relayUrl, err)
eventData, ok := response[2].(map[string]interface{})
if !ok {
log.Printf("Invalid event data format from relay %s", relayUrl)
continue
}
var event relay.Event
if err := json.Unmarshal(eventData, &event); err != nil {
log.Printf("Error unmarshaling event data from relay %s: %v", relayUrl, err)
continue
}
events = append(events, event)
events = append(events, eventData)
}
}
@ -143,20 +147,58 @@ func fetchEventsFromRelay(pubkey, relayUrl string) ([]relay.Event, error) {
return events, nil
}
func storeEvents(events []relay.Event) error {
for _, event := range events {
collection := db.GetCollection(event.Kind)
_, err := collection.InsertOne(context.TODO(), event)
if err != nil {
if mongo.IsDuplicateKeyError(err) {
log.Printf("Duplicate event ID: %s for event kind: %d", event.ID, event.Kind)
} else {
log.Printf("Error inserting event with ID: %s for event kind: %d: %v", event.ID, event.Kind, err)
return err
}
} else {
log.Printf("Successfully inserted event with ID: %s for event kind: %d", event.ID, event.Kind)
func sendEventsToRelay(events []map[string]interface{}) error {
// Use the configuration to get the port
cfg, err := config.LoadConfig("config.yml")
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}
relayUrl := fmt.Sprintf("ws://localhost%s", cfg.Server.Port)
batchSize := 5 // Reduce the batch size to avoid connection issues
for i := 0; i < len(events); i += batchSize {
end := i + batchSize
if end > len(events) {
end = len(events)
}
batch := events[i:end]
if err := sendBatchToRelay(batch, relayUrl); err != nil {
return err
}
}
return nil
}
func sendBatchToRelay(events []map[string]interface{}, relayUrl string) error {
log.Printf("Connecting to local relay: %s", relayUrl)
conn, err := websocket.Dial(relayUrl, "", "http://localhost/")
if err != nil {
log.Printf("Error connecting to local relay: %v", err)
return err
}
defer conn.Close()
log.Printf("Connected to local relay: %s", relayUrl)
for _, event := range events {
eventMessage := []interface{}{"EVENT", event}
eventMessageBytes, err := json.Marshal(eventMessage)
if err != nil {
log.Printf("Error marshaling event message: %v", err)
return err
}
if _, err := conn.Write(eventMessageBytes); err != nil {
log.Printf("Error sending event message to local relay: %v", err)
return err
}
log.Printf("Sent event to local relay: %s", event["id"])
}
// Wait for a short period to avoid overloading the relay server
time.Sleep(100 * time.Millisecond)
return nil
}

View File

@ -49,5 +49,18 @@
>
Return to Dashboard
</button>
<script>
document
.getElementById("import-form")
.addEventListener("submit", function () {
document.getElementById("spinner").style.display = "block";
});
document
.getElementById("result")
.addEventListener("htmx:afterRequest", function () {
document.getElementById("spinner").style.display = "none";
});
</script>
</main>
{{end}}