This is kind of a wild one. Turns out that the triggers I was using actually fire before the transaction is closed and I was primarily getting lucky that the job was present on the other side of the connection rather than having things built correctly. I've fixed this by removing the trigger entirely and instead manually triggering as part of the transaction. This makes the NOTIFY call happen as soon as the transaction closes, just at the cost of making my application be in charge of ensuring the NOTIFY gets called. Seems like a win. Part of doing this is porting the existing job creation code over to use Jet. It's something I want to do anyway, so it's a win all around.
276 lines
8.3 KiB
Go
276 lines
8.3 KiB
Go
package platform
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/Gleipnir-Technology/bob"
|
|
"github.com/Gleipnir-Technology/bob/dialect/psql"
|
|
"github.com/Gleipnir-Technology/bob/dialect/psql/sm"
|
|
"github.com/Gleipnir-Technology/bob/dialect/psql/um"
|
|
"github.com/rs/zerolog/log"
|
|
"github.com/stephenafamo/scan"
|
|
"source.gleipnir.technology/Gleipnir/nidus-sync/db"
|
|
"source.gleipnir.technology/Gleipnir/nidus-sync/db/enums"
|
|
modelfileupload "source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/fileupload/model"
|
|
"source.gleipnir.technology/Gleipnir/nidus-sync/db/models"
|
|
queryfileupload "source.gleipnir.technology/Gleipnir/nidus-sync/db/query/fileupload"
|
|
"source.gleipnir.technology/Gleipnir/nidus-sync/lint"
|
|
"source.gleipnir.technology/Gleipnir/nidus-sync/platform/background"
|
|
"source.gleipnir.technology/Gleipnir/nidus-sync/platform/file"
|
|
"source.gleipnir.technology/Gleipnir/nidus-sync/platform/types"
|
|
)
|
|
|
|
type UploadType = int
|
|
|
|
const (
|
|
UploadTypePool UploadType = iota
|
|
)
|
|
|
|
type UploadStatus = int
|
|
|
|
const (
|
|
UploadStatusComplete UploadStatus = iota
|
|
)
|
|
|
|
type Upload struct {
|
|
Created time.Time `db:"created" json:"created"`
|
|
Error string `db:"error" json:"error"`
|
|
Filename string `db:"filename" json:"filename"`
|
|
ID int32 `db:"id" json:"id"`
|
|
RecordCount int `db:"recordcount" json:"recordcount"`
|
|
Status string `db:"status" json:"status"`
|
|
Type string `db:"type" json:"type"`
|
|
CSVPool *CSVPoolDetail `json:"csv_pool"`
|
|
}
|
|
|
|
type CSVPoolDetailCount struct {
|
|
Existing int `json:"existing"`
|
|
New int `json:"new"`
|
|
Outside int `json:"outside"`
|
|
}
|
|
type CSVPoolDetail struct {
|
|
Count CSVPoolDetailCount `json:"count"`
|
|
Errors []UploadPoolError `json:"errors"`
|
|
Pools []UploadPoolRow `json:"pools"`
|
|
}
|
|
type UploadPoolRow struct {
|
|
Address types.Address `json:"address"`
|
|
Condition string `json:"condition"`
|
|
Errors []UploadPoolError `json:"errors"`
|
|
Status string `json:"status"`
|
|
Tags map[string]string `json:"tags"`
|
|
}
|
|
|
|
func GetUploadDetail(ctx context.Context, organization_id int32, file_id int32) (*Upload, error) {
|
|
file, err := models.FindFileuploadFile(ctx, db.PGInstance.BobDB, file_id)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to lookup file %d: %w", file_id, err)
|
|
}
|
|
csv, err := models.FindFileuploadCSV(ctx, db.PGInstance.BobDB, file_id)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to lookup csv %d: %w", file_id, err)
|
|
}
|
|
switch csv.Type {
|
|
case enums.FileuploadCsvtypeFlyover:
|
|
return getUploadDetailPool(ctx, file)
|
|
case enums.FileuploadCsvtypePoollist:
|
|
return getUploadDetailPool(ctx, file)
|
|
}
|
|
return nil, errors.New("No idea what to do with upload type")
|
|
}
|
|
|
|
func NewUpload(ctx context.Context, u User, upload file.Upload, t modelfileupload.Csvtype) (*int32, error) {
|
|
txn, err := db.BeginTxn(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to begin transaction: %w", err)
|
|
}
|
|
defer lint.LogOnErrRollback(txn.Rollback, ctx, "rollback")
|
|
|
|
file, err := queryfileupload.FileInsert(ctx, txn, modelfileupload.File{
|
|
ContentType: upload.ContentType,
|
|
Created: time.Now(),
|
|
CreatorID: int32(u.ID),
|
|
Deleted: nil,
|
|
Error: "",
|
|
Name: upload.Name,
|
|
OrganizationID: u.Organization.ID,
|
|
Status: modelfileupload.Filestatustype_Uploaded,
|
|
SizeBytes: int32(upload.SizeBytes),
|
|
FileUUID: upload.UUID,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to create file upload: %w", err)
|
|
}
|
|
_, err = queryfileupload.CSVInsert(ctx, txn, modelfileupload.Csv{
|
|
Committed: nil,
|
|
FileID: file.ID,
|
|
Rowcount: 0,
|
|
Type: t,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to create csv: %w", err)
|
|
}
|
|
log.Info().Int32("id", file.ID).Msg("Created new pool CSV upload")
|
|
err = background.NewCSVImport(ctx, txn, file.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("background job create: %w", err)
|
|
}
|
|
if err := txn.Commit(ctx); err != nil {
|
|
return nil, fmt.Errorf("commit: %w", err)
|
|
}
|
|
return &file.ID, nil
|
|
}
|
|
func UploadCommit(ctx context.Context, org Organization, file_id int32, committer User) error {
|
|
txn, err := db.BeginTxn(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to begin transaction: %w", err)
|
|
}
|
|
defer lint.LogOnErrRollback(txn.Rollback, ctx, "rollback")
|
|
|
|
err = queryfileupload.FileUpdateCommitting(ctx, txn, int64(org.ID), int64(file_id), int64(committer.ID))
|
|
if err != nil {
|
|
return fmt.Errorf("update upload: %w", err)
|
|
}
|
|
err = background.NewCSVCommit(ctx, txn, file_id)
|
|
if err != nil {
|
|
return fmt.Errorf("background csv commit: %w", err)
|
|
}
|
|
err = txn.Commit(ctx)
|
|
|
|
return err
|
|
}
|
|
func UploadDiscard(ctx context.Context, org Organization, file_id int32) error {
|
|
_, err := psql.Update(
|
|
um.Table(models.FileuploadFiles.Alias()),
|
|
um.SetCol("status").ToArg("discarded"),
|
|
um.Where(psql.Quote("id").EQ(psql.Arg(file_id))),
|
|
um.Where(psql.Quote("organization_id").EQ(psql.Arg(org.ID))),
|
|
).Exec(ctx, db.PGInstance.BobDB)
|
|
return err
|
|
}
|
|
func UploadList(ctx context.Context, org Organization) ([]Upload, error) {
|
|
results := make([]Upload, 0)
|
|
rows, err := bob.All(ctx, db.PGInstance.BobDB, psql.Select(
|
|
sm.Columns(
|
|
// fileupload.csv columns
|
|
//"csv.file_id AS file_id",
|
|
//"csv.committed",
|
|
"csv.rowcount AS recordcount",
|
|
"csv.type_ AS type",
|
|
|
|
// fileupload.file columns
|
|
//"file.content_type",
|
|
"file.created AS created",
|
|
//"file.creator_id",
|
|
//"file.deleted",
|
|
"file.error AS error",
|
|
"file.id AS id",
|
|
"file.name AS filename",
|
|
//"file.organization_id",
|
|
"file.status AS status",
|
|
//"file.size_bytes",
|
|
//"file.file_uuid",
|
|
// Aggregate data
|
|
),
|
|
sm.From("fileupload.csv").As("csv"),
|
|
sm.InnerJoin("fileupload.file").As("file").OnEQ(psql.Raw("csv.file_id"), psql.Raw("file.id")),
|
|
sm.Where(psql.Quote("file", "organization_id").EQ(psql.Arg(org.ID))),
|
|
sm.OrderBy("created").Desc(),
|
|
), scan.StructMapper[Upload]())
|
|
if err != nil {
|
|
return results, fmt.Errorf("Failed to query pool upload rows: %w", err)
|
|
}
|
|
return rows, nil
|
|
}
|
|
func getUploadDetailPool(ctx context.Context, file *models.FileuploadFile) (*Upload, error) {
|
|
file_errors, errors_by_line, err := errorsByLine(ctx, file)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get errors by line: %w", err)
|
|
}
|
|
pool_rows, err := models.FileuploadPools.Query(
|
|
models.SelectWhere.FileuploadPools.CSVFile.EQ(file.ID),
|
|
sm.OrderBy(models.FileuploadPools.Columns.LineNumber).Asc(),
|
|
).All(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to query pools for %d: %w", file.ID, err)
|
|
}
|
|
address_ids := make([]int32, 0)
|
|
for _, r := range pool_rows {
|
|
if r.AddressID.IsValue() {
|
|
address_ids = append(address_ids, r.AddressID.MustGet())
|
|
}
|
|
}
|
|
addresses, err := types.AddressList(ctx, address_ids)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get address list: %w", err)
|
|
}
|
|
pools := make([]UploadPoolRow, 0)
|
|
count_existing := 0
|
|
count_new := 0
|
|
count_outside := 0
|
|
status := "unknown"
|
|
for _, r := range pool_rows {
|
|
if r.IsNew {
|
|
count_new = count_new + 1
|
|
status = "new"
|
|
} else {
|
|
count_existing = count_existing + 1
|
|
status = "existing"
|
|
}
|
|
if !r.IsInDistrict {
|
|
count_outside++
|
|
status = "outside"
|
|
}
|
|
tags := db.ConvertFromPGData(r.Tags)
|
|
// add 2 here because our file lines are 1-indexed and we skip the header line, but we are ranging 0-indexed
|
|
errors, ok := errors_by_line[r.LineNumber]
|
|
if !ok {
|
|
errors = []UploadPoolError{}
|
|
}
|
|
var address *types.Address
|
|
if r.AddressID.IsValue() {
|
|
var ok bool
|
|
address, ok = addresses[r.AddressID.MustGet()]
|
|
if !ok {
|
|
log.Error().Int32("id", r.AddressID.MustGet()).Msg("address missing")
|
|
continue
|
|
}
|
|
} else {
|
|
address = &types.Address{
|
|
Country: "usa",
|
|
Locality: r.AddressLocality,
|
|
Number: r.AddressNumber,
|
|
PostalCode: r.AddressPostalCode,
|
|
Region: r.AddressRegion,
|
|
Street: r.AddressStreet,
|
|
}
|
|
}
|
|
pools = append(pools, UploadPoolRow{
|
|
Address: *address,
|
|
Condition: r.Condition.String(),
|
|
Errors: errors,
|
|
Status: status,
|
|
Tags: tags,
|
|
})
|
|
}
|
|
return &Upload{
|
|
Created: file.Created,
|
|
Error: file.Error,
|
|
Filename: file.Name,
|
|
ID: file.ID,
|
|
RecordCount: len(pool_rows),
|
|
CSVPool: &CSVPoolDetail{
|
|
Count: CSVPoolDetailCount{
|
|
Existing: count_existing,
|
|
Outside: count_outside,
|
|
New: count_new,
|
|
},
|
|
Errors: file_errors,
|
|
Pools: pools,
|
|
},
|
|
Status: file.Status.String(),
|
|
}, nil
|
|
}
|