Split out ability to upload flyover data from pool uploads

Tons of changes here, all in the name of quickly getting to where I can
create test compliance letters.
This commit is contained in:
Eli Ribble 2026-03-02 18:49:02 +00:00
parent 9939434cb3
commit ff2ec0ad14
No known key found for this signature in database
38 changed files with 4204 additions and 233 deletions

View file

@ -5,14 +5,17 @@ import (
"fmt"
"sync"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/bob/dialect/psql"
"github.com/Gleipnir-Technology/bob/dialect/psql/sm"
commsemail "github.com/Gleipnir-Technology/nidus-sync/comms/email"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/platform/email"
"github.com/Gleipnir-Technology/nidus-sync/platform/text"
"github.com/rs/zerolog/log"
"github.com/stephenafamo/scan"
)
var waitGroup sync.WaitGroup
@ -20,10 +23,10 @@ var waitGroup sync.WaitGroup
func Start(ctx context.Context) {
newOAuthTokenChannel = make(chan struct{}, 10)
channelJobAudio = make(chan jobAudio, 100) // Buffered channel to prevent blocking
channelJobImportCSVPool = make(chan jobImportCSVPool, 100) // Buffered channel to prevent blocking
channelJobEmail = make(chan email.Job, 100) // Buffered channel to prevent blocking
channelJobText = make(chan text.Job, 100) // Buffered channel to prevent blocking
channelJobAudio = make(chan jobAudio, 100) // Buffered channel to prevent blocking
channelJobImportCSV = make(chan jobImportCSV, 100) // Buffered channel to prevent blocking
channelJobEmail = make(chan email.Job, 100) // Buffered channel to prevent blocking
channelJobText = make(chan text.Job, 100) // Buffered channel to prevent blocking
waitGroup.Add(1)
go func() {
@ -46,7 +49,7 @@ func Start(ctx context.Context) {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
startWorkerCSV(ctx, channelJobImportCSVPool)
startWorkerCSV(ctx, channelJobImportCSV)
}()
waitGroup.Add(1)
@ -73,21 +76,33 @@ func WaitForExit() {
}
func addWaitingJobs(ctx context.Context) error {
rows, err := models.FileuploadFiles.Query(
models.SelectWhere.FileuploadFiles.Status.EQ(
enums.FileuploadFilestatustypeUploaded,
type Row_ struct {
ID int32 `db:"id"`
Type enums.FileuploadCsvtype `db:"type"`
}
rows, err := bob.All(ctx, db.PGInstance.BobDB, psql.Select(
sm.Columns(
"file.id AS id",
"csv.type_ AS type",
),
).All(ctx, db.PGInstance.BobDB)
sm.From("fileupload.file").As("file"),
sm.InnerJoin("fileupload.csv").As("csv").OnEQ(psql.Raw("file.id"), psql.Raw("csv.file_id")),
sm.Where(
psql.Raw("file.status").EQ(psql.Arg(enums.FileuploadFilestatustypeUploaded)),
),
), scan.StructMapper[Row_]())
if err != nil {
return fmt.Errorf("Failed to query file uploads: %w", err)
}
for _, row := range rows {
report_id := row.ID
job := jobImportCSVPool{
job := jobImportCSV{
fileID: report_id,
type_: row.Type,
}
select {
case channelJobImportCSVPool <- job:
case channelJobImportCSV <- job:
log.Info().Int32("report_id", report_id).Msg("CSV upload job queued")
default:
log.Warn().Int32("report_id", report_id).Msg("CSV upload job failed to queue, channel full")

View file

@ -4,34 +4,36 @@ import (
"context"
//"fmt"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/platform/csv"
//"github.com/Gleipnir-Technology/nidus-sync/userfile"
//"github.com/google/uuid"
"github.com/rs/zerolog/log"
)
// represents a job to import a pool CSV file
type jobImportCSVPool struct {
type jobImportCSV struct {
fileID int32
type_ enums.FileuploadCsvtype
}
var channelJobImportCSVPool chan jobImportCSVPool
var channelJobImportCSV chan jobImportCSV
func ProcessUpload(file_id int32) {
enqueueUploadJob(jobImportCSVPool{
func ProcessUpload(file_id int32, t enums.FileuploadCsvtype) {
enqueueUploadJob(jobImportCSV{
fileID: file_id,
type_: t,
})
}
func enqueueUploadJob(job jobImportCSVPool) {
func enqueueUploadJob(job jobImportCSV) {
select {
case channelJobImportCSVPool <- job:
case channelJobImportCSV <- job:
log.Info().Int32("file_id", job.fileID).Msg("Enqueued csv job")
default:
log.Warn().Int32("file_id", job.fileID).Msg("csv channel is full, dropping job")
}
}
func startWorkerCSV(ctx context.Context, channelJobImport chan jobImportCSVPool) {
func startWorkerCSV(ctx context.Context, channelJobImport chan jobImportCSV) {
go func() {
for {
select {
@ -40,7 +42,7 @@ func startWorkerCSV(ctx context.Context, channelJobImport chan jobImportCSVPool)
return
case job := <-channelJobImport:
log.Info().Int32("id", job.fileID).Msg("Processing CSV job")
err := csv.ProcessJob(ctx, job.fileID)
err := csv.ProcessJob(ctx, job.fileID, job.type_)
if err != nil {
log.Error().Err(err).Int32("id", job.fileID).Msg("Error processing CSV file")
}