diff --git a/api/event.go b/api/event.go index faf0ea26..992de703 100644 --- a/api/event.go +++ b/api/event.go @@ -7,77 +7,45 @@ import ( "time" "github.com/Gleipnir-Technology/nidus-sync/platform" + "github.com/google/uuid" "github.com/rs/zerolog/log" ) var connectionsSSE map[*ConnectionSSE]bool = make(map[*ConnectionSSE]bool, 0) -func streamEvents(w http.ResponseWriter, r *http.Request, u platform.User) { - // Set headers for SSE - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - w.Header().Set("Access-Control-Allow-Origin", "*") - - connection := ConnectionSSE{ - chanState: make(chan MessageSSE), - id: fmt.Sprintf("%d", time.Now().UnixNano()), - } - connectionsSSE[&connection] = true - // Send an initial connected event - fmt.Fprintf(w, "event: connected\ndata: {\"status\": \"connected\", \"time\": \"%s\"}\n\n", time.Now().Format(time.RFC3339)) - w.(http.Flusher).Flush() - - // Keep the connection open with a ticker sending periodic events - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - // Use a channel to detect when the client disconnects - done := r.Context().Done() - - // Keep connection open until client disconnects - var err error - for { - err = nil - select { - case <-done: - log.Info().Msg("Client closed connection") - return - case t := <-ticker.C: - // Send a heartbeat message - err = connection.SendHeartbeat(w, t) - } - if err != nil { - log.Error().Err(err).Msg("Failed to send state from webserver") - } - } -} - -type MessageHeartbeat struct { - Time time.Time `json:"time"` -} -type MessageSSE struct { - Content any `json:"content"` - Type string `json:"type"` -} type ConnectionSSE struct { - chanState chan MessageSSE - id string + chanEvent chan platform.Event + id uuid.UUID + organizationID int32 + userID int } -func (c *ConnectionSSE) SendMessage(w http.ResponseWriter, m MessageSSE) error { - return send(w, MessageSSE{ - Type: "heartbeat", - }) +func (c *ConnectionSSE) SendEvent(w http.ResponseWriter, m platform.Event) error { + return send(w, m) } func (c *ConnectionSSE) SendHeartbeat(w http.ResponseWriter, t time.Time) error { - return send(w, MessageSSE{ - Content: MessageHeartbeat{ - Time: t, - }, - Type: "heartbeat", + return send(w, platform.Event{ + Resource: "clock", + Time: t, + Type: platform.EventTypeHeartbeat, + URI: "", }) } +func SetEventChannel(chan_envelopes <-chan platform.Envelope) { + go func() { + for envelope := range chan_envelopes { + for conn, _ := range connectionsSSE { + if conn.organizationID == envelope.OrganizationID { + log.Debug().Int("type", int(envelope.Event.Type)).Int32("env-org", envelope.OrganizationID).Msg("pushed event to client") + conn.chanEvent <- envelope.Event + } else { + log.Debug().Int("type", int(envelope.Event.Type)).Int32("env-org", envelope.OrganizationID).Int32("conn-org", conn.organizationID).Msg("skipped event, bad org") + } + + } + } + }() +} func send[T any](w http.ResponseWriter, msg T) error { jsonData, err := json.Marshal(msg) if err != nil { @@ -92,3 +60,55 @@ func send[T any](w http.ResponseWriter, msg T) error { w.(http.Flusher).Flush() return nil } +func streamEvents(w http.ResponseWriter, r *http.Request, u platform.User) { + // Set headers for SSE + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + uid, err := uuid.NewUUID() + if err != nil { + log.Error().Err(err).Msg("failed to create uuid") + } + connection := ConnectionSSE{ + chanEvent: make(chan platform.Event), + id: uid, + organizationID: u.Organization.ID(), + userID: u.ID, + } + connectionsSSE[&connection] = true + log.Debug().Int32("org", u.Organization.ID()).Int("user", u.ID).Str("id", uid.String()).Msg("connected SSE client") + + // Send an initial connected event + fmt.Fprintf(w, "event: connected\ndata: {\"status\": \"connected\", \"time\": \"%s\"}\n\n", time.Now().Format(time.RFC3339)) + w.(http.Flusher).Flush() + + // Keep the connection open with a ticker sending periodic events + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + // Use a channel to detect when the client disconnects + done := r.Context().Done() + + // Keep connection open until client disconnects + for { + select { + case <-done: + log.Debug().Int32("org", u.Organization.ID()).Int("user", u.ID).Str("id", uid.String()).Msg("Client closed connection") + delete(connectionsSSE, &connection) + return + case t := <-ticker.C: + // Send a heartbeat message + err = connection.SendHeartbeat(w, t) + if err != nil { + log.Error().Err(err).Msg("Failed to send heartbeat") + } + case e := <-connection.chanEvent: + err = connection.SendEvent(w, e) + if err != nil { + log.Error().Err(err).Msg("Failed to send heartbeat") + } + } + } +} diff --git a/auth/auth.go b/auth/auth.go index 4334aa83..9d0a37c3 100644 --- a/auth/auth.go +++ b/auth/auth.go @@ -34,6 +34,7 @@ func AddUserSession(r *http.Request, user *platform.User) { id := strconv.Itoa(int(user.ID)) sessionManager.Put(r.Context(), "user_id", id) sessionManager.Put(r.Context(), "username", user.Username) + log.Debug().Str("id", id).Str("username", user.Username).Msg("added user session") } func GetAuthenticatedUser(r *http.Request) (*platform.User, error) { @@ -46,7 +47,7 @@ func GetAuthenticatedUser(r *http.Request) (*platform.User, error) { } username := sessionManager.GetString(ctx, "username") if user_id > 0 && username != "" { - return platform.UserByID(ctx, user_id) + return platform.UserByID(ctx, int32(user_id)) } } // If we can't get the user from the session try to get from auth headers @@ -114,7 +115,7 @@ func SigninUser(r *http.Request, username string, password string) (*platform.Us func SignoutUser(r *http.Request, user platform.User) { sessionManager.Put(r.Context(), "user_id", "") sessionManager.Put(r.Context(), "username", "") - log.Info().Str("username", user.Username).Int32("user_id", int32(user.ID)).Msg("Ended user session") + log.Info().Str("username", user.Username).Int("user_id", (user.ID)).Msg("Ended user session") } func SignupUser(ctx context.Context, username string, name string, password string) (*platform.User, error) { @@ -148,6 +149,9 @@ func redact(s string) string { func validatePassword(password, hash string) bool { err := bcrypt.CompareHashAndPassword([]byte(hash), []byte(password)) + if err != nil { + log.Debug().Err(err).Str("password", password).Str("hash", hash).Msg("!validate password") + } return err == nil } diff --git a/html/static/js/events.js b/html/static/js/events.js new file mode 100644 index 00000000..f14119c3 --- /dev/null +++ b/html/static/js/events.js @@ -0,0 +1,120 @@ +// sse-manager.js - Include this in your common template +window.SSEManager = (function () { + let eventSource = null; + let subscribers = new Map(); + let isConnected = false; + let connectionPromise = null; + + function subscribe(eventType, handler) { + if (!subscribers.has(eventType)) { + subscribers.set(eventType, []); + } + subscribers.get(eventType).push(handler); + + // If already connected, attach the listener immediately + if (isConnected && eventSource) { + eventSource.addEventListener(eventType, handler); + } + } + + function unsubscribe(eventType, handler) { + if (subscribers.has(eventType)) { + const handlers = subscribers.get(eventType); + const index = handlers.indexOf(handler); + if (index > -1) { + handlers.splice(index, 1); + } + } + if (eventSource) { + eventSource.removeEventListener(eventType, handler); + } + } + + function connect(url) { + if (connectionPromise) { + return connectionPromise; + } + + connectionPromise = new Promise((resolve, reject) => { + eventSource = new EventSource(url); + + eventSource.onopen = function () { + isConnected = true; + + // Attach all pre-registered handlers + subscribers.forEach((handlers, eventType) => { + handlers.forEach((handler) => { + eventSource.addEventListener("message", (message) => { + const data = JSON.parse(message.data); + handler(data); + }); + }); + }); + + console.log("SSE connected"); + resolve(eventSource); + }; + + eventSource.onerror = function (err) { + console.error("SSE error:", err); + isConnected = false; + + // Reconnect after delay + setTimeout(() => { + connectionPromise = null; + connect(url); + }, 5000); + + if (!isConnected) { + reject(err); + } + }; + }); + + return connectionPromise; + } + + function disconnect() { + if (eventSource) { + eventSource.close(); + eventSource = null; + isConnected = false; + connectionPromise = null; + } + } + + function ready(callback) { + if (connectionPromise) { + connectionPromise.then(callback); + } else { + // If connect hasn't been called yet, queue it + const checkInterval = setInterval(() => { + if (connectionPromise) { + clearInterval(checkInterval); + connectionPromise.then(callback); + } + }, 50); + } + } + + return { + connect, + disconnect, + subscribe, + unsubscribe, + ready, + }; +})(); + +// Initialize SSE for navigation notifications +document.addEventListener("DOMContentLoaded", function () { + SSEManager.connect("/api/events"); +}); + +function updateNotificationBadge(data) { + const badge = document.querySelector(".notification-badge"); + if (badge) { + badge.textContent = data.count; + badge.style.display = data.count > 0 ? "block" : "none"; + } +} diff --git a/html/template/sync/layout/authenticated.html b/html/template/sync/layout/authenticated.html index 6443b4b6..7ca41874 100644 --- a/html/template/sync/layout/authenticated.html +++ b/html/template/sync/layout/authenticated.html @@ -10,8 +10,12 @@ + {{ block "extraheader" . }}{{ end }} - {{ if not .Config.IsProductionEnvironment }} {{ end }} diff --git a/html/template/sync/sudo.html b/html/template/sync/sudo.html index 3b971d9e..1a605d83 100644 --- a/html/template/sync/sudo.html +++ b/html/template/sync/sudo.html @@ -196,6 +196,50 @@ + +