Add logic to re-add background uploads that didn't complete
This commit is contained in:
parent
d17c8b8be7
commit
013ac85a70
2 changed files with 36 additions and 0 deletions
|
|
@ -2,10 +2,15 @@ package background
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/Gleipnir-Technology/nidus-sync/comms/email"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/models"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/platform/text"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
var waitGroup sync.WaitGroup
|
||||
|
|
@ -47,8 +52,38 @@ func Start(ctx context.Context) {
|
|||
defer waitGroup.Done()
|
||||
startWorkerText(ctx, channelJobText)
|
||||
}()
|
||||
|
||||
err := addWaitingJobs(ctx)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to add waiting background jobs")
|
||||
}
|
||||
}
|
||||
|
||||
func WaitForExit() {
|
||||
|
||||
waitGroup.Wait()
|
||||
}
|
||||
|
||||
func addWaitingJobs(ctx context.Context) error {
|
||||
rows, err := models.FileuploadFiles.Query(
|
||||
models.SelectWhere.FileuploadFiles.Status.EQ(
|
||||
enums.FileuploadFilestatustypeUploaded,
|
||||
),
|
||||
).All(ctx, db.PGInstance.BobDB)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to query file uploads: %w", err)
|
||||
}
|
||||
for _, row := range rows {
|
||||
report_id := row.ID
|
||||
job := jobImportCSVPool{
|
||||
fileID: report_id,
|
||||
}
|
||||
select {
|
||||
case channelJobImportCSVPool <- job:
|
||||
log.Info().Int32("report_id", report_id).Msg("CSV upload job queued")
|
||||
default:
|
||||
log.Warn().Int32("report_id", report_id).Msg("CSV upload job failed to queue, channel full")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ type jobImportCSVPool struct {
|
|||
var channelJobImportCSVPool chan jobImportCSVPool
|
||||
|
||||
func processCSVJob(file_id int32) error {
|
||||
log.Debug().Int32("file_id", file_id).Msg("Fake processing CSV job")
|
||||
return nil
|
||||
}
|
||||
func startWorkerCSV(ctx context.Context, channelJobImport chan jobImportCSVPool) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue