From d17c8b8be7843159eff2e73d05068048e766a7fb Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Sun, 8 Feb 2026 03:52:39 +0000 Subject: [PATCH] Add worker process for CSV import jobs --- background/background.go | 13 ++++++++++--- background/pool.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) create mode 100644 background/pool.go diff --git a/background/background.go b/background/background.go index 92705b81..247fb360 100644 --- a/background/background.go +++ b/background/background.go @@ -13,9 +13,10 @@ var waitGroup sync.WaitGroup func Start(ctx context.Context) { newOAuthTokenChannel = make(chan struct{}, 10) - channelJobAudio = make(chan jobAudio, 100) // Buffered channel to prevent blocking - channelJobEmail = make(chan email.Job, 100) // Buffered channel to prevent blocking - channelJobText = make(chan text.Job, 100) // Buffered channel to prevent blocking + channelJobAudio = make(chan jobAudio, 100) // Buffered channel to prevent blocking + channelJobImportCSVPool = make(chan jobImportCSVPool, 100) // Buffered channel to prevent blocking + channelJobEmail = make(chan email.Job, 100) // Buffered channel to prevent blocking + channelJobText = make(chan text.Job, 100) // Buffered channel to prevent blocking waitGroup.Add(1) go func() { @@ -29,6 +30,12 @@ func Start(ctx context.Context) { startWorkerAudio(ctx, channelJobAudio) }() + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + startWorkerCSV(ctx, channelJobImportCSVPool) + }() + waitGroup.Add(1) go func() { defer waitGroup.Done() diff --git a/background/pool.go b/background/pool.go new file mode 100644 index 00000000..3516e2c3 --- /dev/null +++ b/background/pool.go @@ -0,0 +1,38 @@ +package background + +import ( + "context" + //"fmt" + + //"github.com/Gleipnir-Technology/nidus-sync/userfile" + //"github.com/google/uuid" + "github.com/rs/zerolog/log" +) + +// represents a job to import a pool CSV file +type jobImportCSVPool struct { + fileID int32 +} + +var channelJobImportCSVPool chan jobImportCSVPool + +func processCSVJob(file_id int32) error { + return nil +} +func startWorkerCSV(ctx context.Context, channelJobImport chan jobImportCSVPool) { + go func() { + for { + select { + case <-ctx.Done(): + log.Info().Msg("CSV worker shutting down.") + return + case job := <-channelJobImport: + log.Info().Int32("id", job.fileID).Msg("Processing CSV job") + err := processCSVJob(job.fileID) + if err != nil { + log.Error().Err(err).Int32("id", job.fileID).Msg("Error processing CSV file") + } + } + } + }() +}