Properly save audio and image notes when uploaded

Also fix the audio processing pipeline.
This commit is contained in:
Eli Ribble 2026-01-06 22:23:59 +00:00
parent 4d02357671
commit 39d9f6d258
11 changed files with 145 additions and 138 deletions

View file

@ -4,13 +4,13 @@ import (
"context"
"errors"
"fmt"
"log"
"os"
"os/exec"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/userfile"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)
// AudioJob represents a job to process an audio file.
@ -25,18 +25,18 @@ var audioJobChannel chan AudioJob
func StartAudioWorker(ctx context.Context) {
buffer := 100
audioJobChannel = make(chan AudioJob, buffer) // Buffered channel to prevent blocking
log.Printf("Started audio worker with buffer depth %d", buffer)
log.Info().Int("buffer depth", buffer).Msg("Started audio worker")
go func() {
for {
select {
case <-ctx.Done():
log.Println("Audio worker shutting down.")
log.Info().Msg("Audio worker shutting down.")
return
case job := <-audioJobChannel:
log.Printf("Processing audio job for UUID: %s", job.AudioUUID)
log.Info().Str("uuid", job.AudioUUID.String()).Msg("Processing audio job")
err := processAudioFile(job.AudioUUID)
if err != nil {
log.Printf("Error processing audio file %s: %v", job.AudioUUID, err)
log.Error().Err(err).Str("uuid", job.AudioUUID.String()).Msg("Error processing audio file")
}
}
}
@ -47,9 +47,9 @@ func StartAudioWorker(ctx context.Context) {
func EnqueueAudioJob(job AudioJob) {
select {
case audioJobChannel <- job:
log.Printf("Enqueued audio job for UUID: %s", job.AudioUUID)
log.Info().Str("uuid", job.AudioUUID.String()).Msg("Enqueued audio job")
default:
log.Printf("Audio job channel is full, dropping job for UUID: %s", job.AudioUUID)
log.Warn().Str("uuid", job.AudioUUID.String()).Msg("Audio job channel is full, dropping job")
}
}
@ -76,10 +76,10 @@ func normalizeAudio(audioUUID uuid.UUID) error {
source := userfile.AudioFileContentPathRaw(audioUUID.String())
_, err := os.Stat(source)
if errors.Is(err, os.ErrNotExist) {
log.Printf("%s doesn't exist, skipping normalization", source)
log.Warn().Str("source", source).Msg("file doesn't exist, skipping normalization")
return nil
}
log.Printf("Normalizing %s", source)
log.Info().Str("sourcce", source).Msg("Normalizing")
destination := userfile.AudioFileContentPathNormalized(audioUUID.String())
// Use "ffmpeg" directly, assuming it's in the system PATH
cmd := exec.Command("ffmpeg", "-i", source, "-filter:a", "loudnorm", destination)
@ -92,7 +92,7 @@ func normalizeAudio(audioUUID uuid.UUID) error {
if err != nil {
return fmt.Errorf("failed to update database for normalized audio %s: %v", audioUUID, err)
}
log.Printf("Normalized audio to %s", destination)
log.Info().Str("destination", destination).Msg("Normalized audio")
return nil
}
@ -100,22 +100,22 @@ func transcodeToOgg(audioUUID uuid.UUID) error {
source := userfile.AudioFileContentPathNormalized(audioUUID.String())
_, err := os.Stat(source)
if errors.Is(err, os.ErrNotExist) {
log.Printf("%s doesn't exist, skipping OGG transcoding", source)
log.Warn().Str("source", source).Msg("file doesn't exist, skipping OGG transcoding")
return nil
}
log.Printf("Transcoding %s to ogg", source)
log.Info().Str("source", source).Msg("Transcoding to ogg")
destination := userfile.AudioFileContentPathOgg(audioUUID.String())
// Use "ffmpeg" directly, assuming it's in the system PATH
cmd := exec.Command("ffmpeg", "-i", source, "-vn", "-acodec", "libvorbis", destination)
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("FFmpeg output for OGG transcoding: %s", out)
log.Error().Err(err).Bytes("out", out).Msg("FFmpeg output for OGG transcoding")
return fmt.Errorf("ffmpeg OGG transcoding failed: %v", err)
}
err = db.NoteAudioTranscodedToOgg(audioUUID.String())
if err != nil {
return fmt.Errorf("failed to update database for OGG transcoded audio %s: %v", audioUUID, err)
}
log.Printf("Transcoded audio to %s", destination)
log.Info().Str("destination", destination).Msg("Transcoded audio")
return nil
}

View file

@ -9,6 +9,7 @@ import (
"os"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/label-studio"
"github.com/Gleipnir-Technology/nidus-sync/minio"
"github.com/Gleipnir-Technology/nidus-sync/userfile"
@ -127,7 +128,7 @@ func processLabelTask(ctx context.Context, minioClient *minio.Client, minioBucke
return nil
}
func createTask(client *labelstudio.Client, project *labelstudio.Project, minioClient *minio.Client, bucket string, customer string, note *db.NoteAudio) error {
func createTask(client *labelstudio.Client, project *labelstudio.Project, minioClient *minio.Client, bucket string, customer string, note *models.NoteAudio) error {
audioRef := fmt.Sprintf("s3://%s/%s-normalized.m4a", bucket, note.UUID)
audioFile := fmt.Sprintf("%s/%s-normalized.m4a", userfile.UserFilesDirectory, note.UUID)
uploadPath := fmt.Sprintf("%s-normalized.m4a", note.UUID)
@ -140,9 +141,9 @@ func createTask(client *labelstudio.Client, project *labelstudio.Project, minioC
}
var transcription string = ""
//if note.Transcription.IsValue() {
//transcription = note.Transcription.MustGet()
//transcription = note.Transcription.MustGet()
//}
transcription = note.Transcription
transcription = note.Transcription.GetOr("")
simpleTasks := []map[string]interface{}{
{
"data": map[string]string{
@ -184,7 +185,7 @@ func findLabelStudioProject(client *labelstudio.Client, title string) (*labelstu
return nil, fmt.Errorf("No such project '%s'", title)
}
func findMatchingTask(client *labelstudio.Client, project *labelstudio.Project, customer string, note *db.NoteAudio) (*labelstudio.Task, error) {
func findMatchingTask(client *labelstudio.Client, project *labelstudio.Project, customer string, note *models.NoteAudio) (*labelstudio.Task, error) {
/*meta := map[string]string{
"customer": customer,
"note_uuid": note.UUID,