Rework background jobs to make transactions much shorter

I ended up with minutes-long open transactions in the database in prod
which was causing outtages. This is because I thought transactions were
basically free, which is a terrible thing to think. Instead we'll just
open them when we need them.
This commit is contained in:
Eli Ribble 2026-04-17 22:53:23 +00:00
parent 55cb4ca962
commit ade629ecf5
No known key found for this signature in database
11 changed files with 118 additions and 87 deletions

View file

@ -110,12 +110,36 @@ type RequestLetterCreate struct {
UseType string
}
/*
{
"error": {
"message": "address_zip is required",
"status_code": 422,
"code": "invalid"
}
}
*/
type Error struct {
Code string `json:"code"`
Message string `json:"message"`
StatusCode int `json:"status_code"`
}
type ResponseError struct {
InnerError Error `json:"error"`
}
func (re ResponseError) Error() string {
return fmt.Sprintf("%d %s %s", re.InnerError.StatusCode, re.InnerError.Code, re.InnerError.Message)
}
func (l *Lob) AddressCreate(ctx context.Context, req RequestAddressCreate) (Address, error) {
var result Address
var error_response ResponseError
resp, err := l.client.R().
SetBody(req).
SetContext(ctx).
SetContentType("application/json").
SetError(&error_response).
SetResult(&result).
SetPathParam("urlBase", l.urlBaseApi).
Post("https://{urlBase}/v1/addresses")
@ -123,20 +147,18 @@ func (l *Lob) AddressCreate(ctx context.Context, req RequestAddressCreate) (Addr
return result, fmt.Errorf("address list post: %w", err)
}
if !resp.IsSuccess() {
content, err := io.ReadAll(resp.Body)
if err != nil {
return result, fmt.Errorf("not successful, and can't read response body")
}
return result, fmt.Errorf("not successful: %s", string(content))
return result, fmt.Errorf("not successful: %v", error_response)
}
return result, nil
}
func (l *Lob) AddressList(ctx context.Context) ([]Address, error) {
var result ResponseAddressList
var error_response ResponseError
resp, err := l.client.R().
//SetQueryParamsFromValues(query).
SetContext(ctx).
SetError(&error_response).
SetResult(&result).
SetPathParam("urlBase", l.urlBaseApi).
Get("https://{urlBase}/v1/addresses")
@ -150,12 +172,15 @@ func (l *Lob) AddressList(ctx context.Context) ([]Address, error) {
}
func (l *Lob) LetterCreate(ctx context.Context, req RequestLetterCreate) (Letter, error) {
var error_response ResponseError
var result Letter
color_str := "false"
if req.Color {
color_str = "true"
}
resp, err := l.client.R().
SetContext(ctx).
SetError(&error_response).
SetMultipartField(
"file",
"content.pdf",
@ -168,7 +193,6 @@ func (l *Lob) LetterCreate(ctx context.Context, req RequestLetterCreate) (Letter
"to": req.To,
"use_type": req.UseType,
}).
SetContext(ctx).
SetResult(&result).
SetPathParam("urlBase", l.urlBaseApi).
Post("https://{urlBase}/v1/letters")
@ -181,11 +205,13 @@ func (l *Lob) LetterCreate(ctx context.Context, req RequestLetterCreate) (Letter
return result, nil
}
func (l *Lob) LetterList(ctx context.Context) ([]Letter, error) {
var error_response ResponseError
var result ResponseLetterList
resp, err := l.client.R().
//SetQueryParamsFromValues(query).
SetContext(ctx).
SetError(&error_response).
SetResult(&result).
SetPathParam("urlBase", l.urlBaseApi).
Get("https://{urlBase}/v1/letters")

View file

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
//"github.com/Gleipnir-Technology/nidus-sync/platform/background"
@ -13,7 +12,7 @@ import (
//"github.com/rs/zerolog/log"
)
func processAudioFile(ctx context.Context, txn bob.Executor, audio_id int32) error {
func processAudioFile(ctx context.Context, audio_id int32) error {
a, err := models.NoteAudios.Query(
models.SelectWhere.NoteAudios.ID.EQ(audio_id),
).One(ctx, db.PGInstance.BobDB)

View file

@ -33,20 +33,21 @@ import (
type csvParserFunc[T any] = func(context.Context, bob.Tx, *models.FileuploadFile, *models.FileuploadCSV) ([]T, error)
type csvProcessorFunc[T any] = func(context.Context, bob.Tx, *models.FileuploadFile, *models.FileuploadCSV, []T) error
func JobCommit(ctx context.Context, txn bob.Executor, file_id int32) error {
func JobCommit(ctx context.Context, file_id int32) error {
log.Debug().Int32("file_id", file_id).Msg("begin job commit")
file, err := models.FindFileuploadFile(ctx, txn, file_id)
bxn := db.PGInstance.BobDB
file, err := models.FindFileuploadFile(ctx, bxn, file_id)
if err != nil {
return fmt.Errorf("Failed to get csv file %d from DB: %w", file_id, err)
}
org, err := models.FindOrganization(ctx, txn, file.OrganizationID)
org, err := models.FindOrganization(ctx, bxn, file.OrganizationID)
if err != nil {
return fmt.Errorf("Failed to get org %d from DB: %w", file.OrganizationID, err)
}
rows, err := models.FileuploadPools.Query(
models.SelectWhere.FileuploadPools.CSVFile.EQ(file_id),
).All(ctx, txn)
).All(ctx, bxn)
if err != nil {
return fmt.Errorf("Failed to get all rows of file %d: %w", file_id, err)
}
@ -60,6 +61,11 @@ func JobCommit(ctx context.Context, txn bob.Executor, file_id int32) error {
if err != nil {
return fmt.Errorf("get address list: %w", err)
}
txn, err := bxn.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
defer txn.Rollback(ctx)
for _, row := range rows {
var a *types.Address
var parcel *models.Parcel
@ -173,18 +179,20 @@ func JobCommit(ctx context.Context, txn bob.Executor, file_id int32) error {
return fmt.Errorf("update file status to committed: %w", err)
}
event.Updated(event.TypeFileCSV, file.OrganizationID, strconv.Itoa(int(file.ID)))
defer txn.Commit(ctx)
return nil
}
func JobImport(ctx context.Context, txn bob.Executor, file_id int32) error {
func JobImport(ctx context.Context, file_id int32) error {
bxn := db.PGInstance.BobDB
file, err := models.FileuploadFiles.Query(
models.SelectWhere.FileuploadFiles.ID.EQ(file_id),
).One(ctx, txn)
).One(ctx, bxn)
if err != nil {
return fmt.Errorf("find file: %w", err)
}
csv, err := models.FileuploadCSVS.Query(
models.SelectWhere.FileuploadCSVS.FileID.EQ(file_id),
).One(ctx, txn)
).One(ctx, bxn)
if err != nil {
return fmt.Errorf("find csv: %w", err)
}
@ -202,7 +210,7 @@ func JobImport(ctx context.Context, txn bob.Executor, file_id int32) error {
um.SetCol("status").ToArg("error"),
um.SetCol("error").ToArg(err.Error()),
um.Where(psql.Quote("id").EQ(psql.Arg(file_id))),
).Exec(ctx, db.PGInstance.BobDB)
).Exec(ctx, bxn)
if err != nil {
log.Error().Err(err).Msg("Failed to set upload to error status")
}

View file

@ -9,7 +9,6 @@ import (
"strings"
"time"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/nidus-sync/comms/email"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db"
@ -116,8 +115,9 @@ func sendEmailBegin(ctx context.Context, source string, destination string, temp
}
return background.NewEmailSend(ctx, db.PGInstance.BobDB, *e)
}
func sendEmailComplete(ctx context.Context, txn bob.Executor, email_id int32) error {
email_log, err := models.FindCommsEmailLog(ctx, txn, email_id)
func sendEmailComplete(ctx context.Context, email_id int32) error {
bxn := db.PGInstance.BobDB
email_log, err := models.FindCommsEmailLog(ctx, bxn, email_id)
if err != nil {
return fmt.Errorf("find email: %w", err)
}

View file

@ -2,11 +2,9 @@ package email
import (
"context"
"github.com/Gleipnir-Technology/bob"
//"github.com/rs/zerolog/log"
)
func Job(ctx context.Context, txn bob.Executor, email_id int32) error {
return sendEmailComplete(ctx, txn, email_id)
func Job(ctx context.Context, email_id int32) error {
return sendEmailComplete(ctx, email_id)
}

View file

@ -8,7 +8,6 @@ import (
"log"
"os"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/label-studio"
@ -74,7 +73,7 @@ func createLabelStudioClient() (*labelstudio.Client, error) {
func noteAudioGetLatest(ctx context.Context, uuid string) (*models.NoteAudio, error) {
return nil, nil
}
func jobLabelStudioAudioCreate(ctx context.Context, txn bob.Executor, row_id int32) error {
func jobLabelStudioAudioCreate(ctx context.Context, row_id int32) error {
return fmt.Errorf("label studio integration has been disabled")
/*
customer := os.Getenv("CUSTOMER")

View file

@ -6,8 +6,8 @@ import (
"fmt"
"time"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/lob"
"github.com/Gleipnir-Technology/nidus-sync/platform/file"
@ -17,8 +17,9 @@ import (
"github.com/rs/zerolog/log"
)
func ComplianceSend(ctx context.Context, txn bob.Executor, row_id int32) error {
compliance_req, err := models.FindComplianceReportRequest(ctx, txn, row_id)
func ComplianceSend(ctx context.Context, row_id int32) error {
bxn := db.PGInstance.BobDB
compliance_req, err := models.FindComplianceReportRequest(ctx, bxn, row_id)
if err != nil {
return fmt.Errorf("find compliance report: %w", err)
}
@ -28,7 +29,7 @@ func ComplianceSend(ctx context.Context, txn bob.Executor, row_id int32) error {
return fmt.Errorf("no lead for compliance req %d", compliance_req.ID)
}
lead_id := compliance_req.LeadID.MustGet()
lead, err := models.FindLead(ctx, txn, lead_id)
lead, err := models.FindLead(ctx, bxn, lead_id)
if err != nil {
return fmt.Errorf("find lead: %w", err)
}
@ -37,17 +38,17 @@ func ComplianceSend(ctx context.Context, txn bob.Executor, row_id int32) error {
return fmt.Errorf("no site for lead %d", lead.ID)
}
site_id := lead.SiteID.MustGet()
site, err := models.FindSite(ctx, txn, site_id)
site, err := models.FindSite(ctx, bxn, site_id)
if err != nil {
return fmt.Errorf("find site: %w", err)
}
address, err := models.FindAddress(ctx, txn, site.AddressID)
address, err := models.FindAddress(ctx, bxn, site.AddressID)
if err != nil {
return fmt.Errorf("find address: %w", err)
}
organization, err := models.FindOrganization(ctx, txn, site.OrganizationID)
organization, err := models.FindOrganization(ctx, bxn, site.OrganizationID)
if err != nil {
return fmt.Errorf("find address: %w", err)
}
@ -79,6 +80,11 @@ func ComplianceSend(ctx context.Context, txn bob.Executor, row_id int32) error {
if err != nil {
return fmt.Errorf("generate uuid: %w", err)
}
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("start txn: %w", err)
}
defer txn.Rollback(nil)
mailer, err := models.CommsMailers.Insert(&models.CommsMailerSetter{
AddressID: omit.From(address.ID),
Created: omit.From(time.Now()),
@ -100,6 +106,7 @@ func ComplianceSend(ctx context.Context, txn bob.Executor, row_id int32) error {
return fmt.Errorf("create crrm: %w", err)
}
log.Info().Int32("id", crrm.ID).Msg("Created compliance report request mailer")
txn.Commit(ctx)
return nil
}

View file

@ -7,7 +7,6 @@ import (
"sync"
"time"
"github.com/Gleipnir-Technology/bob"
//"github.com/Gleipnir-Technology/bob/dialect/psql"
//"github.com/Gleipnir-Technology/bob/dialect/psql/sm"
"github.com/Gleipnir-Technology/nidus-sync/config"
@ -21,7 +20,6 @@ import (
"github.com/Gleipnir-Technology/nidus-sync/platform/geocode"
"github.com/Gleipnir-Technology/nidus-sync/platform/mailer"
"github.com/Gleipnir-Technology/nidus-sync/platform/text"
"github.com/jackc/pgx/v5"
//"github.com/Gleipnir-Technology/nidus-sync/userfile"
//"github.com/google/uuid"
"github.com/rs/zerolog/log"
@ -87,55 +85,43 @@ func addWaitingJobs(ctx context.Context) error {
for _, job := range jobs {
sublog := log.With().Int32("job", job.ID).Int32("row_id", job.RowID).Str("type", string(job.Type)).Logger()
sublog.Info().Msg("begin restarted background job")
txn, err := db.PGInstance.BobDB.Begin(ctx)
if err != nil {
sublog.Error().Err(err).Msg("failed begin txn")
continue
}
defer txn.Rollback(ctx)
app_name := fmt.Sprintf("restarted job %d", job.ID)
txn.Exec(fmt.Sprintf("SET application_name = '%s'", app_name))
err = handleJob(ctx, txn, job)
err = handleJob(ctx, job)
if err != nil {
sublog.Error().Err(err).Msg("failed handle job")
continue
}
err = job.Delete(ctx, txn)
err = job.Delete(ctx, db.PGInstance.BobDB)
if err != nil {
sublog.Error().Err(err).Msg("failed delete job")
continue
}
sublog.Info().Msg("job complete")
txn.Commit(ctx)
}
}()
return nil
}
func handleJob(ctx context.Context, txn bob.Executor, job *models.Job) error {
func handleJob(ctx context.Context, job *models.Job) error {
switch job.Type {
case enums.JobtypeAudioTranscode:
return processAudioFile(ctx, txn, job.RowID)
return processAudioFile(ctx, job.RowID)
case enums.JobtypeComplianceMailerSend:
return mailer.ComplianceSend(ctx, txn, job.RowID)
return mailer.ComplianceSend(ctx, job.RowID)
case enums.JobtypeCSVCommit:
return csv.JobCommit(ctx, txn, job.RowID)
return csv.JobCommit(ctx, job.RowID)
case enums.JobtypeCSVImport:
return csv.JobImport(ctx, txn, job.RowID)
return csv.JobImport(ctx, job.RowID)
case enums.JobtypeLabelStudioAudioCreate:
return handleJobLabelStudioAudioCreate(ctx, txn, job.RowID)
return jobLabelStudioAudioCreate(ctx, job.RowID)
case enums.JobtypeEmailSend:
return email.Job(ctx, txn, job.RowID)
return email.Job(ctx, job.RowID)
case enums.JobtypeTextRespond:
return text.JobRespond(ctx, txn, job.RowID)
return text.JobRespond(ctx, job.RowID)
case enums.JobtypeTextSend:
return text.JobSend(ctx, txn, job.RowID)
return text.JobSend(ctx, job.RowID)
default:
return fmt.Errorf("No handler for job type %s", string(job.Type))
}
}
func handleJobLabelStudioAudioCreate(ctx context.Context, txn bob.Executor, row_id int32) error {
return jobLabelStudioAudioCreate(ctx, txn, row_id)
}
func listenForJobs(ctx context.Context) {
for {
//es.SendQueuedEmails(ctx) // send any emails queued prior to listening for notificiations
@ -189,26 +175,16 @@ func listenAndDoOneJob(ctx context.Context) error {
}
sublog := log.With().Int32("job", job.ID).Int32("row_id", job.RowID).Str("type", string(job.Type)).Logger()
tx, err := conn.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return fmt.Errorf("Failed to start transaction: %w", err)
}
defer tx.Rollback(ctx)
ctx, cancel := context.WithCancel(ctx)
txn := bobpgx.NewTx(tx, cancel)
defer txn.Rollback(ctx)
err = handleJob(ctx, txn, job)
err = handleJob(ctx, job)
if err != nil {
sublog.Error().Err(err).Msg("failed to handle job")
return nil
}
err = job.Delete(ctx, txn)
err = job.Delete(ctx, db.PGInstance.BobDB)
if err != nil {
sublog.Error().Err(err).Msg("failed to delete job")
return fmt.Errorf("delete job: %w", err)
}
txn.Commit(ctx)
sublog.Debug().Msg("job complete")
}
}

View file

@ -4,33 +4,35 @@ import (
"context"
"fmt"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/platform/types"
//"github.com/rs/zerolog/log"
)
func JobRespond(ctx context.Context, txn bob.Executor, log_id int32) error {
return respondText(ctx, txn, log_id)
func JobRespond(ctx context.Context, log_id int32) error {
return respondText(ctx, log_id)
}
func JobSend(ctx context.Context, txn bob.Executor, job_id int32) error {
job, err := models.FindCommsTextJob(ctx, txn, job_id)
func JobSend(ctx context.Context, job_id int32) error {
bxn := db.PGInstance.BobDB
job, err := models.FindCommsTextJob(ctx, bxn, job_id)
if err != nil {
return fmt.Errorf("find text: %w", err)
}
//log.Debug().Int32("job.id", job.ID).Msg("completing text job")
return sendTextComplete(ctx, txn, job)
return sendTextComplete(ctx, job)
}
func handleWaitingTextJobs(ctx context.Context, txn bob.Executor, dst types.E164) error {
func handleWaitingTextJobs(ctx context.Context, dst types.E164) error {
bxn := db.PGInstance.BobDB
jobs, err := models.CommsTextJobs.Query(
models.SelectWhere.CommsTextJobs.Destination.EQ(dst.PhoneString()),
models.SelectWhere.CommsTextJobs.Completed.IsNull(),
).All(ctx, txn)
).All(ctx, bxn)
if err != nil {
return fmt.Errorf("query jobs: %w", err)
}
for _, job := range jobs {
err = sendTextComplete(ctx, txn, job)
err = sendTextComplete(ctx, job)
if err != nil {
return fmt.Errorf("send text complete: %w", err)
}

View file

@ -8,14 +8,14 @@ import (
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/nidus-sync/comms/text"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
//"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/platform/background"
"github.com/Gleipnir-Technology/nidus-sync/platform/event"
"github.com/Gleipnir-Technology/nidus-sync/platform/types"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
"github.com/rs/zerolog/log"
)
@ -100,7 +100,12 @@ func sendTextCommandResponse(ctx context.Context, txn bob.Executor, dst types.E1
_, err := sendTextDirect(ctx, txn, enums.CommsTextoriginCommandResponse, dst.PhoneString(), content, false, false)
return err
}
func sendTextComplete(ctx context.Context, txn bob.Executor, job *models.CommsTextJob) error {
func sendTextComplete(ctx context.Context, job *models.CommsTextJob) error {
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer txn.Rollback(ctx)
dst, err := ParsePhoneNumber(job.Destination)
if err != nil {
return fmt.Errorf("parse phone: %w", err)
@ -171,6 +176,7 @@ func sendTextComplete(ctx context.Context, txn bob.Executor, job *models.CommsTe
} else {
log.Debug().Msg("no report info on text")
}
txn.Commit(ctx)
return nil
}

View file

@ -6,7 +6,6 @@ import (
"strings"
"time"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
@ -67,7 +66,12 @@ func HandleTextMessage(ctx context.Context, source string, destination string, c
return err
}
func respondText(ctx context.Context, txn bob.Executor, log_id int32) error {
func respondText(ctx context.Context, log_id int32) error {
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer txn.Rollback(ctx)
l, err := models.FindCommsTextLog(ctx, txn, log_id)
if err != nil {
return fmt.Errorf("find comms: %w", err)
@ -96,7 +100,7 @@ func respondText(ctx context.Context, txn bob.Executor, log_id int32) error {
if err != nil {
return fmt.Errorf("send response: %w", err)
}
handleWaitingTextJobs(ctx, txn, *src)
handleWaitingTextJobs(ctx, *src)
// We don't handle 'stop' here because we allow them to say 'stop' at any time, regardless of
// phone status.
//case "stop":
@ -150,10 +154,10 @@ func respondText(ctx context.Context, txn bob.Executor, log_id int32) error {
return nil
}
// Otherwise let the LLM handle the response
return respondTextLLM(ctx, txn, *src)
return respondTextLLM(ctx, *src)
}
func respondTextLLM(ctx context.Context, txn bob.Executor, src types.E164) error {
func respondTextLLM(ctx context.Context, src types.E164) error {
previous_messages, err := loadPreviousMessagesForLLM(ctx, src)
if err != nil {
return fmt.Errorf("Failed to get previous messages: %w", err)
@ -163,10 +167,16 @@ func respondTextLLM(ctx context.Context, txn bob.Executor, src types.E164) error
if err != nil {
return fmt.Errorf("Failed to generate next message: %w", err)
}
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("start txn: %w", err)
}
defer txn.Rollback(ctx)
_, err = sendTextDirect(ctx, txn, enums.CommsTextoriginLLM, src.PhoneString(), next_message.Content, true, false)
if err != nil {
return fmt.Errorf("Failed to send response text: %w", err)
}
txn.Commit(ctx)
return nil
}