Initiate events connection on all authenticated pages
This commit is contained in:
parent
44c4f17f32
commit
f29047f723
2 changed files with 41 additions and 49 deletions
89
api/event.go
89
api/event.go
|
|
@ -10,7 +10,47 @@ import (
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var connectionsSSE map[*ConnectionSSE]bool = make(map[*ConnectionSSE]bool, 0)
|
||||||
|
|
||||||
func streamEvents(w http.ResponseWriter, r *http.Request, u platform.User) {
|
func streamEvents(w http.ResponseWriter, r *http.Request, u platform.User) {
|
||||||
|
// Set headers for SSE
|
||||||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
w.Header().Set("Cache-Control", "no-cache")
|
||||||
|
w.Header().Set("Connection", "keep-alive")
|
||||||
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||||
|
|
||||||
|
connection := ConnectionSSE{
|
||||||
|
chanState: make(chan MessageSSE),
|
||||||
|
id: fmt.Sprintf("%d", time.Now().UnixNano()),
|
||||||
|
}
|
||||||
|
connectionsSSE[&connection] = true
|
||||||
|
// Send an initial connected event
|
||||||
|
fmt.Fprintf(w, "event: connected\ndata: {\"status\": \"connected\", \"time\": \"%s\"}\n\n", time.Now().Format(time.RFC3339))
|
||||||
|
w.(http.Flusher).Flush()
|
||||||
|
|
||||||
|
// Keep the connection open with a ticker sending periodic events
|
||||||
|
ticker := time.NewTicker(5 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
// Use a channel to detect when the client disconnects
|
||||||
|
done := r.Context().Done()
|
||||||
|
|
||||||
|
// Keep connection open until client disconnects
|
||||||
|
var err error
|
||||||
|
for {
|
||||||
|
err = nil
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
log.Info().Msg("Client closed connection")
|
||||||
|
return
|
||||||
|
case t := <-ticker.C:
|
||||||
|
// Send a heartbeat message
|
||||||
|
err = connection.SendHeartbeat(w, t)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to send state from webserver")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type MessageHeartbeat struct {
|
type MessageHeartbeat struct {
|
||||||
|
|
@ -52,52 +92,3 @@ func send[T any](w http.ResponseWriter, msg T) error {
|
||||||
w.(http.Flusher).Flush()
|
w.(http.Flusher).Flush()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type Webserver struct {
|
|
||||||
connections map[*ConnectionSSE]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// sseHandler handles the Server-Sent Events connection
|
|
||||||
func (web *Webserver) sseHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
// Set headers for SSE
|
|
||||||
w.Header().Set("Content-Type", "text/event-stream")
|
|
||||||
w.Header().Set("Cache-Control", "no-cache")
|
|
||||||
w.Header().Set("Connection", "keep-alive")
|
|
||||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
||||||
|
|
||||||
connection := ConnectionSSE{
|
|
||||||
chanState: make(chan MessageSSE),
|
|
||||||
id: fmt.Sprintf("%d", time.Now().UnixNano()),
|
|
||||||
}
|
|
||||||
web.connections[&connection] = true
|
|
||||||
// Send an initial connected event
|
|
||||||
fmt.Fprintf(w, "event: connected\ndata: {\"status\": \"connected\", \"time\": \"%s\"}\n\n", time.Now().Format(time.RFC3339))
|
|
||||||
w.(http.Flusher).Flush()
|
|
||||||
|
|
||||||
// Keep the connection open with a ticker sending periodic events
|
|
||||||
ticker := time.NewTicker(5 * time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
// Use a channel to detect when the client disconnects
|
|
||||||
done := r.Context().Done()
|
|
||||||
|
|
||||||
// Keep connection open until client disconnects
|
|
||||||
var err error
|
|
||||||
for {
|
|
||||||
err = nil
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
log.Info().Msg("Client closed connection")
|
|
||||||
return
|
|
||||||
case t := <-ticker.C:
|
|
||||||
// Send a heartbeat message
|
|
||||||
err = connection.SendHeartbeat(w, t)
|
|
||||||
//case state := <-connection.chanState:
|
|
||||||
//log.Debug().Msg("Sending new state to connection")
|
|
||||||
//err = connection.SendState(w, state)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Msg("Failed to send state from webserver")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -60,6 +60,7 @@
|
||||||
setTooltipsForSidebar();
|
setTooltipsForSidebar();
|
||||||
});
|
});
|
||||||
</script>
|
</script>
|
||||||
|
<script src="/static/js/events.js"></script>
|
||||||
{{ if not .Config.IsProductionEnvironment }}
|
{{ if not .Config.IsProductionEnvironment }}
|
||||||
<script src="/.flogo/injector.js"></script>
|
<script src="/.flogo/injector.js"></script>
|
||||||
{{ end }}
|
{{ end }}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue