Add support for continuing background text jobs on subscription

This commit is contained in:
Eli Ribble 2026-01-29 22:20:03 +00:00
parent d2d5f003d8
commit 981f444609
No known key found for this signature in database
9 changed files with 327 additions and 20 deletions

View file

@ -60,6 +60,24 @@ var CommsTextJobs = Table[
Generated: false,
AutoIncr: false,
},
Source: column{
Name: "source",
DBType: "comms.textjobsource",
Default: "",
Comment: "",
Nullable: false,
Generated: false,
AutoIncr: false,
},
Completed: column{
Name: "completed",
DBType: "timestamp without time zone",
Default: "NULL",
Comment: "",
Nullable: true,
Generated: false,
AutoIncr: false,
},
},
Indexes: commsTextJobIndexes{
TextJobPkey: index{
@ -106,11 +124,13 @@ type commsTextJobColumns struct {
Destination column
ID column
Type column
Source column
Completed column
}
func (c commsTextJobColumns) AsSlice() []column {
return []column{
c.Content, c.Created, c.Destination, c.ID, c.Type,
c.Content, c.Created, c.Destination, c.ID, c.Type, c.Source, c.Completed,
}
}

View file

@ -272,6 +272,79 @@ func (e *CommsMessagetypeemail) Scan(value any) error {
return nil
}
// Enum values for CommsTextjobsource
const (
CommsTextjobsourceRmo CommsTextjobsource = "rmo"
CommsTextjobsourceNidus CommsTextjobsource = "nidus"
)
func AllCommsTextjobsource() []CommsTextjobsource {
return []CommsTextjobsource{
CommsTextjobsourceRmo,
CommsTextjobsourceNidus,
}
}
type CommsTextjobsource string
func (e CommsTextjobsource) String() string {
return string(e)
}
func (e CommsTextjobsource) Valid() bool {
switch e {
case CommsTextjobsourceRmo,
CommsTextjobsourceNidus:
return true
default:
return false
}
}
// useful when testing in other packages
func (e CommsTextjobsource) All() []CommsTextjobsource {
return AllCommsTextjobsource()
}
func (e CommsTextjobsource) MarshalText() ([]byte, error) {
return []byte(e), nil
}
func (e *CommsTextjobsource) UnmarshalText(text []byte) error {
return e.Scan(text)
}
func (e CommsTextjobsource) MarshalBinary() ([]byte, error) {
return []byte(e), nil
}
func (e *CommsTextjobsource) UnmarshalBinary(data []byte) error {
return e.Scan(data)
}
func (e CommsTextjobsource) Value() (driver.Value, error) {
return string(e), nil
}
func (e *CommsTextjobsource) Scan(value any) error {
switch x := value.(type) {
case string:
*e = CommsTextjobsource(x)
case []byte:
*e = CommsTextjobsource(x)
case nil:
return fmt.Errorf("cannot nil into CommsTextjobsource")
default:
return fmt.Errorf("cannot scan type %T: %v", value, value)
}
if !e.Valid() {
return fmt.Errorf("invalid CommsTextjobsource value: %s", *e)
}
return nil
}
// Enum values for CommsTextjobtype
const (
CommsTextjobtypeReportConfirmation CommsTextjobtype = "report-confirmation"

View file

@ -333,6 +333,8 @@ func (f *Factory) FromExistingCommsTextJob(m *models.CommsTextJob) *CommsTextJob
o.Destination = func() string { return m.Destination }
o.ID = func() int32 { return m.ID }
o.Type = func() enums.CommsTextjobtype { return m.Type }
o.Source = func() enums.CommsTextjobsource { return m.Source }
o.Completed = func() null.Val[time.Time] { return m.Completed }
ctx := context.Background()
if m.R.DestinationPhone != nil {

View file

@ -101,6 +101,16 @@ func random_enums_CommsMessagetypeemail(f *faker.Faker, limits ...string) enums.
return all[f.IntBetween(0, len(all)-1)]
}
func random_enums_CommsTextjobsource(f *faker.Faker, limits ...string) enums.CommsTextjobsource {
if f == nil {
f = &defaultFaker
}
var e enums.CommsTextjobsource
all := e.All()
return all[f.IntBetween(0, len(all)-1)]
}
func random_enums_CommsTextjobtype(f *faker.Faker, limits ...string) enums.CommsTextjobtype {
if f == nil {
f = &defaultFaker

View file

@ -11,7 +11,9 @@ import (
"github.com/Gleipnir-Technology/bob"
enums "github.com/Gleipnir-Technology/nidus-sync/db/enums"
models "github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/aarondl/opt/null"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
"github.com/jaswdr/faker/v2"
)
@ -41,6 +43,8 @@ type CommsTextJobTemplate struct {
Destination func() string
ID func() int32
Type func() enums.CommsTextjobtype
Source func() enums.CommsTextjobsource
Completed func() null.Val[time.Time]
r commsTextJobR
f *Factory
@ -99,6 +103,14 @@ func (o CommsTextJobTemplate) BuildSetter() *models.CommsTextJobSetter {
val := o.Type()
m.Type = omit.From(val)
}
if o.Source != nil {
val := o.Source()
m.Source = omit.From(val)
}
if o.Completed != nil {
val := o.Completed()
m.Completed = omitnull.FromNull(val)
}
return m
}
@ -136,6 +148,12 @@ func (o CommsTextJobTemplate) Build() *models.CommsTextJob {
if o.Type != nil {
m.Type = o.Type()
}
if o.Source != nil {
m.Source = o.Source()
}
if o.Completed != nil {
m.Completed = o.Completed()
}
o.setModelRels(m)
@ -172,6 +190,10 @@ func ensureCreatableCommsTextJob(m *models.CommsTextJobSetter) {
val := random_enums_CommsTextjobtype(nil)
m.Type = omit.From(val)
}
if !(m.Source.IsValue()) {
val := random_enums_CommsTextjobsource(nil)
m.Source = omit.From(val)
}
}
// insertOptRels creates and inserts any optional the relationships on *models.CommsTextJob
@ -296,6 +318,8 @@ func (m commsTextJobMods) RandomizeAllColumns(f *faker.Faker) CommsTextJobMod {
CommsTextJobMods.RandomDestination(f),
CommsTextJobMods.RandomID(f),
CommsTextJobMods.RandomType(f),
CommsTextJobMods.RandomSource(f),
CommsTextJobMods.RandomCompleted(f),
}
}
@ -454,6 +478,90 @@ func (m commsTextJobMods) RandomType(f *faker.Faker) CommsTextJobMod {
})
}
// Set the model columns to this value
func (m commsTextJobMods) Source(val enums.CommsTextjobsource) CommsTextJobMod {
return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) {
o.Source = func() enums.CommsTextjobsource { return val }
})
}
// Set the Column from the function
func (m commsTextJobMods) SourceFunc(f func() enums.CommsTextjobsource) CommsTextJobMod {
return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) {
o.Source = f
})
}
// Clear any values for the column
func (m commsTextJobMods) UnsetSource() CommsTextJobMod {
return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) {
o.Source = nil
})
}
// Generates a random value for the column using the given faker
// if faker is nil, a default faker is used
func (m commsTextJobMods) RandomSource(f *faker.Faker) CommsTextJobMod {
return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) {
o.Source = func() enums.CommsTextjobsource {
return random_enums_CommsTextjobsource(f)
}
})
}
// Set the model columns to this value
func (m commsTextJobMods) Completed(val null.Val[time.Time]) CommsTextJobMod {
return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) {
o.Completed = func() null.Val[time.Time] { return val }
})
}
// Set the Column from the function
func (m commsTextJobMods) CompletedFunc(f func() null.Val[time.Time]) CommsTextJobMod {
return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) {
o.Completed = f
})
}
// Clear any values for the column
func (m commsTextJobMods) UnsetCompleted() CommsTextJobMod {
return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) {
o.Completed = nil
})
}
// Generates a random value for the column using the given faker
// if faker is nil, a default faker is used
// The generated value is sometimes null
func (m commsTextJobMods) RandomCompleted(f *faker.Faker) CommsTextJobMod {
return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) {
o.Completed = func() null.Val[time.Time] {
if f == nil {
f = &defaultFaker
}
val := random_time_Time(f)
return null.From(val)
}
})
}
// Generates a random value for the column using the given faker
// if faker is nil, a default faker is used
// The generated value is never null
func (m commsTextJobMods) RandomCompletedNotNull(f *faker.Faker) CommsTextJobMod {
return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) {
o.Completed = func() null.Val[time.Time] {
if f == nil {
f = &defaultFaker
}
val := random_time_Time(f)
return null.From(val)
}
})
}
func (m commsTextJobMods) WithParentsCascading() CommsTextJobMod {
return CommsTextJobModFunc(func(ctx context.Context, o *CommsTextJobTemplate) {
if isDone, _ := commsTextJobWithParentsCascadingCtx.Value(ctx); isDone {

View file

@ -0,0 +1,11 @@
-- +goose Up
CREATE TYPE comms.TextJobSource AS ENUM (
'rmo',
'nidus'
);
ALTER TABLE comms.text_job ADD COLUMN source comms.TextJobSource;
UPDATE comms.text_job SET source = 'rmo';
ALTER TABLE comms.text_job ALTER COLUMN source SET NOT NULL;
ALTER TABLE comms.text_job ADD COLUMN completed TIMESTAMP WITHOUT TIME ZONE;

View file

@ -20,16 +20,20 @@ import (
"github.com/Gleipnir-Technology/bob/orm"
"github.com/Gleipnir-Technology/bob/types/pgtypes"
enums "github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/aarondl/opt/null"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
)
// CommsTextJob is an object representing the database table.
type CommsTextJob struct {
Content string `db:"content" `
Created time.Time `db:"created" `
Destination string `db:"destination" `
ID int32 `db:"id,pk" `
Type enums.CommsTextjobtype `db:"type_" `
Content string `db:"content" `
Created time.Time `db:"created" `
Destination string `db:"destination" `
ID int32 `db:"id,pk" `
Type enums.CommsTextjobtype `db:"type_" `
Source enums.CommsTextjobsource `db:"source" `
Completed null.Val[time.Time] `db:"completed" `
R commsTextJobR `db:"-" `
}
@ -52,7 +56,7 @@ type commsTextJobR struct {
func buildCommsTextJobColumns(alias string) commsTextJobColumns {
return commsTextJobColumns{
ColumnsExpr: expr.NewColumnsExpr(
"content", "created", "destination", "id", "type_",
"content", "created", "destination", "id", "type_", "source", "completed",
).WithParent("comms.text_job"),
tableAlias: alias,
Content: psql.Quote(alias, "content"),
@ -60,6 +64,8 @@ func buildCommsTextJobColumns(alias string) commsTextJobColumns {
Destination: psql.Quote(alias, "destination"),
ID: psql.Quote(alias, "id"),
Type: psql.Quote(alias, "type_"),
Source: psql.Quote(alias, "source"),
Completed: psql.Quote(alias, "completed"),
}
}
@ -71,6 +77,8 @@ type commsTextJobColumns struct {
Destination psql.Expression
ID psql.Expression
Type psql.Expression
Source psql.Expression
Completed psql.Expression
}
func (c commsTextJobColumns) Alias() string {
@ -85,15 +93,17 @@ func (commsTextJobColumns) AliasedAs(alias string) commsTextJobColumns {
// All values are optional, and do not have to be set
// Generated columns are not included
type CommsTextJobSetter struct {
Content omit.Val[string] `db:"content" `
Created omit.Val[time.Time] `db:"created" `
Destination omit.Val[string] `db:"destination" `
ID omit.Val[int32] `db:"id,pk" `
Type omit.Val[enums.CommsTextjobtype] `db:"type_" `
Content omit.Val[string] `db:"content" `
Created omit.Val[time.Time] `db:"created" `
Destination omit.Val[string] `db:"destination" `
ID omit.Val[int32] `db:"id,pk" `
Type omit.Val[enums.CommsTextjobtype] `db:"type_" `
Source omit.Val[enums.CommsTextjobsource] `db:"source" `
Completed omitnull.Val[time.Time] `db:"completed" `
}
func (s CommsTextJobSetter) SetColumns() []string {
vals := make([]string, 0, 5)
vals := make([]string, 0, 7)
if s.Content.IsValue() {
vals = append(vals, "content")
}
@ -109,6 +119,12 @@ func (s CommsTextJobSetter) SetColumns() []string {
if s.Type.IsValue() {
vals = append(vals, "type_")
}
if s.Source.IsValue() {
vals = append(vals, "source")
}
if !s.Completed.IsUnset() {
vals = append(vals, "completed")
}
return vals
}
@ -128,6 +144,12 @@ func (s CommsTextJobSetter) Overwrite(t *CommsTextJob) {
if s.Type.IsValue() {
t.Type = s.Type.MustGet()
}
if s.Source.IsValue() {
t.Source = s.Source.MustGet()
}
if !s.Completed.IsUnset() {
t.Completed = s.Completed.MustGetNull()
}
}
func (s *CommsTextJobSetter) Apply(q *dialect.InsertQuery) {
@ -136,7 +158,7 @@ func (s *CommsTextJobSetter) Apply(q *dialect.InsertQuery) {
})
q.AppendValues(bob.ExpressionFunc(func(ctx context.Context, w io.StringWriter, d bob.Dialect, start int) ([]any, error) {
vals := make([]bob.Expression, 5)
vals := make([]bob.Expression, 7)
if s.Content.IsValue() {
vals[0] = psql.Arg(s.Content.MustGet())
} else {
@ -167,6 +189,18 @@ func (s *CommsTextJobSetter) Apply(q *dialect.InsertQuery) {
vals[4] = psql.Raw("DEFAULT")
}
if s.Source.IsValue() {
vals[5] = psql.Arg(s.Source.MustGet())
} else {
vals[5] = psql.Raw("DEFAULT")
}
if !s.Completed.IsUnset() {
vals[6] = psql.Arg(s.Completed.MustGetNull())
} else {
vals[6] = psql.Raw("DEFAULT")
}
return bob.ExpressSlice(ctx, w, d, start, vals, "", ", ", "")
}))
}
@ -176,7 +210,7 @@ func (s CommsTextJobSetter) UpdateMod() bob.Mod[*dialect.UpdateQuery] {
}
func (s CommsTextJobSetter) Expressions(prefix ...string) []bob.Expression {
exprs := make([]bob.Expression, 0, 5)
exprs := make([]bob.Expression, 0, 7)
if s.Content.IsValue() {
exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{
@ -213,6 +247,20 @@ func (s CommsTextJobSetter) Expressions(prefix ...string) []bob.Expression {
}})
}
if s.Source.IsValue() {
exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{
psql.Quote(append(prefix, "source")...),
psql.Arg(s.Source),
}})
}
if !s.Completed.IsUnset() {
exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{
psql.Quote(append(prefix, "completed")...),
psql.Arg(s.Completed),
}})
}
return exprs
}
@ -517,6 +565,8 @@ type commsTextJobWhere[Q psql.Filterable] struct {
Destination psql.WhereMod[Q, string]
ID psql.WhereMod[Q, int32]
Type psql.WhereMod[Q, enums.CommsTextjobtype]
Source psql.WhereMod[Q, enums.CommsTextjobsource]
Completed psql.WhereNullMod[Q, time.Time]
}
func (commsTextJobWhere[Q]) AliasedAs(alias string) commsTextJobWhere[Q] {
@ -530,6 +580,8 @@ func buildCommsTextJobWhere[Q psql.Filterable](cols commsTextJobColumns) commsTe
Destination: psql.Where[Q, string](cols.Destination),
ID: psql.Where[Q, int32](cols.ID),
Type: psql.Where[Q, enums.CommsTextjobtype](cols.Type),
Source: psql.Where[Q, enums.CommsTextjobsource](cols.Source),
Completed: psql.WhereNull[Q, time.Time](cols.Completed),
}
}

View file

@ -53,7 +53,7 @@ func sendReportSubscription(ctx context.Context, job Job) error {
return fmt.Errorf("Failed to check if subscribed: %w", err)
}
if sub == nil {
err = delayMessage(ctx, j.source(), j.destination(), j.content(), enums.CommsTextjobtypeReportConfirmation)
err = delayMessage(ctx, enums.CommsTextjobsourceRmo, j.destination(), j.content(), enums.CommsTextjobtypeReportConfirmation)
if err != nil {
return fmt.Errorf("Failed to delay report subscription message: %w", err)
}

View file

@ -123,13 +123,14 @@ func UpdateMessageStatus(twilio_sid string, status string) {
return
}
}
func delayMessage(ctx context.Context, source string, destination string, content string, type_ enums.CommsTextjobtype) error {
func delayMessage(ctx context.Context, source enums.CommsTextjobsource, destination string, content string, type_ enums.CommsTextjobtype) error {
job, err := models.CommsTextJobs.Insert(&models.CommsTextJobSetter{
Content: omit.From(content),
Created: omit.From(time.Now()),
Destination: omit.From(destination),
//ID:
Type: omit.From(type_),
Source: omit.From(source),
Type: omit.From(type_),
}).One(ctx, db.PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to add delayed text job: %w", err)
@ -229,9 +230,39 @@ func getDst(ctx context.Context, to string) (string, error) {
}
func handleWaitingTextJobs(ctx context.Context, src string) {
log.Info().Str("src", src).Msg("Pretend handle waiting jobs")
jobs, err := models.CommsTextJobs.Query(
models.SelectWhere.CommsTextJobs.Destination.EQ(src),
models.SelectWhere.CommsTextJobs.Completed.IsNull(),
).All(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to query for jobs")
return
}
for _, job := range jobs {
var src string
switch job.Source {
case enums.CommsTextjobsourceRmo:
src = config.PhoneNumberReportStr
//case enums.CommsTextJobsourcenidus:
//src := config.PhoneNumebrNidusStr
default:
log.Error().Str("source", job.Source.String()).Msg("Can't support background text job.")
}
err = sendText(ctx, src, job.Destination, job.Content, enums.CommsTextoriginWebsiteAction, false, true)
if err != nil {
log.Error().Err(err).Int32("id", job.ID).Msg("Failed to send delayed text job.")
continue
}
err := job.Update(ctx, db.PGInstance.BobDB, &models.CommsTextJobSetter{
Completed: omitnull.From(time.Now()),
})
if err != nil {
log.Error().Err(err).Int32("id", job.ID).Msg("Failed to update delayed text job.")
continue
}
}
}
func handleResetConversation(ctx context.Context, src string, dst string) {
err := wipeLLMMemory(ctx, src, dst)
if err != nil {