diff --git a/api/api.go b/api/api.go index 2f6bb228..d645d8ae 100644 --- a/api/api.go +++ b/api/api.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "os" "strconv" @@ -32,7 +31,7 @@ func apiAudioPost(w http.ResponseWriter, r *http.Request, u platform.User) { } var payload NoteAudioPayload - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) if err != nil { http.Error(w, "Failed to read the payload", http.StatusBadRequest) return @@ -49,6 +48,7 @@ func apiAudioPost(w http.ResponseWriter, r *http.Request, u platform.User) { Deleted: omitnull.FromPtr(payload.Deleted), DeletorID: omitnull.FromPtr(payload.DeletorID), Duration: omit.From(payload.Duration), + OrganizationID: omit.From(u.Organization.ID()), Transcription: omitnull.FromPtr(payload.Transcription), TranscriptionUserEdited: omit.From(payload.TranscriptionUserEdited), Version: omit.From(payload.Version), @@ -61,20 +61,24 @@ func apiAudioPost(w http.ResponseWriter, r *http.Request, u platform.User) { w.WriteHeader(http.StatusAccepted) } -func apiAudioContentPost(w http.ResponseWriter, r *http.Request, u platform.User) { +func apiAudioContentPost(w http.ResponseWriter, r *http.Request, user platform.User) { u_str := chi.URLParam(r, "uuid") - audioUUID, err := uuid.Parse(u_str) + u, err := uuid.Parse(u_str) if err != nil { http.Error(w, "Failed to parse image UUID", http.StatusBadRequest) return } - err = file.FileContentWrite(r.Body, file.CollectionAudioRaw, audioUUID) + err = file.FileContentWrite(r.Body, file.CollectionAudioRaw, u) if err != nil { log.Printf("Failed to write content file: %v", err) http.Error(w, "failed to write content file", http.StatusInternalServerError) } - - background.AudioTranscode(audioUUID) + ctx := r.Context() + a, err := models.NoteAudios.Query( + models.SelectWhere.NoteAudios.UUID.EQ(u), + models.SelectWhere.NoteAudios.OrganizationID.EQ(user.Organization.ID()), + ).One(ctx, db.PGInstance.BobDB) + background.NewAudioTranscode(ctx, db.PGInstance.BobDB, a.ID) w.WriteHeader(http.StatusOK) } diff --git a/api/twilio.go b/api/twilio.go index 30d7a606..63066f7a 100644 --- a/api/twilio.go +++ b/api/twilio.go @@ -1,6 +1,7 @@ package api import ( + "context" "fmt" "net/http" "strings" @@ -139,7 +140,12 @@ func twilioTextPost(w http.ResponseWriter, r *http.Request) { return } - go text.HandleTextMessage(src, dst, body) + go func() { + err := text.HandleTextMessage(context.Background(), src, dst, body) + if err != nil { + log.Error().Err(err).Msg("failed to handle Twilio incoming text") + } + }() w.Header().Set("Content-Type", "text/xml") fmt.Fprintf(w, "%s", twiml) } diff --git a/api/voipms.go b/api/voipms.go index 17446364..c9b5218f 100644 --- a/api/voipms.go +++ b/api/voipms.go @@ -1,6 +1,7 @@ package api import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -94,6 +95,11 @@ func voipmsTextPost(w http.ResponseWriter, r *http.Request) { log.Info().Int("ID", b.Data.ID).Str("event_type", b.Data.EventType).Str("record_type", b.Data.RecordType).Str("from", b.Data.Payload.From.PhoneNumber).Str("to", to).Str("content", b.Data.Payload.Text).Msg("Text status") // Convert phone numbers from Voip.ms into E164 format for consistency - go text.HandleTextMessage(b.Data.Payload.From.PhoneNumber, to, b.Data.Payload.Text) + go func() { + err := text.HandleTextMessage(context.Background(), b.Data.Payload.From.PhoneNumber, to, b.Data.Payload.Text) + if err != nil { + log.Error().Err(err).Msg("failed to handle VoIP.ms incoming text") + } + }() fmt.Fprintf(w, "ok") } diff --git a/db/dberrors/job.bob.go b/db/dberrors/job.bob.go new file mode 100644 index 00000000..9a93849a --- /dev/null +++ b/db/dberrors/job.bob.go @@ -0,0 +1,17 @@ +// Code generated by BobGen psql v0.42.5. DO NOT EDIT. +// This file is meant to be re-generated in place and/or deleted at any time. + +package dberrors + +var JobErrors = &jobErrors{ + ErrUniqueJobPkey: &UniqueConstraintError{ + schema: "", + table: "job", + columns: []string{"id"}, + s: "job_pkey", + }, +} + +type jobErrors struct { + ErrUniqueJobPkey *UniqueConstraintError +} diff --git a/db/dberrors/note_audio.bob.go b/db/dberrors/note_audio.bob.go index da314876..bec74d5e 100644 --- a/db/dberrors/note_audio.bob.go +++ b/db/dberrors/note_audio.bob.go @@ -10,8 +10,17 @@ var NoteAudioErrors = ¬eAudioErrors{ columns: []string{"version", "uuid"}, s: "note_audio_pkey", }, + + ErrUniqueNoteAudioIdUnique: &UniqueConstraintError{ + schema: "", + table: "note_audio", + columns: []string{"id"}, + s: "note_audio_id_unique", + }, } type noteAudioErrors struct { ErrUniqueNoteAudioPkey *UniqueConstraintError + + ErrUniqueNoteAudioIdUnique *UniqueConstraintError } diff --git a/db/dbinfo/job.bob.go b/db/dbinfo/job.bob.go new file mode 100644 index 00000000..edc497cd --- /dev/null +++ b/db/dbinfo/job.bob.go @@ -0,0 +1,122 @@ +// Code generated by BobGen psql v0.42.5. DO NOT EDIT. +// This file is meant to be re-generated in place and/or deleted at any time. + +package dbinfo + +import "github.com/aarondl/opt/null" + +var Jobs = Table[ + jobColumns, + jobIndexes, + jobForeignKeys, + jobUniques, + jobChecks, +]{ + Schema: "", + Name: "job", + Columns: jobColumns{ + Created: column{ + Name: "created", + DBType: "timestamp without time zone", + Default: "", + Comment: "", + Nullable: false, + Generated: false, + AutoIncr: false, + }, + ID: column{ + Name: "id", + DBType: "integer", + Default: "nextval('job_id_seq'::regclass)", + Comment: "", + Nullable: false, + Generated: false, + AutoIncr: false, + }, + Type: column{ + Name: "type_", + DBType: "public.jobtype", + Default: "", + Comment: "", + Nullable: false, + Generated: false, + AutoIncr: false, + }, + RowID: column{ + Name: "row_id", + DBType: "integer", + Default: "", + Comment: "", + Nullable: false, + Generated: false, + AutoIncr: false, + }, + }, + Indexes: jobIndexes{ + JobPkey: index{ + Type: "btree", + Name: "job_pkey", + Columns: []indexColumn{ + { + Name: "id", + Desc: null.FromCond(false, true), + IsExpression: false, + }, + }, + Unique: true, + Comment: "", + NullsFirst: []bool{false}, + NullsDistinct: false, + Where: "", + Include: []string{}, + }, + }, + PrimaryKey: &constraint{ + Name: "job_pkey", + Columns: []string{"id"}, + Comment: "", + }, + + Comment: "A temporary holding place for jobs that are pushed to backend workers. Once work is completed the job should be deleted", +} + +type jobColumns struct { + Created column + ID column + Type column + RowID column +} + +func (c jobColumns) AsSlice() []column { + return []column{ + c.Created, c.ID, c.Type, c.RowID, + } +} + +type jobIndexes struct { + JobPkey index +} + +func (i jobIndexes) AsSlice() []index { + return []index{ + i.JobPkey, + } +} + +type jobForeignKeys struct{} + +func (f jobForeignKeys) AsSlice() []foreignKey { + return []foreignKey{} +} + +type jobUniques struct{} + +func (u jobUniques) AsSlice() []constraint { + return []constraint{} +} + +type jobChecks struct{} + +func (c jobChecks) AsSlice() []check { + return []check{} +} diff --git a/db/dbinfo/note_audio.bob.go b/db/dbinfo/note_audio.bob.go index 3046131f..88097a25 100644 --- a/db/dbinfo/note_audio.bob.go +++ b/db/dbinfo/note_audio.bob.go @@ -105,6 +105,15 @@ var NoteAudios = Table[ Generated: false, AutoIncr: false, }, + ID: column{ + Name: "id", + DBType: "integer", + Default: "IDENTITY", + Comment: "", + Nullable: false, + Generated: true, + AutoIncr: false, + }, }, Indexes: noteAudioIndexes{ NoteAudioPkey: index{ @@ -129,6 +138,23 @@ var NoteAudios = Table[ Where: "", Include: []string{}, }, + NoteAudioIDUnique: index{ + Type: "btree", + Name: "note_audio_id_unique", + Columns: []indexColumn{ + { + Name: "id", + Desc: null.FromCond(false, true), + IsExpression: false, + }, + }, + Unique: true, + Comment: "", + NullsFirst: []bool{false}, + NullsDistinct: false, + Where: "", + Include: []string{}, + }, }, PrimaryKey: &constraint{ Name: "note_audio_pkey", @@ -164,6 +190,13 @@ var NoteAudios = Table[ ForeignColumns: []string{"id"}, }, }, + Uniques: noteAudioUniques{ + NoteAudioIDUnique: constraint{ + Name: "note_audio_id_unique", + Columns: []string{"id"}, + Comment: "", + }, + }, Comment: "", } @@ -179,21 +212,23 @@ type noteAudioColumns struct { TranscriptionUserEdited column Version column UUID column + ID column } func (c noteAudioColumns) AsSlice() []column { return []column{ - c.Created, c.CreatorID, c.Deleted, c.DeletorID, c.Duration, c.OrganizationID, c.Transcription, c.TranscriptionUserEdited, c.Version, c.UUID, + c.Created, c.CreatorID, c.Deleted, c.DeletorID, c.Duration, c.OrganizationID, c.Transcription, c.TranscriptionUserEdited, c.Version, c.UUID, c.ID, } } type noteAudioIndexes struct { - NoteAudioPkey index + NoteAudioPkey index + NoteAudioIDUnique index } func (i noteAudioIndexes) AsSlice() []index { return []index{ - i.NoteAudioPkey, + i.NoteAudioPkey, i.NoteAudioIDUnique, } } @@ -209,10 +244,14 @@ func (f noteAudioForeignKeys) AsSlice() []foreignKey { } } -type noteAudioUniques struct{} +type noteAudioUniques struct { + NoteAudioIDUnique constraint +} func (u noteAudioUniques) AsSlice() []constraint { - return []constraint{} + return []constraint{ + u.NoteAudioIDUnique, + } } type noteAudioChecks struct{} diff --git a/db/enums/enums.bob.go b/db/enums/enums.bob.go index bf23bbd6..57c81120 100644 --- a/db/enums/enums.bob.go +++ b/db/enums/enums.bob.go @@ -1292,6 +1292,91 @@ func (e *Imagedatatype) Scan(value any) error { return nil } +// Enum values for Jobtype +const ( + JobtypeAudioTranscode Jobtype = "audio-transcode" + JobtypeCSVCommit Jobtype = "csv-commit" + JobtypeCSVImport Jobtype = "csv-import" + JobtypeLabelStudioAudioCreate Jobtype = "label-studio-audio-create" + JobtypeEmailSend Jobtype = "email-send" + JobtypeTextSend Jobtype = "text-send" +) + +func AllJobtype() []Jobtype { + return []Jobtype{ + JobtypeAudioTranscode, + JobtypeCSVCommit, + JobtypeCSVImport, + JobtypeLabelStudioAudioCreate, + JobtypeEmailSend, + JobtypeTextSend, + } +} + +type Jobtype string + +func (e Jobtype) String() string { + return string(e) +} + +func (e Jobtype) Valid() bool { + switch e { + case JobtypeAudioTranscode, + JobtypeCSVCommit, + JobtypeCSVImport, + JobtypeLabelStudioAudioCreate, + JobtypeEmailSend, + JobtypeTextSend: + return true + default: + return false + } +} + +// useful when testing in other packages +func (e Jobtype) All() []Jobtype { + return AllJobtype() +} + +func (e Jobtype) MarshalText() ([]byte, error) { + return []byte(e), nil +} + +func (e *Jobtype) UnmarshalText(text []byte) error { + return e.Scan(text) +} + +func (e Jobtype) MarshalBinary() ([]byte, error) { + return []byte(e), nil +} + +func (e *Jobtype) UnmarshalBinary(data []byte) error { + return e.Scan(data) +} + +func (e Jobtype) Value() (driver.Value, error) { + return string(e), nil +} + +func (e *Jobtype) Scan(value any) error { + switch x := value.(type) { + case string: + *e = Jobtype(x) + case []byte: + *e = Jobtype(x) + case nil: + return fmt.Errorf("cannot nil into Jobtype") + default: + return fmt.Errorf("cannot scan type %T: %v", value, value) + } + + if !e.Valid() { + return fmt.Errorf("invalid Jobtype value: %s", *e) + } + + return nil +} + // Enum values for Leadtype const ( LeadtypeUnknown Leadtype = "unknown" diff --git a/db/migrations/00110_job.sql b/db/migrations/00110_job.sql new file mode 100644 index 00000000..f7fd415a --- /dev/null +++ b/db/migrations/00110_job.sql @@ -0,0 +1,39 @@ +-- +goose Up +CREATE TYPE JobType AS ENUM ( + 'audio-transcode', + 'csv-commit', + 'csv-import', + 'label-studio-audio-create', + 'email-send', + 'text-send' +); + +CREATE TABLE job ( + created TIMESTAMP WITHOUT TIME ZONE NOT NULL, + id SERIAL NOT NULL, + type_ JobType NOT NULL, + row_id INTEGER NOT NULL, + PRIMARY KEY(id) +); +COMMENT ON TABLE job IS 'A temporary holding place for jobs that are pushed to backend workers. Once work is completed the job should be deleted'; + +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION notify_new_job() +RETURNS TRIGGER AS $$ +BEGIN + PERFORM pg_notify('new_job', NEW.id::text); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +-- +goose StatementEnd + +CREATE TRIGGER job_insert_trigger + AFTER INSERT ON job + FOR EACH ROW + EXECUTE FUNCTION notify_new_job(); + +-- +goose Down +DROP TRIGGER job_insert_trigger ON job; +DROP TABLE job; +DROP TYPE JobType; + diff --git a/db/migrations/00111_note_audio_id.sql b/db/migrations/00111_note_audio_id.sql new file mode 100644 index 00000000..913aabbc --- /dev/null +++ b/db/migrations/00111_note_audio_id.sql @@ -0,0 +1,7 @@ +-- +goose Up +ALTER TABLE note_audio +ADD COLUMN id INTEGER GENERATED ALWAYS AS IDENTITY; +ALTER TABLE note_audio +ADD CONSTRAINT note_audio_id_unique UNIQUE (id); +-- +goose Down +ALTER TABLE note_audio DROP COLUMN id; diff --git a/db/models/bob_where.bob.go b/db/models/bob_where.bob.go index 81f5881c..e79d5d22 100644 --- a/db/models/bob_where.bob.go +++ b/db/models/bob_where.bob.go @@ -78,6 +78,7 @@ func Where[Q psql.Filterable]() struct { GeometryColumns geometryColumnWhere[Q] GooseDBVersions gooseDBVersionWhere[Q] H3Aggregations h3AggregationWhere[Q] + Jobs jobWhere[Q] Leads leadWhere[Q] NoteAudios noteAudioWhere[Q] NoteAudioBreadcrumbs noteAudioBreadcrumbWhere[Q] @@ -176,6 +177,7 @@ func Where[Q psql.Filterable]() struct { GeometryColumns geometryColumnWhere[Q] GooseDBVersions gooseDBVersionWhere[Q] H3Aggregations h3AggregationWhere[Q] + Jobs jobWhere[Q] Leads leadWhere[Q] NoteAudios noteAudioWhere[Q] NoteAudioBreadcrumbs noteAudioBreadcrumbWhere[Q] @@ -273,6 +275,7 @@ func Where[Q psql.Filterable]() struct { GeometryColumns: buildGeometryColumnWhere[Q](GeometryColumns.Columns), GooseDBVersions: buildGooseDBVersionWhere[Q](GooseDBVersions.Columns), H3Aggregations: buildH3AggregationWhere[Q](H3Aggregations.Columns), + Jobs: buildJobWhere[Q](Jobs.Columns), Leads: buildLeadWhere[Q](Leads.Columns), NoteAudios: buildNoteAudioWhere[Q](NoteAudios.Columns), NoteAudioBreadcrumbs: buildNoteAudioBreadcrumbWhere[Q](NoteAudioBreadcrumbs.Columns), diff --git a/db/models/job.bob.go b/db/models/job.bob.go new file mode 100644 index 00000000..3f766847 --- /dev/null +++ b/db/models/job.bob.go @@ -0,0 +1,425 @@ +// Code generated by BobGen psql v0.42.5. DO NOT EDIT. +// This file is meant to be re-generated in place and/or deleted at any time. + +package models + +import ( + "context" + "io" + "time" + + "github.com/Gleipnir-Technology/bob" + "github.com/Gleipnir-Technology/bob/dialect/psql" + "github.com/Gleipnir-Technology/bob/dialect/psql/dialect" + "github.com/Gleipnir-Technology/bob/dialect/psql/dm" + "github.com/Gleipnir-Technology/bob/dialect/psql/sm" + "github.com/Gleipnir-Technology/bob/dialect/psql/um" + "github.com/Gleipnir-Technology/bob/expr" + enums "github.com/Gleipnir-Technology/nidus-sync/db/enums" + "github.com/aarondl/opt/omit" +) + +// Job is an object representing the database table. +type Job struct { + Created time.Time `db:"created" ` + ID int32 `db:"id,pk" ` + Type enums.Jobtype `db:"type_" ` + RowID int32 `db:"row_id" ` +} + +// JobSlice is an alias for a slice of pointers to Job. +// This should almost always be used instead of []*Job. +type JobSlice []*Job + +// Jobs contains methods to work with the job table +var Jobs = psql.NewTablex[*Job, JobSlice, *JobSetter]("", "job", buildJobColumns("job")) + +// JobsQuery is a query on the job table +type JobsQuery = *psql.ViewQuery[*Job, JobSlice] + +func buildJobColumns(alias string) jobColumns { + return jobColumns{ + ColumnsExpr: expr.NewColumnsExpr( + "created", "id", "type_", "row_id", + ).WithParent("job"), + tableAlias: alias, + Created: psql.Quote(alias, "created"), + ID: psql.Quote(alias, "id"), + Type: psql.Quote(alias, "type_"), + RowID: psql.Quote(alias, "row_id"), + } +} + +type jobColumns struct { + expr.ColumnsExpr + tableAlias string + Created psql.Expression + ID psql.Expression + Type psql.Expression + RowID psql.Expression +} + +func (c jobColumns) Alias() string { + return c.tableAlias +} + +func (jobColumns) AliasedAs(alias string) jobColumns { + return buildJobColumns(alias) +} + +// JobSetter is used for insert/upsert/update operations +// All values are optional, and do not have to be set +// Generated columns are not included +type JobSetter struct { + Created omit.Val[time.Time] `db:"created" ` + ID omit.Val[int32] `db:"id,pk" ` + Type omit.Val[enums.Jobtype] `db:"type_" ` + RowID omit.Val[int32] `db:"row_id" ` +} + +func (s JobSetter) SetColumns() []string { + vals := make([]string, 0, 4) + if s.Created.IsValue() { + vals = append(vals, "created") + } + if s.ID.IsValue() { + vals = append(vals, "id") + } + if s.Type.IsValue() { + vals = append(vals, "type_") + } + if s.RowID.IsValue() { + vals = append(vals, "row_id") + } + return vals +} + +func (s JobSetter) Overwrite(t *Job) { + if s.Created.IsValue() { + t.Created = s.Created.MustGet() + } + if s.ID.IsValue() { + t.ID = s.ID.MustGet() + } + if s.Type.IsValue() { + t.Type = s.Type.MustGet() + } + if s.RowID.IsValue() { + t.RowID = s.RowID.MustGet() + } +} + +func (s *JobSetter) Apply(q *dialect.InsertQuery) { + q.AppendHooks(func(ctx context.Context, exec bob.Executor) (context.Context, error) { + return Jobs.BeforeInsertHooks.RunHooks(ctx, exec, s) + }) + + q.AppendValues(bob.ExpressionFunc(func(ctx context.Context, w io.StringWriter, d bob.Dialect, start int) ([]any, error) { + vals := make([]bob.Expression, 4) + if s.Created.IsValue() { + vals[0] = psql.Arg(s.Created.MustGet()) + } else { + vals[0] = psql.Raw("DEFAULT") + } + + if s.ID.IsValue() { + vals[1] = psql.Arg(s.ID.MustGet()) + } else { + vals[1] = psql.Raw("DEFAULT") + } + + if s.Type.IsValue() { + vals[2] = psql.Arg(s.Type.MustGet()) + } else { + vals[2] = psql.Raw("DEFAULT") + } + + if s.RowID.IsValue() { + vals[3] = psql.Arg(s.RowID.MustGet()) + } else { + vals[3] = psql.Raw("DEFAULT") + } + + return bob.ExpressSlice(ctx, w, d, start, vals, "", ", ", "") + })) +} + +func (s JobSetter) UpdateMod() bob.Mod[*dialect.UpdateQuery] { + return um.Set(s.Expressions()...) +} + +func (s JobSetter) Expressions(prefix ...string) []bob.Expression { + exprs := make([]bob.Expression, 0, 4) + + if s.Created.IsValue() { + exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{ + psql.Quote(append(prefix, "created")...), + psql.Arg(s.Created), + }}) + } + + if s.ID.IsValue() { + exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{ + psql.Quote(append(prefix, "id")...), + psql.Arg(s.ID), + }}) + } + + if s.Type.IsValue() { + exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{ + psql.Quote(append(prefix, "type_")...), + psql.Arg(s.Type), + }}) + } + + if s.RowID.IsValue() { + exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{ + psql.Quote(append(prefix, "row_id")...), + psql.Arg(s.RowID), + }}) + } + + return exprs +} + +// FindJob retrieves a single record by primary key +// If cols is empty Find will return all columns. +func FindJob(ctx context.Context, exec bob.Executor, IDPK int32, cols ...string) (*Job, error) { + if len(cols) == 0 { + return Jobs.Query( + sm.Where(Jobs.Columns.ID.EQ(psql.Arg(IDPK))), + ).One(ctx, exec) + } + + return Jobs.Query( + sm.Where(Jobs.Columns.ID.EQ(psql.Arg(IDPK))), + sm.Columns(Jobs.Columns.Only(cols...)), + ).One(ctx, exec) +} + +// JobExists checks the presence of a single record by primary key +func JobExists(ctx context.Context, exec bob.Executor, IDPK int32) (bool, error) { + return Jobs.Query( + sm.Where(Jobs.Columns.ID.EQ(psql.Arg(IDPK))), + ).Exists(ctx, exec) +} + +// AfterQueryHook is called after Job is retrieved from the database +func (o *Job) AfterQueryHook(ctx context.Context, exec bob.Executor, queryType bob.QueryType) error { + var err error + + switch queryType { + case bob.QueryTypeSelect: + ctx, err = Jobs.AfterSelectHooks.RunHooks(ctx, exec, JobSlice{o}) + case bob.QueryTypeInsert: + ctx, err = Jobs.AfterInsertHooks.RunHooks(ctx, exec, JobSlice{o}) + case bob.QueryTypeUpdate: + ctx, err = Jobs.AfterUpdateHooks.RunHooks(ctx, exec, JobSlice{o}) + case bob.QueryTypeDelete: + ctx, err = Jobs.AfterDeleteHooks.RunHooks(ctx, exec, JobSlice{o}) + } + + return err +} + +// primaryKeyVals returns the primary key values of the Job +func (o *Job) primaryKeyVals() bob.Expression { + return psql.Arg(o.ID) +} + +func (o *Job) pkEQ() dialect.Expression { + return psql.Quote("job", "id").EQ(bob.ExpressionFunc(func(ctx context.Context, w io.StringWriter, d bob.Dialect, start int) ([]any, error) { + return o.primaryKeyVals().WriteSQL(ctx, w, d, start) + })) +} + +// Update uses an executor to update the Job +func (o *Job) Update(ctx context.Context, exec bob.Executor, s *JobSetter) error { + v, err := Jobs.Update(s.UpdateMod(), um.Where(o.pkEQ())).One(ctx, exec) + if err != nil { + return err + } + + *o = *v + + return nil +} + +// Delete deletes a single Job record with an executor +func (o *Job) Delete(ctx context.Context, exec bob.Executor) error { + _, err := Jobs.Delete(dm.Where(o.pkEQ())).Exec(ctx, exec) + return err +} + +// Reload refreshes the Job using the executor +func (o *Job) Reload(ctx context.Context, exec bob.Executor) error { + o2, err := Jobs.Query( + sm.Where(Jobs.Columns.ID.EQ(psql.Arg(o.ID))), + ).One(ctx, exec) + if err != nil { + return err + } + + *o = *o2 + + return nil +} + +// AfterQueryHook is called after JobSlice is retrieved from the database +func (o JobSlice) AfterQueryHook(ctx context.Context, exec bob.Executor, queryType bob.QueryType) error { + var err error + + switch queryType { + case bob.QueryTypeSelect: + ctx, err = Jobs.AfterSelectHooks.RunHooks(ctx, exec, o) + case bob.QueryTypeInsert: + ctx, err = Jobs.AfterInsertHooks.RunHooks(ctx, exec, o) + case bob.QueryTypeUpdate: + ctx, err = Jobs.AfterUpdateHooks.RunHooks(ctx, exec, o) + case bob.QueryTypeDelete: + ctx, err = Jobs.AfterDeleteHooks.RunHooks(ctx, exec, o) + } + + return err +} + +func (o JobSlice) pkIN() dialect.Expression { + if len(o) == 0 { + return psql.Raw("NULL") + } + + return psql.Quote("job", "id").In(bob.ExpressionFunc(func(ctx context.Context, w io.StringWriter, d bob.Dialect, start int) ([]any, error) { + pkPairs := make([]bob.Expression, len(o)) + for i, row := range o { + pkPairs[i] = row.primaryKeyVals() + } + return bob.ExpressSlice(ctx, w, d, start, pkPairs, "", ", ", "") + })) +} + +// copyMatchingRows finds models in the given slice that have the same primary key +// then it first copies the existing relationships from the old model to the new model +// and then replaces the old model in the slice with the new model +func (o JobSlice) copyMatchingRows(from ...*Job) { + for i, old := range o { + for _, new := range from { + if new.ID != old.ID { + continue + } + + o[i] = new + break + } + } +} + +// UpdateMod modifies an update query with "WHERE primary_key IN (o...)" +func (o JobSlice) UpdateMod() bob.Mod[*dialect.UpdateQuery] { + return bob.ModFunc[*dialect.UpdateQuery](func(q *dialect.UpdateQuery) { + q.AppendHooks(func(ctx context.Context, exec bob.Executor) (context.Context, error) { + return Jobs.BeforeUpdateHooks.RunHooks(ctx, exec, o) + }) + + q.AppendLoader(bob.LoaderFunc(func(ctx context.Context, exec bob.Executor, retrieved any) error { + var err error + switch retrieved := retrieved.(type) { + case *Job: + o.copyMatchingRows(retrieved) + case []*Job: + o.copyMatchingRows(retrieved...) + case JobSlice: + o.copyMatchingRows(retrieved...) + default: + // If the retrieved value is not a Job or a slice of Job + // then run the AfterUpdateHooks on the slice + _, err = Jobs.AfterUpdateHooks.RunHooks(ctx, exec, o) + } + + return err + })) + + q.AppendWhere(o.pkIN()) + }) +} + +// DeleteMod modifies an delete query with "WHERE primary_key IN (o...)" +func (o JobSlice) DeleteMod() bob.Mod[*dialect.DeleteQuery] { + return bob.ModFunc[*dialect.DeleteQuery](func(q *dialect.DeleteQuery) { + q.AppendHooks(func(ctx context.Context, exec bob.Executor) (context.Context, error) { + return Jobs.BeforeDeleteHooks.RunHooks(ctx, exec, o) + }) + + q.AppendLoader(bob.LoaderFunc(func(ctx context.Context, exec bob.Executor, retrieved any) error { + var err error + switch retrieved := retrieved.(type) { + case *Job: + o.copyMatchingRows(retrieved) + case []*Job: + o.copyMatchingRows(retrieved...) + case JobSlice: + o.copyMatchingRows(retrieved...) + default: + // If the retrieved value is not a Job or a slice of Job + // then run the AfterDeleteHooks on the slice + _, err = Jobs.AfterDeleteHooks.RunHooks(ctx, exec, o) + } + + return err + })) + + q.AppendWhere(o.pkIN()) + }) +} + +func (o JobSlice) UpdateAll(ctx context.Context, exec bob.Executor, vals JobSetter) error { + if len(o) == 0 { + return nil + } + + _, err := Jobs.Update(vals.UpdateMod(), o.UpdateMod()).All(ctx, exec) + return err +} + +func (o JobSlice) DeleteAll(ctx context.Context, exec bob.Executor) error { + if len(o) == 0 { + return nil + } + + _, err := Jobs.Delete(o.DeleteMod()).Exec(ctx, exec) + return err +} + +func (o JobSlice) ReloadAll(ctx context.Context, exec bob.Executor) error { + if len(o) == 0 { + return nil + } + + o2, err := Jobs.Query(sm.Where(o.pkIN())).All(ctx, exec) + if err != nil { + return err + } + + o.copyMatchingRows(o2...) + + return nil +} + +type jobWhere[Q psql.Filterable] struct { + Created psql.WhereMod[Q, time.Time] + ID psql.WhereMod[Q, int32] + Type psql.WhereMod[Q, enums.Jobtype] + RowID psql.WhereMod[Q, int32] +} + +func (jobWhere[Q]) AliasedAs(alias string) jobWhere[Q] { + return buildJobWhere[Q](buildJobColumns(alias)) +} + +func buildJobWhere[Q psql.Filterable](cols jobColumns) jobWhere[Q] { + return jobWhere[Q]{ + Created: psql.Where[Q, time.Time](cols.Created), + ID: psql.Where[Q, int32](cols.ID), + Type: psql.Where[Q, enums.Jobtype](cols.Type), + RowID: psql.Where[Q, int32](cols.RowID), + } +} diff --git a/db/models/note_audio.bob.go b/db/models/note_audio.bob.go index c502d998..47e6e3bc 100644 --- a/db/models/note_audio.bob.go +++ b/db/models/note_audio.bob.go @@ -36,6 +36,7 @@ type NoteAudio struct { TranscriptionUserEdited bool `db:"transcription_user_edited" ` Version int32 `db:"version,pk" ` UUID uuid.UUID `db:"uuid,pk" ` + ID int32 `db:"id,generated" ` R noteAudioR `db:"-" ` } @@ -62,7 +63,7 @@ type noteAudioR struct { func buildNoteAudioColumns(alias string) noteAudioColumns { return noteAudioColumns{ ColumnsExpr: expr.NewColumnsExpr( - "created", "creator_id", "deleted", "deletor_id", "duration", "organization_id", "transcription", "transcription_user_edited", "version", "uuid", + "created", "creator_id", "deleted", "deletor_id", "duration", "organization_id", "transcription", "transcription_user_edited", "version", "uuid", "id", ).WithParent("note_audio"), tableAlias: alias, Created: psql.Quote(alias, "created"), @@ -75,6 +76,7 @@ func buildNoteAudioColumns(alias string) noteAudioColumns { TranscriptionUserEdited: psql.Quote(alias, "transcription_user_edited"), Version: psql.Quote(alias, "version"), UUID: psql.Quote(alias, "uuid"), + ID: psql.Quote(alias, "id"), } } @@ -91,6 +93,7 @@ type noteAudioColumns struct { TranscriptionUserEdited psql.Expression Version psql.Expression UUID psql.Expression + ID psql.Expression } func (c noteAudioColumns) Alias() string { @@ -992,6 +995,7 @@ type noteAudioWhere[Q psql.Filterable] struct { TranscriptionUserEdited psql.WhereMod[Q, bool] Version psql.WhereMod[Q, int32] UUID psql.WhereMod[Q, uuid.UUID] + ID psql.WhereMod[Q, int32] } func (noteAudioWhere[Q]) AliasedAs(alias string) noteAudioWhere[Q] { @@ -1010,6 +1014,7 @@ func buildNoteAudioWhere[Q psql.Filterable](cols noteAudioColumns) noteAudioWher TranscriptionUserEdited: psql.Where[Q, bool](cols.TranscriptionUserEdited), Version: psql.Where[Q, int32](cols.Version), UUID: psql.Where[Q, uuid.UUID](cols.UUID), + ID: psql.Where[Q, int32](cols.ID), } } diff --git a/main.go b/main.go index d509c991..71318c51 100644 --- a/main.go +++ b/main.go @@ -18,10 +18,6 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/html" "github.com/Gleipnir-Technology/nidus-sync/llm" "github.com/Gleipnir-Technology/nidus-sync/platform" - "github.com/Gleipnir-Technology/nidus-sync/platform/email" - "github.com/Gleipnir-Technology/nidus-sync/platform/file" - "github.com/Gleipnir-Technology/nidus-sync/platform/geocode" - "github.com/Gleipnir-Technology/nidus-sync/platform/text" "github.com/Gleipnir-Technology/nidus-sync/rmo" nidussync "github.com/Gleipnir-Technology/nidus-sync/sync" "github.com/getsentry/sentry-go" @@ -96,25 +92,15 @@ func main() { log.Error().Err(err).Msg("Failed to load html templates") os.Exit(4) } - err = email.LoadTemplates() + // Start up background processes + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = platform.StartAll(ctx) if err != nil { - log.Error().Err(err).Msg("Failed to load email templates") + log.Error().Err(err).Msg("Failed at platform.StartAll") os.Exit(5) } - - err = text.StoreSources() - if err != nil { - log.Error().Err(err).Msg("Failed to store text source phone numbers") - os.Exit(6) - } - - err = file.CreateDirectories() - if err != nil { - log.Error().Err(err).Msg("Failed to create file directories") - os.Exit(7) - } - - geocode.InitializeStadia(config.StadiaMapsAPIKey) router_logger := log.With().Logger() sentryMiddleware := sentryhttp.New(sentryhttp.Options{ Repanic: true, @@ -141,11 +127,6 @@ func main() { log.Debug().Str("report url", config.DomainRMO).Str("sync url", config.DomainNidus).Msg("Serving at URLs") - // Start up background processes - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - platform.BackgroundStart(ctx) - openai_logger := log.With().Logger() err = llm.CreateOpenAIClient(ctx, &openai_logger) if err != nil { @@ -182,7 +163,7 @@ func main() { cancel() close(chan_envelope) - platform.BackgroundWaitForExit() + platform.WaitForExit() log.Info().Msg("Shutdown complete") } diff --git a/platform/background/arcgis.go b/platform/arcgis.go similarity index 91% rename from platform/background/arcgis.go rename to platform/arcgis.go index 9bf47e13..1ee0d0c0 100644 --- a/platform/background/arcgis.go +++ b/platform/arcgis.go @@ -1,4 +1,4 @@ -package background +package platform import ( "bytes" @@ -24,15 +24,18 @@ import ( "github.com/Gleipnir-Technology/arcgis-go/response" "github.com/Gleipnir-Technology/bob" "github.com/Gleipnir-Technology/bob/dialect/psql" + "github.com/Gleipnir-Technology/bob/dialect/psql/dialect" "github.com/Gleipnir-Technology/bob/dialect/psql/dm" "github.com/Gleipnir-Technology/bob/dialect/psql/im" "github.com/Gleipnir-Technology/bob/dialect/psql/sm" "github.com/Gleipnir-Technology/bob/dialect/psql/um" "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db" + "github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/db/sql" "github.com/Gleipnir-Technology/nidus-sync/debug" + "github.com/Gleipnir-Technology/nidus-sync/h3utils" "github.com/Gleipnir-Technology/nidus-sync/platform/oauth" "github.com/aarondl/opt/omit" "github.com/aarondl/opt/omitnull" @@ -40,11 +43,11 @@ import ( "github.com/jackc/pgx/v5" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "github.com/uber/h3-go/v4" ) var syncStatusByOrg map[int32]bool -var newOAuthTokenChannel chan struct{} var CodeVerifier string = "random_secure_string_min_43_chars_long_should_be_stored_in_session" func HasFieldseekerConnection(ctx context.Context, user_id int32) (bool, error) { @@ -216,7 +219,7 @@ func generateCodeVerifier() string { } // Find out what we can about this user -func UpdateArcgisUserData(ctx context.Context, user *models.User, oauth *models.ArcgisOauthToken) { +func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models.ArcgisOauthToken) { client, err := arcgis.NewArcGISAuth( ctx, &arcgis.AuthenticatorOAuth{ @@ -333,7 +336,7 @@ func UpdateArcgisUserData(ctx context.Context, user *models.User, oauth *models. newOAuthTokenChannel <- struct{}{} } -func NewFieldSeeker(ctx context.Context, oa *models.ArcgisOauthToken) (*fieldseeker.FieldSeeker, error) { +func newFieldSeeker(ctx context.Context, oa *models.ArcgisOauthToken) (*fieldseeker.FieldSeeker, error) { row, err := sql.OrgByOauthId(oa.ID).One(ctx, db.PGInstance.BobDB) if err != nil { return nil, fmt.Errorf("Failed to get org ID from oauth %d: %w", oa.ID, err) @@ -562,7 +565,7 @@ func periodicallyExportFieldseeker(ctx context.Context, org *models.Organization log.Debug().Int32("org.id", org.ID).Msg("No oauth for org") continue } - fssync, err := NewFieldSeeker(ctx, oa) + fssync, err := newFieldSeeker(ctx, oa) if err != nil { if errors.Is(err, &oauth.InvalidatedTokenError{}) { log.Info().Int32("org", org.ID).Msg("oauth token for org is invalid, waiting for refresh") @@ -1642,3 +1645,149 @@ func ensureArcgisAccount(ctx context.Context, txn bob.Tx, portal *response.Porta } return account, nil } +func updateSummaryTables(ctx context.Context, org *models.Organization) { + updateSummaryMosquitoSource(ctx, org) + updateSummaryServiceRequest(ctx, org) + updateSummaryTrap(ctx, org) +} + +func aggregateAtResolution(ctx context.Context, resolution int, org_id int32, type_ enums.H3aggregationtype, cells []h3.Cell) error { + var err error + log.Debug().Int("resolution", resolution).Str("type", string(type_)).Msg("Working summary layer") + cellToCount := make(map[h3.Cell]int, 0) + for _, cell := range cells { + scaled, err := cell.Parent(resolution) + if err != nil { + log.Error().Err(err).Int("resolution", resolution).Msg("Failed to get cell's parent at resolution") + continue + } + cellToCount[scaled] = cellToCount[scaled] + 1 + } + + _, err = models.H3Aggregations.Delete( + dm.Where( + psql.And( + models.H3Aggregations.Columns.OrganizationID.EQ(psql.Arg(org_id)), + models.H3Aggregations.Columns.Resolution.EQ(psql.Arg(resolution)), + models.H3Aggregations.Columns.Type.EQ(psql.Arg(type_)), + ), + ), + ).Exec(ctx, db.PGInstance.BobDB) + if err != nil { + return fmt.Errorf("Failed to clear previous aggregation: %w", err) + } + var to_insert []bob.Mod[*dialect.InsertQuery] = make([]bob.Mod[*dialect.InsertQuery], 0) + to_insert = append(to_insert, im.Into("h3_aggregation", "cell", "resolution", "count_", "type_", "organization_id", "geometry")) + for cell, count := range cellToCount { + polygon, err := h3utils.CellToPostgisGeometry(cell) + if err != nil { + log.Error().Err(err).Msg("Failed to get PostGIS geometry") + continue + } + // log.Info().Str("polygon", polygon).Msg("Going to insert") + to_insert = append(to_insert, im.Values(psql.Arg(cell.String(), resolution, count, type_, org_id), psql.F("st_geomfromtext", psql.S(polygon), 4326))) + } + to_insert = append(to_insert, im.OnConflict("cell, organization_id, type_").DoUpdate( + im.SetCol("count_").To(psql.Raw("EXCLUDED.count_")), + )) + //log.Info().Str("sql", insertQueryToString(psql.Insert(to_insert...))).Msg("Updating...") + _, err = psql.Insert(to_insert...).Exec(ctx, db.PGInstance.BobDB) + if err != nil { + return fmt.Errorf("Failed to add h3 aggregation: %w", err) + } + return nil +} + +func updateSummaryMosquitoSource(ctx context.Context, org *models.Organization) { + point_locations, err := org.Pointlocations().All(ctx, db.PGInstance.BobDB) + if err != nil { + log.Error().Err(err).Msg("Failed to get all point locations") + return + } + if len(point_locations) == 0 { + log.Info().Int("org_id", int(org.ID)).Msg("No updates to perform") + return + } + + cells := make([]h3.Cell, 0) + for _, p := range point_locations { + if p.H3cell.IsNull() { + continue + } + cell, err := h3utils.ToCell(p.H3cell.MustGet()) + if err != nil { + log.Error().Err(err).Msg("Failed to get geometry point") + continue + } + cells = append(cells, cell) + } + + for i := range 16 { + err = aggregateAtResolution(ctx, i, org.ID, enums.H3aggregationtypeMosquitosource, cells) + if err != nil { + log.Error().Err(err).Int("resolution", i).Msg("Failed to aggregate mosquito source") + } + } +} + +func updateSummaryServiceRequest(ctx context.Context, org *models.Organization) { + service_requests, err := org.Servicerequests().All(ctx, db.PGInstance.BobDB) + if err != nil { + log.Error().Err(err).Msg("Failed to get all service requests") + return + } + if len(service_requests) == 0 { + log.Info().Int("org_id", int(org.ID)).Msg("No updates to perform") + return + } + + cells := make([]h3.Cell, 0) + for _, p := range service_requests { + if p.H3cell.IsNull() { + continue + } + cell, err := h3utils.ToCell(p.H3cell.MustGet()) + if err != nil { + log.Error().Err(err).Msg("Failed to get geometry point") + continue + } + cells = append(cells, cell) + } + for i := range 16 { + err = aggregateAtResolution(ctx, i, org.ID, enums.H3aggregationtypeServicerequest, cells) + if err != nil { + log.Error().Err(err).Int("resolution", i).Msg("Failed to aggregate service request") + } + } +} + +func updateSummaryTrap(ctx context.Context, org *models.Organization) { + traps, err := org.Traplocations().All(ctx, db.PGInstance.BobDB) + if err != nil { + log.Error().Err(err).Msg("Failed to get all trap locations") + return + } + if len(traps) == 0 { + log.Info().Int("org_id", int(org.ID)).Msg("No updates to perform") + return + } + + cells := make([]h3.Cell, 0) + for _, t := range traps { + if t.H3cell.IsNull() { + continue + } + cell, err := h3utils.ToCell(t.H3cell.MustGet()) + if err != nil { + log.Error().Err(err).Msg("Failed to get geometry point") + continue + } + cells = append(cells, cell) + } + for i := range 16 { + err = aggregateAtResolution(ctx, i, org.ID, enums.H3aggregationtypeTrap, cells) + if err != nil { + log.Error().Err(err).Int("resolution", i).Msg("Failed to aggregate trap") + } + } +} diff --git a/platform/audio.go b/platform/audio.go index 0d3b65ce..ad80b3b3 100644 --- a/platform/audio.go +++ b/platform/audio.go @@ -1 +1,38 @@ package platform + +import ( + "context" + "fmt" + + "github.com/Gleipnir-Technology/bob" + "github.com/Gleipnir-Technology/nidus-sync/db" + "github.com/Gleipnir-Technology/nidus-sync/db/models" + "github.com/Gleipnir-Technology/nidus-sync/platform/background" + "github.com/Gleipnir-Technology/nidus-sync/platform/subprocess" + //"github.com/google/uuid" + //"github.com/rs/zerolog/log" +) + +func processAudioFile(ctx context.Context, txn bob.Executor, audio_id int32) error { + a, err := models.NoteAudios.Query( + models.SelectWhere.NoteAudios.ID.EQ(audio_id), + ).One(ctx, db.PGInstance.BobDB) + + if err != nil { + return fmt.Errorf("note audio query: %w", err) + } + // Normalize audio + err = subprocess.NormalizeAudio(a.UUID) + if err != nil { + return fmt.Errorf("failed to normalize audio %s: %v", a.UUID, err) + } + + // Transcode to OGG + err = subprocess.TranscodeToOgg(a.UUID) + if err != nil { + return fmt.Errorf("failed to transcode audio %s to OGG: %v", a.UUID, err) + } + + background.NewLabelStudioAudioCreate(ctx, db.PGInstance.BobDB, audio_id) + return nil +} diff --git a/platform/background.go b/platform/background.go deleted file mode 100644 index 709a6ac7..00000000 --- a/platform/background.go +++ /dev/null @@ -1,14 +0,0 @@ -package platform - -import ( - "context" - - "github.com/Gleipnir-Technology/nidus-sync/platform/background" -) - -func BackgroundStart(ctx context.Context) { - background.Start(ctx) -} -func BackgroundWaitForExit() { - background.WaitForExit() -} diff --git a/platform/background/audio.go b/platform/background/audio.go deleted file mode 100644 index 41348c80..00000000 --- a/platform/background/audio.go +++ /dev/null @@ -1,71 +0,0 @@ -package background - -import ( - "context" - "fmt" - - "github.com/Gleipnir-Technology/nidus-sync/platform/subprocess" - "github.com/google/uuid" - "github.com/rs/zerolog/log" -) - -// AudioJob represents a job to process an audio file. -type jobAudio struct { - AudioUUID uuid.UUID -} - -var channelJobAudio chan jobAudio - -func AudioTranscode(audio_uuid uuid.UUID) { - enqueueAudioJob(jobAudio{ - AudioUUID: audio_uuid, - }) -} - -// startAudioWorker initializes the audio job channel and starts the worker goroutine. -func startWorkerAudio(ctx context.Context, audioJobChannel chan jobAudio) { - go func() { - for { - select { - case <-ctx.Done(): - log.Info().Msg("Audio worker shutting down.") - return - case job := <-audioJobChannel: - log.Info().Str("uuid", job.AudioUUID.String()).Msg("Processing audio job") - err := processAudioFile(job.AudioUUID) - if err != nil { - log.Error().Err(err).Str("uuid", job.AudioUUID.String()).Msg("Error processing audio file") - } - } - } - }() -} - -// EnqueueAudioJob sends an audio processing job to the worker. -func enqueueAudioJob(job jobAudio) { - select { - case channelJobAudio <- job: - log.Info().Str("uuid", job.AudioUUID.String()).Msg("Enqueued audio job") - default: - log.Warn().Str("uuid", job.AudioUUID.String()).Msg("Audio job channel is full, dropping job") - } -} - -func processAudioFile(audioUUID uuid.UUID) error { - // Normalize audio - err := subprocess.NormalizeAudio(audioUUID) - if err != nil { - return fmt.Errorf("failed to normalize audio %s: %v", audioUUID, err) - } - - // Transcode to OGG - err = subprocess.TranscodeToOgg(audioUUID) - if err != nil { - return fmt.Errorf("failed to transcode audio %s to OGG: %v", audioUUID, err) - } - - enqueueLabelStudioJob(jobLabelStudio{ - UUID: audioUUID, - }) - return nil -} diff --git a/platform/background/background.go b/platform/background/background.go index bb3bc8ce..90a69d7c 100644 --- a/platform/background/background.go +++ b/platform/background/background.go @@ -3,82 +3,40 @@ package background import ( "context" "fmt" - "sync" - //commsemail "github.com/Gleipnir-Technology/nidus-sync/comms/email" - //"github.com/Gleipnir-Technology/nidus-sync/config" - "github.com/Gleipnir-Technology/nidus-sync/platform/email" - "github.com/Gleipnir-Technology/nidus-sync/platform/text" - "github.com/rs/zerolog/log" + "github.com/Gleipnir-Technology/bob" + "github.com/Gleipnir-Technology/nidus-sync/db/enums" + "github.com/Gleipnir-Technology/nidus-sync/db/models" + "github.com/aarondl/opt/omit" + //"github.com/rs/zerolog/log" ) -var waitGroup sync.WaitGroup - -func Start(ctx context.Context) { - newOAuthTokenChannel = make(chan struct{}, 10) - - channelJobAudio = make(chan jobAudio, 100) // Buffered channel to prevent blocking - channelJobCSV = make(chan jobCSV, 100) // Buffered channel to prevent blocking - channelJobEmail = make(chan email.Job, 100) // Buffered channel to prevent blocking - channelJobText = make(chan text.Job, 100) // Buffered channel to prevent blocking - - /* - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - commsemail.StartWebsocket(ctx, config.ForwardEmailAPIToken) - }() - */ - - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - refreshFieldseekerData(ctx, newOAuthTokenChannel) - }() - - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - startWorkerAudio(ctx, channelJobAudio) - }() - - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - startWorkerCSV(ctx, channelJobCSV) - }() - - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - startWorkerEmail(ctx, channelJobEmail) - }() - - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - startWorkerText(ctx, channelJobText) - }() - - err := addWaitingJobs(ctx) - if err != nil { - log.Error().Err(err).Msg("Failed to add waiting background jobs") - } +func NewAudioTranscode(ctx context.Context, txn bob.Executor, audio_id int32) error { + return newJob(ctx, txn, enums.JobtypeCSVCommit, audio_id) } - -func WaitForExit() { - - waitGroup.Wait() +func NewCSVCommit(ctx context.Context, txn bob.Executor, csv_id int32) error { + return newJob(ctx, txn, enums.JobtypeCSVCommit, csv_id) } - -func addWaitingJobs(ctx context.Context) error { - err := addWaitingJobsCommit(ctx) +func NewCSVImport(ctx context.Context, txn bob.Executor, csv_id int32) error { + return newJob(ctx, txn, enums.JobtypeCSVImport, csv_id) +} +func NewEmailSend(ctx context.Context, txn bob.Executor, email_id int32) error { + return newJob(ctx, txn, enums.JobtypeEmailSend, email_id) +} +func NewLabelStudioAudioCreate(ctx context.Context, txn bob.Executor, note_audio_id int32) error { + return newJob(ctx, txn, enums.JobtypeLabelStudioAudioCreate, note_audio_id) +} +func NewTextSend(ctx context.Context, txn bob.Executor, text_id int32) error { + return newJob(ctx, txn, enums.JobtypeTextSend, text_id) +} +func newJob(ctx context.Context, txn bob.Executor, t enums.Jobtype, id int32) error { + _, err := models.Jobs.Insert(&models.JobSetter{ + // ID + Type: omit.From(t), + RowID: omit.From(id), + }).One(ctx, txn) if err != nil { - return fmt.Errorf("commit: %w", err) - } - err = addWaitingJobsImport(ctx) - if err != nil { - return fmt.Errorf("commit: %w", err) + return fmt.Errorf("insert job: %w", err) } return nil } diff --git a/platform/background/email.go b/platform/background/email.go deleted file mode 100644 index e9ab160b..00000000 --- a/platform/background/email.go +++ /dev/null @@ -1,44 +0,0 @@ -package background - -import ( - "context" - - "github.com/Gleipnir-Technology/nidus-sync/platform/email" - "github.com/rs/zerolog/log" -) - -var channelJobEmail chan email.Job - -func ReportSubscriptionConfirmationEmail(destination, report_id string) { - enqueueJobEmail(email.NewJobReportNotificationConfirmation( - destination, - report_id, - )) -} - -func enqueueJobEmail(job email.Job) { - select { - case channelJobEmail <- job: - return - default: - log.Warn().Msg("email job channel is full, dropping job") - } -} - -func startWorkerEmail(ctx context.Context, channel chan email.Job) { - go func() { - log.Debug().Msg("Email worker started") - for { - select { - case <-ctx.Done(): - log.Info().Msg("Email worker shutting down.") - return - case job := <-channel: - err := email.Handle(ctx, job) - if err != nil { - log.Error().Err(err).Msg("Failed to handle email message") - } - } - } - }() -} diff --git a/platform/background/summary.go b/platform/background/summary.go deleted file mode 100644 index 874c56c8..00000000 --- a/platform/background/summary.go +++ /dev/null @@ -1,165 +0,0 @@ -package background - -import ( - "context" - "fmt" - - "github.com/Gleipnir-Technology/bob" - "github.com/Gleipnir-Technology/bob/dialect/psql" - "github.com/Gleipnir-Technology/bob/dialect/psql/dialect" - "github.com/Gleipnir-Technology/bob/dialect/psql/dm" - "github.com/Gleipnir-Technology/bob/dialect/psql/im" - "github.com/Gleipnir-Technology/nidus-sync/db" - "github.com/Gleipnir-Technology/nidus-sync/db/enums" - "github.com/Gleipnir-Technology/nidus-sync/db/models" - "github.com/Gleipnir-Technology/nidus-sync/h3utils" - "github.com/rs/zerolog/log" - "github.com/uber/h3-go/v4" -) - -func updateSummaryTables(ctx context.Context, org *models.Organization) { - updateSummaryMosquitoSource(ctx, org) - updateSummaryServiceRequest(ctx, org) - updateSummaryTrap(ctx, org) -} - -func aggregateAtResolution(ctx context.Context, resolution int, org_id int32, type_ enums.H3aggregationtype, cells []h3.Cell) error { - var err error - log.Info().Int("resolution", resolution).Str("type", string(type_)).Msg("Working summary layer") - cellToCount := make(map[h3.Cell]int, 0) - for _, cell := range cells { - scaled, err := cell.Parent(resolution) - if err != nil { - log.Error().Err(err).Int("resolution", resolution).Msg("Failed to get cell's parent at resolution") - continue - } - cellToCount[scaled] = cellToCount[scaled] + 1 - } - - _, err = models.H3Aggregations.Delete( - dm.Where( - psql.And( - models.H3Aggregations.Columns.OrganizationID.EQ(psql.Arg(org_id)), - models.H3Aggregations.Columns.Resolution.EQ(psql.Arg(resolution)), - models.H3Aggregations.Columns.Type.EQ(psql.Arg(type_)), - ), - ), - ).Exec(ctx, db.PGInstance.BobDB) - if err != nil { - return fmt.Errorf("Failed to clear previous aggregation: %w", err) - } - var to_insert []bob.Mod[*dialect.InsertQuery] = make([]bob.Mod[*dialect.InsertQuery], 0) - to_insert = append(to_insert, im.Into("h3_aggregation", "cell", "resolution", "count_", "type_", "organization_id", "geometry")) - for cell, count := range cellToCount { - polygon, err := h3utils.CellToPostgisGeometry(cell) - if err != nil { - log.Error().Err(err).Msg("Failed to get PostGIS geometry") - continue - } - // log.Info().Str("polygon", polygon).Msg("Going to insert") - to_insert = append(to_insert, im.Values(psql.Arg(cell.String(), resolution, count, type_, org_id), psql.F("st_geomfromtext", psql.S(polygon), 4326))) - } - to_insert = append(to_insert, im.OnConflict("cell, organization_id, type_").DoUpdate( - im.SetCol("count_").To(psql.Raw("EXCLUDED.count_")), - )) - //log.Info().Str("sql", insertQueryToString(psql.Insert(to_insert...))).Msg("Updating...") - _, err = psql.Insert(to_insert...).Exec(ctx, db.PGInstance.BobDB) - if err != nil { - return fmt.Errorf("Failed to add h3 aggregation: %w", err) - } - return nil -} - -func updateSummaryMosquitoSource(ctx context.Context, org *models.Organization) { - point_locations, err := org.Pointlocations().All(ctx, db.PGInstance.BobDB) - if err != nil { - log.Error().Err(err).Msg("Failed to get all point locations") - return - } - if len(point_locations) == 0 { - log.Info().Int("org_id", int(org.ID)).Msg("No updates to perform") - return - } - - cells := make([]h3.Cell, 0) - for _, p := range point_locations { - if p.H3cell.IsNull() { - continue - } - cell, err := h3utils.ToCell(p.H3cell.MustGet()) - if err != nil { - log.Error().Err(err).Msg("Failed to get geometry point") - continue - } - cells = append(cells, cell) - } - - for i := range 16 { - err = aggregateAtResolution(ctx, i, org.ID, enums.H3aggregationtypeMosquitosource, cells) - if err != nil { - log.Error().Err(err).Int("resolution", i).Msg("Failed to aggregate mosquito source") - } - } -} - -func updateSummaryServiceRequest(ctx context.Context, org *models.Organization) { - service_requests, err := org.Servicerequests().All(ctx, db.PGInstance.BobDB) - if err != nil { - log.Error().Err(err).Msg("Failed to get all service requests") - return - } - if len(service_requests) == 0 { - log.Info().Int("org_id", int(org.ID)).Msg("No updates to perform") - return - } - - cells := make([]h3.Cell, 0) - for _, p := range service_requests { - if p.H3cell.IsNull() { - continue - } - cell, err := h3utils.ToCell(p.H3cell.MustGet()) - if err != nil { - log.Error().Err(err).Msg("Failed to get geometry point") - continue - } - cells = append(cells, cell) - } - for i := range 16 { - err = aggregateAtResolution(ctx, i, org.ID, enums.H3aggregationtypeServicerequest, cells) - if err != nil { - log.Error().Err(err).Int("resolution", i).Msg("Failed to aggregate service request") - } - } -} - -func updateSummaryTrap(ctx context.Context, org *models.Organization) { - traps, err := org.Traplocations().All(ctx, db.PGInstance.BobDB) - if err != nil { - log.Error().Err(err).Msg("Failed to get all trap locations") - return - } - if len(traps) == 0 { - log.Info().Int("org_id", int(org.ID)).Msg("No updates to perform") - return - } - - cells := make([]h3.Cell, 0) - for _, t := range traps { - if t.H3cell.IsNull() { - continue - } - cell, err := h3utils.ToCell(t.H3cell.MustGet()) - if err != nil { - log.Error().Err(err).Msg("Failed to get geometry point") - continue - } - cells = append(cells, cell) - } - for i := range 16 { - err = aggregateAtResolution(ctx, i, org.ID, enums.H3aggregationtypeTrap, cells) - if err != nil { - log.Error().Err(err).Int("resolution", i).Msg("Failed to aggregate trap") - } - } -} diff --git a/platform/background/text.go b/platform/background/text.go deleted file mode 100644 index 14c72a66..00000000 --- a/platform/background/text.go +++ /dev/null @@ -1,46 +0,0 @@ -package background - -import ( - "context" - - "github.com/Gleipnir-Technology/nidus-sync/config" - "github.com/Gleipnir-Technology/nidus-sync/platform/text" - "github.com/rs/zerolog/log" -) - -var channelJobText chan text.Job - -func ReportUserText(destination text.E164, report_id string, message string) { - //enqueueJobText(text.N -} -func ReportSubscriptionConfirmationText(destination text.E164, report_id string) { - enqueueJobText(text.NewJobReportSubscriptionConfirmation( - destination, - report_id, - *text.NewE164(&config.PhoneNumberReport), - )) -} - -func enqueueJobText(job text.Job) { - select { - case channelJobText <- job: - log.Info().Msg("Enqueued text job") - default: - log.Warn().Msg("sms job channel is full, dropping job") - } -} - -func startWorkerText(ctx context.Context, channel chan text.Job) { - go func() { - log.Debug().Msg("Text worker started") - for { - select { - case <-ctx.Done(): - log.Info().Msg("Text worker shutting down.") - return - case job := <-channel: - text.Handle(ctx, job) - } - } - }() -} diff --git a/platform/background/upload.go b/platform/background/upload.go deleted file mode 100644 index e5410a7b..00000000 --- a/platform/background/upload.go +++ /dev/null @@ -1,123 +0,0 @@ -package background - -import ( - "context" - "fmt" - - "github.com/Gleipnir-Technology/bob" - "github.com/Gleipnir-Technology/bob/dialect/psql" - "github.com/Gleipnir-Technology/bob/dialect/psql/sm" - "github.com/Gleipnir-Technology/nidus-sync/db" - "github.com/Gleipnir-Technology/nidus-sync/db/enums" - "github.com/Gleipnir-Technology/nidus-sync/platform/csv" - //"github.com/Gleipnir-Technology/nidus-sync/userfile" - //"github.com/google/uuid" - "github.com/rs/zerolog/log" - "github.com/stephenafamo/scan" -) - -type jobCSVAction = int - -const ( - jobCSVActionCommit jobCSVAction = iota - jobCSVActionImport -) - -type jobCSV struct { - action jobCSVAction - csvType enums.FileuploadCsvtype - fileID int32 -} - -var channelJobCSV chan jobCSV - -func CommitUpload(file_id int32) { - enqueueJobCSV(jobCSV{ - action: jobCSVActionCommit, - fileID: file_id, - }) -} -func ProcessUpload(file_id int32, t enums.FileuploadCsvtype) { - enqueueJobCSV(jobCSV{ - action: jobCSVActionImport, - csvType: t, - fileID: file_id, - }) -} - -func addWaitingJobsCommit(ctx context.Context) error { - return addWaitingJobsForType(ctx, enums.FileuploadFilestatustypeCommitting, jobCSVActionCommit) -} -func addWaitingJobsImport(ctx context.Context) error { - return addWaitingJobsForType(ctx, enums.FileuploadFilestatustypeUploaded, jobCSVActionImport) -} -func addWaitingJobsForType(ctx context.Context, status enums.FileuploadFilestatustype, action jobCSVAction) error { - type Row_ struct { - ID int32 `db:"id"` - Type enums.FileuploadCsvtype `db:"type"` - } - rows, err := bob.All(ctx, db.PGInstance.BobDB, psql.Select( - sm.Columns( - "file.id AS id", - "csv.type_ AS type", - ), - sm.From("fileupload.file").As("file"), - sm.InnerJoin("fileupload.csv").As("csv").OnEQ(psql.Raw("file.id"), psql.Raw("csv.file_id")), - sm.Where( - psql.Raw("file.status").EQ(psql.Arg(status)), - ), - ), scan.StructMapper[Row_]()) - - if err != nil { - return fmt.Errorf("Failed to query file uploads: %w", err) - } - for _, row := range rows { - report_id := row.ID - enqueueJobCSV(jobCSV{ - action: action, - fileID: report_id, - csvType: row.Type, - }) - } - return nil -} -func enqueueJobCSV(job jobCSV) { - select { - case channelJobCSV <- job: - log.Info().Int32("file_id", job.fileID).Msg("Enqueued csv job") - default: - log.Warn().Int32("file_id", job.fileID).Msg("csv channel is full, dropping job") - } -} -func startWorkerCSV(ctx context.Context, channelJobImport chan jobCSV) { - go func() { - for { - select { - case <-ctx.Done(): - log.Info().Msg("CSV worker shutting down.") - return - case job := <-channelJobImport: - switch job.action { - case jobCSVActionCommit: - log.Info().Int32("id", job.fileID).Msg("Processing CSV commit job") - err := csv.JobCommit(ctx, job.fileID) - if err != nil { - log.Error().Err(err).Int32("id", job.fileID).Msg("Error processing CSV file") - continue - } - case jobCSVActionImport: - log.Info().Int32("id", job.fileID).Msg("Processing CSV import job") - err := csv.JobImport(ctx, job.fileID, job.csvType) - if err != nil { - log.Error().Err(err).Int32("id", job.fileID).Msg("Error processing CSV file") - continue - } - default: - log.Error().Msg("Unrecognized job action") - return - } - log.Info().Int32("id", job.fileID).Msg("Done processing CSV job") - } - } - }() -} diff --git a/platform/csv/csv.go b/platform/csv/csv.go index 870fc9e4..4583fc03 100644 --- a/platform/csv/csv.go +++ b/platform/csv/csv.go @@ -32,12 +32,7 @@ import ( type csvParserFunc[T any] = func(context.Context, bob.Tx, *models.FileuploadFile, *models.FileuploadCSV) ([]T, error) type csvProcessorFunc[T any] = func(context.Context, bob.Tx, *models.FileuploadFile, *models.FileuploadCSV, []T) error -func JobCommit(ctx context.Context, file_id int32) error { - txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("Failed to start transaction: %w", err) - } - +func JobCommit(ctx context.Context, txn bob.Executor, file_id int32) error { file, err := models.FindFileuploadFile(ctx, txn, file_id) if err != nil { return fmt.Errorf("Failed to get csv file %d from DB: %w", file_id, err) @@ -165,12 +160,17 @@ func JobCommit(ctx context.Context, file_id int32) error { }).One(ctx, txn) */ } - txn.Commit(ctx) return nil } -func JobImport(ctx context.Context, file_id int32, type_ enums.FileuploadCsvtype) error { - var err error - switch type_ { +func JobImport(ctx context.Context, txn bob.Executor, file_id int32) error { + csv, err := models.FileuploadCSVS.Query( + models.SelectWhere.FileuploadCSVS.FileID.EQ(file_id), + ).One(ctx, txn) + if err != nil { + return fmt.Errorf("find csv: %w", err) + } + + switch csv.Type { case enums.FileuploadCsvtypePoollist: err = importCSV(ctx, file_id, parseCSVPoollist, processCSVPoollist) case enums.FileuploadCsvtypeFlyover: diff --git a/platform/csv/pool.go b/platform/csv/pool.go index fd4f3d5f..49d0148b 100644 --- a/platform/csv/pool.go +++ b/platform/csv/pool.go @@ -257,7 +257,7 @@ func parseCSVPoollist(ctx context.Context, txn bob.Tx, f *models.FileuploadFile, continue } text.EnsureInDB(ctx, txn, *phone) - setter.PropertyOwnerPhoneE164 = omitnull.From(text.PhoneString(*phone)) + setter.PropertyOwnerPhoneE164 = omitnull.From(phone.PhoneString()) case headerPoolResidentOwned: boolValue, err := parseBool(col) if err != nil { @@ -272,7 +272,7 @@ func parseCSVPoollist(ctx context.Context, txn bob.Tx, f *models.FileuploadFile, continue } text.EnsureInDB(ctx, txn, *phone) - setter.ResidentPhoneE164 = omitnull.From(text.PhoneString(*phone)) + setter.ResidentPhoneE164 = omitnull.From(phone.PhoneString()) case headerPoolTag: tags[header_names[i]] = col } diff --git a/platform/email/email.go b/platform/email/email.go index cadc4c57..b3531f14 100644 --- a/platform/email/email.go +++ b/platform/email/email.go @@ -9,9 +9,13 @@ import ( "strings" "time" + "github.com/Gleipnir-Technology/bob" + "github.com/Gleipnir-Technology/nidus-sync/comms/email" + "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/Gleipnir-Technology/nidus-sync/db/models" + "github.com/Gleipnir-Technology/nidus-sync/platform/background" "github.com/aarondl/opt/omit" "github.com/aarondl/opt/omitnull" "github.com/rs/zerolog/log" @@ -40,7 +44,7 @@ func EnsureInDB(ctx context.Context, destination string) (err error) { return nil } -func insertEmailLog(ctx context.Context, data map[string]string, destination string, public_id string, source string, subject string, template_id int32) (err error) { +func insertEmailLog(ctx context.Context, data map[string]string, destination string, public_id string, source string, subject string, template_id int32) (email_id *int32, err error) { data_for_insert := db.ConvertToPGData(data) var type_ enums.CommsMessagetypeemail switch template_id { @@ -49,9 +53,9 @@ func insertEmailLog(ctx context.Context, data map[string]string, destination str case templateInitialID: type_ = enums.CommsMessagetypeemailInitialContact default: - return fmt.Errorf("Unrecognized template ID %d", template_id) + return nil, fmt.Errorf("Unrecognized template ID %d", template_id) } - _, err = models.CommsEmailLogs.Insert(&models.CommsEmailLogSetter{ + e, err := models.CommsEmailLogs.Insert(&models.CommsEmailLogSetter{ //ID: Created: omit.From(time.Now()), DeliveryStatus: omit.From("initial"), @@ -64,10 +68,12 @@ func insertEmailLog(ctx context.Context, data map[string]string, destination str TemplateData: omit.From(data_for_insert), Type: omit.From(type_), }).One(ctx, db.PGInstance.BobDB) - - return err + if err != nil { + return nil, fmt.Errorf("insern email log: %w", err) + } + return &e.ID, nil } -func generatePublicId(t enums.CommsMessagetypeemail, m map[string]string) string { +func generatePublicId(template int32, m map[string]string) string { if m == nil || len(m) == 0 { // Return hash of empty string for empty maps emptyHash := sha256.Sum256([]byte("")) @@ -84,7 +90,7 @@ func generatePublicId(t enums.CommsMessagetypeemail, m map[string]string) string // Build a string with all key-value pairs var sb strings.Builder // Add type first - sb.WriteString(fmt.Sprintf("type:%s,", t)) + sb.WriteString(fmt.Sprintf("template:%d,", template)) for _, k := range keys { sb.WriteString(k) sb.WriteString(":") // Separator between key and value @@ -100,3 +106,36 @@ func generatePublicId(t enums.CommsMessagetypeemail, m map[string]string) string // Convert to hex string and return return hex.EncodeToString(hashBytes) } +func sendEmailBegin(ctx context.Context, source string, destination string, template int32, subject string, data map[string]string) error { + public_id := generatePublicId(template, data) + data["URLViewInBrowser"] = urlEmailInBrowser(public_id) + + e, err := insertEmailLog(ctx, data, destination, public_id, config.ForwardEmailRMOAddress, subject, template) + if err != nil { + return fmt.Errorf("Failed to store email log: %w", err) + } + return background.NewEmailSend(ctx, db.PGInstance.BobDB, *e) +} +func sendEmailComplete(ctx context.Context, txn bob.Executor, email_id int32) error { + email_log, err := models.FindCommsEmailLog(ctx, txn, email_id) + if err != nil { + return fmt.Errorf("find email: %w", err) + } + data := db.ConvertFromPGData(email_log.TemplateData) + text, html, err := renderEmailTemplates(email_log.TemplateID, data) + if err != nil { + return fmt.Errorf("Failed to render email report notification template: %w", err) + } + resp, err := email.Send(ctx, email.Request{ + From: config.ForwardEmailRMOAddress, + HTML: html, + Subject: email_log.Subject, + Text: text, + To: email_log.Destination, + }) + if err != nil { + return fmt.Errorf("Failed to send email %d: %w", email_log.ID, err) + } + log.Info().Str("response id", resp.ID).Int32("email id", email_log.ID).Msg("Sent email") + return nil +} diff --git a/platform/email/initial.go b/platform/email/initial.go index 1e480880..eaa594b7 100644 --- a/platform/email/initial.go +++ b/platform/email/initial.go @@ -4,12 +4,10 @@ import ( "context" "fmt" - "github.com/Gleipnir-Technology/nidus-sync/comms/email" "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db" - "github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/Gleipnir-Technology/nidus-sync/db/models" - "github.com/rs/zerolog/log" + //"github.com/rs/zerolog/log" ) type contentEmailInitial struct { @@ -18,14 +16,6 @@ type contentEmailInitial struct { URLSubscribe string } -type jobInitial struct { - base jobEmailBase -} - -func (job jobInitial) Destination() string { - return job.base.destination -} - func maybeSendInitialEmail(ctx context.Context, destination string) error { err := EnsureInDB(ctx, destination) if err != nil { @@ -59,30 +49,10 @@ func sendEmailInitialContact(ctx context.Context, destination string) error { data["URLSubscribe"] = config.MakeURLReport("/email/confirm?email=%s", destination) data["URLUnsubscribe"] = urlUnsubscribe(destination) - public_id := generatePublicId(enums.CommsMessagetypeemailInitialContact, data) - data["URLBrowser"] = urlEmailInBrowser(public_id) - - text, html, err := renderEmailTemplates(templateInitialID, data) - if err != nil { - return fmt.Errorf("Failed to render email temlates: %w", err) - } - subject := "Welcome" - err = insertEmailLog(ctx, data, destination, public_id, source, subject, templateInitialID) + err := sendEmailBegin(ctx, source, destination, templateInitialID, subject, data) if err != nil { - return fmt.Errorf("Failed to store email log: %w", err) + return fmt.Errorf("Failed to send initial email to %s: %w", err) } - resp, err := email.Send(ctx, email.Request{ - From: source, - HTML: html, - Subject: subject, - Text: text, - To: destination, - }) - - if err != nil { - return fmt.Errorf("Failed to send email to %s: %w", err) - } - log.Info().Str("id", resp.ID).Str("to", destination).Msg("Sent initial contact email") return nil } diff --git a/platform/email/job.go b/platform/email/job.go index 73f9f313..bfb93151 100644 --- a/platform/email/job.go +++ b/platform/email/job.go @@ -2,40 +2,11 @@ package email import ( "context" - "errors" - "fmt" - "github.com/Gleipnir-Technology/nidus-sync/db/enums" - "github.com/rs/zerolog/log" + "github.com/Gleipnir-Technology/bob" + //"github.com/rs/zerolog/log" ) -type Job interface { - destination() string - messageType() enums.CommsMessagetypeemail - renderHTML() (string, error) - renderTXT() (string, error) - subject() string -} - -type jobEmailBase struct { - destination string - source string -} - -func Handle(ctx context.Context, job Job) error { - var err error - log.Debug().Str("dest", job.destination()).Str("type", string(job.messageType())).Msg("Handling email job") - switch job.messageType() { - case enums.CommsMessagetypeemailReportSubscriptionConfirmation: - return errors.New("ReportSubscription has been deprecated.") - case enums.CommsMessagetypeemailReportNotificationConfirmation: - err = sendEmailReportConfirmation(ctx, job) - default: - return errors.New("not implemented") - } - if err != nil { - log.Error().Err(err).Str("dest", job.destination()).Str("type", string(job.messageType())).Msg("Error processing email") - return fmt.Errorf("Failed to handle email: %w", err) - } - return nil +func Job(ctx context.Context, txn bob.Executor, email_id int32) error { + return sendEmailComplete(ctx, txn, email_id) } diff --git a/platform/email/report_notification_confirmation.go b/platform/email/report_notification_confirmation.go index 2d8c82bc..bc0e8a5b 100644 --- a/platform/email/report_notification_confirmation.go +++ b/platform/email/report_notification_confirmation.go @@ -4,10 +4,8 @@ import ( "context" "fmt" - "github.com/Gleipnir-Technology/nidus-sync/comms/email" "github.com/Gleipnir-Technology/nidus-sync/config" - "github.com/Gleipnir-Technology/nidus-sync/db/enums" - "github.com/rs/zerolog/log" + //"github.com/rs/zerolog/log" ) type contentEmailReportConfirmation struct { @@ -15,80 +13,25 @@ type contentEmailReportConfirmation struct { URLReportStatus string } -func NewJobReportNotificationConfirmation(destination, report_id string) Job { - return jobEmailReportNotificationConfirmation{ - dest: destination, - reportID: report_id, - } -} - -type jobEmailReportNotificationConfirmation struct { - dest string - reportID string -} - -func (job jobEmailReportNotificationConfirmation) destination() string { - return job.dest -} -func (job jobEmailReportNotificationConfirmation) messageType() enums.CommsMessagetypeemail { - return enums.CommsMessagetypeemailReportNotificationConfirmation -} -func (job jobEmailReportNotificationConfirmation) renderHTML() (string, error) { - _ = newContentEmailNotificationConfirmation(job) - return "", nil -} -func (job jobEmailReportNotificationConfirmation) renderTXT() (string, error) { - return "fake txt", nil -} -func (job jobEmailReportNotificationConfirmation) subject() string { - return "" -} - -func sendEmailReportConfirmation(ctx context.Context, job Job) error { - j, ok := job.(jobEmailReportNotificationConfirmation) - if !ok { - return fmt.Errorf("job is not for report subscription confirmation") - } - err := maybeSendInitialEmail(ctx, j.destination()) +func SendReportConfirmation(ctx context.Context, report_id, destination string) error { + err := maybeSendInitialEmail(ctx, destination) if err != nil { return fmt.Errorf("Failed to handle initial email: %w", err) } data := make(map[string]string, 0) - data["report_id"] = j.reportID - report_id_str := publicReportID(j.reportID) + data["report_id"] = report_id + report_id_str := publicReportID(report_id) data["ReportIDStr"] = report_id_str data["URLLogo"] = config.MakeURLReport("/static/img/nidus-logo-no-lettering-64.png") - data["URLReportStatus"] = config.MakeURLReport("/status/%s", j.reportID) - data["URLReportUnsubscribe"] = config.MakeURLReport("/email/unsubscribe/report/%s", j.reportID) - data["URLUnsubscribe"] = urlUnsubscribe(j.destination()) + data["URLReportStatus"] = config.MakeURLReport("/status/%s", report_id) + data["URLReportUnsubscribe"] = config.MakeURLReport("/email/unsubscribe/report/%s", report_id) + data["URLUnsubscribe"] = urlUnsubscribe(destination) - public_id := generatePublicId(enums.CommsMessagetypeemailReportNotificationConfirmation, data) - data["URLViewInBrowser"] = urlEmailInBrowser(public_id) - - text, html, err := renderEmailTemplates(templateReportNotificationConfirmationID, data) - if err != nil { - return fmt.Errorf("Failed to render email report notification template: %w", err) - } subject := fmt.Sprintf("Mosquito Report Submission - %s", report_id_str) - err = insertEmailLog(ctx, data, j.destination(), public_id, config.ForwardEmailRMOAddress, subject, templateReportNotificationConfirmationID) - if err != nil { - return fmt.Errorf("Failed to store email log: %w", err) - } - resp, err := email.Send(ctx, email.Request{ - From: config.ForwardEmailRMOAddress, - HTML: html, - Subject: subject, - Text: text, - To: j.destination(), - }) - if err != nil { - return fmt.Errorf("Failed to send email report confirmation to %s for report %s: %w", j.dest, j.reportID, err) - } - log.Info().Str("id", resp.ID).Str("dest", j.dest).Str("report_id", j.reportID).Msg("Sent report confirmation email") - return nil + return sendEmailBegin(ctx, config.ForwardEmailRMOAddress, destination, templateReportNotificationConfirmationID, subject, data) } -func newContentEmailNotificationConfirmation(job jobEmailReportNotificationConfirmation) (result contentEmailReportConfirmation) { - result.URLReportStatus = config.MakeURLReport("/status/%s", job.reportID) +func newContentEmailNotificationConfirmation(report_id string) (result contentEmailReportConfirmation) { + result.URLReportStatus = config.MakeURLReport("/status/%s", report_id) return result } diff --git a/platform/email/template/initial-contact.html b/platform/email/template/initial-contact.html index 5b53946d..7ba972a1 100644 --- a/platform/email/template/initial-contact.html +++ b/platform/email/template/initial-contact.html @@ -67,7 +67,7 @@ {{ if not .IsBrowser }}
Email not displaying correctly? - View it in your browser + View it in your browser
{{ end }} diff --git a/platform/geocode/geocode.go b/platform/geocode/geocode.go index 33da4623..904b8630 100644 --- a/platform/geocode/geocode.go +++ b/platform/geocode/geocode.go @@ -32,7 +32,7 @@ func InitializeStadia(key string) { } // Ensure the provided address exists. If it doesn't add it to the database. -func EnsureAddress(ctx context.Context, txn bob.Tx, a types.Address, l types.Location) (*models.Address, error) { +func EnsureAddress(ctx context.Context, txn bob.Executor, a types.Address, l types.Location) (*models.Address, error) { address, err := models.Addresses.Query( models.SelectWhere.Addresses.Country.EQ(a.CountryEnum()), models.SelectWhere.Addresses.Locality.EQ(a.Locality), @@ -90,7 +90,7 @@ func EnsureAddress(ctx context.Context, txn bob.Tx, a types.Address, l types.Loc // Either get an address that matches, or create a new address. Either way, return an address // This will make a call to a structured geocode service, so it's slow. -func EnsureAddressWithGeocode(ctx context.Context, txn bob.Tx, org *models.Organization, a types.Address) (*models.Address, error) { +func EnsureAddressWithGeocode(ctx context.Context, txn bob.Executor, org *models.Organization, a types.Address) (*models.Address, error) { address, err := models.Addresses.Query( models.SelectWhere.Addresses.Country.EQ(a.CountryEnum()), models.SelectWhere.Addresses.Locality.EQ(a.Locality), @@ -237,7 +237,7 @@ func toGeocodeResult(resp stadia.GeocodeResponse, address_msg string) (*GeocodeR } // Get the parcel for a given address, if one can be found -func GetParcel(ctx context.Context, txn bob.Tx, a *models.Address) (*models.Parcel, error) { +func GetParcel(ctx context.Context, txn bob.Executor, a *models.Address) (*models.Parcel, error) { result, err := models.Parcels.Query( sm.InnerJoin("address").On(psql.F("ST_Contains", psql.Raw("parcel.geometry"), psql.Raw("address.location"))), models.SelectWhere.Addresses.ID.EQ(a.ID), diff --git a/platform/background/label_studio.go b/platform/label_studio.go similarity index 65% rename from platform/background/label_studio.go rename to platform/label_studio.go index 7fc8d757..43e841b9 100644 --- a/platform/background/label_studio.go +++ b/platform/label_studio.go @@ -1,4 +1,4 @@ -package background +package platform import ( "context" @@ -8,66 +8,38 @@ import ( "log" "os" + "github.com/Gleipnir-Technology/bob" "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/label-studio" "github.com/Gleipnir-Technology/nidus-sync/minio" - "github.com/google/uuid" + //"github.com/google/uuid" ) -type jobLabelStudio struct { - UUID uuid.UUID -} +var labelStudioClient *labelstudio.Client +var labelStudioProject *labelstudio.Project +var minioClient *minio.Client -var channelJobLabelStudio chan jobLabelStudio - -func enqueueLabelStudioJob(job jobLabelStudio) { - select { - case channelJobLabelStudio <- job: - log.Printf("Enqueued label job for UUID: %s", job.UUID) - default: - log.Printf("Label job channel is full, dropping job for UUID: %s", job.UUID) - } -} - -func StartLabelStudioWorker(ctx context.Context) error { +func initializeLabelStudio() error { // Initialize the minio client - minioBucket := os.Getenv("S3_BUCKET") + //minioBucket := os.Getenv("S3_BUCKET") - labelStudioClient, err := createLabelStudioClient() + var err error + labelStudioClient, err = createLabelStudioClient() if err != nil { - return fmt.Errorf("Failed to create label studio client: %v", err) + return fmt.Errorf("Failed to create label studio client: %w", err) } // Get the project we are going to upload to - project, err := findLabelStudioProject(labelStudioClient, "Nidus Speech-to-Text Transcriptions") + labelStudioProject, err = findLabelStudioProject(labelStudioClient, "Nidus Speech-to-Text Transcriptions") if err != nil { - return errors.New(fmt.Sprintf("Failed to find the label studio project")) + return fmt.Errorf("Failed to find the label studio project: %w", err) } - minioClient, err := createMinioClient() + minioClient, err = createMinioClient() if err != nil { - return fmt.Errorf("Failed to create minio client: %v", err) + return fmt.Errorf("Failed to create minio client: %w", err) } - buffer := 100 - channelJobLabelStudio = make(chan jobLabelStudio, buffer) // Buffered channel to prevent blocking - log.Printf("Started label studio worker with buffer depth %d", buffer) - go func() { - for { - select { - case <-ctx.Done(): - log.Println("Audio worker shutting down.") - return - case job := <-channelJobLabelStudio: - log.Printf("Processing label job for UUID: %s", job.UUID) - err := processLabelTask(ctx, minioClient, minioBucket, labelStudioClient, project, job) - if err != nil { - log.Printf("Error processing label job for audio file %s: %v", job.UUID, err) - } - } - } - }() return nil } - func createMinioClient() (*minio.Client, error) { baseUrl := os.Getenv("S3_BASE_URL") accessKeyID := os.Getenv("S3_ACCESS_KEY_ID") @@ -80,7 +52,6 @@ func createMinioClient() (*minio.Client, error) { log.Println("Created minio client") return client, err } - func createLabelStudioClient() (*labelstudio.Client, error) { // Initialize the client with your Label Studio base URL and API key labelStudioApiKey := os.Getenv("LABEL_STUDIO_API_KEY") @@ -100,33 +71,36 @@ func createLabelStudioClient() (*labelstudio.Client, error) { func noteAudioGetLatest(ctx context.Context, uuid string) (*models.NoteAudio, error) { return nil, nil } -func processLabelTask(ctx context.Context, minioClient *minio.Client, minioBucket string, labelStudioClient *labelstudio.Client, project *labelstudio.Project, job jobLabelStudio) error { - customer := os.Getenv("CUSTOMER") - if customer == "" { - return errors.New("You must specify a CUSTOMER env var") - } - note, err := noteAudioGetLatest(ctx, job.UUID.String()) - if err != nil { - return errors.New(fmt.Sprintf("Failed to get note %s", note.UUID)) - } +func jobLabelStudioAudioCreate(ctx context.Context, txn bob.Executor, row_id int32) error { + return fmt.Errorf("label studio integration has been disabled") + /* + customer := os.Getenv("CUSTOMER") + if customer == "" { + return errors.New("You must specify a CUSTOMER env var") + } + note, err := noteAudioGetLatest(ctx, job.UUID.String()) + if err != nil { + return errors.New(fmt.Sprintf("Failed to get note %s", note.UUID)) + } - if note.Version != 1 { - return errors.New(fmt.Sprintf("Got version %d of %s", note.Version, note.UUID)) - } - task, err := findMatchingTask(labelStudioClient, project, customer, note) - if err != nil { - return errors.New(fmt.Sprintf("Failed to search for a task: %v", err)) - } - // We already have a task, nothing to do. - if task != nil { + if note.Version != 1 { + return errors.New(fmt.Sprintf("Got version %d of %s", note.Version, note.UUID)) + } + task, err := findMatchingTask(labelStudioClient, project, customer, note) + if err != nil { + return errors.New(fmt.Sprintf("Failed to search for a task: %v", err)) + } + // We already have a task, nothing to do. + if task != nil { + return nil + } + + err = createTask(labelStudioClient, project, minioClient, minioBucket, customer, note) + if err != nil { + return errors.New(fmt.Sprintf("Failed to create a task: %v", err)) + } return nil - } - - err = createTask(labelStudioClient, project, minioClient, minioBucket, customer, note) - if err != nil { - return errors.New(fmt.Sprintf("Failed to create a task: %v", err)) - } - return nil + */ } func createTask(client *labelstudio.Client, project *labelstudio.Project, minioClient *minio.Client, bucket string, customer string, note *models.NoteAudio) error { diff --git a/platform/note.go b/platform/note.go index c4ca3c48..906d9eb8 100644 --- a/platform/note.go +++ b/platform/note.go @@ -2,6 +2,7 @@ package platform import ( "context" + "fmt" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/models" @@ -10,16 +11,14 @@ import ( ) func NoteAudioCreate(ctx context.Context, user User, setter models.NoteAudioSetter) error { - err := user.Organization.model.InsertNoteAudios(ctx, db.PGInstance.BobDB, &setter) - if err == nil { - return nil + _, err := models.Organizations.Insert(&setter).One(ctx, db.PGInstance.BobDB) + if err != nil { + // Just ignore this failure, it means we already have this content + if err.Error() != "insertOrganizationNoteAudios0: ERROR: duplicate key value violates unique constraint \"note_audio_pkey\" (SQLSTATE 23505)" { + return fmt.Errorf("create note_audio: %w", err) + } } - // Just ignore this failure, it means we already have this content - if err.Error() == "insertOrganizationNoteAudios0: ERROR: duplicate key value violates unique constraint \"note_audio_pkey\" (SQLSTATE 23505)" { - return nil - } - log.Warn().Err(err).Msg("Unrecognized error creating note audio") - return err + return nil } func NoteAudioNormalized(uuid string) error { diff --git a/platform/oauth.go b/platform/oauth.go index c8f88b71..0f1b1b8d 100644 --- a/platform/oauth.go +++ b/platform/oauth.go @@ -10,7 +10,6 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/models" - "github.com/Gleipnir-Technology/nidus-sync/platform/background" "github.com/Gleipnir-Technology/nidus-sync/platform/oauth" "github.com/aarondl/opt/omit" "github.com/aarondl/opt/omitnull" @@ -72,6 +71,6 @@ func HandleOauthAccessCode(ctx context.Context, user User, code string) error { if err != nil { return fmt.Errorf("Failed to save token to database: %w", err) } - go background.UpdateArcgisUserData(context.Background(), user.model, oauth) + go updateArcgisUserData(context.Background(), user.model, oauth) return nil } diff --git a/platform/organization.go b/platform/organization.go index 47b510f1..27921868 100644 --- a/platform/organization.go +++ b/platform/organization.go @@ -7,7 +7,6 @@ import ( "github.com/Gleipnir-Technology/bob/dialect/psql/sm" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/models" - "github.com/Gleipnir-Technology/nidus-sync/platform/background" //"github.com/google/uuid" ) @@ -54,7 +53,7 @@ func (o Organization) ID() int32 { return o.model.ID } func (o Organization) IsSyncOngoing() bool { - return background.IsSyncOngoing(o.ID()) + return IsSyncOngoing(o.ID()) } func (o Organization) FieldseekerSyncLatest(ctx context.Context) (*models.FieldseekerSync, error) { sync, err := o.model.FieldseekerSyncs(sm.OrderBy("created").Desc()).One(ctx, db.PGInstance.BobDB) diff --git a/platform/report/notification.go b/platform/report/notification.go index bec7aaa6..c3f0a127 100644 --- a/platform/report/notification.go +++ b/platform/report/notification.go @@ -10,17 +10,14 @@ import ( "time" "github.com/Gleipnir-Technology/bob" - "github.com/aarondl/opt/omit" - "github.com/aarondl/opt/omitnull" - //"github.com/Gleipnir-Technology/bob/dialect/psql" - //"github.com/Gleipnir-Technology/bob/dialect/psql/sm" - //"github.com/Gleipnir-Technology/bob/dialect/psql/um" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/db/sql" - "github.com/Gleipnir-Technology/nidus-sync/platform/background" "github.com/Gleipnir-Technology/nidus-sync/platform/email" "github.com/Gleipnir-Technology/nidus-sync/platform/text" + "github.com/Gleipnir-Technology/nidus-sync/platform/types" + "github.com/aarondl/opt/omit" + "github.com/aarondl/opt/omitnull" "github.com/rs/zerolog/log" //"github.com/stephenafamo/scan" ) @@ -66,7 +63,7 @@ func GenerateReportID() (string, error) { return builder.String(), nil } -func RegisterNotificationEmail(ctx context.Context, txn bob.Tx, report_id string, destination string) *ErrorWithCode { +func RegisterNotificationEmail(ctx context.Context, txn bob.Executor, report_id string, destination string) *ErrorWithCode { some_report, err := findSomeReport(ctx, report_id) if err != nil { return err @@ -79,11 +76,11 @@ func RegisterNotificationEmail(ctx context.Context, txn bob.Tx, report_id string if err != nil { return err } - background.ReportSubscriptionConfirmationEmail(destination, report_id) + email.SendReportConfirmation(ctx, destination, report_id) return nil } -func RegisterNotificationPhone(ctx context.Context, txn bob.Tx, report_id string, phone text.E164) *ErrorWithCode { +func RegisterNotificationPhone(ctx context.Context, txn bob.Executor, report_id string, phone types.E164) *ErrorWithCode { some_report, err := findSomeReport(ctx, report_id) if err != nil { return err @@ -96,11 +93,11 @@ func RegisterNotificationPhone(ctx context.Context, txn bob.Tx, report_id string if err != nil { return err } - background.ReportSubscriptionConfirmationText(phone, report_id) + text.ReportSubscriptionConfirmationText(ctx, phone, report_id) return nil } -func RegisterSubscriptionEmail(ctx context.Context, txn bob.Tx, destination string) *ErrorWithCode { +func RegisterSubscriptionEmail(ctx context.Context, txn bob.Executor, destination string) *ErrorWithCode { e := email.EnsureInDB(ctx, destination) if e != nil { return newInternalError(e, "Failed to ensure email is in DB") @@ -119,7 +116,7 @@ func RegisterSubscriptionEmail(ctx context.Context, txn bob.Tx, destination stri return nil } -func RegisterSubscriptionPhone(ctx context.Context, txn bob.Tx, phone text.E164) *ErrorWithCode { +func RegisterSubscriptionPhone(ctx context.Context, txn bob.Executor, phone types.E164) *ErrorWithCode { e := text.EnsureInDB(ctx, db.PGInstance.BobDB, phone) if e != nil { return newInternalError(e, "Failed to ensure phone is in DB") @@ -128,7 +125,7 @@ func RegisterSubscriptionPhone(ctx context.Context, txn bob.Tx, phone text.E164) Created: omit.From(time.Now()), Deleted: omitnull.FromPtr[time.Time](nil), //DistrictID: omitnull.FromPtr[int32](nil), - PhoneE164: omit.From(text.PhoneString(phone)), + PhoneE164: omit.From(phone.PhoneString()), } _, err := models.PublicreportSubscribePhones.Insert(&setter).Exec(ctx, txn) if err != nil { @@ -138,7 +135,7 @@ func RegisterSubscriptionPhone(ctx context.Context, txn bob.Tx, phone text.E164) return nil } -func SaveReporter(ctx context.Context, txn bob.Tx, report_id string, name string, email string, phone *text.E164, has_consent bool) *ErrorWithCode { +func SaveReporter(ctx context.Context, txn bob.Executor, report_id string, name string, email string, phone *types.E164, has_consent bool) *ErrorWithCode { some_report, err := findSomeReport(ctx, report_id) if err != nil { return err diff --git a/platform/report/report_nuisance.go b/platform/report/report_nuisance.go index 9547efbd..e26f589c 100644 --- a/platform/report/report_nuisance.go +++ b/platform/report/report_nuisance.go @@ -19,7 +19,7 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/models" //"github.com/Gleipnir-Technology/nidus-sync/db/sql" - "github.com/Gleipnir-Technology/nidus-sync/platform/text" + "github.com/Gleipnir-Technology/nidus-sync/platform/types" "github.com/rs/zerolog/log" "github.com/stephenafamo/scan" ) @@ -33,7 +33,7 @@ type Nuisance struct { func (sr Nuisance) PublicReportID() string { return sr.publicReportID } -func (sr Nuisance) addNotificationEmail(ctx context.Context, txn bob.Tx, email string) *ErrorWithCode { +func (sr Nuisance) addNotificationEmail(ctx context.Context, txn bob.Executor, email string) *ErrorWithCode { setter := models.PublicreportNotifyEmailNuisanceSetter{ Created: omit.From(time.Now()), Deleted: omitnull.FromPtr[time.Time](nil), @@ -46,13 +46,13 @@ func (sr Nuisance) addNotificationEmail(ctx context.Context, txn bob.Tx, email s } return nil } -func (sr Nuisance) addNotificationPhone(ctx context.Context, txn bob.Tx, phone text.E164) *ErrorWithCode { +func (sr Nuisance) addNotificationPhone(ctx context.Context, txn bob.Executor, phone types.E164) *ErrorWithCode { var err error setter := models.PublicreportNotifyPhoneNuisanceSetter{ Created: omit.From(time.Now()), Deleted: omitnull.FromPtr[time.Time](nil), NuisanceID: omit.From(sr.id), - PhoneE164: omit.From(text.PhoneString(phone)), + PhoneE164: omit.From(phone.PhoneString()), } _, err = models.PublicreportNotifyPhoneNuisances.Insert(&setter).Exec(ctx, txn) if err != nil { @@ -78,12 +78,12 @@ func (sr Nuisance) districtID(ctx context.Context) *int32 { func (sr Nuisance) reportID() int32 { return sr.id } -func (sr Nuisance) updateReporterConsent(ctx context.Context, txn bob.Tx, has_consent bool) *ErrorWithCode { +func (sr Nuisance) updateReporterConsent(ctx context.Context, txn bob.Executor, has_consent bool) *ErrorWithCode { return sr.updateReportCol(ctx, txn, &models.PublicreportNuisanceSetter{ ReporterContactConsent: omitnull.From(has_consent), }) } -func (sr Nuisance) updateReportCol(ctx context.Context, txn bob.Tx, setter *models.PublicreportNuisanceSetter) *ErrorWithCode { +func (sr Nuisance) updateReportCol(ctx context.Context, txn bob.Executor, setter *models.PublicreportNuisanceSetter) *ErrorWithCode { err := sr.row.Update(ctx, txn, setter) if err != nil { log.Error().Err(err).Str("public_id", sr.publicReportID).Int32("report_id", sr.id).Msg("Failed to update report") @@ -91,20 +91,20 @@ func (sr Nuisance) updateReportCol(ctx context.Context, txn bob.Tx, setter *mode } return nil } -func (sr Nuisance) updateReporterEmail(ctx context.Context, txn bob.Tx, email string) *ErrorWithCode { +func (sr Nuisance) updateReporterEmail(ctx context.Context, txn bob.Executor, email string) *ErrorWithCode { return sr.updateReportCol(ctx, txn, &models.PublicreportNuisanceSetter{ ReporterEmail: omitnull.From(email), }) } -func (sr Nuisance) updateReporterName(ctx context.Context, txn bob.Tx, name string) *ErrorWithCode { +func (sr Nuisance) updateReporterName(ctx context.Context, txn bob.Executor, name string) *ErrorWithCode { return sr.updateReportCol(ctx, txn, &models.PublicreportNuisanceSetter{ ReporterName: omitnull.From(name), }) } -func (sr Nuisance) updateReporterPhone(ctx context.Context, txn bob.Tx, phone text.E164) *ErrorWithCode { +func (sr Nuisance) updateReporterPhone(ctx context.Context, txn bob.Executor, phone types.E164) *ErrorWithCode { result, err := psql.Update( um.Table("publicreport.nuisance"), - um.SetCol("reporter_phone").ToArg(text.PhoneString(phone)), + um.SetCol("reporter_phone").ToArg(phone.PhoneString()), um.Where(psql.Quote("public_id").EQ(psql.Arg(sr.publicReportID))), ).Exec(ctx, txn) if err != nil { diff --git a/platform/report/report_water.go b/platform/report/report_water.go index 63030ed0..2f2f1550 100644 --- a/platform/report/report_water.go +++ b/platform/report/report_water.go @@ -14,7 +14,7 @@ import ( "github.com/Gleipnir-Technology/bob/dialect/psql/sm" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/models" - "github.com/Gleipnir-Technology/nidus-sync/platform/text" + "github.com/Gleipnir-Technology/nidus-sync/platform/types" "github.com/aarondl/opt/omit" "github.com/aarondl/opt/omitnull" "github.com/rs/zerolog/log" @@ -30,7 +30,7 @@ type Water struct { func (sr Water) PublicReportID() string { return sr.publicReportID } -func (sr Water) addNotificationEmail(ctx context.Context, txn bob.Tx, email string) *ErrorWithCode { +func (sr Water) addNotificationEmail(ctx context.Context, txn bob.Executor, email string) *ErrorWithCode { setter := models.PublicreportNotifyEmailWaterSetter{ Created: omit.From(time.Now()), Deleted: omitnull.FromPtr[time.Time](nil), @@ -44,11 +44,11 @@ func (sr Water) addNotificationEmail(ctx context.Context, txn bob.Tx, email stri } return nil } -func (sr Water) addNotificationPhone(ctx context.Context, txn bob.Tx, phone text.E164) *ErrorWithCode { +func (sr Water) addNotificationPhone(ctx context.Context, txn bob.Executor, phone types.E164) *ErrorWithCode { setter := models.PublicreportNotifyPhoneWaterSetter{ Created: omit.From(time.Now()), Deleted: omitnull.FromPtr[time.Time](nil), - PhoneE164: omit.From(text.PhoneString(phone)), + PhoneE164: omit.From(phone.PhoneString()), WaterID: omit.From(sr.id), } _, err := models.PublicreportNotifyPhoneWaters.Insert(&setter).Exec(ctx, txn) @@ -77,22 +77,22 @@ func (sr Water) districtID(ctx context.Context) *int32 { func (sr Water) reportID() int32 { return sr.id } -func (sr Water) updateReporterConsent(ctx context.Context, txn bob.Tx, has_consent bool) *ErrorWithCode { +func (sr Water) updateReporterConsent(ctx context.Context, txn bob.Executor, has_consent bool) *ErrorWithCode { return sr.updateReportCol(ctx, txn, &models.PublicreportWaterSetter{ ReporterContactConsent: omitnull.From(has_consent), }) } -func (sr Water) updateReporterEmail(ctx context.Context, txn bob.Tx, email string) *ErrorWithCode { +func (sr Water) updateReporterEmail(ctx context.Context, txn bob.Executor, email string) *ErrorWithCode { return sr.updateReportCol(ctx, txn, &models.PublicreportWaterSetter{ ReporterEmail: omit.From(email), }) } -func (sr Water) updateReporterName(ctx context.Context, txn bob.Tx, name string) *ErrorWithCode { +func (sr Water) updateReporterName(ctx context.Context, txn bob.Executor, name string) *ErrorWithCode { return sr.updateReportCol(ctx, txn, &models.PublicreportWaterSetter{ ReporterName: omit.From(name), }) } -func (sr Water) updateReportCol(ctx context.Context, txn bob.Tx, setter *models.PublicreportWaterSetter) *ErrorWithCode { +func (sr Water) updateReportCol(ctx context.Context, txn bob.Executor, setter *models.PublicreportWaterSetter) *ErrorWithCode { err := sr.row.Update(ctx, txn, setter) if err != nil { log.Error().Err(err).Str("public_id", sr.publicReportID).Int32("report_id", sr.id).Msg("Failed to update report") @@ -100,9 +100,9 @@ func (sr Water) updateReportCol(ctx context.Context, txn bob.Tx, setter *models. } return nil } -func (sr Water) updateReporterPhone(ctx context.Context, txn bob.Tx, phone text.E164) *ErrorWithCode { +func (sr Water) updateReporterPhone(ctx context.Context, txn bob.Executor, phone types.E164) *ErrorWithCode { return sr.updateReportCol(ctx, txn, &models.PublicreportWaterSetter{ - ReporterPhone: omit.From(text.PhoneString(phone)), + ReporterPhone: omit.From(phone.PhoneString()), }) } func newWater(ctx context.Context, public_id string, report_id int32) (Water, *ErrorWithCode) { diff --git a/platform/report/some_report.go b/platform/report/some_report.go index bd46ee20..8263dea1 100644 --- a/platform/report/some_report.go +++ b/platform/report/some_report.go @@ -19,19 +19,19 @@ import ( //"github.com/Gleipnir-Technology/nidus-sync/db" //"github.com/Gleipnir-Technology/nidus-sync/db/models" //"github.com/Gleipnir-Technology/nidus-sync/db/sql" - "github.com/Gleipnir-Technology/nidus-sync/platform/text" + "github.com/Gleipnir-Technology/nidus-sync/platform/types" //"github.com/rs/zerolog/log" //"github.com/stephenafamo/scan" ) type SomeReport interface { - addNotificationEmail(context.Context, bob.Tx, string) *ErrorWithCode - addNotificationPhone(context.Context, bob.Tx, text.E164) *ErrorWithCode + addNotificationEmail(context.Context, bob.Executor, string) *ErrorWithCode + addNotificationPhone(context.Context, bob.Executor, types.E164) *ErrorWithCode districtID(context.Context) *int32 - updateReporterConsent(context.Context, bob.Tx, bool) *ErrorWithCode - updateReporterEmail(context.Context, bob.Tx, string) *ErrorWithCode - updateReporterName(context.Context, bob.Tx, string) *ErrorWithCode - updateReporterPhone(context.Context, bob.Tx, text.E164) *ErrorWithCode + updateReporterConsent(context.Context, bob.Executor, bool) *ErrorWithCode + updateReporterEmail(context.Context, bob.Executor, string) *ErrorWithCode + updateReporterName(context.Context, bob.Executor, string) *ErrorWithCode + updateReporterPhone(context.Context, bob.Executor, types.E164) *ErrorWithCode PublicReportID() string reportID() int32 } diff --git a/platform/start.go b/platform/start.go new file mode 100644 index 00000000..7dbdf158 --- /dev/null +++ b/platform/start.go @@ -0,0 +1,194 @@ +package platform + +import ( + "context" + "fmt" + "strconv" + "sync" + "time" + + "github.com/Gleipnir-Technology/bob" + bobpgx "github.com/Gleipnir-Technology/bob/drivers/pgx" + //"github.com/Gleipnir-Technology/bob/dialect/psql" + //"github.com/Gleipnir-Technology/bob/dialect/psql/sm" + "github.com/Gleipnir-Technology/nidus-sync/config" + "github.com/Gleipnir-Technology/nidus-sync/db" + "github.com/Gleipnir-Technology/nidus-sync/db/enums" + "github.com/Gleipnir-Technology/nidus-sync/db/models" + //"github.com/Gleipnir-Technology/nidus-sync/platform/background" + "github.com/Gleipnir-Technology/nidus-sync/platform/csv" + "github.com/Gleipnir-Technology/nidus-sync/platform/email" + "github.com/Gleipnir-Technology/nidus-sync/platform/file" + "github.com/Gleipnir-Technology/nidus-sync/platform/geocode" + "github.com/Gleipnir-Technology/nidus-sync/platform/text" + "github.com/jackc/pgx/v5" + //"github.com/Gleipnir-Technology/nidus-sync/userfile" + //"github.com/google/uuid" + "github.com/rs/zerolog/log" +) + +var waitGroup sync.WaitGroup +var newOAuthTokenChannel chan struct{} + +func StartAll(ctx context.Context) error { + err := email.LoadTemplates() + if err != nil { + return fmt.Errorf("Failed to load email templates: %w", err) + } + + err = text.StoreSources() + if err != nil { + return fmt.Errorf("Failed to store text source phone numbers: %w", err) + } + + err = file.CreateDirectories() + if err != nil { + return fmt.Errorf("Failed to create file directories: %w", err) + } + + err = initializeLabelStudio() + if err != nil { + return fmt.Errorf("init label studio: %w", err) + } + + geocode.InitializeStadia(config.StadiaMapsAPIKey) + + newOAuthTokenChannel = make(chan struct{}, 10) + + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + refreshFieldseekerData(ctx, newOAuthTokenChannel) + }() + + err = addWaitingJobs(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to add waiting background jobs") + } + return nil +} + +func WaitForExit() { + waitGroup.Wait() +} + +func addWaitingJobs(ctx context.Context) error { + jobs, err := models.Jobs.Query().All(ctx, db.PGInstance.BobDB) + if err != nil { + return fmt.Errorf("Failed to query waiting jobs: %w", err) + } + for _, job := range jobs { + go func() { + txn, err := db.PGInstance.BobDB.Begin(ctx) + if err != nil { + log.Error().Err(err).Msg("failed begin txn") + return + } + err = handleJob(ctx, txn, job) + if err != nil { + log.Error().Err(err).Msg("failed begin txn") + return + } + err = job.Delete(ctx, txn) + if err != nil { + log.Error().Err(err).Msg("failed delete job") + return + } + txn.Commit(ctx) + }() + } + return nil +} +func handleJob(ctx context.Context, txn bob.Executor, job *models.Job) error { + switch job.Type { + case enums.JobtypeCSVCommit: + return csv.JobCommit(ctx, txn, job.RowID) + case enums.JobtypeCSVImport: + return csv.JobImport(ctx, txn, job.RowID) + case enums.JobtypeLabelStudioAudioCreate: + return handleJobLabelStudioAudioCreate(ctx, txn, job.RowID) + case enums.JobtypeEmailSend: + return email.Job(ctx, txn, job.RowID) + case enums.JobtypeTextSend: + return text.Job(ctx, txn, job.RowID) + default: + return fmt.Errorf("No handler for job type %s", string(job.Type)) + } +} +func handleJobLabelStudioAudioCreate(ctx context.Context, txn bob.Executor, row_id int32) error { + return jobLabelStudioAudioCreate(ctx, txn, row_id) +} +func listenForJobs(ctx context.Context) { + for { + //es.SendQueuedEmails(ctx) // send any emails queued prior to listening for notificiations + err := listenAndDoOneJob(ctx) + if err != nil { + log.Error().Err(err).Msg("Crashed listenAndDoOneJob") + } + + select { + case <-ctx.Done(): + return + default: + // If listenAndSendOneConn returned and ctx has not been cancelled that means there was a fatal database error. + // Wait a while to avoid busy-looping while the database is unreachable. + time.Sleep(time.Minute) + } + } +} +func listenAndDoOneJob(ctx context.Context) error { + conn, err := db.PGInstance.PGXPool.Acquire(ctx) + if err != nil { + //if !pgconn.Timeout(err) { + return fmt.Errorf("failed to acquire database connection to listen for queued emails: %w", err) + } + defer conn.Release() + + _, err = conn.Exec(ctx, "LISTEN new_job") + if err != nil { + //if !pgconn.Timeout(err) { + return fmt.Errorf("failed to listen to outbound_email_queued: %w", err) + } + + for { + notification, err := conn.Conn().WaitForNotification(ctx) + if err != nil { + //if !pgconn.Timeout(err) { + return fmt.Errorf("failed while waiting for notification of outbound_email_queued") + } + + job_id, err := strconv.Atoi(notification.Payload) + if err != nil { + return fmt.Errorf("failed to parse int from payload '%s': %w", notification.Payload, err) + } + + c := bobpgx.NewConn(conn.Conn()) + job, err := models.FindJob(ctx, c, int32(job_id)) + if err != nil { + return fmt.Errorf("Failed to find job %d: %w", job_id, err) + } + + //tx, err := c.BeginTx(ctx, pgx.TxOptions{}) + tx, err := conn.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return fmt.Errorf("Failed to start transaction: %w", err) + } + ctx, cancel := context.WithCancel(ctx) + txn := bobpgx.NewTx(tx, cancel) + + sublog := log.With().Int32("job", job.ID).Int32("row_id", job.RowID).Logger() + err = handleJob(ctx, txn, job) + if err != nil { + sublog.Error().Err(err).Msg("failed to handle job") + txn.Rollback(ctx) + return nil + } + err = job.Delete(ctx, txn) + if err != nil { + sublog.Error().Err(err).Msg("failed to delete job") + txn.Rollback(ctx) + return fmt.Errorf("delete job: %w", err) + } + txn.Commit(ctx) + } +} diff --git a/platform/text/job.go b/platform/text/job.go index afbfc1ef..ddb65b48 100644 --- a/platform/text/job.go +++ b/platform/text/job.go @@ -2,38 +2,24 @@ package text import ( "context" + "fmt" - "github.com/rs/zerolog/log" + "github.com/Gleipnir-Technology/bob" + "github.com/Gleipnir-Technology/nidus-sync/config" + "github.com/Gleipnir-Technology/nidus-sync/db/enums" + "github.com/Gleipnir-Technology/nidus-sync/platform/types" ) -type MessageType int - -const ( - ReportSubscription MessageType = iota -) - -type Job interface { - content() string - destination() string - messageType() MessageType - messageTypeName() string - source() string +func Job(ctx context.Context, txn bob.Executor, text_id int32) error { + return sendTextComplete(ctx, txn, text_id) } -func Handle(ctx context.Context, job Job) { - var err error - switch job.messageType() { - case ReportSubscription: - err = sendReportSubscription(ctx, job) - } +func ReportSubscriptionConfirmationText(ctx context.Context, destination types.E164, report_id string) error { + content := fmt.Sprintf("Thanks for submitting mosquito report %s. Text for any questions. We'll send you updates as we get them.", report_id) + origin := enums.CommsTextoriginWebsiteAction + err := sendTextBegin(ctx, *types.NewE164(&config.PhoneNumberReport), destination, content, origin, true, true) if err != nil { - log.Error().Err(err).Str("dest", job.destination()).Str("type", string(job.messageTypeName())).Msg("Error processing text") - return + return fmt.Errorf("Failed to send initial confirmation: %w", err) } - /* - case enums.CommsMessagetypeemailReportStatusScheduled: - case enums.CommsMessagetypeemailReportStatusComplete: - - } - */ + return err } diff --git a/platform/text/report-subscription.go b/platform/text/report-subscription.go index e55ebfce..579a08fc 100644 --- a/platform/text/report-subscription.go +++ b/platform/text/report-subscription.go @@ -6,75 +6,37 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/enums" - "github.com/nyaruka/phonenumbers" + "github.com/Gleipnir-Technology/nidus-sync/platform/types" //"github.com/rs/zerolog/log" ) -func NewJobReportSubscriptionConfirmation( - destination E164, - report_id string, - source E164) jobReportSubscription { - return jobReportSubscription{ - dst: destination, - reportID: report_id, - src: source, - } -} - -type jobReportSubscription struct { - dst E164 - reportID string - src E164 -} - -func (j jobReportSubscription) content() string { - return fmt.Sprintf("Thanks for submitting mosquito report %s. Text for any questions. We'll send you updates as we get them.", j.reportID) -} -func (j jobReportSubscription) destination() string { - return phonenumbers.Format(j.dst.number, phonenumbers.E164) -} -func (j jobReportSubscription) messageType() MessageType { - return ReportSubscription -} -func (j jobReportSubscription) messageTypeName() string { - return "report-subscription" -} -func (j jobReportSubscription) source() string { - return phonenumbers.Format(j.src.number, phonenumbers.E164) -} - -func sendReportSubscription(ctx context.Context, job Job) error { - j, ok := job.(jobReportSubscription) - if !ok { - return fmt.Errorf("job is not for report subscription confirmation") - } - - err := ensureInDB(ctx, db.PGInstance.BobDB, job.destination()) +func sendReportSubscription(ctx context.Context, source, destination types.E164, content string) error { + err := EnsureInDB(ctx, db.PGInstance.BobDB, destination) if err != nil { return fmt.Errorf("Failed to ensure text message destination is in the DB: %w", err) } - status, err := phoneStatus(ctx, job.destination()) + status, err := phoneStatus(ctx, destination) if err != nil { return fmt.Errorf("Failed to check if subscribed: %w", err) } switch status { case enums.CommsPhonestatustypeUnconfirmed: - err = delayMessage(ctx, enums.CommsTextjobsourceRmo, j.destination(), j.content(), enums.CommsTextjobtypeReportConfirmation) + err = delayMessage(ctx, enums.CommsTextjobsourceRmo, destination, content, enums.CommsTextjobtypeReportConfirmation) if err != nil { return fmt.Errorf("Failed to delay report subscription message: %w", err) } - err := ensureInitialText(ctx, j.source(), j.destination()) + err := ensureInitialText(ctx, source, destination) if err != nil { return fmt.Errorf("Failed to ensure initial text has been sent: %w", err) } return nil case enums.CommsPhonestatustypeOkToSend: - err = sendText(ctx, j.source(), j.destination(), j.content(), enums.CommsTextoriginWebsiteAction, false, true) + err = sendTextBegin(ctx, source, destination, content, enums.CommsTextoriginWebsiteAction, false, true) if err != nil { return fmt.Errorf("Failed to send report subscription confirmation: %w", err) } case enums.CommsPhonestatustypeStopped: - resendInitialText(ctx, j.source(), j.destination()) + resendInitialText(ctx, source, destination) } return nil } diff --git a/platform/text/text.go b/platform/text/text.go index 339cbe7a..d8396a7a 100644 --- a/platform/text/text.go +++ b/platform/text/text.go @@ -17,104 +17,102 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/db/sql" "github.com/Gleipnir-Technology/nidus-sync/llm" + "github.com/Gleipnir-Technology/nidus-sync/platform/background" + "github.com/Gleipnir-Technology/nidus-sync/platform/types" "github.com/aarondl/opt/omit" "github.com/aarondl/opt/omitnull" "github.com/nyaruka/phonenumbers" "github.com/rs/zerolog/log" ) -type E164 struct { - number *phonenumbers.PhoneNumber +func EnsureInDB(ctx context.Context, ex bob.Executor, destination types.E164) (err error) { + _, err = psql.Insert( + im.Into("comms.phone", "e164", "is_subscribed", "status"), + im.Values( + psql.Arg(destination.PhoneString()), + psql.Arg(false), + psql.Arg("unconfirmed"), + ), + im.OnConflict("e164").DoNothing(), + ).Exec(ctx, ex) + return err } - -func NewE164(n *phonenumbers.PhoneNumber) *E164 { - return &E164{ - number: n, - } -} - -func EnsureInDB(ctx context.Context, ex bob.Executor, destination E164) (err error) { - return ensureInDB(ctx, ex, PhoneString(destination)) -} -func HandleTextMessage(src string, dst string, body string) { - ctx := context.Background() - - _, err := insertTextLog(ctx, body, dst, src, enums.CommsTextoriginCustomer, false, true) +func HandleTextMessage(ctx context.Context, source string, destination string, body string) error { + src, err := ParsePhoneNumber(source) if err != nil { - log.Error().Err(err).Str("dst", dst).Msg("Failed to add text message log") - return + return fmt.Errorf("parse source '%s': %w", source, err) } - status, err := phoneStatus(ctx, src) + dst, err := ParsePhoneNumber(destination) if err != nil { - log.Error().Err(err).Msg("Failed to get phone status") - return + return fmt.Errorf("parse destination '%s': %w", destination, err) + } + _, err = insertTextLog(ctx, *dst, *src, enums.CommsTextoriginCustomer, body, false, true) + if err != nil { + return fmt.Errorf("insert text log: %w", err) + } + status, err := phoneStatus(ctx, *src) + if err != nil { + return fmt.Errorf("Failed to get phone status") } body_l := strings.TrimSpace(strings.ToLower(body)) // We don't know if they're subscribed or not. if status == enums.CommsPhonestatustypeUnconfirmed { switch body_l { case "yes": - setPhoneStatus(ctx, src, enums.CommsPhonestatustypeOkToSend) + setPhoneStatus(ctx, *src, enums.CommsPhonestatustypeOkToSend) content := "Thanks, we've confirmed your phone number. You can text STOP at any time if you change your mind" - err := sendText(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false, false) + err := sendTextBegin(ctx, *dst, *src, content, enums.CommsTextoriginCommandResponse, false, false) if err != nil { log.Error().Err(err).Msg("Failed to send confirmation response") } - handleWaitingTextJobs(ctx, src) + handleWaitingTextJobs(ctx, *src) default: content := "I have to start with either 'YES' or 'STOP' first, Which do you want?" - err = sendText(ctx, dst, src, content, enums.CommsTextoriginReiteration, false, false) + err = sendTextBegin(ctx, *dst, *src, content, enums.CommsTextoriginReiteration, false, false) if err != nil { log.Error().Err(err).Msg("Failed to resend initial prompt.") } } - return + return nil } switch body_l { case "stop": content := "You have successfully been unsubscribed. You will not receive any more messages from this number. Reply START to resubscribe." - err = sendText(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false, false) + err = sendTextBegin(ctx, *dst, *src, content, enums.CommsTextoriginCommandResponse, false, false) if err != nil { log.Error().Err(err).Msg("Failed to send unsubscribe acknowledgement.") } - setPhoneStatus(ctx, src, enums.CommsPhonestatustypeStopped) - return + setPhoneStatus(ctx, *src, enums.CommsPhonestatustypeStopped) + return nil case "reset conversation": - handleResetConversation(ctx, src, dst) - return + handleResetConversation(ctx, *src, *dst) + return nil default: } - previous_messages, err := loadPreviousMessagesForLLM(ctx, dst, src) + previous_messages, err := loadPreviousMessagesForLLM(ctx, *dst, *src) if err != nil { - log.Error().Err(err).Str("dst", dst).Str("src", src).Msg("Failed to get previous messages") - return + return fmt.Errorf("Failed to get previous messages: %w", err) } log.Info().Int("len", len(previous_messages)).Msg("passing") - next_message, err := generateNextMessage(ctx, previous_messages, src) + sublog := log.With().Str("dst", destination).Str("src", source).Logger() + next_message, err := generateNextMessage(ctx, previous_messages, *src) if err != nil { - log.Error().Err(err).Str("dst", dst).Str("src", src).Msg("Failed to generate next message") - return + return fmt.Errorf("Failed to generate next message: %w", err) } - err = sendText(ctx, dst, src, next_message.Content, enums.CommsTextoriginLLM, false, true) + err = sendTextBegin(ctx, *dst, *src, next_message.Content, enums.CommsTextoriginLLM, false, true) if err != nil { - log.Error().Err(err).Str("src", src).Str("dst", dst).Str("content", next_message.Content).Msg("Failed to send response text") - return + return fmt.Errorf("Failed to send response text: %w", err) } - log.Info().Str("src", src).Str("dst", dst).Str("body", body).Str("reply", next_message.Content).Msg("Handled text message") + sublog.Info().Str("content", next_message.Content).Str("body", body).Str("reply", next_message.Content).Msg("Handled text message") + return nil } -func ParsePhoneNumber(input string) (*E164, error) { +func ParsePhoneNumber(input string) (*types.E164, error) { n, err := phonenumbers.Parse(input, "US") if err != nil { return nil, err } - return &E164{ - number: n, - }, nil -} - -func PhoneString(p E164) string { - return phonenumbers.Format(p.number, phonenumbers.E164) + return types.NewE164(n), nil } func StoreSources() error { @@ -159,11 +157,11 @@ func UpdateMessageStatus(twilio_sid string, status string) { return } } -func delayMessage(ctx context.Context, source enums.CommsTextjobsource, destination string, content string, type_ enums.CommsTextjobtype) error { +func delayMessage(ctx context.Context, source enums.CommsTextjobsource, destination types.E164, content string, type_ enums.CommsTextjobtype) error { job, err := models.CommsTextJobs.Insert(&models.CommsTextJobSetter{ Content: omit.From(content), Created: omit.From(time.Now()), - Destination: omit.From(destination), + Destination: omit.From(destination.PhoneString()), //ID: Source: omit.From(source), Type: omit.From(type_), @@ -175,8 +173,8 @@ func delayMessage(ctx context.Context, source enums.CommsTextjobsource, destinat return nil } -func resendInitialText(ctx context.Context, src string, dst string) error { - phone, err := models.FindCommsPhone(ctx, db.PGInstance.BobDB, dst) +func resendInitialText(ctx context.Context, src, dst types.E164) error { + phone, err := models.FindCommsPhone(ctx, db.PGInstance.BobDB, dst.PhoneString()) if err != nil { return fmt.Errorf("Failed to find phone %s: %w", dst, err) } @@ -189,33 +187,20 @@ func resendInitialText(ctx context.Context, src string, dst string) error { return nil } -func sendInitialText(ctx context.Context, src string, dst string) error { +func sendInitialText(ctx context.Context, src, dst types.E164) error { content := "Welcome to Report Mosquitoes Online. We received your request and want to confirm text updates. Reply YES to continue. Reply STOP at any time to unsubscribe" origin := enums.CommsTextoriginWebsiteAction - err := sendText(ctx, src, dst, content, origin, true, true) + err := sendTextBegin(ctx, src, dst, content, origin, true, true) if err != nil { return fmt.Errorf("Failed to send initial confirmation: %w", err) } return nil } -func ensureInDB(ctx context.Context, ex bob.Executor, destination string) (err error) { - _, err = psql.Insert( - im.Into("comms.phone", "e164", "is_subscribed", "status"), - im.Values( - psql.Arg(destination), - psql.Arg(false), - psql.Arg("unconfirmed"), - ), - im.OnConflict("e164").DoNothing(), - ).Exec(ctx, ex) - return err -} - -func ensureInitialText(ctx context.Context, src string, dst string) error { +func ensureInitialText(ctx context.Context, src, dst types.E164) error { // rows, err := models.CommsTextLogs.Query( - models.SelectWhere.CommsTextLogs.Destination.EQ(dst), + models.SelectWhere.CommsTextLogs.Destination.EQ(dst.PhoneString()), models.SelectWhere.CommsTextLogs.IsWelcome.EQ(true), ).All(ctx, db.PGInstance.BobDB) if err != nil { @@ -227,7 +212,7 @@ func ensureInitialText(ctx context.Context, src string, dst string) error { return sendInitialText(ctx, src, dst) } -func generateNextMessage(ctx context.Context, history []llm.Message, customer_phone string) (llm.Message, error) { +func generateNextMessage(ctx context.Context, history []llm.Message, customer_phone types.E164) (llm.Message, error) { _handle_report_status := func() (string, error) { return "Report: ABCD-1234-5678, District: Delta MVCD, Status: scheduled, Appointment: Wednesday 3:30pm", nil } @@ -240,9 +225,9 @@ func generateNextMessage(ctx context.Context, history []llm.Message, customer_ph return llm.GenerateNextMessage(ctx, history, _handle_report_status, _handle_contact_district, _handle_contact_supervisor) } -func handleWaitingTextJobs(ctx context.Context, src string) { +func handleWaitingTextJobs(ctx context.Context, src types.E164) { jobs, err := models.CommsTextJobs.Query( - models.SelectWhere.CommsTextJobs.Destination.EQ(src), + models.SelectWhere.CommsTextJobs.Destination.EQ(src.PhoneString()), models.SelectWhere.CommsTextJobs.Completed.IsNull(), ).All(ctx, db.PGInstance.BobDB) if err != nil { @@ -250,21 +235,34 @@ func handleWaitingTextJobs(ctx context.Context, src string) { return } for _, job := range jobs { - var src string + var source string switch job.Source { case enums.CommsTextjobsourceRmo: - src = config.PhoneNumberReportStr + source = config.PhoneNumberReportStr //case enums.CommsTextJobsourcenidus: //src := config.PhoneNumebrNidusStr default: log.Error().Str("source", job.Source.String()).Msg("Can't support background text job.") + continue } - err = sendText(ctx, src, job.Destination, job.Content, enums.CommsTextoriginWebsiteAction, false, true) + p, err := phonenumbers.Parse(job.Destination, "US") + if err != nil { + log.Error().Err(err).Str("dest", job.Destination).Int32("id", job.ID).Msg("Invalid destination in job") + continue + } + dst := types.NewE164(p) + p, err = phonenumbers.Parse(source, "US") + if err != nil { + log.Error().Err(err).Str("source", source).Int32("id", job.ID).Msg("Invalid source in job") + continue + } + src := types.NewE164(p) + err = sendTextBegin(ctx, *src, *dst, job.Content, enums.CommsTextoriginWebsiteAction, false, true) if err != nil { log.Error().Err(err).Int32("id", job.ID).Msg("Failed to send delayed text job.") continue } - err := job.Update(ctx, db.PGInstance.BobDB, &models.CommsTextJobSetter{ + err = job.Update(ctx, db.PGInstance.BobDB, &models.CommsTextJobSetter{ Completed: omitnull.From(time.Now()), }) if err != nil { @@ -274,60 +272,64 @@ func handleWaitingTextJobs(ctx context.Context, src string) { } } -func handleResetConversation(ctx context.Context, src string, dst string) { +func handleResetConversation(ctx context.Context, src types.E164, dst types.E164) { err := wipeLLMMemory(ctx, src, dst) + sublog := log.With().Str("src", src.PhoneString()).Str("dst", dst.PhoneString()).Logger() if err != nil { - log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("Failed to wipe memory") + sublog.Error().Err(err).Msg("Failed to wipe memory") content := "Failed to wip memory" - err = sendText(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false, false) + err = sendTextBegin(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false, false) if err != nil { - log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("Failed to indicated memory wipe failure.") + sublog.Error().Err(err).Msg("Failed to indicated memory wipe failure.") } return } content := "LLM memory wiped" - err = sendText(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false, false) + err = sendTextBegin(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false, false) if err != nil { - log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("Failed to indicated memory wiped.") + sublog.Error().Err(err).Msg("Failed to indicated memory wiped.") return } - log.Info().Err(err).Str("src", src).Str("dst", dst).Msg("Wiped LLM memory") + sublog.Info().Err(err).Msg("Wiped LLM memory") } -func insertTextLog(ctx context.Context, content string, destination string, source string, origin enums.CommsTextorigin, is_welcome bool, is_visible_to_llm bool) (log *models.CommsTextLog, err error) { - log, err = models.CommsTextLogs.Insert(&models.CommsTextLogSetter{ +func insertTextLog(ctx context.Context, destination types.E164, source types.E164, origin enums.CommsTextorigin, content string, is_welcome bool, is_visible_to_llm bool) (l *models.CommsTextLog, err error) { + l, err = models.CommsTextLogs.Insert(&models.CommsTextLogSetter{ //ID: Content: omit.From(content), Created: omit.From(time.Now()), - Destination: omit.From(destination), + Destination: omit.From(destination.PhoneString()), IsVisibleToLLM: omit.From(is_visible_to_llm), IsWelcome: omit.From(is_welcome), Origin: omit.From(origin), - Source: omit.From(source), + Source: omit.From(source.PhoneString()), TwilioSid: omitnull.FromPtr[string](nil), TwilioStatus: omit.From(""), }).One(ctx, db.PGInstance.BobDB) + if err != nil { + log.Debug().Int32("id", l.ID).Bool("is_visible_to_llm", is_visible_to_llm).Str("message", content).Msg("inserted text log") + } - return log, err + return l, err } -func phoneStatus(ctx context.Context, src string) (enums.CommsPhonestatustype, error) { - phone, err := models.FindCommsPhone(ctx, db.PGInstance.BobDB, src) +func phoneStatus(ctx context.Context, src types.E164) (enums.CommsPhonestatustype, error) { + phone, err := models.FindCommsPhone(ctx, db.PGInstance.BobDB, src.PhoneString()) if err != nil { - return enums.CommsPhonestatustypeUnconfirmed, fmt.Errorf("Failed to determine if '%s' is subscribed: %w", src, err) + return enums.CommsPhonestatustypeUnconfirmed, fmt.Errorf("Failed to determine if '%s' is subscribed: %w", src.PhoneString(), err) } return phone.Status, nil } -func loadPreviousMessagesForLLM(ctx context.Context, dst, src string) ([]llm.Message, error) { - messages, err := sql.TextsBySenders(dst, src).All(ctx, db.PGInstance.BobDB) +func loadPreviousMessagesForLLM(ctx context.Context, dst, src types.E164) ([]llm.Message, error) { + messages, err := sql.TextsBySenders(dst.PhoneString(), src.PhoneString()).All(ctx, db.PGInstance.BobDB) results := make([]llm.Message, 0) if err != nil { return results, fmt.Errorf("Failed to get message history for %s and %s: %w", dst, src, err) } for _, m := range messages { if m.IsVisibleToLLM { - is_from_customer := (m.Source == src) + is_from_customer := (m.Source == src.PhoneString()) results = append(results, llm.Message{ IsFromCustomer: is_from_customer, Content: m.Content, @@ -337,45 +339,51 @@ func loadPreviousMessagesForLLM(ctx context.Context, dst, src string) ([]llm.Mes return results, nil } -func sendText(ctx context.Context, source string, destination string, message string, origin enums.CommsTextorigin, is_welcome bool, is_visible_to_llm bool) error { - err := ensureInDB(ctx, db.PGInstance.BobDB, destination) +func sendTextBegin(ctx context.Context, source types.E164, destination types.E164, message string, origin enums.CommsTextorigin, is_welcome bool, is_visible_to_llm bool) error { + err := EnsureInDB(ctx, db.PGInstance.BobDB, destination) if err != nil { return fmt.Errorf("Failed to ensure text message destination is in the DB: %w", err) } - l, err := insertTextLog(ctx, message, destination, source, origin, is_welcome, is_visible_to_llm) + l, err := insertTextLog(ctx, destination, source, origin, message, is_welcome, is_visible_to_llm) if err != nil { return fmt.Errorf("Failed to insert text message in the DB: %w", err) } - sid, err := text.SendText(ctx, source, destination, message) + return background.NewTextSend(ctx, db.PGInstance.BobDB, l.ID) +} +func sendTextComplete(ctx context.Context, txn bob.Executor, text_id int32) error { + text_log, err := models.FindCommsTextLog(ctx, txn, text_id) + if err != nil { + return fmt.Errorf("find text: %w", err) + } + sid, err := text.SendText(ctx, text_log.Source, text_log.Destination, text_log.Content) if err != nil { return fmt.Errorf("Failed to send text message: %w", err) } - err = l.Update(ctx, db.PGInstance.BobDB, &models.CommsTextLogSetter{ + err = text_log.Update(ctx, db.PGInstance.BobDB, &models.CommsTextLogSetter{ TwilioSid: omitnull.From(sid), TwilioStatus: omit.From("created"), }) if err != nil { return fmt.Errorf("Failed to update text Twilio status: %w", err) } - log.Info().Int32("id", l.ID).Bool("is_visible_to_llm", is_visible_to_llm).Str("message", message).Msg("inserted text log") return nil } -func setPhoneStatus(ctx context.Context, src string, status enums.CommsPhonestatustype) error { - phone, err := models.FindCommsPhone(ctx, db.PGInstance.BobDB, src) +func setPhoneStatus(ctx context.Context, src types.E164, status enums.CommsPhonestatustype) error { + phone, err := models.FindCommsPhone(ctx, db.PGInstance.BobDB, src.PhoneString()) if err != nil { return fmt.Errorf("Failed to determine if '%s' is subscribed: %w", src, err) } phone.Update(ctx, db.PGInstance.BobDB, &models.CommsPhoneSetter{ Status: omit.From(status), }) - log.Info().Str("src", src).Str("status", string(status)).Msg("Set number subscribed") + log.Info().Str("src", src.PhoneString()).Str("status", string(status)).Msg("Set number subscribed") return nil } -func wipeLLMMemory(ctx context.Context, src string, dst string) error { - rows, err := sql.TextsBySenders(dst, src).All(ctx, db.PGInstance.BobDB) +func wipeLLMMemory(ctx context.Context, src types.E164, dst types.E164) error { + rows, err := sql.TextsBySenders(dst.PhoneString(), src.PhoneString()).All(ctx, db.PGInstance.BobDB) if err != nil { return fmt.Errorf("Failed to query for texts: %w", err) } diff --git a/platform/tile.go b/platform/tile.go index 3ac46643..8adf2e74 100644 --- a/platform/tile.go +++ b/platform/tile.go @@ -18,7 +18,6 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/models" - "github.com/Gleipnir-Technology/nidus-sync/platform/background" "github.com/Gleipnir-Technology/nidus-sync/platform/oauth" "github.com/rs/zerolog/log" ) @@ -152,7 +151,7 @@ func ImageAtTile(ctx context.Context, org *models.Organization, level, y, x uint if err != nil { return nil, fmt.Errorf("get oauth for org: %w", err) } - fssync, err := background.NewFieldSeeker( + fssync, err := newFieldSeeker( ctx, oauth, ) @@ -210,7 +209,7 @@ func getFieldseeker(ctx context.Context, org *models.Organization) (*fieldseeker if err != nil { return nil, fmt.Errorf("get oauth for org: %w", err) } - fssync, err = background.NewFieldSeeker( + fssync, err = newFieldSeeker( ctx, oauth, ) diff --git a/platform/types/e164.go b/platform/types/e164.go new file mode 100644 index 00000000..162e0e54 --- /dev/null +++ b/platform/types/e164.go @@ -0,0 +1,18 @@ +package types + +import ( + "github.com/nyaruka/phonenumbers" +) + +type E164 struct { + number *phonenumbers.PhoneNumber +} + +func NewE164(n *phonenumbers.PhoneNumber) *E164 { + return &E164{ + number: n, + } +} +func (e E164) PhoneString() string { + return phonenumbers.Format(e.number, phonenumbers.E164) +} diff --git a/platform/upload.go b/platform/upload.go index da39baa1..4a604898 100644 --- a/platform/upload.go +++ b/platform/upload.go @@ -41,10 +41,10 @@ type UploadSummary struct { Type string `db:"type"` } -func NewUpload(ctx context.Context, u User, upload file.FileUpload, t enums.FileuploadCsvtype) (Upload, error) { +func NewUpload(ctx context.Context, u User, upload file.FileUpload, t enums.FileuploadCsvtype) (*Upload, error) { txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) if err != nil { - return Upload{}, fmt.Errorf("Failed to begin transaction: %w", err) + return nil, fmt.Errorf("Failed to begin transaction: %w", err) } defer txn.Rollback(ctx) @@ -60,7 +60,7 @@ func NewUpload(ctx context.Context, u User, upload file.FileUpload, t enums.File FileUUID: omit.From(upload.UUID), }).One(ctx, txn) if err != nil { - return Upload{}, fmt.Errorf("Failed to create file upload: %w", err) + return nil, fmt.Errorf("Failed to create file upload: %w", err) } _, err = models.FileuploadCSVS.Insert(&models.FileuploadCSVSetter{ Committed: omitnull.FromPtr[time.Time](nil), @@ -69,27 +69,38 @@ func NewUpload(ctx context.Context, u User, upload file.FileUpload, t enums.File Type: omit.From(t), }).One(ctx, txn) if err != nil { - return Upload{}, fmt.Errorf("Failed to create csv: %w", err) + return nil, fmt.Errorf("Failed to create csv: %w", err) } log.Info().Int32("id", file.ID).Msg("Created new pool CSV upload") + err = background.NewCSVImport(ctx, txn, file.ID) + if err != nil { + return nil, fmt.Errorf("background job create: %w", err) + } txn.Commit(ctx) - background.ProcessUpload(file.ID, t) - return Upload{ + return &Upload{ ID: file.ID, }, nil } func UploadCommit(ctx context.Context, org Organization, file_id int32, committer User) error { - // Create addresses for each row - // Create sites for each row - // Create pools for each row - _, err := psql.Update( + txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("Failed to begin transaction: %w", err) + } + defer txn.Rollback(ctx) + + _, err = psql.Update( um.Table(models.FileuploadFiles.Alias()), um.SetCol("status").ToArg("committing"), um.SetCol("committer").ToArg(committer.ID), um.Where(psql.Quote("id").EQ(psql.Arg(file_id))), um.Where(psql.Quote("organization_id").EQ(psql.Arg(org.ID))), - ).Exec(ctx, db.PGInstance.BobDB) - background.CommitUpload(file_id) + ).Exec(ctx, txn) + err = background.NewCSVCommit(ctx, txn, file_id) + if err != nil { + return fmt.Errorf("update upload: %w", err) + } + err = txn.Commit(ctx) + return err } func UploadDiscard(ctx context.Context, org Organization, file_id int32) error { diff --git a/rmo/notification.go b/rmo/notification.go index 6ce5ca1d..cbbcca3f 100644 --- a/rmo/notification.go +++ b/rmo/notification.go @@ -9,6 +9,7 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/platform" "github.com/Gleipnir-Technology/nidus-sync/platform/report" "github.com/Gleipnir-Technology/nidus-sync/platform/text" + "github.com/Gleipnir-Technology/nidus-sync/platform/types" "github.com/rs/zerolog/log" ) @@ -26,7 +27,7 @@ func postRegisterNotifications(w http.ResponseWriter, r *http.Request) { phone_str := r.PostFormValue("phone") report_id := r.PostFormValue("report_id") - var phone *text.E164 + var phone *types.E164 if phone_str != "" { phone, err = text.ParsePhoneNumber(phone_str) if err != nil {