Add contacts, rework comms schema
This in a pretty huge change. At a high level we're adding the concept of a 'contact' which is a person or organization that has zero or more contact methods (email, phone). This ended up cascading a number of changes, including critically to the publicreprt schema. In the end it seemed safer to get to the point where I'm confident we aren't using any of the old fields for storing reporter information (though I haven't deleted the columns yet) so I removed the code for defining those columns. At this point I think it's not possible for me to regenerate the bob schema due to the interdependencies between my various schemas, so the migration is well-and-truly happening.
This commit is contained in:
parent
085935fa66
commit
f1fe8b4d2b
46 changed files with 1127 additions and 633 deletions
|
|
@ -5,7 +5,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/models"
|
||||
querycomms "github.com/Gleipnir-Technology/nidus-sync/db/query/comms"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/platform/types"
|
||||
//"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
|
@ -14,8 +14,8 @@ func JobRespond(ctx context.Context, log_id int32) error {
|
|||
return respondText(ctx, log_id)
|
||||
}
|
||||
func JobSend(ctx context.Context, job_id int32) error {
|
||||
bxn := db.PGInstance.BobDB
|
||||
job, err := models.FindCommsTextJob(ctx, bxn, job_id)
|
||||
bxn := db.PGInstance.PGXPool
|
||||
job, err := querycomms.TextJobFromID(ctx, bxn, int64(job_id))
|
||||
if err != nil {
|
||||
return fmt.Errorf("find text: %w", err)
|
||||
}
|
||||
|
|
@ -23,11 +23,8 @@ func JobSend(ctx context.Context, job_id int32) error {
|
|||
return sendTextComplete(ctx, job)
|
||||
}
|
||||
func handleWaitingTextJobs(ctx context.Context, dst types.E164) error {
|
||||
bxn := db.PGInstance.BobDB
|
||||
jobs, err := models.CommsTextJobs.Query(
|
||||
models.SelectWhere.CommsTextJobs.Destination.EQ(dst.PhoneString()),
|
||||
models.SelectWhere.CommsTextJobs.Completed.IsNull(),
|
||||
).All(ctx, bxn)
|
||||
bxn := db.PGInstance.PGXPool
|
||||
jobs, err := querycomms.TextJobsWaitingFromDestination(ctx, bxn, dst.PhoneString())
|
||||
if err != nil {
|
||||
return fmt.Errorf("query jobs: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,12 +6,10 @@ import (
|
|||
"fmt"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"github.com/Gleipnir-Technology/bob"
|
||||
"github.com/Gleipnir-Technology/bob/dialect/psql"
|
||||
"github.com/Gleipnir-Technology/bob/dialect/psql/um"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/config"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db"
|
||||
//"github.com/Gleipnir-Technology/nidus-sync/db/enums"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/models"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/sql"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/llm"
|
||||
|
|
@ -34,7 +32,7 @@ func generateNextMessage(ctx context.Context, history []llm.Message, customer_ph
|
|||
}
|
||||
return llm.GenerateNextMessage(ctx, history, _handle_report_status, _handle_contact_district, _handle_contact_supervisor)
|
||||
}
|
||||
func handleResetConversation(ctx context.Context, txn bob.Executor, src types.E164) error {
|
||||
func handleResetConversation(ctx context.Context, txn db.Ex, src types.E164) error {
|
||||
err := wipeLLMMemory(ctx, src)
|
||||
sublog := log.With().Str("src", src.PhoneString()).Logger()
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -4,31 +4,28 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/Gleipnir-Technology/bob"
|
||||
"github.com/Gleipnir-Technology/bob/dialect/psql"
|
||||
"github.com/Gleipnir-Technology/bob/dialect/psql/im"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
|
||||
modelcomms "github.com/Gleipnir-Technology/nidus-sync/db/gen/nidus-sync/comms/model"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/models"
|
||||
querycomms "github.com/Gleipnir-Technology/nidus-sync/db/query/comms"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/platform/types"
|
||||
"github.com/aarondl/opt/omit"
|
||||
"github.com/rs/zerolog/log"
|
||||
//"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func EnsureInDB(ctx context.Context, txn bob.Executor, dst types.E164) (err error) {
|
||||
return ensureInDB(ctx, txn, dst.PhoneString())
|
||||
func EnsureInDB(ctx context.Context, txn db.Ex, contact modelcomms.Contact, dst types.E164) (err error) {
|
||||
return ensureInDB(ctx, txn, contact, dst.PhoneString())
|
||||
}
|
||||
func ensureInDB(ctx context.Context, txn bob.Executor, destination string) (err error) {
|
||||
_, err = psql.Insert(
|
||||
im.Into("comms.phone", "can_sms", "e164", "is_subscribed", "status"),
|
||||
im.Values(
|
||||
psql.Arg(true),
|
||||
psql.Arg(destination),
|
||||
psql.Arg(false),
|
||||
psql.Arg("unconfirmed"),
|
||||
),
|
||||
im.OnConflict("e164").DoNothing(),
|
||||
).Exec(ctx, txn)
|
||||
func ensureInDB(ctx context.Context, txn db.Ex, contact modelcomms.Contact, destination string) (err error) {
|
||||
contact_phone := modelcomms.ContactPhone{
|
||||
CanSms: true,
|
||||
ConfirmedMessageID: nil,
|
||||
ContactID: contact.ID,
|
||||
E164: destination,
|
||||
IsSubscribed: false,
|
||||
StopMessageID: nil,
|
||||
}
|
||||
_, err = querycomms.ContactPhoneInsert(ctx, txn, contact_phone)
|
||||
return err
|
||||
}
|
||||
func phoneStatus(ctx context.Context, src types.E164) (enums.CommsPhonestatustype, error) {
|
||||
|
|
@ -38,17 +35,3 @@ func phoneStatus(ctx context.Context, src types.E164) (enums.CommsPhonestatustyp
|
|||
}
|
||||
return phone.Status, nil
|
||||
}
|
||||
func setPhoneStatus(ctx context.Context, txn bob.Executor, src types.E164, status enums.CommsPhonestatustype) error {
|
||||
phone, err := models.FindCommsPhone(ctx, txn, src.PhoneString())
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to determine if '%s' is subscribed: %w", src, err)
|
||||
}
|
||||
err = phone.Update(ctx, txn, &models.CommsPhoneSetter{
|
||||
Status: omit.From(status),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("update phone status: %w", err)
|
||||
}
|
||||
log.Info().Str("src", src.PhoneString()).Str("status", string(status)).Msg("Set number subscribed")
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,20 +4,15 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/Gleipnir-Technology/bob"
|
||||
"github.com/Gleipnir-Technology/bob/dialect/psql"
|
||||
"github.com/Gleipnir-Technology/bob/dialect/psql/sm"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
|
||||
//"github.com/Gleipnir-Technology/nidus-sync/db/models"
|
||||
modelcomms "github.com/Gleipnir-Technology/nidus-sync/db/gen/nidus-sync/comms/model"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/platform/types"
|
||||
//"github.com/rs/zerolog/log"
|
||||
"github.com/stephenafamo/scan"
|
||||
)
|
||||
|
||||
// Send a message from a district to a public reporter within the context of the public report
|
||||
func ReportMessage(ctx context.Context, txn bob.Executor, user_id int32, report_id int32, destination types.E164, content string) (*int32, error) {
|
||||
job_id, err := sendTextBegin(ctx, txn, &user_id, &report_id, destination, content, enums.CommsTextjobtypeReportMessage)
|
||||
func ReportMessage(ctx context.Context, txn db.Ex, user_id int32, report_id int32, destination types.E164, content string) (*int32, error) {
|
||||
job_id, err := sendTextBegin(ctx, txn, &user_id, &report_id, destination, content, modelcomms.Textjobtype_ReportMessage)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to send initial confirmation: %w", err)
|
||||
}
|
||||
|
|
@ -25,43 +20,11 @@ func ReportMessage(ctx context.Context, txn bob.Executor, user_id int32, report_
|
|||
}
|
||||
|
||||
// Send a message from the system to a public reporter indicating they are subscribed to updates on the report
|
||||
func ReportSubscriptionConfirmationText(ctx context.Context, txn bob.Executor, destination types.E164, report_id string) error {
|
||||
func ReportSubscriptionConfirmationText(ctx context.Context, txn db.Ex, destination types.E164, report_id string) error {
|
||||
content := fmt.Sprintf("Thanks for submitting mosquito report %s. Text for any questions. We'll send you updates as we get them.", report_id)
|
||||
_, err := sendTextBegin(ctx, txn, nil, nil, destination, content, enums.CommsTextjobtypeReportConfirmation)
|
||||
_, err := sendTextBegin(ctx, txn, nil, nil, destination, content, modelcomms.Textjobtype_ReportConfirmation)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to send initial confirmation: %w", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
type reportIDs struct {
|
||||
ID int32 `db:"id"`
|
||||
PublicID string `db:"public_id"`
|
||||
OrganizationID int32 `db:"organization_id"`
|
||||
}
|
||||
|
||||
// Get the list of reports that are still open for a particular text message recipient
|
||||
// 'still open' is not well-defined throughout the system, but for now we'll go with
|
||||
// 'not reviewed in any way'.
|
||||
func reportsForTextRecipient(ctx context.Context, txn bob.Executor, destination types.E164) ([]reportIDs, error) {
|
||||
rows, err := bob.All(ctx, db.PGInstance.BobDB, psql.Select(
|
||||
sm.Columns(
|
||||
"r.id",
|
||||
"r.public_id",
|
||||
"r.organization_id",
|
||||
),
|
||||
sm.From("comms.text_job").As("t"),
|
||||
sm.InnerJoin("publicreport.report").As("r").OnEQ(
|
||||
psql.Quote("t", "report_id"),
|
||||
psql.Quote("r", "id"),
|
||||
),
|
||||
sm.Where(psql.Quote("t", "report_id").IsNotNull()),
|
||||
sm.Where(psql.Quote("t", "destination").EQ(psql.Arg(destination.PhoneString()))),
|
||||
sm.Where(psql.Quote("r", "status").EQ(psql.Arg(enums.PublicreportReportstatustypeReported))),
|
||||
), scan.StructMapper[reportIDs]())
|
||||
if err != nil {
|
||||
return []reportIDs{}, fmt.Errorf("query reports: %w", err)
|
||||
}
|
||||
|
||||
return rows, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,50 +5,47 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/Gleipnir-Technology/bob"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/comms/text"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/config"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
|
||||
modelcomms "github.com/Gleipnir-Technology/nidus-sync/db/gen/nidus-sync/comms/model"
|
||||
modelpublic "github.com/Gleipnir-Technology/nidus-sync/db/gen/nidus-sync/public/model"
|
||||
modelpublicreport "github.com/Gleipnir-Technology/nidus-sync/db/gen/nidus-sync/publicreport/model"
|
||||
querycomms "github.com/Gleipnir-Technology/nidus-sync/db/query/comms"
|
||||
querypublic "github.com/Gleipnir-Technology/nidus-sync/db/query/public"
|
||||
querypublicreport "github.com/Gleipnir-Technology/nidus-sync/db/query/publicreport"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/lint"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/models"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/platform/background"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/platform/event"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/platform/types"
|
||||
"github.com/aarondl/opt/omit"
|
||||
"github.com/aarondl/opt/omitnull"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func ensureInitialText(ctx context.Context, txn bob.Executor, dst types.E164) error {
|
||||
rows, err := models.CommsTextLogs.Query(
|
||||
models.SelectWhere.CommsTextLogs.Destination.EQ(dst.PhoneString()),
|
||||
models.SelectWhere.CommsTextLogs.IsWelcome.EQ(true),
|
||||
).All(ctx, txn)
|
||||
func ensureInitialText(ctx context.Context, txn db.Ex, dst types.E164) error {
|
||||
logs, err := querycomms.TextLogWelcomeFromDestination(ctx, txn, dst.PhoneString())
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to query text logs: %w", err)
|
||||
}
|
||||
if len(rows) > 0 {
|
||||
if len(logs) > 0 {
|
||||
return nil
|
||||
}
|
||||
return sendInitialText(ctx, txn, dst)
|
||||
}
|
||||
func resendInitialText(ctx context.Context, txn bob.Executor, dst types.E164) error {
|
||||
phone, err := models.FindCommsPhone(ctx, txn, dst.PhoneString())
|
||||
func resendInitialText(ctx context.Context, txn db.Ex, dst types.E164) error {
|
||||
phone, err := querycomms.ContactPhoneFromE164(ctx, txn, dst.PhoneString())
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to find phone %s: %w", dst, err)
|
||||
}
|
||||
err = phone.Update(ctx, txn, &models.CommsPhoneSetter{
|
||||
Status: omit.From(enums.CommsPhonestatustypeUnconfirmed),
|
||||
})
|
||||
err = querycomms.ContactPhoneUpdateStopMessageID(ctx, txn, phone.E164, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to clear subscription on phone %s: %w", dst, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func sendInitialText(ctx context.Context, txn bob.Executor, dst types.E164) error {
|
||||
func sendInitialText(ctx context.Context, txn db.Ex, dst types.E164) error {
|
||||
content := "Welcome to Report Mosquitoes Online. We received your request and want to confirm text updates. Reply YES to continue. Reply STOP at any time to unsubscribe"
|
||||
_, err := sendTextDirect(ctx, txn, enums.CommsTextoriginWebsiteAction, dst.PhoneString(), content, false, true)
|
||||
_, err := sendTextDirect(ctx, txn, modelcomms.Textorigin_WebsiteAction, dst.PhoneString(), content, false, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("send text: %w", err)
|
||||
}
|
||||
|
|
@ -57,21 +54,17 @@ func sendInitialText(ctx context.Context, txn bob.Executor, dst types.E164) erro
|
|||
|
||||
// Begin the process of sending the text message, but only get as far as adding it to
|
||||
// the database, then let the backend finish sending.
|
||||
func sendTextBegin(ctx context.Context, txn bob.Executor, user_id *int32, report_id *int32, destination types.E164, content string, type_ enums.CommsTextjobtype) (*int32, error) {
|
||||
err := EnsureInDB(ctx, txn, destination)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to ensure text message destination is in the DB: %w", err)
|
||||
}
|
||||
job, err := models.CommsTextJobs.Insert(&models.CommsTextJobSetter{
|
||||
Content: omit.From(content),
|
||||
CreatorID: omitnull.FromPtr(user_id),
|
||||
Created: omit.From(time.Now()),
|
||||
Destination: omit.From(destination.PhoneString()),
|
||||
func sendTextBegin(ctx context.Context, txn db.Ex, user_id *int32, report_id *int32, destination types.E164, content string, type_ modelcomms.Textjobtype) (*int32, error) {
|
||||
job, err := querycomms.TextJobInsert(ctx, txn, modelcomms.TextJob{
|
||||
Content: content,
|
||||
CreatorID: user_id,
|
||||
Created: time.Now(),
|
||||
Destination: destination.PhoneString(),
|
||||
//ID:
|
||||
ReportID: omitnull.FromPtr(report_id),
|
||||
Source: omit.From(enums.CommsTextjobsourceRmo),
|
||||
Type: omit.From(type_),
|
||||
}).One(ctx, txn)
|
||||
ReportID: report_id,
|
||||
Source: modelcomms.Textjobsource_Rmo,
|
||||
Type: type_,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to add delayed text job: %w", err)
|
||||
}
|
||||
|
|
@ -81,12 +74,12 @@ func sendTextBegin(ctx context.Context, txn bob.Executor, user_id *int32, report
|
|||
}
|
||||
return &job.ID, nil
|
||||
}
|
||||
func sendTextCommandResponse(ctx context.Context, txn bob.Executor, dst types.E164, content string) error {
|
||||
_, err := sendTextDirect(ctx, txn, enums.CommsTextoriginCommandResponse, dst.PhoneString(), content, false, false)
|
||||
func sendTextCommandResponse(ctx context.Context, txn db.Ex, dst types.E164, content string) error {
|
||||
_, err := sendTextDirect(ctx, txn, modelcomms.Textorigin_CommandResponse, dst.PhoneString(), content, false, false)
|
||||
return err
|
||||
}
|
||||
func sendTextComplete(ctx context.Context, job *models.CommsTextJob) error {
|
||||
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
|
||||
func sendTextComplete(ctx context.Context, job modelcomms.TextJob) error {
|
||||
txn, err := db.BeginTxn(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("begin tx: %w", err)
|
||||
}
|
||||
|
|
@ -95,12 +88,12 @@ func sendTextComplete(ctx context.Context, job *models.CommsTextJob) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("parse phone: %w", err)
|
||||
}
|
||||
var origin enums.CommsTextorigin
|
||||
var origin modelcomms.Textorigin
|
||||
switch job.Type {
|
||||
case enums.CommsTextjobtypeReportConfirmation:
|
||||
origin = enums.CommsTextoriginWebsiteAction
|
||||
case enums.CommsTextjobtypeReportMessage:
|
||||
origin = enums.CommsTextoriginDistrict
|
||||
case modelcomms.Textjobtype_ReportConfirmation:
|
||||
origin = modelcomms.Textorigin_WebsiteAction
|
||||
case modelcomms.Textjobtype_ReportMessage:
|
||||
origin = modelcomms.Textorigin_District
|
||||
default:
|
||||
return fmt.Errorf("incomplete switch: %s", string(job.Type))
|
||||
}
|
||||
|
|
@ -128,37 +121,35 @@ func sendTextComplete(ctx context.Context, job *models.CommsTextJob) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("send text direct: %w", err)
|
||||
}
|
||||
err = job.Update(ctx, txn, &models.CommsTextJobSetter{
|
||||
Completed: omitnull.From(time.Now()),
|
||||
})
|
||||
err = querycomms.TextJobComplete(ctx, txn, int64(job.ID))
|
||||
if err != nil {
|
||||
return fmt.Errorf("update job: %w", err)
|
||||
}
|
||||
if job.ReportID.IsValue() {
|
||||
creator_id := job.CreatorID.MustGet()
|
||||
report_id := job.ReportID.MustGet()
|
||||
if job.ReportID != nil {
|
||||
creator_id := *job.CreatorID
|
||||
report_id := *job.ReportID
|
||||
log.Debug().Int32("creator", creator_id).Int32("report_id", report_id).Msg("Creating report entries for text message")
|
||||
_, err := models.ReportTexts.Insert(&models.ReportTextSetter{
|
||||
CreatorID: omit.From(creator_id),
|
||||
ReportID: omit.From(report_id),
|
||||
TextLogID: omit.From(text_log.ID),
|
||||
}).One(ctx, txn)
|
||||
querypublic.ReportTextInsert(ctx, txn, modelpublic.ReportText{
|
||||
CreatorID: creator_id,
|
||||
ReportID: report_id,
|
||||
TextLogID: text_log.ID,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("insert report_text: %w", err)
|
||||
}
|
||||
_, err = models.PublicreportReportLogs.Insert(&models.PublicreportReportLogSetter{
|
||||
Created: omit.From(time.Now()),
|
||||
EmailLogID: omitnull.FromPtr[int32](nil),
|
||||
_, err = querypublicreport.ReportLogInsert(ctx, txn, modelpublicreport.ReportLog{
|
||||
Created: time.Now(),
|
||||
EmailLogID: nil,
|
||||
// ID
|
||||
ReportID: omit.From(report_id),
|
||||
TextLogID: omitnull.From(text_log.ID),
|
||||
Type: omit.From(enums.PublicreportReportlogtypeMessageText),
|
||||
UserID: omitnull.From(creator_id),
|
||||
}).One(ctx, txn)
|
||||
ReportID: report_id,
|
||||
TextLogID: &text_log.ID,
|
||||
Type: modelpublicreport.Reportlogtype_MessageText,
|
||||
UserID: &creator_id,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("insert report log: %w", err)
|
||||
}
|
||||
report, err := models.FindPublicreportReport(ctx, txn, report_id)
|
||||
report, err := querypublicreport.ReportFromID(ctx, txn, int64(report_id))
|
||||
if err != nil {
|
||||
return fmt.Errorf("find public report: %w", err)
|
||||
}
|
||||
|
|
@ -174,33 +165,28 @@ func sendTextComplete(ctx context.Context, job *models.CommsTextJob) error {
|
|||
|
||||
// Send a text message and save the appropriate database records.
|
||||
// Send immediately using the current goroutine
|
||||
func sendTextDirect(ctx context.Context, txn bob.Executor, origin enums.CommsTextorigin, destination, content string, is_visible_to_llm, is_welcome bool) (*models.CommsTextLog, error) {
|
||||
text_log, err := models.CommsTextLogs.Insert(&models.CommsTextLogSetter{
|
||||
//ID:
|
||||
Content: omit.From(content),
|
||||
Created: omit.From(time.Now()),
|
||||
Destination: omit.From(destination),
|
||||
IsVisibleToLLM: omit.From(is_visible_to_llm),
|
||||
IsWelcome: omit.From(is_welcome),
|
||||
Origin: omit.From(origin),
|
||||
Source: omit.From(config.PhoneNumberReportStr),
|
||||
TwilioSid: omitnull.FromPtr[string](nil),
|
||||
TwilioStatus: omit.From(""),
|
||||
}).One(ctx, txn)
|
||||
func sendTextDirect(ctx context.Context, txn db.Ex, origin modelcomms.Textorigin, destination, content string, is_visible_to_llm, is_welcome bool) (modelcomms.TextLog, error) {
|
||||
text_log, err := querycomms.TextLogInsert(ctx, txn, modelcomms.TextLog{
|
||||
Content: content,
|
||||
Created: time.Now(),
|
||||
Destination: destination,
|
||||
IsVisibleToLlm: is_visible_to_llm,
|
||||
IsWelcome: is_welcome,
|
||||
Origin: origin,
|
||||
Source: config.PhoneNumberReportStr,
|
||||
TwilioSid: nil,
|
||||
TwilioStatus: "",
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("insert text log: %w", err)
|
||||
return modelcomms.TextLog{}, fmt.Errorf("insert text log: %w", err)
|
||||
}
|
||||
pid, err := text.SendText(ctx, config.VoipMSNumber, destination, content)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("send text: %w", err)
|
||||
return modelcomms.TextLog{}, fmt.Errorf("send text: %w", err)
|
||||
}
|
||||
err = text_log.Update(ctx, txn, &models.CommsTextLogSetter{
|
||||
TwilioSid: omitnull.From(pid),
|
||||
TwilioStatus: omit.From("created"),
|
||||
})
|
||||
err = querycomms.TextLogUpdate(ctx, txn, int64(text_log.ID), pid, "created")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("update %w", err)
|
||||
return modelcomms.TextLog{}, fmt.Errorf("update %w", err)
|
||||
}
|
||||
|
||||
return text_log, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,8 +9,12 @@ import (
|
|||
"github.com/Gleipnir-Technology/nidus-sync/config"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/lint"
|
||||
modelcomms "github.com/Gleipnir-Technology/nidus-sync/db/gen/nidus-sync/comms/model"
|
||||
modelpublicreport "github.com/Gleipnir-Technology/nidus-sync/db/gen/nidus-sync/publicreport/model"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/models"
|
||||
querycomms "github.com/Gleipnir-Technology/nidus-sync/db/query/comms"
|
||||
querypublicreport "github.com/Gleipnir-Technology/nidus-sync/db/query/publicreport"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/lint"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/platform/background"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/platform/event"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/platform/types"
|
||||
|
|
@ -68,12 +72,12 @@ func HandleTextMessage(ctx context.Context, source string, destination string, c
|
|||
}
|
||||
|
||||
func respondText(ctx context.Context, log_id int32) error {
|
||||
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
|
||||
txn, err := db.BeginTxn(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("begin tx: %w", err)
|
||||
}
|
||||
defer lint.LogOnErrRollback(txn.Rollback, ctx, "rollback")
|
||||
l, err := models.FindCommsTextLog(ctx, txn, log_id)
|
||||
l, err := querycomms.TextLogFromID(ctx, txn, int64(log_id))
|
||||
if err != nil {
|
||||
return fmt.Errorf("find comms: %w", err)
|
||||
}
|
||||
|
|
@ -82,19 +86,19 @@ func respondText(ctx context.Context, log_id int32) error {
|
|||
return fmt.Errorf("parse source: %w", err)
|
||||
}
|
||||
|
||||
status, err := phoneStatus(ctx, *src)
|
||||
contact_phone, err := querycomms.ContactPhoneFromE164(ctx, txn, src.PhoneString())
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get phone status")
|
||||
return fmt.Errorf("Failed to get contact phone")
|
||||
}
|
||||
|
||||
body_l := strings.TrimSpace(strings.ToLower(l.Content))
|
||||
// If the user isn't confirmed for sending regular texts ensure they get a reprompt
|
||||
if status == enums.CommsPhonestatustypeUnconfirmed {
|
||||
if contact_phone.ConfirmedMessageID == nil {
|
||||
switch body_l {
|
||||
case "yes":
|
||||
err = setPhoneStatus(ctx, txn, *src, enums.CommsPhonestatustypeOkToSend)
|
||||
err = querycomms.ContactPhoneUpdateConfirmedMessageID(ctx, txn, src.PhoneString(), &l.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("set phone status: %w", err)
|
||||
return fmt.Errorf("set phone confirmed message ID: %w", err)
|
||||
}
|
||||
content := "Thanks, we've confirmed your phone number. You can text STOP at any time if you change your mind"
|
||||
err = sendTextCommandResponse(ctx, txn, *src, content)
|
||||
|
|
@ -118,14 +122,15 @@ func respondText(ctx context.Context, log_id int32) error {
|
|||
}
|
||||
switch body_l {
|
||||
case "stop":
|
||||
err = querycomms.ContactPhoneUpdateStopMessageID(ctx, txn, src.PhoneString(), &l.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("set phone stop message ID: %w", err)
|
||||
}
|
||||
content := "You have successfully been unsubscribed. You will not receive any more messages from this number. Reply START to resubscribe."
|
||||
err = sendTextCommandResponse(ctx, txn, *src, content)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to send unsubscribe acknowledgement.")
|
||||
}
|
||||
lint.LogOnErrCtx(func(ctx context.Context) error {
|
||||
return setPhoneStatus(ctx, txn, *src, enums.CommsPhonestatustypeStopped)
|
||||
}, ctx, "set phone status")
|
||||
return nil
|
||||
case "reset conversation":
|
||||
err = handleResetConversation(ctx, txn, *src)
|
||||
|
|
@ -140,20 +145,23 @@ func respondText(ctx context.Context, log_id int32) error {
|
|||
return nil
|
||||
}
|
||||
// If we've got an open public report from this phone number then we'll let the district respond
|
||||
reports, err := reportsForTextRecipient(ctx, txn, *src)
|
||||
// Get the list of reports that are still open for a particular text message recipient
|
||||
// 'still open' is not well-defined throughout the system, but for now we'll go with
|
||||
// 'not reviewed in any way'.
|
||||
reports, err := querypublicreport.ReportsFromReporterPhone(ctx, txn, src.PhoneString())
|
||||
if err != nil {
|
||||
return fmt.Errorf("has open report: %w", err)
|
||||
}
|
||||
for _, report := range reports {
|
||||
_, err = models.PublicreportReportLogs.Insert(&models.PublicreportReportLogSetter{
|
||||
Created: omit.From(time.Now()),
|
||||
EmailLogID: omitnull.FromPtr[int32](nil),
|
||||
_, err = querypublicreport.ReportLogInsert(ctx, txn, modelpublicreport.ReportLog{
|
||||
Created: time.Now(),
|
||||
EmailLogID: nil,
|
||||
// ID
|
||||
ReportID: omit.From(report.ID),
|
||||
TextLogID: omitnull.From(log_id),
|
||||
Type: omit.From(enums.PublicreportReportlogtypeMessageText),
|
||||
UserID: omitnull.FromPtr[int32](nil),
|
||||
}).One(ctx, txn)
|
||||
ReportID: report.ID,
|
||||
TextLogID: &log_id,
|
||||
Type: modelpublicreport.Reportlogtype_MessageText,
|
||||
UserID: nil,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("insert report log: %w", err)
|
||||
}
|
||||
|
|
@ -164,7 +172,8 @@ func respondText(ctx context.Context, log_id int32) error {
|
|||
return nil
|
||||
}
|
||||
// Otherwise let the LLM handle the response
|
||||
return respondTextLLM(ctx, *src)
|
||||
//return respondTextLLM(ctx, *src)
|
||||
return nil
|
||||
}
|
||||
|
||||
func respondTextLLM(ctx context.Context, src types.E164) error {
|
||||
|
|
@ -177,12 +186,12 @@ func respondTextLLM(ctx context.Context, src types.E164) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("Failed to generate next message: %w", err)
|
||||
}
|
||||
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
|
||||
txn, err := db.BeginTxn(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("start txn: %w", err)
|
||||
}
|
||||
defer lint.LogOnErrRollback(txn.Rollback, ctx, "rollback")
|
||||
_, err = sendTextDirect(ctx, txn, enums.CommsTextoriginLLM, src.PhoneString(), next_message.Content, true, false)
|
||||
_, err = sendTextDirect(ctx, txn, modelcomms.Textorigin_Llm, src.PhoneString(), next_message.Content, true, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to send response text: %w", err)
|
||||
}
|
||||
|
|
@ -201,22 +210,28 @@ func ParsePhoneNumber(input string) (*types.E164, error) {
|
|||
}
|
||||
|
||||
func StoreSources() error {
|
||||
var err error
|
||||
ctx := context.TODO()
|
||||
txn := db.PGInstance.PGXPool
|
||||
// Magical id 1 is set in migration 00151
|
||||
contact, err := querycomms.ContactFromID(ctx, txn, 1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("contact from ID 1: %w", err)
|
||||
}
|
||||
for _, n := range []string{config.PhoneNumberReportStr, config.PhoneNumberSupportStr, config.VoipMSNumber} {
|
||||
var err error
|
||||
// Deal with Voip.ms not expecting API calls with the prefixed +1
|
||||
if !strings.HasPrefix(n, "+1") {
|
||||
dest, err := ParsePhoneNumber("+1" + n)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse +1'%s' as phone number: %w", n, err)
|
||||
}
|
||||
err = EnsureInDB(ctx, db.PGInstance.BobDB, *dest)
|
||||
err = EnsureInDB(ctx, txn, contact, *dest)
|
||||
} else {
|
||||
dest, err := ParsePhoneNumber(n)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse '%s' as phone number: %w", n, err)
|
||||
}
|
||||
err = EnsureInDB(ctx, db.PGInstance.BobDB, *dest)
|
||||
err = EnsureInDB(ctx, txn, contact, *dest)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to add number '%s' to DB: %w", n, err)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue