2026-03-12 23:49:16 +00:00
package api
import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/Gleipnir-Technology/nidus-sync/platform"
2026-03-13 17:33:39 +00:00
"github.com/google/uuid"
2026-03-12 23:49:16 +00:00
"github.com/rs/zerolog/log"
)
2026-03-13 00:03:23 +00:00
var connectionsSSE map [ * ConnectionSSE ] bool = make ( map [ * ConnectionSSE ] bool , 0 )
2026-03-13 17:33:39 +00:00
type ConnectionSSE struct {
chanEvent chan platform . Event
id uuid . UUID
organizationID int32
2026-04-02 21:31:31 +00:00
userID int32
2026-03-13 17:33:39 +00:00
}
2026-03-13 18:21:20 +00:00
type Message struct {
Resource string ` json:"resource" `
Time time . Time ` json:"time" `
Type string ` json:"type" `
URI string ` json:"uri" `
}
2026-03-13 17:33:39 +00:00
func ( c * ConnectionSSE ) SendEvent ( w http . ResponseWriter , m platform . Event ) error {
2026-03-13 18:21:20 +00:00
return send ( w , Message {
Resource : m . Resource ,
Time : m . Time ,
Type : m . Type . String ( ) ,
URI : m . URI ,
} )
2026-03-13 17:33:39 +00:00
}
func ( c * ConnectionSSE ) SendHeartbeat ( w http . ResponseWriter , t time . Time ) error {
return send ( w , platform . Event {
Resource : "clock" ,
Time : t ,
Type : platform . EventTypeHeartbeat ,
URI : "" ,
} )
}
func SetEventChannel ( chan_envelopes <- chan platform . Envelope ) {
go func ( ) {
for envelope := range chan_envelopes {
for conn , _ := range connectionsSSE {
if conn . organizationID == envelope . OrganizationID {
log . Debug ( ) . Int ( "type" , int ( envelope . Event . Type ) ) . Int32 ( "env-org" , envelope . OrganizationID ) . Msg ( "pushed event to client" )
conn . chanEvent <- envelope . Event
2026-04-02 21:31:31 +00:00
} else if conn . userID == envelope . UserID {
log . Debug ( ) . Int ( "type" , int ( envelope . Event . Type ) ) . Int32 ( "env-user" , envelope . UserID ) . Msg ( "pushed event to user" )
conn . chanEvent <- envelope . Event
2026-03-13 17:33:39 +00:00
} else {
log . Debug ( ) . Int ( "type" , int ( envelope . Event . Type ) ) . Int32 ( "env-org" , envelope . OrganizationID ) . Int32 ( "conn-org" , conn . organizationID ) . Msg ( "skipped event, bad org" )
}
}
}
} ( )
}
func send [ T any ] ( w http . ResponseWriter , msg T ) error {
jsonData , err := json . Marshal ( msg )
if err != nil {
return fmt . Errorf ( "marshaling json: %w" , err )
}
// Write in SSE format: "data: <json>\n\n"
_ , err = fmt . Fprintf ( w , "data: %s\n\n" , jsonData )
if err != nil {
return fmt . Errorf ( "writing SSE message: %w" , err )
}
w . ( http . Flusher ) . Flush ( )
return nil
}
2026-03-12 23:49:16 +00:00
func streamEvents ( w http . ResponseWriter , r * http . Request , u platform . User ) {
2026-03-13 00:03:23 +00:00
// Set headers for SSE
w . Header ( ) . Set ( "Content-Type" , "text/event-stream" )
w . Header ( ) . Set ( "Cache-Control" , "no-cache" )
w . Header ( ) . Set ( "Connection" , "keep-alive" )
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" )
2026-03-13 17:33:39 +00:00
uid , err := uuid . NewUUID ( )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "failed to create uuid" )
}
2026-03-13 00:03:23 +00:00
connection := ConnectionSSE {
2026-03-13 17:33:39 +00:00
chanEvent : make ( chan platform . Event ) ,
id : uid ,
2026-03-22 01:22:44 +00:00
organizationID : u . Organization . ID ,
2026-04-02 21:31:31 +00:00
userID : int32 ( u . ID ) ,
2026-03-13 00:03:23 +00:00
}
connectionsSSE [ & connection ] = true
2026-03-22 01:22:44 +00:00
log . Debug ( ) . Int32 ( "org" , u . Organization . ID ) . Int ( "user" , u . ID ) . Str ( "id" , uid . String ( ) ) . Msg ( "connected SSE client" )
2026-03-13 17:33:39 +00:00
2026-03-13 00:03:23 +00:00
// Send an initial connected event
fmt . Fprintf ( w , "event: connected\ndata: {\"status\": \"connected\", \"time\": \"%s\"}\n\n" , time . Now ( ) . Format ( time . RFC3339 ) )
w . ( http . Flusher ) . Flush ( )
// Keep the connection open with a ticker sending periodic events
ticker := time . NewTicker ( 5 * time . Second )
defer ticker . Stop ( )
// Use a channel to detect when the client disconnects
done := r . Context ( ) . Done ( )
// Keep connection open until client disconnects
for {
select {
case <- done :
2026-03-22 01:22:44 +00:00
log . Debug ( ) . Int32 ( "org" , u . Organization . ID ) . Int ( "user" , u . ID ) . Str ( "id" , uid . String ( ) ) . Msg ( "Client closed connection" )
2026-03-13 17:33:39 +00:00
delete ( connectionsSSE , & connection )
2026-03-13 00:03:23 +00:00
return
case t := <- ticker . C :
// Send a heartbeat message
err = connection . SendHeartbeat ( w , t )
2026-03-13 17:33:39 +00:00
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Failed to send heartbeat" )
}
case e := <- connection . chanEvent :
err = connection . SendEvent ( w , e )
if err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Failed to send heartbeat" )
}
2026-03-13 00:03:23 +00:00
}
2026-03-12 23:49:16 +00:00
}
}