diff --git a/background/background.go b/background/background.go index 5df4db68..d43ebd8b 100644 --- a/background/background.go +++ b/background/background.go @@ -5,17 +5,11 @@ import ( "fmt" "sync" - "github.com/Gleipnir-Technology/bob" - "github.com/Gleipnir-Technology/bob/dialect/psql" - "github.com/Gleipnir-Technology/bob/dialect/psql/sm" commsemail "github.com/Gleipnir-Technology/nidus-sync/comms/email" "github.com/Gleipnir-Technology/nidus-sync/config" - "github.com/Gleipnir-Technology/nidus-sync/db" - "github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/Gleipnir-Technology/nidus-sync/platform/email" "github.com/Gleipnir-Technology/nidus-sync/platform/text" "github.com/rs/zerolog/log" - "github.com/stephenafamo/scan" ) var waitGroup sync.WaitGroup @@ -76,31 +70,13 @@ func WaitForExit() { } func addWaitingJobs(ctx context.Context) error { - type Row_ struct { - ID int32 `db:"id"` - Type enums.FileuploadCsvtype `db:"type"` - } - rows, err := bob.All(ctx, db.PGInstance.BobDB, psql.Select( - sm.Columns( - "file.id AS id", - "csv.type_ AS type", - ), - sm.From("fileupload.file").As("file"), - sm.InnerJoin("fileupload.csv").As("csv").OnEQ(psql.Raw("file.id"), psql.Raw("csv.file_id")), - sm.Where( - psql.Raw("file.status").EQ(psql.Arg(enums.FileuploadFilestatustypeUploaded)), - ), - ), scan.StructMapper[Row_]()) - + err := addWaitingJobsCommit(ctx) if err != nil { - return fmt.Errorf("Failed to query file uploads: %w", err) + return fmt.Errorf("commit: %w", err) } - for _, row := range rows { - report_id := row.ID - enqueueJobCSV(jobCSV{ - fileID: report_id, - csvType: row.Type, - }) + err = addWaitingJobsImport(ctx) + if err != nil { + return fmt.Errorf("commit: %w", err) } return nil } diff --git a/background/upload.go b/background/upload.go index 09912f16..58e31ebd 100644 --- a/background/upload.go +++ b/background/upload.go @@ -2,13 +2,18 @@ package background import ( "context" - //"fmt" + "fmt" + "github.com/Gleipnir-Technology/bob" + "github.com/Gleipnir-Technology/bob/dialect/psql" + "github.com/Gleipnir-Technology/bob/dialect/psql/sm" + "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/Gleipnir-Technology/nidus-sync/platform/csv" //"github.com/Gleipnir-Technology/nidus-sync/userfile" //"github.com/google/uuid" "github.com/rs/zerolog/log" + "github.com/stephenafamo/scan" ) type jobCSVAction = int @@ -39,6 +44,42 @@ func ProcessUpload(file_id int32, t enums.FileuploadCsvtype) { }) } +func addWaitingJobsCommit(ctx context.Context) error { + return addWaitingJobsForType(ctx, enums.FileuploadFilestatustypeCommitting, jobCSVActionCommit) +} +func addWaitingJobsImport(ctx context.Context) error { + return addWaitingJobsForType(ctx, enums.FileuploadFilestatustypeUploaded, jobCSVActionImport) +} +func addWaitingJobsForType(ctx context.Context, status enums.FileuploadFilestatustype, action jobCSVAction) error { + type Row_ struct { + ID int32 `db:"id"` + Type enums.FileuploadCsvtype `db:"type"` + } + rows, err := bob.All(ctx, db.PGInstance.BobDB, psql.Select( + sm.Columns( + "file.id AS id", + "csv.type_ AS type", + ), + sm.From("fileupload.file").As("file"), + sm.InnerJoin("fileupload.csv").As("csv").OnEQ(psql.Raw("file.id"), psql.Raw("csv.file_id")), + sm.Where( + psql.Raw("file.status").EQ(psql.Arg(status)), + ), + ), scan.StructMapper[Row_]()) + + if err != nil { + return fmt.Errorf("Failed to query file uploads: %w", err) + } + for _, row := range rows { + report_id := row.ID + enqueueJobCSV(jobCSV{ + action: action, + fileID: report_id, + csvType: row.Type, + }) + } + return nil +} func enqueueJobCSV(job jobCSV) { select { case channelJobCSV <- job: