nidus-sync/queue/audio_processing.go

122 lines
3.6 KiB
Go
Raw Normal View History

package queue
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/exec"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/userfile"
"github.com/google/uuid"
)
// AudioJob represents a job to process an audio file.
type AudioJob struct {
AudioUUID uuid.UUID
}
// audioJobChannel is the channel used to send audio processing jobs to the worker.
var audioJobChannel chan AudioJob
// StartAudioWorker initializes the audio job channel and starts the worker goroutine.
func StartAudioWorker(ctx context.Context) {
buffer := 100
audioJobChannel = make(chan AudioJob, buffer) // Buffered channel to prevent blocking
log.Printf("Started audio worker with buffer depth %d", buffer)
go func() {
for {
select {
case <-ctx.Done():
log.Println("Audio worker shutting down.")
return
case job := <-audioJobChannel:
log.Printf("Processing audio job for UUID: %s", job.AudioUUID)
err := processAudioFile(job.AudioUUID)
if err != nil {
log.Printf("Error processing audio file %s: %v", job.AudioUUID, err)
}
}
}
}()
}
// EnqueueAudioJob sends an audio processing job to the worker.
func EnqueueAudioJob(job AudioJob) {
select {
case audioJobChannel <- job:
log.Printf("Enqueued audio job for UUID: %s", job.AudioUUID)
default:
log.Printf("Audio job channel is full, dropping job for UUID: %s", job.AudioUUID)
}
}
func processAudioFile(audioUUID uuid.UUID) error {
// Normalize audio
err := normalizeAudio(audioUUID)
if err != nil {
return fmt.Errorf("failed to normalize audio %s: %v", audioUUID, err)
}
// Transcode to OGG
err = transcodeToOgg(audioUUID)
if err != nil {
return fmt.Errorf("failed to transcode audio %s to OGG: %v", audioUUID, err)
}
EnqueueLabelStudioJob(LabelStudioJob{
UUID: audioUUID,
})
return nil
}
func normalizeAudio(audioUUID uuid.UUID) error {
source := userfile.AudioFileContentPathRaw(audioUUID.String())
_, err := os.Stat(source)
if errors.Is(err, os.ErrNotExist) {
log.Printf("%s doesn't exist, skipping normalization", source)
return nil
}
log.Printf("Normalizing %s", source)
destination := userfile.AudioFileContentPathNormalized(audioUUID.String())
// Use "ffmpeg" directly, assuming it's in the system PATH
cmd := exec.Command("ffmpeg", "-i", source, "-filter:a", "loudnorm", destination)
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("FFmpeg output for normalization: %s", out)
return fmt.Errorf("ffmpeg normalization failed: %v", err)
}
err = db.NoteAudioNormalized(audioUUID.String())
if err != nil {
return fmt.Errorf("failed to update database for normalized audio %s: %v", audioUUID, err)
}
log.Printf("Normalized audio to %s", destination)
return nil
}
func transcodeToOgg(audioUUID uuid.UUID) error {
source := userfile.AudioFileContentPathNormalized(audioUUID.String())
_, err := os.Stat(source)
if errors.Is(err, os.ErrNotExist) {
log.Printf("%s doesn't exist, skipping OGG transcoding", source)
return nil
}
log.Printf("Transcoding %s to ogg", source)
destination := userfile.AudioFileContentPathOgg(audioUUID.String())
// Use "ffmpeg" directly, assuming it's in the system PATH
cmd := exec.Command("ffmpeg", "-i", source, "-vn", "-acodec", "libvorbis", destination)
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("FFmpeg output for OGG transcoding: %s", out)
return fmt.Errorf("ffmpeg OGG transcoding failed: %v", err)
}
err = db.NoteAudioTranscodedToOgg(audioUUID.String())
if err != nil {
return fmt.Errorf("failed to update database for OGG transcoded audio %s: %v", audioUUID, err)
}
log.Printf("Transcoded audio to %s", destination)
return nil
}