Distinguish between status messages and resource messages in SSE

This commit is contained in:
Eli Ribble 2026-04-28 17:06:21 +00:00
parent 38359e20e9
commit 52c41e29d8
No known key found for this signature in database
14 changed files with 106 additions and 41 deletions

View file

@ -33,6 +33,7 @@ type Status struct {
IsModified bool `json:"is_modified"`
Revision string `json:"revision"`
Status string `json:"status"`
Type string `json:"type"`
}
func (c *ConnectionSSE) SendEvent(w http.ResponseWriter, m platform.Event) error {
@ -113,6 +114,7 @@ func streamEvents(w http.ResponseWriter, r *http.Request, u platform.User) {
IsModified: v.IsModified,
Revision: v.Revision,
Status: "connected",
Type: "status",
}
body, err := json.Marshal(status)
if err != nil {
@ -121,7 +123,7 @@ func streamEvents(w http.ResponseWriter, r *http.Request, u platform.User) {
return
}
w.Write(body)
fmt.Fprintf(w, "data: %s\n\n", body)
w.(http.Flusher).Flush()
// Keep the connection open with a ticker sending periodic events

View file

@ -8,7 +8,7 @@ import { onMounted } from "vue";
import { apiClient } from "@/client";
import router from "@/route/config";
import { SSEManager, type SSEMessage } from "@/SSEManager";
import { SSEManager, type SSEMessageResource } from "@/SSEManager";
async function sentryInit() {
const config = await apiClient.JSONGet("/api");
@ -23,7 +23,7 @@ async function sentryInit() {
}
onMounted(() => {
SSEManager.connect("/api/events");
SSEManager.subscribe((msg: SSEMessage) => {
SSEManager.subscribe((msg: SSEMessageResource) => {
if (msg.type != "heartbeat") {
console.log("SSE", msg);
}

View file

@ -1,17 +1,27 @@
// Define types for the SSE data structure
export interface SSEMessage {
export interface SSEMessageBase {
type: string;
}
export interface SSEMessageResource extends SSEMessageBase {
resource: string;
time: string;
type: string;
uri: string;
}
type SSEHandler = (data: SSEMessage) => void;
export interface SSEMessageStatus extends SSEMessageBase {
build_time: Date;
is_modified: boolean;
revision: string;
status: string;
}
type SSEHandlerResource = (data: SSEMessageResource) => void;
type SSEHandlerStatus = (data: SSEMessageStatus) => void;
interface SSEManagerType {
connect: (url: string) => Promise<EventSource>;
disconnect: () => void;
subscribe: (handler: SSEHandler) => string;
subscribe: (handler: SSEHandlerResource) => string;
subscribeStatus: (handler: SSEHandlerStatus) => string;
unsubscribe: (uuid: string) => void;
ready: (callback: (eventSource: EventSource) => void) => void;
}
@ -26,7 +36,8 @@ declare global {
export const SSEManager: SSEManagerType = (function (): SSEManagerType {
let eventSource: EventSource | null = null;
let subscribers: Map<string, SSEHandler> = new Map();
let subscribersResource: Map<string, SSEHandlerResource> = new Map();
let subscribersStatus: Map<string, SSEHandlerStatus> = new Map();
let isConnected: boolean = false;
let connectionPromise: Promise<EventSource> | null = null;
@ -42,7 +53,7 @@ export const SSEManager: SSEManagerType = (function (): SSEManagerType {
isConnected = true;
eventSource!.addEventListener("message", (message: MessageEvent) => {
const data: SSEMessage = JSON.parse(message.data);
const data: SSEMessageBase = JSON.parse(message.data);
handleMessage(data);
});
@ -85,13 +96,18 @@ export const SSEManager: SSEManagerType = (function (): SSEManagerType {
}
}
function handleMessage(msg: SSEMessage) {
function handleMessage(msg: SSEMessageBase) {
if (msg.type == "heartbeat") {
return;
} else if (msg.type == "status") {
subscribersStatus.forEach((handler: SSEHandlerStatus, _: string) => {
handler(msg as SSEMessageStatus);
});
} else {
subscribersResource.forEach((handler: SSEHandlerResource, _: string) => {
handler(msg as SSEMessageResource);
});
}
subscribers.forEach((handler: SSEHandler, _: string) => {
handler(msg);
});
}
function ready(callback: (eventSource: EventSource) => void): void {
@ -108,15 +124,24 @@ export const SSEManager: SSEManagerType = (function (): SSEManagerType {
}
}
function subscribe(handler: SSEHandler): string {
function subscribe(handler: SSEHandlerResource): string {
const uuid = crypto.randomUUID();
subscribers.set(uuid.toString(), handler);
subscribersResource.set(uuid.toString(), handler);
return uuid;
}
function subscribeStatus(handler: SSEHandlerStatus): string {
const uuid = crypto.randomUUID();
subscribersStatus.set(uuid.toString(), handler);
return uuid;
}
function unsubscribe(uuid: string): void {
if (subscribers.has(uuid)) {
subscribers.delete(uuid);
if (subscribersResource.has(uuid)) {
subscribersResource.delete(uuid);
}
if (subscribersStatus.has(uuid)) {
subscribersStatus.delete(uuid);
}
}
@ -124,6 +149,7 @@ export const SSEManager: SSEManagerType = (function (): SSEManagerType {
connect,
disconnect,
subscribe,
subscribeStatus,
unsubscribe,
ready,
};

View file

@ -211,7 +211,7 @@
import { ref, reactive, onMounted, onBeforeUnmount, nextTick } from "vue";
import { Tooltip, Popover } from "bootstrap";
import NavigationLink from "@/components/common/NavigationLink.vue";
import { SSEManager, type SSEMessage } from "@/SSEManager";
import { SSEManager, type SSEMessageResource } from "@/SSEManager";
import { useSessionStore } from "@/store/session";
import type { Session } from "@/type/api";
@ -283,7 +283,7 @@ const setTooltipsForSidebar = () => {
// Lifecycle hooks
onMounted(async () => {
const sub = SSEManager.subscribe((msg: SSEMessage) => {
const sub = SSEManager.subscribe((msg: SSEMessageResource) => {
if (msg.resource != "sync:session") {
return;
}

View file

@ -2,7 +2,7 @@ import { defineStore } from "pinia";
import { ref } from "vue";
import { apiClient } from "@/client";
import { SSEManager, SSEMessage } from "@/SSEManager";
import { SSEManager, SSEMessageResource } from "@/SSEManager";
import { useSessionStore } from "@/store/session";
import { Communication, CommunicationDTO } from "@/type/api";
@ -13,7 +13,7 @@ export const useCommunicationStore = defineStore("communication", () => {
const error = ref(null);
// Subscription
SSEManager.subscribe((msg: SSEMessage) => {
SSEManager.subscribe((msg: SSEMessageResource) => {
if (msg.resource.startsWith("rmo:")) {
fetchAll();
}

View file

@ -1,6 +1,6 @@
import { defineStore } from "pinia";
import { ref } from "vue";
import { SSEManager, SSEMessage } from "@/SSEManager";
import { SSEManager, SSEMessageResource } from "@/SSEManager";
import { ReviewTask, ReviewTaskListResponse } from "@/type/api";
import { useSessionStore } from "@/store/session";
@ -11,7 +11,7 @@ export const useStoreReviewTask = defineStore("review-task", () => {
const error = ref<string | null>(null);
// Subscription
SSEManager.subscribe((msg: SSEMessage) => {
SSEManager.subscribe((msg: SSEMessageResource) => {
if (msg.resource.startsWith("sync:review-task")) {
fetchAll();
}

View file

@ -1,7 +1,7 @@
import { defineStore } from "pinia";
import { ref } from "vue";
import { ServiceRequest } from "@/type/api";
import { SSEManager, SSEMessage } from "@/SSEManager";
import { SSEManager, SSEMessageResource } from "@/SSEManager";
import { useSessionStore } from "@/store/session";
export const useStoreServiceRequest = defineStore("service-request", () => {
@ -11,7 +11,7 @@ export const useStoreServiceRequest = defineStore("service-request", () => {
const error = ref(null);
// Subscription
SSEManager.subscribe((msg: SSEMessage) => {
SSEManager.subscribe((msg: SSEMessageResource) => {
if (msg.resource.startsWith("sync:service-request")) {
fetchAll();
}

View file

@ -1,7 +1,7 @@
import * as axios from "axios";
import { defineStore } from "pinia";
import { ref } from "vue";
import { SSEManager, type SSEMessage } from "@/SSEManager";
import { SSEManager, type SSEMessageResource } from "@/SSEManager";
import {
Organization,
Session,
@ -38,7 +38,7 @@ export const useSessionStore = defineStore("session", () => {
const urls = ref<URLs | null>(null);
// Subscription
SSEManager.subscribe((msg: SSEMessage) => {
SSEManager.subscribe((msg: SSEMessageResource) => {
if (msg.type == "sync:session") {
fetchSession();
}

View file

@ -1,7 +1,7 @@
import { defineStore } from "pinia";
import { ref } from "vue";
import { Signal } from "@/type/api";
import { SSEManager, type SSEMessage } from "@/SSEManager";
import { SSEManager, type SSEMessageResource } from "@/SSEManager";
import { useSessionStore } from "@/store/session";
export const useSignalStore = defineStore("signal", () => {
@ -11,7 +11,7 @@ export const useSignalStore = defineStore("signal", () => {
const error = ref(null);
// Subscription
SSEManager.subscribe((msg: SSEMessage) => {
SSEManager.subscribe((msg: SSEMessageResource) => {
if (msg.resource.startsWith("sync:signal")) {
fetchAll();
}

View file

@ -1,6 +1,6 @@
import { defineStore } from "pinia";
import { ref } from "vue";
import { SSEManager, SSEMessage } from "@/SSEManager";
import { SSEManager, SSEMessageResource } from "@/SSEManager";
import { Site, SiteListResponse } from "@/type/api";
import { useSessionStore } from "@/store/session";
@ -11,7 +11,7 @@ export const useStoreSite = defineStore("site", () => {
const error = ref<string | null>(null);
// Subscription
SSEManager.subscribe((msg: SSEMessage) => {
SSEManager.subscribe((msg: SSEMessageResource) => {
if (msg.resource.startsWith("sync:site")) {
fetchAll();
}

View file

@ -1,7 +1,7 @@
import { defineStore } from "pinia";
import { ref } from "vue";
import { Sync } from "@/type/api";
import { SSEManager, SSEMessage } from "@/SSEManager";
import { SSEManager, SSEMessageResource } from "@/SSEManager";
import { useSessionStore } from "@/store/session";
export const useStoreSync = defineStore("sync", () => {
@ -11,7 +11,7 @@ export const useStoreSync = defineStore("sync", () => {
const error = ref(null);
// Subscription
SSEManager.subscribe((msg: SSEMessage) => {
SSEManager.subscribe((msg: SSEMessageResource) => {
if (msg.resource.startsWith("sync:sync")) {
fetchAll();
}

View file

@ -1,7 +1,7 @@
import { defineStore } from "pinia";
import { ref } from "vue";
import { Upload } from "@/type/api";
import { SSEManager, type SSEMessage } from "@/SSEManager";
import { SSEManager, type SSEMessageResource } from "@/SSEManager";
import { useSessionStore } from "@/store/session";
export const useUploadStore = defineStore("upload", () => {
@ -12,7 +12,7 @@ export const useUploadStore = defineStore("upload", () => {
const error = ref(null);
// Subscription
SSEManager.subscribe((msg: SSEMessage) => {
SSEManager.subscribe((msg: SSEMessageResource) => {
if (msg.resource.startsWith("sync:upload")) {
fetchAll();
}

View file

@ -1,7 +1,7 @@
import { defineStore } from "pinia";
import { ref } from "vue";
import { User } from "@/type/api";
import { SSEManager, type SSEMessage } from "@/SSEManager";
import { SSEManager, type SSEMessageResource } from "@/SSEManager";
import { useSessionStore } from "@/store/session";
export const useUserStore = defineStore("users", () => {
@ -13,7 +13,7 @@ export const useUserStore = defineStore("users", () => {
const ongoingFetch = ref<Promise<User[]> | null>(null);
// Subscription
SSEManager.subscribe((msg: SSEMessage) => {
SSEManager.subscribe((msg: SSEMessageResource) => {
if (msg.resource.startsWith("sync:user")) {
fetchAll();
}

View file

@ -1,6 +1,43 @@
package main
package version
var (
Version = "dev"
Commit = "none"
import (
"runtime/debug"
"time"
)
type VersionInfo struct {
BuildTime time.Time
IsModified bool
Revision string
}
func Get() VersionInfo {
info, ok := debug.ReadBuildInfo()
if !ok {
return VersionInfo{
BuildTime: time.Now(),
IsModified: false,
Revision: "unknown",
}
}
var version VersionInfo
for _, setting := range info.Settings {
switch setting.Key {
case "vcs.modified":
version.IsModified = setting.Value == "true"
case "vcs.revision":
if len(setting.Value) > 7 {
version.Revision = setting.Value[:7]
} else {
version.Revision = setting.Value
}
case "vcs.time":
if t, err := time.Parse(time.RFC3339, setting.Value); err == nil {
version.BuildTime = t
}
}
}
return version
}