diff --git a/lob/lob.go b/lob/lob.go index 3ef9c862..b95250e1 100644 --- a/lob/lob.go +++ b/lob/lob.go @@ -110,12 +110,36 @@ type RequestLetterCreate struct { UseType string } +/* + { + "error": { + "message": "address_zip is required", + "status_code": 422, + "code": "invalid" + } + } +*/ +type Error struct { + Code string `json:"code"` + Message string `json:"message"` + StatusCode int `json:"status_code"` +} +type ResponseError struct { + InnerError Error `json:"error"` +} + +func (re ResponseError) Error() string { + return fmt.Sprintf("%d %s %s", re.InnerError.StatusCode, re.InnerError.Code, re.InnerError.Message) +} + func (l *Lob) AddressCreate(ctx context.Context, req RequestAddressCreate) (Address, error) { var result Address + var error_response ResponseError resp, err := l.client.R(). SetBody(req). SetContext(ctx). SetContentType("application/json"). + SetError(&error_response). SetResult(&result). SetPathParam("urlBase", l.urlBaseApi). Post("https://{urlBase}/v1/addresses") @@ -123,20 +147,18 @@ func (l *Lob) AddressCreate(ctx context.Context, req RequestAddressCreate) (Addr return result, fmt.Errorf("address list post: %w", err) } if !resp.IsSuccess() { - content, err := io.ReadAll(resp.Body) - if err != nil { - return result, fmt.Errorf("not successful, and can't read response body") - } - return result, fmt.Errorf("not successful: %s", string(content)) + return result, fmt.Errorf("not successful: %v", error_response) } return result, nil } func (l *Lob) AddressList(ctx context.Context) ([]Address, error) { var result ResponseAddressList + var error_response ResponseError resp, err := l.client.R(). //SetQueryParamsFromValues(query). SetContext(ctx). + SetError(&error_response). SetResult(&result). SetPathParam("urlBase", l.urlBaseApi). Get("https://{urlBase}/v1/addresses") @@ -150,12 +172,15 @@ func (l *Lob) AddressList(ctx context.Context) ([]Address, error) { } func (l *Lob) LetterCreate(ctx context.Context, req RequestLetterCreate) (Letter, error) { + var error_response ResponseError var result Letter color_str := "false" if req.Color { color_str = "true" } resp, err := l.client.R(). + SetContext(ctx). + SetError(&error_response). SetMultipartField( "file", "content.pdf", @@ -168,7 +193,6 @@ func (l *Lob) LetterCreate(ctx context.Context, req RequestLetterCreate) (Letter "to": req.To, "use_type": req.UseType, }). - SetContext(ctx). SetResult(&result). SetPathParam("urlBase", l.urlBaseApi). Post("https://{urlBase}/v1/letters") @@ -181,11 +205,13 @@ func (l *Lob) LetterCreate(ctx context.Context, req RequestLetterCreate) (Letter return result, nil } func (l *Lob) LetterList(ctx context.Context) ([]Letter, error) { + var error_response ResponseError var result ResponseLetterList resp, err := l.client.R(). //SetQueryParamsFromValues(query). SetContext(ctx). + SetError(&error_response). SetResult(&result). SetPathParam("urlBase", l.urlBaseApi). Get("https://{urlBase}/v1/letters") diff --git a/platform/audio.go b/platform/audio.go index 2b881344..1cfda188 100644 --- a/platform/audio.go +++ b/platform/audio.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/Gleipnir-Technology/bob" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/models" //"github.com/Gleipnir-Technology/nidus-sync/platform/background" @@ -13,7 +12,7 @@ import ( //"github.com/rs/zerolog/log" ) -func processAudioFile(ctx context.Context, txn bob.Executor, audio_id int32) error { +func processAudioFile(ctx context.Context, audio_id int32) error { a, err := models.NoteAudios.Query( models.SelectWhere.NoteAudios.ID.EQ(audio_id), ).One(ctx, db.PGInstance.BobDB) diff --git a/platform/csv/csv.go b/platform/csv/csv.go index aafa3a75..5e846467 100644 --- a/platform/csv/csv.go +++ b/platform/csv/csv.go @@ -33,20 +33,21 @@ import ( type csvParserFunc[T any] = func(context.Context, bob.Tx, *models.FileuploadFile, *models.FileuploadCSV) ([]T, error) type csvProcessorFunc[T any] = func(context.Context, bob.Tx, *models.FileuploadFile, *models.FileuploadCSV, []T) error -func JobCommit(ctx context.Context, txn bob.Executor, file_id int32) error { +func JobCommit(ctx context.Context, file_id int32) error { log.Debug().Int32("file_id", file_id).Msg("begin job commit") - file, err := models.FindFileuploadFile(ctx, txn, file_id) + bxn := db.PGInstance.BobDB + file, err := models.FindFileuploadFile(ctx, bxn, file_id) if err != nil { return fmt.Errorf("Failed to get csv file %d from DB: %w", file_id, err) } - org, err := models.FindOrganization(ctx, txn, file.OrganizationID) + org, err := models.FindOrganization(ctx, bxn, file.OrganizationID) if err != nil { return fmt.Errorf("Failed to get org %d from DB: %w", file.OrganizationID, err) } rows, err := models.FileuploadPools.Query( models.SelectWhere.FileuploadPools.CSVFile.EQ(file_id), - ).All(ctx, txn) + ).All(ctx, bxn) if err != nil { return fmt.Errorf("Failed to get all rows of file %d: %w", file_id, err) } @@ -60,6 +61,11 @@ func JobCommit(ctx context.Context, txn bob.Executor, file_id int32) error { if err != nil { return fmt.Errorf("get address list: %w", err) } + txn, err := bxn.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } + defer txn.Rollback(ctx) for _, row := range rows { var a *types.Address var parcel *models.Parcel @@ -173,18 +179,20 @@ func JobCommit(ctx context.Context, txn bob.Executor, file_id int32) error { return fmt.Errorf("update file status to committed: %w", err) } event.Updated(event.TypeFileCSV, file.OrganizationID, strconv.Itoa(int(file.ID))) + defer txn.Commit(ctx) return nil } -func JobImport(ctx context.Context, txn bob.Executor, file_id int32) error { +func JobImport(ctx context.Context, file_id int32) error { + bxn := db.PGInstance.BobDB file, err := models.FileuploadFiles.Query( models.SelectWhere.FileuploadFiles.ID.EQ(file_id), - ).One(ctx, txn) + ).One(ctx, bxn) if err != nil { return fmt.Errorf("find file: %w", err) } csv, err := models.FileuploadCSVS.Query( models.SelectWhere.FileuploadCSVS.FileID.EQ(file_id), - ).One(ctx, txn) + ).One(ctx, bxn) if err != nil { return fmt.Errorf("find csv: %w", err) } @@ -202,7 +210,7 @@ func JobImport(ctx context.Context, txn bob.Executor, file_id int32) error { um.SetCol("status").ToArg("error"), um.SetCol("error").ToArg(err.Error()), um.Where(psql.Quote("id").EQ(psql.Arg(file_id))), - ).Exec(ctx, db.PGInstance.BobDB) + ).Exec(ctx, bxn) if err != nil { log.Error().Err(err).Msg("Failed to set upload to error status") } diff --git a/platform/email/email.go b/platform/email/email.go index b3531f14..4f9b5e1d 100644 --- a/platform/email/email.go +++ b/platform/email/email.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "github.com/Gleipnir-Technology/bob" "github.com/Gleipnir-Technology/nidus-sync/comms/email" "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db" @@ -116,8 +115,9 @@ func sendEmailBegin(ctx context.Context, source string, destination string, temp } return background.NewEmailSend(ctx, db.PGInstance.BobDB, *e) } -func sendEmailComplete(ctx context.Context, txn bob.Executor, email_id int32) error { - email_log, err := models.FindCommsEmailLog(ctx, txn, email_id) +func sendEmailComplete(ctx context.Context, email_id int32) error { + bxn := db.PGInstance.BobDB + email_log, err := models.FindCommsEmailLog(ctx, bxn, email_id) if err != nil { return fmt.Errorf("find email: %w", err) } diff --git a/platform/email/job.go b/platform/email/job.go index bfb93151..62cbe650 100644 --- a/platform/email/job.go +++ b/platform/email/job.go @@ -2,11 +2,9 @@ package email import ( "context" - - "github.com/Gleipnir-Technology/bob" //"github.com/rs/zerolog/log" ) -func Job(ctx context.Context, txn bob.Executor, email_id int32) error { - return sendEmailComplete(ctx, txn, email_id) +func Job(ctx context.Context, email_id int32) error { + return sendEmailComplete(ctx, email_id) } diff --git a/platform/label_studio.go b/platform/label_studio.go index 190559dd..a5af5dcf 100644 --- a/platform/label_studio.go +++ b/platform/label_studio.go @@ -8,7 +8,6 @@ import ( "log" "os" - "github.com/Gleipnir-Technology/bob" "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/label-studio" @@ -74,7 +73,7 @@ func createLabelStudioClient() (*labelstudio.Client, error) { func noteAudioGetLatest(ctx context.Context, uuid string) (*models.NoteAudio, error) { return nil, nil } -func jobLabelStudioAudioCreate(ctx context.Context, txn bob.Executor, row_id int32) error { +func jobLabelStudioAudioCreate(ctx context.Context, row_id int32) error { return fmt.Errorf("label studio integration has been disabled") /* customer := os.Getenv("CUSTOMER") diff --git a/platform/mailer/mailer.go b/platform/mailer/mailer.go index ed7be499..521d0160 100644 --- a/platform/mailer/mailer.go +++ b/platform/mailer/mailer.go @@ -6,8 +6,8 @@ import ( "fmt" "time" - "github.com/Gleipnir-Technology/bob" "github.com/Gleipnir-Technology/nidus-sync/config" + "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/lob" "github.com/Gleipnir-Technology/nidus-sync/platform/file" @@ -17,8 +17,9 @@ import ( "github.com/rs/zerolog/log" ) -func ComplianceSend(ctx context.Context, txn bob.Executor, row_id int32) error { - compliance_req, err := models.FindComplianceReportRequest(ctx, txn, row_id) +func ComplianceSend(ctx context.Context, row_id int32) error { + bxn := db.PGInstance.BobDB + compliance_req, err := models.FindComplianceReportRequest(ctx, bxn, row_id) if err != nil { return fmt.Errorf("find compliance report: %w", err) } @@ -28,7 +29,7 @@ func ComplianceSend(ctx context.Context, txn bob.Executor, row_id int32) error { return fmt.Errorf("no lead for compliance req %d", compliance_req.ID) } lead_id := compliance_req.LeadID.MustGet() - lead, err := models.FindLead(ctx, txn, lead_id) + lead, err := models.FindLead(ctx, bxn, lead_id) if err != nil { return fmt.Errorf("find lead: %w", err) } @@ -37,17 +38,17 @@ func ComplianceSend(ctx context.Context, txn bob.Executor, row_id int32) error { return fmt.Errorf("no site for lead %d", lead.ID) } site_id := lead.SiteID.MustGet() - site, err := models.FindSite(ctx, txn, site_id) + site, err := models.FindSite(ctx, bxn, site_id) if err != nil { return fmt.Errorf("find site: %w", err) } - address, err := models.FindAddress(ctx, txn, site.AddressID) + address, err := models.FindAddress(ctx, bxn, site.AddressID) if err != nil { return fmt.Errorf("find address: %w", err) } - organization, err := models.FindOrganization(ctx, txn, site.OrganizationID) + organization, err := models.FindOrganization(ctx, bxn, site.OrganizationID) if err != nil { return fmt.Errorf("find address: %w", err) } @@ -79,6 +80,11 @@ func ComplianceSend(ctx context.Context, txn bob.Executor, row_id int32) error { if err != nil { return fmt.Errorf("generate uuid: %w", err) } + txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("start txn: %w", err) + } + defer txn.Rollback(nil) mailer, err := models.CommsMailers.Insert(&models.CommsMailerSetter{ AddressID: omit.From(address.ID), Created: omit.From(time.Now()), @@ -100,6 +106,7 @@ func ComplianceSend(ctx context.Context, txn bob.Executor, row_id int32) error { return fmt.Errorf("create crrm: %w", err) } log.Info().Int32("id", crrm.ID).Msg("Created compliance report request mailer") + txn.Commit(ctx) return nil } diff --git a/platform/start.go b/platform/start.go index d524d7f5..3ca790a1 100644 --- a/platform/start.go +++ b/platform/start.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/Gleipnir-Technology/bob" //"github.com/Gleipnir-Technology/bob/dialect/psql" //"github.com/Gleipnir-Technology/bob/dialect/psql/sm" "github.com/Gleipnir-Technology/nidus-sync/config" @@ -21,7 +20,6 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/platform/geocode" "github.com/Gleipnir-Technology/nidus-sync/platform/mailer" "github.com/Gleipnir-Technology/nidus-sync/platform/text" - "github.com/jackc/pgx/v5" //"github.com/Gleipnir-Technology/nidus-sync/userfile" //"github.com/google/uuid" "github.com/rs/zerolog/log" @@ -87,55 +85,43 @@ func addWaitingJobs(ctx context.Context) error { for _, job := range jobs { sublog := log.With().Int32("job", job.ID).Int32("row_id", job.RowID).Str("type", string(job.Type)).Logger() sublog.Info().Msg("begin restarted background job") - txn, err := db.PGInstance.BobDB.Begin(ctx) - if err != nil { - sublog.Error().Err(err).Msg("failed begin txn") - continue - } - defer txn.Rollback(ctx) - app_name := fmt.Sprintf("restarted job %d", job.ID) - txn.Exec(fmt.Sprintf("SET application_name = '%s'", app_name)) - err = handleJob(ctx, txn, job) + err = handleJob(ctx, job) if err != nil { sublog.Error().Err(err).Msg("failed handle job") continue } - err = job.Delete(ctx, txn) + err = job.Delete(ctx, db.PGInstance.BobDB) if err != nil { sublog.Error().Err(err).Msg("failed delete job") continue } sublog.Info().Msg("job complete") - txn.Commit(ctx) } }() return nil } -func handleJob(ctx context.Context, txn bob.Executor, job *models.Job) error { +func handleJob(ctx context.Context, job *models.Job) error { switch job.Type { case enums.JobtypeAudioTranscode: - return processAudioFile(ctx, txn, job.RowID) + return processAudioFile(ctx, job.RowID) case enums.JobtypeComplianceMailerSend: - return mailer.ComplianceSend(ctx, txn, job.RowID) + return mailer.ComplianceSend(ctx, job.RowID) case enums.JobtypeCSVCommit: - return csv.JobCommit(ctx, txn, job.RowID) + return csv.JobCommit(ctx, job.RowID) case enums.JobtypeCSVImport: - return csv.JobImport(ctx, txn, job.RowID) + return csv.JobImport(ctx, job.RowID) case enums.JobtypeLabelStudioAudioCreate: - return handleJobLabelStudioAudioCreate(ctx, txn, job.RowID) + return jobLabelStudioAudioCreate(ctx, job.RowID) case enums.JobtypeEmailSend: - return email.Job(ctx, txn, job.RowID) + return email.Job(ctx, job.RowID) case enums.JobtypeTextRespond: - return text.JobRespond(ctx, txn, job.RowID) + return text.JobRespond(ctx, job.RowID) case enums.JobtypeTextSend: - return text.JobSend(ctx, txn, job.RowID) + return text.JobSend(ctx, job.RowID) default: return fmt.Errorf("No handler for job type %s", string(job.Type)) } } -func handleJobLabelStudioAudioCreate(ctx context.Context, txn bob.Executor, row_id int32) error { - return jobLabelStudioAudioCreate(ctx, txn, row_id) -} func listenForJobs(ctx context.Context) { for { //es.SendQueuedEmails(ctx) // send any emails queued prior to listening for notificiations @@ -189,26 +175,16 @@ func listenAndDoOneJob(ctx context.Context) error { } sublog := log.With().Int32("job", job.ID).Int32("row_id", job.RowID).Str("type", string(job.Type)).Logger() - tx, err := conn.BeginTx(ctx, pgx.TxOptions{}) - if err != nil { - return fmt.Errorf("Failed to start transaction: %w", err) - } - defer tx.Rollback(ctx) - ctx, cancel := context.WithCancel(ctx) - txn := bobpgx.NewTx(tx, cancel) - defer txn.Rollback(ctx) - - err = handleJob(ctx, txn, job) + err = handleJob(ctx, job) if err != nil { sublog.Error().Err(err).Msg("failed to handle job") return nil } - err = job.Delete(ctx, txn) + err = job.Delete(ctx, db.PGInstance.BobDB) if err != nil { sublog.Error().Err(err).Msg("failed to delete job") return fmt.Errorf("delete job: %w", err) } - txn.Commit(ctx) sublog.Debug().Msg("job complete") } } diff --git a/platform/text/job.go b/platform/text/job.go index 347028aa..476932c5 100644 --- a/platform/text/job.go +++ b/platform/text/job.go @@ -4,33 +4,35 @@ import ( "context" "fmt" - "github.com/Gleipnir-Technology/bob" + "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/platform/types" //"github.com/rs/zerolog/log" ) -func JobRespond(ctx context.Context, txn bob.Executor, log_id int32) error { - return respondText(ctx, txn, log_id) +func JobRespond(ctx context.Context, log_id int32) error { + return respondText(ctx, log_id) } -func JobSend(ctx context.Context, txn bob.Executor, job_id int32) error { - job, err := models.FindCommsTextJob(ctx, txn, job_id) +func JobSend(ctx context.Context, job_id int32) error { + bxn := db.PGInstance.BobDB + job, err := models.FindCommsTextJob(ctx, bxn, job_id) if err != nil { return fmt.Errorf("find text: %w", err) } //log.Debug().Int32("job.id", job.ID).Msg("completing text job") - return sendTextComplete(ctx, txn, job) + return sendTextComplete(ctx, job) } -func handleWaitingTextJobs(ctx context.Context, txn bob.Executor, dst types.E164) error { +func handleWaitingTextJobs(ctx context.Context, dst types.E164) error { + bxn := db.PGInstance.BobDB jobs, err := models.CommsTextJobs.Query( models.SelectWhere.CommsTextJobs.Destination.EQ(dst.PhoneString()), models.SelectWhere.CommsTextJobs.Completed.IsNull(), - ).All(ctx, txn) + ).All(ctx, bxn) if err != nil { return fmt.Errorf("query jobs: %w", err) } for _, job := range jobs { - err = sendTextComplete(ctx, txn, job) + err = sendTextComplete(ctx, job) if err != nil { return fmt.Errorf("send text complete: %w", err) } diff --git a/platform/text/send.go b/platform/text/send.go index 76dd476a..48897a73 100644 --- a/platform/text/send.go +++ b/platform/text/send.go @@ -8,14 +8,14 @@ import ( "github.com/Gleipnir-Technology/bob" "github.com/Gleipnir-Technology/nidus-sync/comms/text" "github.com/Gleipnir-Technology/nidus-sync/config" - "github.com/aarondl/opt/omit" - "github.com/aarondl/opt/omitnull" - //"github.com/Gleipnir-Technology/nidus-sync/db" + "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/platform/background" "github.com/Gleipnir-Technology/nidus-sync/platform/event" "github.com/Gleipnir-Technology/nidus-sync/platform/types" + "github.com/aarondl/opt/omit" + "github.com/aarondl/opt/omitnull" "github.com/rs/zerolog/log" ) @@ -100,7 +100,12 @@ func sendTextCommandResponse(ctx context.Context, txn bob.Executor, dst types.E1 _, err := sendTextDirect(ctx, txn, enums.CommsTextoriginCommandResponse, dst.PhoneString(), content, false, false) return err } -func sendTextComplete(ctx context.Context, txn bob.Executor, job *models.CommsTextJob) error { +func sendTextComplete(ctx context.Context, job *models.CommsTextJob) error { + txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer txn.Rollback(ctx) dst, err := ParsePhoneNumber(job.Destination) if err != nil { return fmt.Errorf("parse phone: %w", err) @@ -171,6 +176,7 @@ func sendTextComplete(ctx context.Context, txn bob.Executor, job *models.CommsTe } else { log.Debug().Msg("no report info on text") } + txn.Commit(ctx) return nil } diff --git a/platform/text/text.go b/platform/text/text.go index dcecdd0d..97fb034b 100644 --- a/platform/text/text.go +++ b/platform/text/text.go @@ -6,7 +6,6 @@ import ( "strings" "time" - "github.com/Gleipnir-Technology/bob" "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/enums" @@ -67,7 +66,12 @@ func HandleTextMessage(ctx context.Context, source string, destination string, c return err } -func respondText(ctx context.Context, txn bob.Executor, log_id int32) error { +func respondText(ctx context.Context, log_id int32) error { + txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer txn.Rollback(ctx) l, err := models.FindCommsTextLog(ctx, txn, log_id) if err != nil { return fmt.Errorf("find comms: %w", err) @@ -96,7 +100,7 @@ func respondText(ctx context.Context, txn bob.Executor, log_id int32) error { if err != nil { return fmt.Errorf("send response: %w", err) } - handleWaitingTextJobs(ctx, txn, *src) + handleWaitingTextJobs(ctx, *src) // We don't handle 'stop' here because we allow them to say 'stop' at any time, regardless of // phone status. //case "stop": @@ -150,10 +154,10 @@ func respondText(ctx context.Context, txn bob.Executor, log_id int32) error { return nil } // Otherwise let the LLM handle the response - return respondTextLLM(ctx, txn, *src) + return respondTextLLM(ctx, *src) } -func respondTextLLM(ctx context.Context, txn bob.Executor, src types.E164) error { +func respondTextLLM(ctx context.Context, src types.E164) error { previous_messages, err := loadPreviousMessagesForLLM(ctx, src) if err != nil { return fmt.Errorf("Failed to get previous messages: %w", err) @@ -163,10 +167,16 @@ func respondTextLLM(ctx context.Context, txn bob.Executor, src types.E164) error if err != nil { return fmt.Errorf("Failed to generate next message: %w", err) } + txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("start txn: %w", err) + } + defer txn.Rollback(ctx) _, err = sendTextDirect(ctx, txn, enums.CommsTextoriginLLM, src.PhoneString(), next_message.Content, true, false) if err != nil { return fmt.Errorf("Failed to send response text: %w", err) } + txn.Commit(ctx) return nil }