Fix notification of job happening before transaction is closed
This is kind of a wild one. Turns out that the triggers I was using actually fire before the transaction is closed and I was primarily getting lucky that the job was present on the other side of the connection rather than having things built correctly. I've fixed this by removing the trigger entirely and instead manually triggering as part of the transaction. This makes the NOTIFY call happen as soon as the transaction closes, just at the cost of making my application be in charge of ensuring the NOTIFY gets called. Seems like a win. Part of doing this is porting the existing job creation code over to use Jet. It's something I want to do anyway, so it's a win all around.
This commit is contained in:
parent
7b04822a9b
commit
393836a86a
30 changed files with 1126 additions and 72 deletions
|
|
@ -3,63 +3,53 @@ package background
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/Gleipnir-Technology/bob"
|
||||
"github.com/aarondl/opt/omit"
|
||||
"github.com/rs/zerolog/log"
|
||||
"source.gleipnir.technology/Gleipnir/nidus-sync/db"
|
||||
"source.gleipnir.technology/Gleipnir/nidus-sync/db/enums"
|
||||
"source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/public/model"
|
||||
"source.gleipnir.technology/Gleipnir/nidus-sync/db/models"
|
||||
query "source.gleipnir.technology/Gleipnir/nidus-sync/db/query/public"
|
||||
//"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func NewAudioTranscode(ctx context.Context, txn bob.Executor, audio_id int32) error {
|
||||
return newJob(ctx, txn, enums.JobtypeAudioTranscode, audio_id)
|
||||
func NewAudioTranscode(ctx context.Context, txn db.Ex, audio_id int32) error {
|
||||
return newJob(ctx, txn, model.Jobtype_AudioTranscode, audio_id)
|
||||
}
|
||||
func NewComplianceMailer(ctx context.Context, txn db.Ex, compliance_report_request_id int32) error {
|
||||
return newJob2(ctx, txn, model.Jobtype_ComplianceMailerSend, compliance_report_request_id)
|
||||
return newJob(ctx, txn, model.Jobtype_ComplianceMailerSend, compliance_report_request_id)
|
||||
}
|
||||
func NewCSVCommit(ctx context.Context, txn bob.Executor, csv_id int32) error {
|
||||
return newJob(ctx, txn, enums.JobtypeCSVCommit, csv_id)
|
||||
func NewCSVCommit(ctx context.Context, txn db.Ex, csv_id int32) error {
|
||||
return newJob(ctx, txn, model.Jobtype_CsvCommit, csv_id)
|
||||
}
|
||||
func NewCSVImport(ctx context.Context, txn bob.Executor, csv_id int32) error {
|
||||
return newJob(ctx, txn, enums.JobtypeCSVImport, csv_id)
|
||||
func NewCSVImport(ctx context.Context, txn db.Ex, csv_id int32) error {
|
||||
return newJob(ctx, txn, model.Jobtype_CsvImport, csv_id)
|
||||
}
|
||||
func NewEmailSend(ctx context.Context, txn bob.Executor, email_id int32) error {
|
||||
return newJob(ctx, txn, enums.JobtypeEmailSend, email_id)
|
||||
func NewEmailSend(ctx context.Context, txn db.Ex, email_id int32) error {
|
||||
return newJob(ctx, txn, model.Jobtype_EmailSend, email_id)
|
||||
}
|
||||
func NewLabelStudioAudioCreate(ctx context.Context, txn bob.Executor, note_audio_id int32) error {
|
||||
return newJob(ctx, txn, enums.JobtypeLabelStudioAudioCreate, note_audio_id)
|
||||
func NewLabelStudioAudioCreate(ctx context.Context, txn db.Ex, note_audio_id int32) error {
|
||||
return newJob(ctx, txn, model.Jobtype_LabelStudioAudioCreate, note_audio_id)
|
||||
}
|
||||
func NewTextRespond(ctx context.Context, txn db.Ex, text_id int32) error {
|
||||
return newJob2(ctx, txn, model.Jobtype_TextRespond, text_id)
|
||||
return newJob(ctx, txn, model.Jobtype_TextRespond, text_id)
|
||||
}
|
||||
func NewTextSend(ctx context.Context, txn db.Ex, job_id int32) error {
|
||||
return newJob2(ctx, txn, model.Jobtype_TextSend, job_id)
|
||||
return newJob(ctx, txn, model.Jobtype_TextSend, job_id)
|
||||
}
|
||||
func newJob(ctx context.Context, txn bob.Executor, t enums.Jobtype, id int32) error {
|
||||
_, err := models.Jobs.Insert(&models.JobSetter{
|
||||
Created: omit.From(time.Now()),
|
||||
// ID
|
||||
Type: omit.From(t),
|
||||
RowID: omit.From(id),
|
||||
}).One(ctx, txn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("insert job: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func newJob2(ctx context.Context, txn db.Ex, t model.Jobtype, id int32) error {
|
||||
func newJob(ctx context.Context, txn db.Ex, t model.Jobtype, id int32) error {
|
||||
job := model.Job{
|
||||
Created: time.Now(),
|
||||
Type: t,
|
||||
RowID: id,
|
||||
}
|
||||
_, err := query.JobInsert(ctx, txn, job)
|
||||
j, err := query.JobInsert(ctx, txn, job)
|
||||
if err != nil {
|
||||
return fmt.Errorf("insert job: %w", err)
|
||||
}
|
||||
err = query.JobNotify(ctx, txn, "new_job", strconv.Itoa(int(j.ID)))
|
||||
if err != nil {
|
||||
return fmt.Errorf("notify job: %w", err)
|
||||
}
|
||||
log.Debug().Int32("id", j.ID).Int32("row_id", id).Msg("created job, added notify")
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -114,7 +114,7 @@ func sendEmailBegin(ctx context.Context, source string, destination string, temp
|
|||
if err != nil {
|
||||
return fmt.Errorf("Failed to store email log: %w", err)
|
||||
}
|
||||
return background.NewEmailSend(ctx, db.PGInstance.BobDB, *e)
|
||||
return background.NewEmailSend(ctx, db.PGInstance.PGXPool, *e)
|
||||
}
|
||||
func sendEmailComplete(ctx context.Context, email_id int32) error {
|
||||
bxn := db.PGInstance.BobDB
|
||||
|
|
|
|||
|
|
@ -65,18 +65,20 @@ func HandleTextMessage(ctx context.Context, source string, destination string, c
|
|||
if err := txn.Commit(ctx); err != nil {
|
||||
return fmt.Errorf("commit: %w", err)
|
||||
}
|
||||
log.Debug().Msg("commit handle text message")
|
||||
return err
|
||||
}
|
||||
|
||||
func respondText(ctx context.Context, log_id int32) error {
|
||||
txn, err := db.BeginTxn(ctx)
|
||||
log.Debug().Msg("respond text txn begin")
|
||||
if err != nil {
|
||||
return fmt.Errorf("begin tx: %w", err)
|
||||
}
|
||||
defer lint.LogOnErrRollback(txn.Rollback, ctx, "rollback")
|
||||
l, err := querycomms.TextLogFromID(ctx, txn, int64(log_id))
|
||||
if err != nil {
|
||||
return fmt.Errorf("find comms: %w", err)
|
||||
return fmt.Errorf("find comms %d: %w", log_id, err)
|
||||
}
|
||||
src, err := ParsePhoneNumber(l.Source)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -10,13 +10,13 @@ import (
|
|||
"github.com/Gleipnir-Technology/bob/dialect/psql"
|
||||
"github.com/Gleipnir-Technology/bob/dialect/psql/sm"
|
||||
"github.com/Gleipnir-Technology/bob/dialect/psql/um"
|
||||
"github.com/aarondl/opt/omit"
|
||||
"github.com/aarondl/opt/omitnull"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/stephenafamo/scan"
|
||||
"source.gleipnir.technology/Gleipnir/nidus-sync/db"
|
||||
"source.gleipnir.technology/Gleipnir/nidus-sync/db/enums"
|
||||
modelfileupload "source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/fileupload/model"
|
||||
"source.gleipnir.technology/Gleipnir/nidus-sync/db/models"
|
||||
queryfileupload "source.gleipnir.technology/Gleipnir/nidus-sync/db/query/fileupload"
|
||||
"source.gleipnir.technology/Gleipnir/nidus-sync/lint"
|
||||
"source.gleipnir.technology/Gleipnir/nidus-sync/platform/background"
|
||||
"source.gleipnir.technology/Gleipnir/nidus-sync/platform/file"
|
||||
|
|
@ -82,34 +82,34 @@ func GetUploadDetail(ctx context.Context, organization_id int32, file_id int32)
|
|||
return nil, errors.New("No idea what to do with upload type")
|
||||
}
|
||||
|
||||
func NewUpload(ctx context.Context, u User, upload file.Upload, t enums.FileuploadCsvtype) (*int32, error) {
|
||||
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
|
||||
func NewUpload(ctx context.Context, u User, upload file.Upload, t modelfileupload.Csvtype) (*int32, error) {
|
||||
txn, err := db.BeginTxn(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to begin transaction: %w", err)
|
||||
}
|
||||
defer lint.LogOnErrRollback(txn.Rollback, ctx, "rollback")
|
||||
|
||||
file, err := models.FileuploadFiles.Insert(&models.FileuploadFileSetter{
|
||||
ContentType: omit.From(upload.ContentType),
|
||||
Created: omit.From(time.Now()),
|
||||
CreatorID: omit.From(int32(u.ID)),
|
||||
Deleted: omitnull.FromPtr[time.Time](nil),
|
||||
Error: omit.From(""),
|
||||
Name: omit.From(upload.Name),
|
||||
OrganizationID: omit.From(u.Organization.ID),
|
||||
Status: omit.From(enums.FileuploadFilestatustypeUploaded),
|
||||
SizeBytes: omit.From(int32(upload.SizeBytes)),
|
||||
FileUUID: omit.From(upload.UUID),
|
||||
}).One(ctx, txn)
|
||||
file, err := queryfileupload.FileInsert(ctx, txn, modelfileupload.File{
|
||||
ContentType: upload.ContentType,
|
||||
Created: time.Now(),
|
||||
CreatorID: int32(u.ID),
|
||||
Deleted: nil,
|
||||
Error: "",
|
||||
Name: upload.Name,
|
||||
OrganizationID: u.Organization.ID,
|
||||
Status: modelfileupload.Filestatustype_Uploaded,
|
||||
SizeBytes: int32(upload.SizeBytes),
|
||||
FileUUID: upload.UUID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to create file upload: %w", err)
|
||||
}
|
||||
_, err = models.FileuploadCSVS.Insert(&models.FileuploadCSVSetter{
|
||||
Committed: omitnull.FromPtr[time.Time](nil),
|
||||
FileID: omit.From(file.ID),
|
||||
Rowcount: omit.From(int32(0)),
|
||||
Type: omit.From(t),
|
||||
}).One(ctx, txn)
|
||||
_, err = queryfileupload.CSVInsert(ctx, txn, modelfileupload.Csv{
|
||||
Committed: nil,
|
||||
FileID: file.ID,
|
||||
Rowcount: 0,
|
||||
Type: t,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to create csv: %w", err)
|
||||
}
|
||||
|
|
@ -124,19 +124,13 @@ func NewUpload(ctx context.Context, u User, upload file.Upload, t enums.Fileuplo
|
|||
return &file.ID, nil
|
||||
}
|
||||
func UploadCommit(ctx context.Context, org Organization, file_id int32, committer User) error {
|
||||
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
|
||||
txn, err := db.BeginTxn(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to begin transaction: %w", err)
|
||||
}
|
||||
defer lint.LogOnErrRollback(txn.Rollback, ctx, "rollback")
|
||||
|
||||
_, err = psql.Update(
|
||||
um.Table(models.FileuploadFiles.Alias()),
|
||||
um.SetCol("status").ToArg("committing"),
|
||||
um.SetCol("committer").ToArg(committer.ID),
|
||||
um.Where(psql.Quote("id").EQ(psql.Arg(file_id))),
|
||||
um.Where(psql.Quote("organization_id").EQ(psql.Arg(org.ID))),
|
||||
).Exec(ctx, txn)
|
||||
err = queryfileupload.FileUpdateCommitting(ctx, txn, int64(org.ID), int64(file_id), int64(committer.ID))
|
||||
if err != nil {
|
||||
return fmt.Errorf("update upload: %w", err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue