Add both committing and import CSV jobs to the backlog

This commit is contained in:
Eli Ribble 2026-03-04 19:02:11 +00:00
parent 438c946bad
commit 96514d61e2
No known key found for this signature in database
2 changed files with 47 additions and 30 deletions

View file

@ -5,17 +5,11 @@ import (
"fmt"
"sync"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/bob/dialect/psql"
"github.com/Gleipnir-Technology/bob/dialect/psql/sm"
commsemail "github.com/Gleipnir-Technology/nidus-sync/comms/email"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/platform/email"
"github.com/Gleipnir-Technology/nidus-sync/platform/text"
"github.com/rs/zerolog/log"
"github.com/stephenafamo/scan"
)
var waitGroup sync.WaitGroup
@ -76,31 +70,13 @@ func WaitForExit() {
}
func addWaitingJobs(ctx context.Context) error {
type Row_ struct {
ID int32 `db:"id"`
Type enums.FileuploadCsvtype `db:"type"`
}
rows, err := bob.All(ctx, db.PGInstance.BobDB, psql.Select(
sm.Columns(
"file.id AS id",
"csv.type_ AS type",
),
sm.From("fileupload.file").As("file"),
sm.InnerJoin("fileupload.csv").As("csv").OnEQ(psql.Raw("file.id"), psql.Raw("csv.file_id")),
sm.Where(
psql.Raw("file.status").EQ(psql.Arg(enums.FileuploadFilestatustypeUploaded)),
),
), scan.StructMapper[Row_]())
err := addWaitingJobsCommit(ctx)
if err != nil {
return fmt.Errorf("Failed to query file uploads: %w", err)
return fmt.Errorf("commit: %w", err)
}
for _, row := range rows {
report_id := row.ID
enqueueJobCSV(jobCSV{
fileID: report_id,
csvType: row.Type,
})
err = addWaitingJobsImport(ctx)
if err != nil {
return fmt.Errorf("commit: %w", err)
}
return nil
}

View file

@ -2,13 +2,18 @@ package background
import (
"context"
//"fmt"
"fmt"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/bob/dialect/psql"
"github.com/Gleipnir-Technology/bob/dialect/psql/sm"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/platform/csv"
//"github.com/Gleipnir-Technology/nidus-sync/userfile"
//"github.com/google/uuid"
"github.com/rs/zerolog/log"
"github.com/stephenafamo/scan"
)
type jobCSVAction = int
@ -39,6 +44,42 @@ func ProcessUpload(file_id int32, t enums.FileuploadCsvtype) {
})
}
func addWaitingJobsCommit(ctx context.Context) error {
return addWaitingJobsForType(ctx, enums.FileuploadFilestatustypeCommitting, jobCSVActionCommit)
}
func addWaitingJobsImport(ctx context.Context) error {
return addWaitingJobsForType(ctx, enums.FileuploadFilestatustypeUploaded, jobCSVActionImport)
}
func addWaitingJobsForType(ctx context.Context, status enums.FileuploadFilestatustype, action jobCSVAction) error {
type Row_ struct {
ID int32 `db:"id"`
Type enums.FileuploadCsvtype `db:"type"`
}
rows, err := bob.All(ctx, db.PGInstance.BobDB, psql.Select(
sm.Columns(
"file.id AS id",
"csv.type_ AS type",
),
sm.From("fileupload.file").As("file"),
sm.InnerJoin("fileupload.csv").As("csv").OnEQ(psql.Raw("file.id"), psql.Raw("csv.file_id")),
sm.Where(
psql.Raw("file.status").EQ(psql.Arg(status)),
),
), scan.StructMapper[Row_]())
if err != nil {
return fmt.Errorf("Failed to query file uploads: %w", err)
}
for _, row := range rows {
report_id := row.ID
enqueueJobCSV(jobCSV{
action: action,
fileID: report_id,
csvType: row.Type,
})
}
return nil
}
func enqueueJobCSV(job jobCSV) {
select {
case channelJobCSV <- job: