From aee17d2c7ab31ad52779aa848d86b3d8ca7abec3 Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Sun, 8 Feb 2026 05:06:47 +0000 Subject: [PATCH] Add jobs to process queue when csv is uploaded --- background/pool.go | 14 ++++++++++++++ platform/pool.go | 2 ++ 2 files changed, 16 insertions(+) diff --git a/background/pool.go b/background/pool.go index 212c3f2f..e9f7f0b2 100644 --- a/background/pool.go +++ b/background/pool.go @@ -17,6 +17,20 @@ type jobImportCSVPool struct { var channelJobImportCSVPool chan jobImportCSVPool +func ProcessUpload(file_id int32) { + enqueueUploadJob(jobImportCSVPool{ + fileID: file_id, + }) +} + +func enqueueUploadJob(job jobImportCSVPool) { + select { + case channelJobImportCSVPool <- job: + log.Info().Int32("file_id", job.fileID).Msg("Enqueued csv job") + default: + log.Warn().Int32("file_id", job.fileID).Msg("csv channel is full, dropping job") + } +} func startWorkerCSV(ctx context.Context, channelJobImport chan jobImportCSVPool) { go func() { for { diff --git a/platform/pool.go b/platform/pool.go index 1c4052d5..3537b6ab 100644 --- a/platform/pool.go +++ b/platform/pool.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/Gleipnir-Technology/nidus-sync/background" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/Gleipnir-Technology/nidus-sync/db/models" @@ -45,6 +46,7 @@ func NewPoolUpload(ctx context.Context, u *models.User, upload userfile.FileUplo return PoolUpload{}, fmt.Errorf("Failed to create csv: %w", err) } txn.Commit(ctx) + background.ProcessUpload(file.ID) return PoolUpload{ ID: file.ID, }, nil