nidus-sync/platform/background/upload.go
Eli Ribble 44c4f17f32
Massive rework of platform layer user/organization
The goal of this rework is to make it so I can pass around platform.User
instead of a pair of models.Organization and models.User. This is useful
for reason I kind of forget now, but it started with working on
notifications and ballooned massively from there into refactoring a
number of things that were bugging me.

This also includes a tiny amount of work on server-side events (SSE).

 * background stuff lives inside the platform now, which I need for
   having it push updates through SSE
 * userfile now lives in the platform, under file, so other platform
   functions can safely use it
 * oauth is broken into pieces and inside platform because other stuff
   was calling it already, but badly.
 * notifications go into the platform as well
2026-03-12 23:49:16 +00:00

123 lines
3.2 KiB
Go

package background
import (
"context"
"fmt"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/bob/dialect/psql"
"github.com/Gleipnir-Technology/bob/dialect/psql/sm"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/platform/csv"
//"github.com/Gleipnir-Technology/nidus-sync/userfile"
//"github.com/google/uuid"
"github.com/rs/zerolog/log"
"github.com/stephenafamo/scan"
)
type jobCSVAction = int
const (
jobCSVActionCommit jobCSVAction = iota
jobCSVActionImport
)
type jobCSV struct {
action jobCSVAction
csvType enums.FileuploadCsvtype
fileID int32
}
var channelJobCSV chan jobCSV
func CommitUpload(file_id int32) {
enqueueJobCSV(jobCSV{
action: jobCSVActionCommit,
fileID: file_id,
})
}
func ProcessUpload(file_id int32, t enums.FileuploadCsvtype) {
enqueueJobCSV(jobCSV{
action: jobCSVActionImport,
csvType: t,
fileID: file_id,
})
}
func addWaitingJobsCommit(ctx context.Context) error {
return addWaitingJobsForType(ctx, enums.FileuploadFilestatustypeCommitting, jobCSVActionCommit)
}
func addWaitingJobsImport(ctx context.Context) error {
return addWaitingJobsForType(ctx, enums.FileuploadFilestatustypeUploaded, jobCSVActionImport)
}
func addWaitingJobsForType(ctx context.Context, status enums.FileuploadFilestatustype, action jobCSVAction) error {
type Row_ struct {
ID int32 `db:"id"`
Type enums.FileuploadCsvtype `db:"type"`
}
rows, err := bob.All(ctx, db.PGInstance.BobDB, psql.Select(
sm.Columns(
"file.id AS id",
"csv.type_ AS type",
),
sm.From("fileupload.file").As("file"),
sm.InnerJoin("fileupload.csv").As("csv").OnEQ(psql.Raw("file.id"), psql.Raw("csv.file_id")),
sm.Where(
psql.Raw("file.status").EQ(psql.Arg(status)),
),
), scan.StructMapper[Row_]())
if err != nil {
return fmt.Errorf("Failed to query file uploads: %w", err)
}
for _, row := range rows {
report_id := row.ID
enqueueJobCSV(jobCSV{
action: action,
fileID: report_id,
csvType: row.Type,
})
}
return nil
}
func enqueueJobCSV(job jobCSV) {
select {
case channelJobCSV <- job:
log.Info().Int32("file_id", job.fileID).Msg("Enqueued csv job")
default:
log.Warn().Int32("file_id", job.fileID).Msg("csv channel is full, dropping job")
}
}
func startWorkerCSV(ctx context.Context, channelJobImport chan jobCSV) {
go func() {
for {
select {
case <-ctx.Done():
log.Info().Msg("CSV worker shutting down.")
return
case job := <-channelJobImport:
switch job.action {
case jobCSVActionCommit:
log.Info().Int32("id", job.fileID).Msg("Processing CSV commit job")
err := csv.JobCommit(ctx, job.fileID)
if err != nil {
log.Error().Err(err).Int32("id", job.fileID).Msg("Error processing CSV file")
continue
}
case jobCSVActionImport:
log.Info().Int32("id", job.fileID).Msg("Processing CSV import job")
err := csv.JobImport(ctx, job.fileID, job.csvType)
if err != nil {
log.Error().Err(err).Int32("id", job.fileID).Msg("Error processing CSV file")
continue
}
default:
log.Error().Msg("Unrecognized job action")
return
}
log.Info().Int32("id", job.fileID).Msg("Done processing CSV job")
}
}
}()
}