diff --git a/background/background.go b/background/background.go index 247fb360..24e6da21 100644 --- a/background/background.go +++ b/background/background.go @@ -2,10 +2,15 @@ package background import ( "context" + "fmt" "sync" "github.com/Gleipnir-Technology/nidus-sync/comms/email" + "github.com/Gleipnir-Technology/nidus-sync/db" + "github.com/Gleipnir-Technology/nidus-sync/db/enums" + "github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/platform/text" + "github.com/rs/zerolog/log" ) var waitGroup sync.WaitGroup @@ -47,8 +52,38 @@ func Start(ctx context.Context) { defer waitGroup.Done() startWorkerText(ctx, channelJobText) }() + + err := addWaitingJobs(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to add waiting background jobs") + } } + func WaitForExit() { waitGroup.Wait() } + +func addWaitingJobs(ctx context.Context) error { + rows, err := models.FileuploadFiles.Query( + models.SelectWhere.FileuploadFiles.Status.EQ( + enums.FileuploadFilestatustypeUploaded, + ), + ).All(ctx, db.PGInstance.BobDB) + if err != nil { + return fmt.Errorf("Failed to query file uploads: %w", err) + } + for _, row := range rows { + report_id := row.ID + job := jobImportCSVPool{ + fileID: report_id, + } + select { + case channelJobImportCSVPool <- job: + log.Info().Int32("report_id", report_id).Msg("CSV upload job queued") + default: + log.Warn().Int32("report_id", report_id).Msg("CSV upload job failed to queue, channel full") + } + } + return nil +} diff --git a/background/pool.go b/background/pool.go index 3516e2c3..220c626f 100644 --- a/background/pool.go +++ b/background/pool.go @@ -17,6 +17,7 @@ type jobImportCSVPool struct { var channelJobImportCSVPool chan jobImportCSVPool func processCSVJob(file_id int32) error { + log.Debug().Int32("file_id", file_id).Msg("Fake processing CSV job") return nil } func startWorkerCSV(ctx context.Context, channelJobImport chan jobImportCSVPool) {