From 9914274d42f00b6144807f0cb5e08c3b907c077a Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Tue, 27 Jan 2026 19:56:26 +0000 Subject: [PATCH] Wire in agent to the reporter texting system Also rework the so the platform absorbs all the business logic that was going in the wrong place. --- api/twilio.go | 6 +- auth/auth.go | 2 +- background/background.go | 2 +- background/email.go | 1 + background/text.go | 5 +- comms/text/text.go | 9 +-- llm/log.go | 4 +- main.go | 4 +- {comms => platform}/text/db.go | 0 {comms => platform}/text/job.go | 0 {comms => platform}/text/llm.go | 0 .../text/report-subscription.go | 48 +++++------ platform/{ => text}/text.go | 79 +++++++++++-------- public-report/quick.go | 2 +- 14 files changed, 86 insertions(+), 76 deletions(-) rename {comms => platform}/text/db.go (100%) rename {comms => platform}/text/job.go (100%) rename {comms => platform}/text/llm.go (100%) rename {comms => platform}/text/report-subscription.go (53%) rename platform/{ => text}/text.go (87%) diff --git a/api/twilio.go b/api/twilio.go index 91ee54ba..ef1d205c 100644 --- a/api/twilio.go +++ b/api/twilio.go @@ -4,7 +4,7 @@ import ( "fmt" "net/http" - "github.com/Gleipnir-Technology/nidus-sync/platform" + "github.com/Gleipnir-Technology/nidus-sync/platform/text" "github.com/rs/zerolog/log" "github.com/twilio/twilio-go/twiml" ) @@ -18,7 +18,7 @@ func twilioStatusPost(w http.ResponseWriter, r *http.Request) { message_sid := r.PostFormValue("MessageSid") message_status := r.PostFormValue("MessageStatus") log.Info().Str("sid", message_sid).Str("status", message_status).Msg("Updated message status") - platform.UpdateMessageStatus(message_sid, message_status) + text.UpdateMessageStatus(message_sid, message_status) fmt.Fprintf(w, "") } func twilioTextPost(w http.ResponseWriter, r *http.Request) { @@ -43,7 +43,7 @@ func twilioTextPost(w http.ResponseWriter, r *http.Request) { log.Info().Str("message_sid", message_sid).Str("account_sid", account_sid).Str("messaging_service_sid", messaging_service_sid).Str("from", from).Str("to_", to_).Str("body", body).Str("num_media", num_media).Str("num_segments", num_segments).Str("media_content_type0", media_content_type0).Str("media_url0", media_url0).Str("from_city", from_city).Str("from_state", from_state).Str("from_zip", from_zip).Str("from_country", from_country).Str("to_city", to_city).Str("to_state", to_state).Str("to_zip", to_zip).Str("to_country", to_country).Msg("got text") twiml, _ := twiml.Messages([]twiml.Element{}) - go platform.HandleTextMessage(from, to_, body) + go text.HandleTextMessage(from, to_, body) w.Header().Set("Content-Type", "text/xml") fmt.Fprintf(w, "%s", twiml) } diff --git a/auth/auth.go b/auth/auth.go index df88736b..6599d328 100644 --- a/auth/auth.go +++ b/auth/auth.go @@ -178,7 +178,7 @@ func findUser(ctx context.Context, user_id int) (*models.User, error) { return nil, err } } - log.Info().Int32("user_id", user.ID).Int32("org_id", user.OrganizationID).Msg("Found user") + //log.Info().Int32("user_id", user.ID).Int32("org_id", user.OrganizationID).Msg("Found user") return user, err } diff --git a/background/background.go b/background/background.go index a5c44584..92705b81 100644 --- a/background/background.go +++ b/background/background.go @@ -5,7 +5,7 @@ import ( "sync" "github.com/Gleipnir-Technology/nidus-sync/comms/email" - "github.com/Gleipnir-Technology/nidus-sync/comms/text" + "github.com/Gleipnir-Technology/nidus-sync/platform/text" ) var waitGroup sync.WaitGroup diff --git a/background/email.go b/background/email.go index ea05ac9f..ccdd464c 100644 --- a/background/email.go +++ b/background/email.go @@ -27,6 +27,7 @@ func enqueueJobEmail(job email.Job) { func startWorkerEmail(ctx context.Context, channel chan email.Job) { go func() { + log.Info().Msg("Email worker started") for { select { case <-ctx.Done(): diff --git a/background/text.go b/background/text.go index 91308769..8e9d920a 100644 --- a/background/text.go +++ b/background/text.go @@ -3,8 +3,8 @@ package background import ( "context" - "github.com/Gleipnir-Technology/nidus-sync/comms/text" "github.com/Gleipnir-Technology/nidus-sync/config" + "github.com/Gleipnir-Technology/nidus-sync/platform/text" "github.com/rs/zerolog/log" ) @@ -29,10 +29,11 @@ func enqueueJobText(job text.Job) { func startWorkerText(ctx context.Context, channel chan text.Job) { go func() { + log.Info().Msg("Text worker started") for { select { case <-ctx.Done(): - log.Info().Msg("Email worker shutting down.") + log.Info().Msg("Text worker shutting down.") return case job := <-channel: text.Handle(ctx, job) diff --git a/comms/text/text.go b/comms/text/text.go index c98e33fc..018cfdd8 100644 --- a/comms/text/text.go +++ b/comms/text/text.go @@ -6,18 +6,11 @@ import ( "fmt" "github.com/Gleipnir-Technology/nidus-sync/config" - "github.com/nyaruka/phonenumbers" "github.com/rs/zerolog/log" "github.com/twilio/twilio-go" twilioApi "github.com/twilio/twilio-go/rest/api/v2010" ) -type E164 = phonenumbers.PhoneNumber - -func ParsePhoneNumber(input string) (*E164, error) { - return phonenumbers.Parse(input, "US") -} - func SendText(ctx context.Context, source string, destination string, message string) (string, error) { client := twilio.NewRestClient() @@ -31,11 +24,11 @@ func SendText(ctx context.Context, source string, destination string, message st if err != nil { return "", fmt.Errorf("Failed to create message to %s: %w", destination, err) } - //log.Info().Str("dest", destination).Str("sid", *resp.Body).Msg("Text message response") if resp.Sid == nil { log.Warn().Str("src", source).Str("dst", destination).Msg("Text message sid is nil") return "", nil } + log.Info().Str("src", source).Str("dst", destination).Str("message", message).Str("sid", *resp.Sid).Msg("Created text message") return *resp.Sid, nil } diff --git a/llm/log.go b/llm/log.go index 9e305727..984bd88d 100644 --- a/llm/log.go +++ b/llm/log.go @@ -5,13 +5,13 @@ import ( "strings" "github.com/rs/zerolog" - "go.mau.fi/util/exzerolog" + //"go.mau.fi/util/exzerolog" ) type Logger = zerolog.Logger func linkLogger(logger *zerolog.Logger) { - exzerolog.SetupDefaults(logger) + //exzerolog.SetupDefaults(logger) } type ZerologWriter struct { diff --git a/main.go b/main.go index 426ea7c3..3c695f90 100644 --- a/main.go +++ b/main.go @@ -16,7 +16,7 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/llm" - "github.com/Gleipnir-Technology/nidus-sync/platform" + "github.com/Gleipnir-Technology/nidus-sync/platform/text" "github.com/Gleipnir-Technology/nidus-sync/public-report" nidussync "github.com/Gleipnir-Technology/nidus-sync/sync" "github.com/go-chi/chi/v5" @@ -48,7 +48,7 @@ func main() { os.Exit(3) } - err = platform.TextStoreSources() + err = text.StoreSources() if err != nil { log.Error().Err(err).Msg("Failed to store text source phone numbers") os.Exit(4) diff --git a/comms/text/db.go b/platform/text/db.go similarity index 100% rename from comms/text/db.go rename to platform/text/db.go diff --git a/comms/text/job.go b/platform/text/job.go similarity index 100% rename from comms/text/job.go rename to platform/text/job.go diff --git a/comms/text/llm.go b/platform/text/llm.go similarity index 100% rename from comms/text/llm.go rename to platform/text/llm.go diff --git a/comms/text/report-subscription.go b/platform/text/report-subscription.go similarity index 53% rename from comms/text/report-subscription.go rename to platform/text/report-subscription.go index 12328338..3fe39e84 100644 --- a/comms/text/report-subscription.go +++ b/platform/text/report-subscription.go @@ -4,8 +4,7 @@ import ( "context" "fmt" - //"github.com/Gleipnir-Technology/nidus-sync/db/enums" - //"github.com/Gleipnir-Technology/nidus-sync/platform" + "github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/nyaruka/phonenumbers" //"github.com/rs/zerolog/log" ) @@ -44,32 +43,33 @@ func (j jobReportSubscription) source() string { } func sendReportSubscription(ctx context.Context, job Job) error { - /* - j, ok := job.(jobReportSubscription) - if !ok { - return fmt.Errorf("job is not for report subscription confirmation") - } + j, ok := job.(jobReportSubscription) + if !ok { + return fmt.Errorf("job is not for report subscription confirmation") + } - sub, err := isSubscribed(ctx, job.destination()) + sub, err := isSubscribed(ctx, job.destination()) + if err != nil { + return fmt.Errorf("Failed to check if subscribed: %w", err) + } + if sub == nil { + err = delayMessage(ctx, j.source(), j.destination(), j.content(), enums.CommsTextjobtypeReportConfirmation) if err != nil { - return fmt.Errorf("Failed to check if subscribed: %w", err) + return fmt.Errorf("Failed to delay report subscription message: %w", err) } - if !sub { - err = sendText(ctx, j.source(), j.destination(), j.content(), enums.CommsTextoriginWebsiteAction, false) - if err != nil { - return fmt.Errorf("Failed to send report subscription confirmation: %w", err) - } - } else { - err = delayMessage(ctx, j.source(), j.destination(), j.content(), enums.CommsTextjobtypeReportConfirmation) - if err != nil { - return fmt.Errorf("Failed to delay report subscription message: %w", err) - } - err := ensureInitialText(ctx, j.source(), j.destination()) - if err != nil { - return fmt.Errorf("Failed to ensure initial text has been sent: %w", err) - } + err := ensureInitialText(ctx, j.source(), j.destination()) + if err != nil { + return fmt.Errorf("Failed to ensure initial text has been sent: %w", err) } return nil - */ + } + if *sub { + err = sendText(ctx, j.source(), j.destination(), j.content(), enums.CommsTextoriginWebsiteAction, false, true) + if err != nil { + return fmt.Errorf("Failed to send report subscription confirmation: %w", err) + } + } else { + resendInitialText(ctx, j.source(), j.destination()) + } return nil } diff --git a/platform/text.go b/platform/text/text.go similarity index 87% rename from platform/text.go rename to platform/text/text.go index 8d76da0f..84536653 100644 --- a/platform/text.go +++ b/platform/text/text.go @@ -1,4 +1,4 @@ -package platform +package text import ( "context" @@ -21,6 +21,8 @@ import ( "github.com/rs/zerolog/log" ) +type E164 = phonenumbers.PhoneNumber + func HandleTextMessage(from string, to string, body string) { ctx := context.Background() type_, src := splitPhoneSource(from) @@ -30,7 +32,7 @@ func HandleTextMessage(from string, to string, body string) { return } - _, err = insertTextLog(ctx, body, dst, src, enums.CommsTextoriginCustomer, false) + _, err = insertTextLog(ctx, body, dst, src, enums.CommsTextoriginCustomer, false, true) if err != nil { log.Error().Err(err).Str("dst", dst).Msg("Failed to add text message log") return @@ -51,12 +53,7 @@ func HandleTextMessage(from string, to string, body string) { handleWaitingTextJobs(ctx, src) default: content := "I have to start with either 'YES' or 'STOP' first, Which do you want?" - /*err := insertTextLog(ctx, body, src, dst, enums.CommsTextoriginReiteration, false) - if err != nil { - log.Error().Err(err).Msg("Failed to add reiteration to the text log") - return - }*/ - err = sendText(ctx, dst, src, content, enums.CommsTextoriginReiteration, false) + err = sendText(ctx, dst, src, content, enums.CommsTextoriginReiteration, false, false) if err != nil { log.Error().Err(err).Msg("Failed to resend initial prompt.") } @@ -79,14 +76,7 @@ func HandleTextMessage(from string, to string, body string) { log.Error().Err(err).Str("dst", dst).Str("src", from).Msg("Failed to generate next message") return } - /* - err = insertTextLog(ctx, next_message.Content, src, dst, enums.CommsTextoriginLLM, false) - if err != nil { - log.Error().Err(err).Str("dst", dst).Msg("Failed to insert new text message to the text log") - return - } - */ - err = sendText(ctx, src, dst, next_message.Content, enums.CommsTextoriginLLM, false) + err = sendText(ctx, dst, src, next_message.Content, enums.CommsTextoriginLLM, false, true) if err != nil { log.Error().Err(err).Str("src", src).Str("dst", dst).Str("content", next_message.Content).Msg("Failed to send response text") return @@ -94,7 +84,11 @@ func HandleTextMessage(from string, to string, body string) { log.Info().Str("from", from).Str("from-type", type_).Str("to", to).Str("src", src).Str("dst", dst).Str("body", body).Str("reply", next_message.Content).Msg("Handled text message") } -func TextStoreSources() error { +func ParsePhoneNumber(input string) (*E164, error) { + return phonenumbers.Parse(input, "US") +} + +func StoreSources() error { ctx := context.TODO() src := phonenumbers.Format(&config.PhoneNumberReport, phonenumbers.E164) return ensureInDB(ctx, src) @@ -132,9 +126,32 @@ func delayMessage(ctx context.Context, source string, destination string, conten return nil } +func resendInitialText(ctx context.Context, src string, dst string) error { + phone, err := models.FindCommsPhone(ctx, db.PGInstance.BobDB, dst) + if err != nil { + return fmt.Errorf("Failed to find phone %s: %w", dst, err) + } + err = phone.Update(ctx, db.PGInstance.BobDB, &models.CommsPhoneSetter{ + IsSubscribed: omitnull.FromPtr[bool](nil), + }) + if err != nil { + return fmt.Errorf("Failed to clear subscription on phone %s: %w", dst, err) + } + return nil +} + +func sendInitialText(ctx context.Context, src string, dst string) error { + content := "Welcome to Report Mosquitoes Online. We received your request and want to confirm text updates. Reply YES to continue. Reply STOP at any time to unsubscribe" + origin := enums.CommsTextoriginWebsiteAction + err := sendText(ctx, src, dst, content, origin, true, true) + if err != nil { + return fmt.Errorf("Failed to send initial confirmation: %w", err) + } + return nil +} + func ensureInitialText(ctx context.Context, src string, dst string) error { // - origin := enums.CommsTextoriginWebsiteAction rows, err := models.CommsTextLogs.Query( models.SelectWhere.CommsTextLogs.Destination.EQ(dst), models.SelectWhere.CommsTextLogs.IsWelcome.EQ(true), @@ -145,12 +162,7 @@ func ensureInitialText(ctx context.Context, src string, dst string) error { if len(rows) > 0 { return nil } - content := "Welcome to Report Mosquitoes Online. We received your request and want to confirm text updates. Reply YES to continue. Reply STOP at any time to unsubscribe" - err = sendText(ctx, src, dst, content, origin, true) - if err != nil { - return fmt.Errorf("Failed to send initial confirmation: %w", err) - } - return nil + return sendInitialText(ctx, src, dst) } func ensureInDB(ctx context.Context, destination string) (err error) { @@ -213,14 +225,14 @@ func handleResetConversation(ctx context.Context, src string, dst string) { if err != nil { log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("Failed to wipe memory") content := "Failed to wip memory" - err = sendText(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false) + err = sendText(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false, false) if err != nil { log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("Failed to indicated memory wipe failure.") } return } content := "LLM memory wiped" - err = sendText(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false) + err = sendText(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false, false) if err != nil { log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("Failed to indicated memory wiped.") return @@ -228,13 +240,13 @@ func handleResetConversation(ctx context.Context, src string, dst string) { log.Info().Err(err).Str("src", src).Str("dst", dst).Msg("Wiped LLM memory") } -func insertTextLog(ctx context.Context, content string, destination string, source string, origin enums.CommsTextorigin, is_welcome bool) (log *models.CommsTextLog, err error) { +func insertTextLog(ctx context.Context, content string, destination string, source string, origin enums.CommsTextorigin, is_welcome bool, is_visible_to_llm bool) (log *models.CommsTextLog, err error) { log, err = models.CommsTextLogs.Insert(&models.CommsTextLogSetter{ //ID: Content: omit.From(content), Created: omit.From(time.Now()), Destination: omit.From(destination), - IsVisibleToLLM: omit.From(true), + IsVisibleToLLM: omit.From(is_visible_to_llm), IsWelcome: omit.From(is_welcome), Origin: omit.From(origin), Source: omit.From(source), @@ -263,7 +275,6 @@ func loadPreviousMessagesForLLM(ctx context.Context, dst, src string) ([]llm.Mes if err != nil { return results, fmt.Errorf("Failed to get message history for %s and %s: %w", dst, src, err) } - log.Info().Int("count", len(messages)).Str("src", src).Str("dst", dst).Msg("Found previous messages") for _, m := range messages { if m.IsVisibleToLLM { is_from_customer := (m.Source == src) @@ -276,12 +287,12 @@ func loadPreviousMessagesForLLM(ctx context.Context, dst, src string) ([]llm.Mes return results, nil } -func sendText(ctx context.Context, source string, destination string, message string, origin enums.CommsTextorigin, is_welcome bool) error { +func sendText(ctx context.Context, source string, destination string, message string, origin enums.CommsTextorigin, is_welcome bool, is_visible_to_llm bool) error { err := ensureInDB(ctx, destination) if err != nil { return fmt.Errorf("Failed to ensure text message destination is in the DB: %w", err) } - log, err := insertTextLog(ctx, message, destination, source, origin, is_welcome) + l, err := insertTextLog(ctx, message, destination, source, origin, is_welcome, is_visible_to_llm) if err != nil { return fmt.Errorf("Failed to insert text message in the DB: %w", err) } @@ -289,10 +300,14 @@ func sendText(ctx context.Context, source string, destination string, message st if err != nil { return fmt.Errorf("Failed to send text message: %w", err) } - err = log.Update(ctx, db.PGInstance.BobDB, &models.CommsTextLogSetter{ + err = l.Update(ctx, db.PGInstance.BobDB, &models.CommsTextLogSetter{ TwilioSid: omitnull.From(sid), TwilioStatus: omit.From("created"), }) + if err != nil { + return fmt.Errorf("Failed to update text Twilio status: %w", err) + } + log.Info().Int32("id", l.ID).Bool("is_visible_to_llm", is_visible_to_llm).Str("message", message).Msg("inserted text log") return nil } diff --git a/public-report/quick.go b/public-report/quick.go index 4feea1a5..708b5953 100644 --- a/public-report/quick.go +++ b/public-report/quick.go @@ -11,7 +11,6 @@ import ( "github.com/Gleipnir-Technology/bob/dialect/psql" "github.com/Gleipnir-Technology/bob/dialect/psql/um" "github.com/Gleipnir-Technology/nidus-sync/background" - "github.com/Gleipnir-Technology/nidus-sync/comms/text" "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/enums" @@ -19,6 +18,7 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/h3utils" "github.com/Gleipnir-Technology/nidus-sync/htmlpage" "github.com/Gleipnir-Technology/nidus-sync/platform" + "github.com/Gleipnir-Technology/nidus-sync/platform/text" "github.com/aarondl/opt/omit" "github.com/aarondl/opt/omitnull" "github.com/rs/zerolog/log"