Massive rework of platform layer user/organization

The goal of this rework is to make it so I can pass around platform.User
instead of a pair of models.Organization and models.User. This is useful
for reason I kind of forget now, but it started with working on
notifications and ballooned massively from there into refactoring a
number of things that were bugging me.

This also includes a tiny amount of work on server-side events (SSE).

 * background stuff lives inside the platform now, which I need for
   having it push updates through SSE
 * userfile now lives in the platform, under file, so other platform
   functions can safely use it
 * oauth is broken into pieces and inside platform because other stuff
   was calling it already, but badly.
 * notifications go into the platform as well
This commit is contained in:
Eli Ribble 2026-03-12 23:49:16 +00:00
parent 32dcc50c94
commit 44c4f17f32
No known key found for this signature in database
85 changed files with 1492 additions and 1384 deletions

View file

@ -1,227 +0,0 @@
package background
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/label-studio"
"github.com/Gleipnir-Technology/nidus-sync/minio"
"github.com/google/uuid"
)
type jobLabelStudio struct {
UUID uuid.UUID
}
var channelJobLabelStudio chan jobLabelStudio
func enqueueLabelStudioJob(job jobLabelStudio) {
select {
case channelJobLabelStudio <- job:
log.Printf("Enqueued label job for UUID: %s", job.UUID)
default:
log.Printf("Label job channel is full, dropping job for UUID: %s", job.UUID)
}
}
func StartLabelStudioWorker(ctx context.Context) error {
// Initialize the minio client
minioBucket := os.Getenv("S3_BUCKET")
labelStudioClient, err := createLabelStudioClient()
if err != nil {
return fmt.Errorf("Failed to create label studio client: %v", err)
}
// Get the project we are going to upload to
project, err := findLabelStudioProject(labelStudioClient, "Nidus Speech-to-Text Transcriptions")
if err != nil {
return errors.New(fmt.Sprintf("Failed to find the label studio project"))
}
minioClient, err := createMinioClient()
if err != nil {
return fmt.Errorf("Failed to create minio client: %v", err)
}
buffer := 100
channelJobLabelStudio = make(chan jobLabelStudio, buffer) // Buffered channel to prevent blocking
log.Printf("Started label studio worker with buffer depth %d", buffer)
go func() {
for {
select {
case <-ctx.Done():
log.Println("Audio worker shutting down.")
return
case job := <-channelJobLabelStudio:
log.Printf("Processing label job for UUID: %s", job.UUID)
err := processLabelTask(ctx, minioClient, minioBucket, labelStudioClient, project, job)
if err != nil {
log.Printf("Error processing label job for audio file %s: %v", job.UUID, err)
}
}
}
}()
return nil
}
func createMinioClient() (*minio.Client, error) {
baseUrl := os.Getenv("S3_BASE_URL")
accessKeyID := os.Getenv("S3_ACCESS_KEY_ID")
secretAccessKey := os.Getenv("S3_SECRET_ACCESS_KEY")
client, err := minio.NewClient(baseUrl, accessKeyID, secretAccessKey)
if err != nil {
return nil, err
}
log.Println("Created minio client")
return client, err
}
func createLabelStudioClient() (*labelstudio.Client, error) {
// Initialize the client with your Label Studio base URL and API key
labelStudioApiKey := os.Getenv("LABEL_STUDIO_API_KEY")
labelStudioBaseUrl := os.Getenv("LABEL_STUDIO_BASE_URL")
labelStudioClient := labelstudio.NewClient(labelStudioBaseUrl, labelStudioApiKey)
log.Println("Created label studio client")
// Get and store the access token
err := labelStudioClient.GetAccessToken()
if err != nil {
return nil, errors.New(fmt.Sprintf("Failed to get access token: %v", err))
}
log.Println("Got label studio client access token")
return labelStudioClient, nil
}
func processLabelTask(ctx context.Context, minioClient *minio.Client, minioBucket string, labelStudioClient *labelstudio.Client, project *labelstudio.Project, job jobLabelStudio) error {
customer := os.Getenv("CUSTOMER")
if customer == "" {
return errors.New("You must specify a CUSTOMER env var")
}
note, err := db.NoteAudioGetLatest(ctx, job.UUID.String())
if err != nil {
return errors.New(fmt.Sprintf("Failed to get note %s", note.UUID))
}
if note.Version != 1 {
return errors.New(fmt.Sprintf("Got version %d of %s", note.Version, note.UUID))
}
task, err := findMatchingTask(labelStudioClient, project, customer, note)
if err != nil {
return errors.New(fmt.Sprintf("Failed to search for a task: %v", err))
}
// We already have a task, nothing to do.
if task != nil {
return nil
}
err = createTask(labelStudioClient, project, minioClient, minioBucket, customer, note)
if err != nil {
return errors.New(fmt.Sprintf("Failed to create a task: %v", err))
}
return nil
}
func createTask(client *labelstudio.Client, project *labelstudio.Project, minioClient *minio.Client, bucket string, customer string, note *models.NoteAudio) error {
audioRef := fmt.Sprintf("s3://%s/%s-normalized.m4a", bucket, note.UUID)
audioFile := fmt.Sprintf("%s/user/%s-normalized.m4a", config.FilesDirectory, note.UUID)
uploadPath := fmt.Sprintf("%s-normalized.m4a", note.UUID)
if !minioClient.ObjectExists(bucket, uploadPath) {
err := minioClient.UploadFile(bucket, audioFile, uploadPath)
if err != nil {
return fmt.Errorf("Failed to upload audio: %v", err)
}
}
var transcription string = ""
//if note.Transcription.IsValue() {
//transcription = note.Transcription.MustGet()
//}
transcription = note.Transcription.GetOr("")
simpleTasks := []map[string]interface{}{
{
"data": map[string]string{
"audio": audioRef,
"note_uuid": note.UUID.String(),
"transcription": transcription,
},
"meta": map[string]string{
"customer": customer,
"note_uuid": note.UUID.String(),
},
},
}
_, err := client.ImportTasks(project.ID, simpleTasks)
if err != nil {
log.Fatalf("Failed to import tasks: %v", err)
}
log.Printf("Created task for note audio %s", note.UUID)
return nil
}
func findLabelStudioProject(client *labelstudio.Client, title string) (*labelstudio.Project, error) {
// Attempt to get live projects
projects, err := client.Projects()
if err != nil {
log.Fatalf("Failed to get projects: %v", err)
}
fmt.Printf("Found %d projects:\n", projects.Count)
for i, p := range projects.Results {
fmt.Printf("%d. %s (ID: %d) - Tasks: %d\n",
i+1,
p.Title,
p.ID,
p.TaskNumber)
if p.Title == title {
return &p, nil
}
}
return nil, fmt.Errorf("No such project '%s'", title)
}
func findMatchingTask(client *labelstudio.Client, project *labelstudio.Project, customer string, note *models.NoteAudio) (*labelstudio.Task, error) {
/*meta := map[string]string{
"customer": customer,
"note_uuid": note.UUID,
}*/
items := []map[string]interface{}{
{"filter": "filter:tasks:data.note_uuid", "operator": "equal", "type": "string", "value": note.UUID},
}
filters := map[string]interface{}{
"conjunction": "and",
"items": items,
}
query := map[string]interface{}{
"filters": filters,
}
queryStr, err := json.Marshal(query)
if err != nil {
return nil, fmt.Errorf("Failed to marshal query JSON: %v", err)
}
// Get all tasks
options := &labelstudio.TasksListOptions{
ProjectID: project.ID,
Query: string(queryStr),
}
tasksResponse, err := client.ListTasks(options)
if err != nil {
return nil, fmt.Errorf("Failed to get tasks: %v", err)
}
if len(tasksResponse.Tasks) == 0 {
return nil, nil
} else if len(tasksResponse.Tasks) == 1 {
return &tasksResponse.Tasks[0], nil
} else {
return nil, fmt.Errorf("Got too many tasks: %d", len(tasksResponse.Tasks))
}
// Specify bucket name
//bucketNamePtr := flag.String("bucket", "label-studio", "The bucket to upload to")
//filePathPtr := flag.String("file", "example.txt", "The file to upload")
//flag.Parse()
}