Move comms work to background goroutine

This is a sort of random checkpoint of work
 * add schema for tracking messages sent to DB
 * add terms of service and privacy policy for RCS compliance
 * standardize some things about background workers
 * update some missing stuff from generated DB code
This commit is contained in:
Eli Ribble 2026-01-20 17:10:22 +00:00
parent 8f44e57c72
commit 842e6cff43
No known key found for this signature in database
47 changed files with 7361 additions and 179 deletions

View file

@ -1,121 +1,23 @@
package queue
import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/userfile"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)
// AudioJob represents a job to process an audio file.
type AudioJob struct {
type JobAudio struct {
AudioUUID uuid.UUID
}
// audioJobChannel is the channel used to send audio processing jobs to the worker.
var audioJobChannel chan AudioJob
// StartAudioWorker initializes the audio job channel and starts the worker goroutine.
func StartAudioWorker(ctx context.Context) {
buffer := 100
audioJobChannel = make(chan AudioJob, buffer) // Buffered channel to prevent blocking
log.Info().Int("buffer depth", buffer).Msg("Started audio worker")
go func() {
for {
select {
case <-ctx.Done():
log.Info().Msg("Audio worker shutting down.")
return
case job := <-audioJobChannel:
log.Info().Str("uuid", job.AudioUUID.String()).Msg("Processing audio job")
err := processAudioFile(job.AudioUUID)
if err != nil {
log.Error().Err(err).Str("uuid", job.AudioUUID.String()).Msg("Error processing audio file")
}
}
}
}()
}
var ChannelJobAudio chan JobAudio
// EnqueueAudioJob sends an audio processing job to the worker.
func EnqueueAudioJob(job AudioJob) {
func EnqueueAudioJob(job JobAudio) {
select {
case audioJobChannel <- job:
case ChannelJobAudio <- job:
log.Info().Str("uuid", job.AudioUUID.String()).Msg("Enqueued audio job")
default:
log.Warn().Str("uuid", job.AudioUUID.String()).Msg("Audio job channel is full, dropping job")
}
}
func processAudioFile(audioUUID uuid.UUID) error {
// Normalize audio
err := normalizeAudio(audioUUID)
if err != nil {
return fmt.Errorf("failed to normalize audio %s: %v", audioUUID, err)
}
// Transcode to OGG
err = transcodeToOgg(audioUUID)
if err != nil {
return fmt.Errorf("failed to transcode audio %s to OGG: %v", audioUUID, err)
}
EnqueueLabelStudioJob(LabelStudioJob{
UUID: audioUUID,
})
return nil
}
func normalizeAudio(audioUUID uuid.UUID) error {
source := userfile.AudioFileContentPathRaw(audioUUID.String())
_, err := os.Stat(source)
if errors.Is(err, os.ErrNotExist) {
log.Warn().Str("source", source).Msg("file doesn't exist, skipping normalization")
return nil
}
log.Info().Str("sourcce", source).Msg("Normalizing")
destination := userfile.AudioFileContentPathNormalized(audioUUID.String())
// Use "ffmpeg" directly, assuming it's in the system PATH
cmd := exec.Command("ffmpeg", "-i", source, "-filter:a", "loudnorm", destination)
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("FFmpeg output for normalization: %s", out)
return fmt.Errorf("ffmpeg normalization failed: %v", err)
}
err = db.NoteAudioNormalized(audioUUID.String())
if err != nil {
return fmt.Errorf("failed to update database for normalized audio %s: %v", audioUUID, err)
}
log.Info().Str("destination", destination).Msg("Normalized audio")
return nil
}
func transcodeToOgg(audioUUID uuid.UUID) error {
source := userfile.AudioFileContentPathNormalized(audioUUID.String())
_, err := os.Stat(source)
if errors.Is(err, os.ErrNotExist) {
log.Warn().Str("source", source).Msg("file doesn't exist, skipping OGG transcoding")
return nil
}
log.Info().Str("source", source).Msg("Transcoding to ogg")
destination := userfile.AudioFileContentPathOgg(audioUUID.String())
// Use "ffmpeg" directly, assuming it's in the system PATH
cmd := exec.Command("ffmpeg", "-i", source, "-vn", "-acodec", "libvorbis", destination)
out, err := cmd.CombinedOutput()
if err != nil {
log.Error().Err(err).Bytes("out", out).Msg("FFmpeg output for OGG transcoding")
return fmt.Errorf("ffmpeg OGG transcoding failed: %v", err)
}
err = db.NoteAudioTranscodedToOgg(audioUUID.String())
if err != nil {
return fmt.Errorf("failed to update database for OGG transcoded audio %s: %v", audioUUID, err)
}
log.Info().Str("destination", destination).Msg("Transcoded audio")
return nil
}

38
queue/comms.go Normal file
View file

@ -0,0 +1,38 @@
package queue
import (
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/rs/zerolog/log"
)
type JobEmail struct {
Destination string
Source string
Type enums.CommsEmailmessagetype
}
type JobSMS struct {
Destination string
Source string
Type enums.CommsSmsmessagetype
}
var ChannelJobEmail chan JobEmail
var ChannelJobSMS chan JobSMS
func EnqueueJobEmail(job JobEmail) {
select {
case ChannelJobEmail <- job:
log.Info().Str("destination", job.Destination).Msg("Enqueued email job")
default:
log.Warn().Msg("email job channel is full, dropping job")
}
}
func EnqueueJobSMS(job JobSMS) {
select {
case ChannelJobSMS <- job:
log.Info().Str("destination", job.Destination).Msg("Enqueued sms job")
default:
log.Warn().Msg("sms job channel is full, dropping job")
}
}