diff --git a/db/dbinfo/comms.text_job.bob.go b/db/dbinfo/comms.text_job.bob.go index cd2dfabd..5cfeb1dc 100644 --- a/db/dbinfo/comms.text_job.bob.go +++ b/db/dbinfo/comms.text_job.bob.go @@ -60,6 +60,24 @@ var CommsTextJobs = Table[ Generated: false, AutoIncr: false, }, + Source: column{ + Name: "source", + DBType: "comms.textjobsource", + Default: "", + Comment: "", + Nullable: false, + Generated: false, + AutoIncr: false, + }, + Completed: column{ + Name: "completed", + DBType: "timestamp without time zone", + Default: "NULL", + Comment: "", + Nullable: true, + Generated: false, + AutoIncr: false, + }, }, Indexes: commsTextJobIndexes{ TextJobPkey: index{ @@ -106,11 +124,13 @@ type commsTextJobColumns struct { Destination column ID column Type column + Source column + Completed column } func (c commsTextJobColumns) AsSlice() []column { return []column{ - c.Content, c.Created, c.Destination, c.ID, c.Type, + c.Content, c.Created, c.Destination, c.ID, c.Type, c.Source, c.Completed, } } diff --git a/db/enums/enums.bob.go b/db/enums/enums.bob.go index 66d39176..6e603c4f 100644 --- a/db/enums/enums.bob.go +++ b/db/enums/enums.bob.go @@ -272,6 +272,79 @@ func (e *CommsMessagetypeemail) Scan(value any) error { return nil } +// Enum values for CommsTextjobsource +const ( + CommsTextjobsourceRmo CommsTextjobsource = "rmo" + CommsTextjobsourceNidus CommsTextjobsource = "nidus" +) + +func AllCommsTextjobsource() []CommsTextjobsource { + return []CommsTextjobsource{ + CommsTextjobsourceRmo, + CommsTextjobsourceNidus, + } +} + +type CommsTextjobsource string + +func (e CommsTextjobsource) String() string { + return string(e) +} + +func (e CommsTextjobsource) Valid() bool { + switch e { + case CommsTextjobsourceRmo, + CommsTextjobsourceNidus: + return true + default: + return false + } +} + +// useful when testing in other packages +func (e CommsTextjobsource) All() []CommsTextjobsource { + return AllCommsTextjobsource() +} + +func (e CommsTextjobsource) MarshalText() ([]byte, error) { + return []byte(e), nil +} + +func (e *CommsTextjobsource) UnmarshalText(text []byte) error { + return e.Scan(text) +} + +func (e CommsTextjobsource) MarshalBinary() ([]byte, error) { + return []byte(e), nil +} + +func (e *CommsTextjobsource) UnmarshalBinary(data []byte) error { + return e.Scan(data) +} + +func (e CommsTextjobsource) Value() (driver.Value, error) { + return string(e), nil +} + +func (e *CommsTextjobsource) Scan(value any) error { + switch x := value.(type) { + case string: + *e = CommsTextjobsource(x) + case []byte: + *e = CommsTextjobsource(x) + case nil: + return fmt.Errorf("cannot nil into CommsTextjobsource") + default: + return fmt.Errorf("cannot scan type %T: %v", value, value) + } + + if !e.Valid() { + return fmt.Errorf("invalid CommsTextjobsource value: %s", *e) + } + + return nil +} + // Enum values for CommsTextjobtype const ( CommsTextjobtypeReportConfirmation CommsTextjobtype = "report-confirmation" diff --git a/db/factory/bobfactory_main.bob.go b/db/factory/bobfactory_main.bob.go index cd749e18..ace24ffd 100644 --- a/db/factory/bobfactory_main.bob.go +++ b/db/factory/bobfactory_main.bob.go @@ -333,6 +333,8 @@ func (f *Factory) FromExistingCommsTextJob(m *models.CommsTextJob) *CommsTextJob o.Destination = func() string { return m.Destination } o.ID = func() int32 { return m.ID } o.Type = func() enums.CommsTextjobtype { return m.Type } + o.Source = func() enums.CommsTextjobsource { return m.Source } + o.Completed = func() null.Val[time.Time] { return m.Completed } ctx := context.Background() if m.R.DestinationPhone != nil { diff --git a/db/factory/bobfactory_random.bob.go b/db/factory/bobfactory_random.bob.go index fe4997bf..75ff76b2 100644 --- a/db/factory/bobfactory_random.bob.go +++ b/db/factory/bobfactory_random.bob.go @@ -101,6 +101,16 @@ func random_enums_CommsMessagetypeemail(f *faker.Faker, limits ...string) enums. return all[f.IntBetween(0, len(all)-1)] } +func random_enums_CommsTextjobsource(f *faker.Faker, limits ...string) enums.CommsTextjobsource { + if f == nil { + f = &defaultFaker + } + + var e enums.CommsTextjobsource + all := e.All() + return all[f.IntBetween(0, len(all)-1)] +} + func random_enums_CommsTextjobtype(f *faker.Faker, limits ...string) enums.CommsTextjobtype { if f == nil { f = &defaultFaker diff --git a/db/factory/comms.text_job.bob.go b/db/factory/comms.text_job.bob.go index 1c62de8e..5b3a7935 100644 --- a/db/factory/comms.text_job.bob.go +++ b/db/factory/comms.text_job.bob.go @@ -11,7 +11,9 @@ import ( "github.com/Gleipnir-Technology/bob" enums "github.com/Gleipnir-Technology/nidus-sync/db/enums" models "github.com/Gleipnir-Technology/nidus-sync/db/models" + "github.com/aarondl/opt/null" "github.com/aarondl/opt/omit" + "github.com/aarondl/opt/omitnull" "github.com/jaswdr/faker/v2" ) @@ -41,6 +43,8 @@ type CommsTextJobTemplate struct { Destination func() string ID func() int32 Type func() enums.CommsTextjobtype + Source func() enums.CommsTextjobsource + Completed func() null.Val[time.Time] r commsTextJobR f *Factory @@ -99,6 +103,14 @@ func (o CommsTextJobTemplate) BuildSetter() *models.CommsTextJobSetter { val := o.Type() m.Type = omit.From(val) } + if o.Source != nil { + val := o.Source() + m.Source = omit.From(val) + } + if o.Completed != nil { + val := o.Completed() + m.Completed = omitnull.FromNull(val) + } return m } @@ -136,6 +148,12 @@ func (o CommsTextJobTemplate) Build() *models.CommsTextJob { if o.Type != nil { m.Type = o.Type() } + if o.Source != nil { + m.Source = o.Source() + } + if o.Completed != nil { + m.Completed = o.Completed() + } o.setModelRels(m) @@ -172,6 +190,10 @@ func ensureCreatableCommsTextJob(m *models.CommsTextJobSetter) { val := random_enums_CommsTextjobtype(nil) m.Type = omit.From(val) } + if !(m.Source.IsValue()) { + val := random_enums_CommsTextjobsource(nil) + m.Source = omit.From(val) + } } // insertOptRels creates and inserts any optional the relationships on *models.CommsTextJob @@ -296,6 +318,8 @@ func (m commsTextJobMods) RandomizeAllColumns(f *faker.Faker) CommsTextJobMod { CommsTextJobMods.RandomDestination(f), CommsTextJobMods.RandomID(f), CommsTextJobMods.RandomType(f), + CommsTextJobMods.RandomSource(f), + CommsTextJobMods.RandomCompleted(f), } } @@ -454,6 +478,90 @@ func (m commsTextJobMods) RandomType(f *faker.Faker) CommsTextJobMod { }) } +// Set the model columns to this value +func (m commsTextJobMods) Source(val enums.CommsTextjobsource) CommsTextJobMod { + return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) { + o.Source = func() enums.CommsTextjobsource { return val } + }) +} + +// Set the Column from the function +func (m commsTextJobMods) SourceFunc(f func() enums.CommsTextjobsource) CommsTextJobMod { + return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) { + o.Source = f + }) +} + +// Clear any values for the column +func (m commsTextJobMods) UnsetSource() CommsTextJobMod { + return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) { + o.Source = nil + }) +} + +// Generates a random value for the column using the given faker +// if faker is nil, a default faker is used +func (m commsTextJobMods) RandomSource(f *faker.Faker) CommsTextJobMod { + return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) { + o.Source = func() enums.CommsTextjobsource { + return random_enums_CommsTextjobsource(f) + } + }) +} + +// Set the model columns to this value +func (m commsTextJobMods) Completed(val null.Val[time.Time]) CommsTextJobMod { + return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) { + o.Completed = func() null.Val[time.Time] { return val } + }) +} + +// Set the Column from the function +func (m commsTextJobMods) CompletedFunc(f func() null.Val[time.Time]) CommsTextJobMod { + return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) { + o.Completed = f + }) +} + +// Clear any values for the column +func (m commsTextJobMods) UnsetCompleted() CommsTextJobMod { + return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) { + o.Completed = nil + }) +} + +// Generates a random value for the column using the given faker +// if faker is nil, a default faker is used +// The generated value is sometimes null +func (m commsTextJobMods) RandomCompleted(f *faker.Faker) CommsTextJobMod { + return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) { + o.Completed = func() null.Val[time.Time] { + if f == nil { + f = &defaultFaker + } + + val := random_time_Time(f) + return null.From(val) + } + }) +} + +// Generates a random value for the column using the given faker +// if faker is nil, a default faker is used +// The generated value is never null +func (m commsTextJobMods) RandomCompletedNotNull(f *faker.Faker) CommsTextJobMod { + return CommsTextJobModFunc(func(_ context.Context, o *CommsTextJobTemplate) { + o.Completed = func() null.Val[time.Time] { + if f == nil { + f = &defaultFaker + } + + val := random_time_Time(f) + return null.From(val) + } + }) +} + func (m commsTextJobMods) WithParentsCascading() CommsTextJobMod { return CommsTextJobModFunc(func(ctx context.Context, o *CommsTextJobTemplate) { if isDone, _ := commsTextJobWithParentsCascadingCtx.Value(ctx); isDone { diff --git a/db/migrations/00046_comms_text_job_source.sql b/db/migrations/00046_comms_text_job_source.sql new file mode 100644 index 00000000..cbdfea62 --- /dev/null +++ b/db/migrations/00046_comms_text_job_source.sql @@ -0,0 +1,11 @@ +-- +goose Up +CREATE TYPE comms.TextJobSource AS ENUM ( + 'rmo', + 'nidus' +); + +ALTER TABLE comms.text_job ADD COLUMN source comms.TextJobSource; +UPDATE comms.text_job SET source = 'rmo'; +ALTER TABLE comms.text_job ALTER COLUMN source SET NOT NULL; +ALTER TABLE comms.text_job ADD COLUMN completed TIMESTAMP WITHOUT TIME ZONE; + diff --git a/db/models/comms.text_job.bob.go b/db/models/comms.text_job.bob.go index aabb5a6a..f5532b15 100644 --- a/db/models/comms.text_job.bob.go +++ b/db/models/comms.text_job.bob.go @@ -20,16 +20,20 @@ import ( "github.com/Gleipnir-Technology/bob/orm" "github.com/Gleipnir-Technology/bob/types/pgtypes" enums "github.com/Gleipnir-Technology/nidus-sync/db/enums" + "github.com/aarondl/opt/null" "github.com/aarondl/opt/omit" + "github.com/aarondl/opt/omitnull" ) // CommsTextJob is an object representing the database table. type CommsTextJob struct { - Content string `db:"content" ` - Created time.Time `db:"created" ` - Destination string `db:"destination" ` - ID int32 `db:"id,pk" ` - Type enums.CommsTextjobtype `db:"type_" ` + Content string `db:"content" ` + Created time.Time `db:"created" ` + Destination string `db:"destination" ` + ID int32 `db:"id,pk" ` + Type enums.CommsTextjobtype `db:"type_" ` + Source enums.CommsTextjobsource `db:"source" ` + Completed null.Val[time.Time] `db:"completed" ` R commsTextJobR `db:"-" ` } @@ -52,7 +56,7 @@ type commsTextJobR struct { func buildCommsTextJobColumns(alias string) commsTextJobColumns { return commsTextJobColumns{ ColumnsExpr: expr.NewColumnsExpr( - "content", "created", "destination", "id", "type_", + "content", "created", "destination", "id", "type_", "source", "completed", ).WithParent("comms.text_job"), tableAlias: alias, Content: psql.Quote(alias, "content"), @@ -60,6 +64,8 @@ func buildCommsTextJobColumns(alias string) commsTextJobColumns { Destination: psql.Quote(alias, "destination"), ID: psql.Quote(alias, "id"), Type: psql.Quote(alias, "type_"), + Source: psql.Quote(alias, "source"), + Completed: psql.Quote(alias, "completed"), } } @@ -71,6 +77,8 @@ type commsTextJobColumns struct { Destination psql.Expression ID psql.Expression Type psql.Expression + Source psql.Expression + Completed psql.Expression } func (c commsTextJobColumns) Alias() string { @@ -85,15 +93,17 @@ func (commsTextJobColumns) AliasedAs(alias string) commsTextJobColumns { // All values are optional, and do not have to be set // Generated columns are not included type CommsTextJobSetter struct { - Content omit.Val[string] `db:"content" ` - Created omit.Val[time.Time] `db:"created" ` - Destination omit.Val[string] `db:"destination" ` - ID omit.Val[int32] `db:"id,pk" ` - Type omit.Val[enums.CommsTextjobtype] `db:"type_" ` + Content omit.Val[string] `db:"content" ` + Created omit.Val[time.Time] `db:"created" ` + Destination omit.Val[string] `db:"destination" ` + ID omit.Val[int32] `db:"id,pk" ` + Type omit.Val[enums.CommsTextjobtype] `db:"type_" ` + Source omit.Val[enums.CommsTextjobsource] `db:"source" ` + Completed omitnull.Val[time.Time] `db:"completed" ` } func (s CommsTextJobSetter) SetColumns() []string { - vals := make([]string, 0, 5) + vals := make([]string, 0, 7) if s.Content.IsValue() { vals = append(vals, "content") } @@ -109,6 +119,12 @@ func (s CommsTextJobSetter) SetColumns() []string { if s.Type.IsValue() { vals = append(vals, "type_") } + if s.Source.IsValue() { + vals = append(vals, "source") + } + if !s.Completed.IsUnset() { + vals = append(vals, "completed") + } return vals } @@ -128,6 +144,12 @@ func (s CommsTextJobSetter) Overwrite(t *CommsTextJob) { if s.Type.IsValue() { t.Type = s.Type.MustGet() } + if s.Source.IsValue() { + t.Source = s.Source.MustGet() + } + if !s.Completed.IsUnset() { + t.Completed = s.Completed.MustGetNull() + } } func (s *CommsTextJobSetter) Apply(q *dialect.InsertQuery) { @@ -136,7 +158,7 @@ func (s *CommsTextJobSetter) Apply(q *dialect.InsertQuery) { }) q.AppendValues(bob.ExpressionFunc(func(ctx context.Context, w io.StringWriter, d bob.Dialect, start int) ([]any, error) { - vals := make([]bob.Expression, 5) + vals := make([]bob.Expression, 7) if s.Content.IsValue() { vals[0] = psql.Arg(s.Content.MustGet()) } else { @@ -167,6 +189,18 @@ func (s *CommsTextJobSetter) Apply(q *dialect.InsertQuery) { vals[4] = psql.Raw("DEFAULT") } + if s.Source.IsValue() { + vals[5] = psql.Arg(s.Source.MustGet()) + } else { + vals[5] = psql.Raw("DEFAULT") + } + + if !s.Completed.IsUnset() { + vals[6] = psql.Arg(s.Completed.MustGetNull()) + } else { + vals[6] = psql.Raw("DEFAULT") + } + return bob.ExpressSlice(ctx, w, d, start, vals, "", ", ", "") })) } @@ -176,7 +210,7 @@ func (s CommsTextJobSetter) UpdateMod() bob.Mod[*dialect.UpdateQuery] { } func (s CommsTextJobSetter) Expressions(prefix ...string) []bob.Expression { - exprs := make([]bob.Expression, 0, 5) + exprs := make([]bob.Expression, 0, 7) if s.Content.IsValue() { exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{ @@ -213,6 +247,20 @@ func (s CommsTextJobSetter) Expressions(prefix ...string) []bob.Expression { }}) } + if s.Source.IsValue() { + exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{ + psql.Quote(append(prefix, "source")...), + psql.Arg(s.Source), + }}) + } + + if !s.Completed.IsUnset() { + exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{ + psql.Quote(append(prefix, "completed")...), + psql.Arg(s.Completed), + }}) + } + return exprs } @@ -517,6 +565,8 @@ type commsTextJobWhere[Q psql.Filterable] struct { Destination psql.WhereMod[Q, string] ID psql.WhereMod[Q, int32] Type psql.WhereMod[Q, enums.CommsTextjobtype] + Source psql.WhereMod[Q, enums.CommsTextjobsource] + Completed psql.WhereNullMod[Q, time.Time] } func (commsTextJobWhere[Q]) AliasedAs(alias string) commsTextJobWhere[Q] { @@ -530,6 +580,8 @@ func buildCommsTextJobWhere[Q psql.Filterable](cols commsTextJobColumns) commsTe Destination: psql.Where[Q, string](cols.Destination), ID: psql.Where[Q, int32](cols.ID), Type: psql.Where[Q, enums.CommsTextjobtype](cols.Type), + Source: psql.Where[Q, enums.CommsTextjobsource](cols.Source), + Completed: psql.WhereNull[Q, time.Time](cols.Completed), } } diff --git a/platform/text/report-subscription.go b/platform/text/report-subscription.go index 3fe39e84..4a32a0b2 100644 --- a/platform/text/report-subscription.go +++ b/platform/text/report-subscription.go @@ -53,7 +53,7 @@ func sendReportSubscription(ctx context.Context, job Job) error { return fmt.Errorf("Failed to check if subscribed: %w", err) } if sub == nil { - err = delayMessage(ctx, j.source(), j.destination(), j.content(), enums.CommsTextjobtypeReportConfirmation) + err = delayMessage(ctx, enums.CommsTextjobsourceRmo, j.destination(), j.content(), enums.CommsTextjobtypeReportConfirmation) if err != nil { return fmt.Errorf("Failed to delay report subscription message: %w", err) } diff --git a/platform/text/text.go b/platform/text/text.go index 64376784..9e302ceb 100644 --- a/platform/text/text.go +++ b/platform/text/text.go @@ -123,13 +123,14 @@ func UpdateMessageStatus(twilio_sid string, status string) { return } } -func delayMessage(ctx context.Context, source string, destination string, content string, type_ enums.CommsTextjobtype) error { +func delayMessage(ctx context.Context, source enums.CommsTextjobsource, destination string, content string, type_ enums.CommsTextjobtype) error { job, err := models.CommsTextJobs.Insert(&models.CommsTextJobSetter{ Content: omit.From(content), Created: omit.From(time.Now()), Destination: omit.From(destination), //ID: - Type: omit.From(type_), + Source: omit.From(source), + Type: omit.From(type_), }).One(ctx, db.PGInstance.BobDB) if err != nil { return fmt.Errorf("Failed to add delayed text job: %w", err) @@ -229,9 +230,39 @@ func getDst(ctx context.Context, to string) (string, error) { } func handleWaitingTextJobs(ctx context.Context, src string) { - log.Info().Str("src", src).Msg("Pretend handle waiting jobs") - + jobs, err := models.CommsTextJobs.Query( + models.SelectWhere.CommsTextJobs.Destination.EQ(src), + models.SelectWhere.CommsTextJobs.Completed.IsNull(), + ).All(ctx, db.PGInstance.BobDB) + if err != nil { + log.Error().Err(err).Msg("Failed to query for jobs") + return + } + for _, job := range jobs { + var src string + switch job.Source { + case enums.CommsTextjobsourceRmo: + src = config.PhoneNumberReportStr + //case enums.CommsTextJobsourcenidus: + //src := config.PhoneNumebrNidusStr + default: + log.Error().Str("source", job.Source.String()).Msg("Can't support background text job.") + } + err = sendText(ctx, src, job.Destination, job.Content, enums.CommsTextoriginWebsiteAction, false, true) + if err != nil { + log.Error().Err(err).Int32("id", job.ID).Msg("Failed to send delayed text job.") + continue + } + err := job.Update(ctx, db.PGInstance.BobDB, &models.CommsTextJobSetter{ + Completed: omitnull.From(time.Now()), + }) + if err != nil { + log.Error().Err(err).Int32("id", job.ID).Msg("Failed to update delayed text job.") + continue + } + } } + func handleResetConversation(ctx context.Context, src string, dst string) { err := wipeLLMMemory(ctx, src, dst) if err != nil {