diff --git a/background/upload.go b/background/upload.go index 58e31ebd..7d840f3c 100644 --- a/background/upload.go +++ b/background/upload.go @@ -96,15 +96,16 @@ func startWorkerCSV(ctx context.Context, channelJobImport chan jobCSV) { log.Info().Msg("CSV worker shutting down.") return case job := <-channelJobImport: - log.Info().Int32("id", job.fileID).Msg("Processing CSV job") switch job.action { case jobCSVActionCommit: + log.Info().Int32("id", job.fileID).Msg("Processing CSV commit job") err := csv.JobCommit(ctx, job.fileID) if err != nil { log.Error().Err(err).Int32("id", job.fileID).Msg("Error processing CSV file") continue } case jobCSVActionImport: + log.Info().Int32("id", job.fileID).Msg("Processing CSV import job") err := csv.JobImport(ctx, job.fileID, job.csvType) if err != nil { log.Error().Err(err).Int32("id", job.fileID).Msg("Error processing CSV file") diff --git a/platform/csv/csv.go b/platform/csv/csv.go index 83ecf117..96e9c2bc 100644 --- a/platform/csv/csv.go +++ b/platform/csv/csv.go @@ -91,10 +91,13 @@ func JobImport(ctx context.Context, file_id int32, type_ enums.FileuploadCsvtype func importCSV[T any](ctx context.Context, file_id int32, parser csvParserFunc[T], processor csvProcessorFunc[T]) error { // Not done in the transaction so the state shows up immediately _, err := psql.Update( - um.Table("fileupload.csv"), - um.SetCol("status").ToArg("processing"), - um.Where(psql.Quote("file_id").EQ(psql.Arg(file_id))), + um.Table("fileupload.file"), + um.SetCol("status").ToArg("parsing"), + um.Where(psql.Quote("id").EQ(psql.Arg(file_id))), ).Exec(ctx, db.PGInstance.BobDB) + if err != nil { + return fmt.Errorf("Failed to set file %d to processing: %w", file_id, err) + } file, c, err := loadFileAndCSV(ctx, file_id) if err != nil {