nidus-sync/platform/text/text.go

389 lines
14 KiB
Go

package text
import (
"context"
"fmt"
"strings"
"time"
"github.com/Gleipnir-Technology/bob/dialect/psql"
"github.com/Gleipnir-Technology/bob/dialect/psql/um"
"github.com/Gleipnir-Technology/nidus-sync/comms/text"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/db/sql"
"github.com/Gleipnir-Technology/nidus-sync/llm"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
"github.com/nyaruka/phonenumbers"
"github.com/rs/zerolog/log"
)
type E164 = phonenumbers.PhoneNumber
func EnsureInDB(ctx context.Context, destination E164) (err error) {
return ensureInDB(ctx, PhoneString(destination))
}
func HandleTextMessage(src string, dst string, body string) {
ctx := context.Background()
_, err := insertTextLog(ctx, body, dst, src, enums.CommsTextoriginCustomer, false, true)
if err != nil {
log.Error().Err(err).Str("dst", dst).Msg("Failed to add text message log")
return
}
status, err := phoneStatus(ctx, src)
if err != nil {
log.Error().Err(err).Msg("Failed to get phone status")
return
}
body_l := strings.TrimSpace(strings.ToLower(body))
// We don't know if they're subscribed or not.
if status == enums.CommsPhonestatustypeUnconfirmed {
switch body_l {
case "yes":
setPhoneStatus(ctx, src, enums.CommsPhonestatustypeOkToSend)
content := "Thanks, we've confirmed your phone number. You can text STOP at any time if you change your mind"
err := sendText(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false, false)
if err != nil {
log.Error().Err(err).Msg("Failed to send confirmation response")
}
handleWaitingTextJobs(ctx, src)
default:
content := "I have to start with either 'YES' or 'STOP' first, Which do you want?"
err = sendText(ctx, dst, src, content, enums.CommsTextoriginReiteration, false, false)
if err != nil {
log.Error().Err(err).Msg("Failed to resend initial prompt.")
}
}
return
}
switch body_l {
case "stop":
content := "You have successfully been unsubscribed. You will not receive any more messages from this number. Reply START to resubscribe."
err = sendText(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false, false)
if err != nil {
log.Error().Err(err).Msg("Failed to send unsubscribe acknowledgement.")
}
setPhoneStatus(ctx, src, enums.CommsPhonestatustypeStopped)
return
case "reset conversation":
handleResetConversation(ctx, src, dst)
return
default:
}
previous_messages, err := loadPreviousMessagesForLLM(ctx, dst, src)
if err != nil {
log.Error().Err(err).Str("dst", dst).Str("src", src).Msg("Failed to get previous messages")
return
}
log.Info().Int("len", len(previous_messages)).Msg("passing")
next_message, err := generateNextMessage(ctx, previous_messages, src)
if err != nil {
log.Error().Err(err).Str("dst", dst).Str("src", src).Msg("Failed to generate next message")
return
}
err = sendText(ctx, dst, src, next_message.Content, enums.CommsTextoriginLLM, false, true)
if err != nil {
log.Error().Err(err).Str("src", src).Str("dst", dst).Str("content", next_message.Content).Msg("Failed to send response text")
return
}
log.Info().Str("src", src).Str("dst", dst).Str("body", body).Str("reply", next_message.Content).Msg("Handled text message")
}
func ParsePhoneNumber(input string) (*E164, error) {
return phonenumbers.Parse(input, "US")
}
func PhoneString(p E164) string {
return phonenumbers.Format(&p, phonenumbers.E164)
}
func StoreSources() error {
ctx := context.TODO()
for _, n := range []string{config.PhoneNumberReportStr, config.PhoneNumberSupportStr, config.VoipMSNumber} {
var err error
// Deal with Voip.ms not expecting API calls with the prefixed +1
if !strings.HasPrefix(n, "+1") {
dest, err := ParsePhoneNumber("+1" + n)
if err != nil {
return fmt.Errorf("Failed to parse +1'%s' as phone number: %w", n, err)
}
err = EnsureInDB(ctx, *dest)
} else {
dest, err := ParsePhoneNumber(n)
if err != nil {
return fmt.Errorf("Failed to parse '%s' as phone number: %w", n, err)
}
err = EnsureInDB(ctx, *dest)
}
if err != nil {
return fmt.Errorf("Failed to add number '%s' to DB: %w", n, err)
}
}
return nil
}
func UpdateMessageStatus(twilio_sid string, status string) {
ctx := context.TODO()
l, err := models.CommsTextLogs.Query(
models.SelectWhere.CommsTextLogs.TwilioSid.EQ(twilio_sid),
).One(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Str("twilio_sid", twilio_sid).Str("status", status).Msg("Failed to update message status query failed")
return
}
err = l.Update(ctx, db.PGInstance.BobDB, &models.CommsTextLogSetter{
TwilioStatus: omit.From(status),
})
if err != nil {
log.Error().Err(err).Str("twilio_sid", twilio_sid).Str("status", status).Msg("Failed to update message status update failed")
return
}
}
func delayMessage(ctx context.Context, source enums.CommsTextjobsource, destination string, content string, type_ enums.CommsTextjobtype) error {
job, err := models.CommsTextJobs.Insert(&models.CommsTextJobSetter{
Content: omit.From(content),
Created: omit.From(time.Now()),
Destination: omit.From(destination),
//ID:
Source: omit.From(source),
Type: omit.From(type_),
}).One(ctx, db.PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to add delayed text job: %w", err)
}
log.Info().Int32("id", job.ID).Msg("Created delayed text job")
return nil
}
func resendInitialText(ctx context.Context, src string, dst string) error {
phone, err := models.FindCommsPhone(ctx, db.PGInstance.BobDB, dst)
if err != nil {
return fmt.Errorf("Failed to find phone %s: %w", dst, err)
}
err = phone.Update(ctx, db.PGInstance.BobDB, &models.CommsPhoneSetter{
Status: omit.From(enums.CommsPhonestatustypeUnconfirmed),
})
if err != nil {
return fmt.Errorf("Failed to clear subscription on phone %s: %w", dst, err)
}
return nil
}
func sendInitialText(ctx context.Context, src string, dst string) error {
content := "Welcome to Report Mosquitoes Online. We received your request and want to confirm text updates. Reply YES to continue. Reply STOP at any time to unsubscribe"
origin := enums.CommsTextoriginWebsiteAction
err := sendText(ctx, src, dst, content, origin, true, true)
if err != nil {
return fmt.Errorf("Failed to send initial confirmation: %w", err)
}
return nil
}
func ensureInDB(ctx context.Context, destination string) (err error) {
_, err = models.FindCommsPhone(ctx, db.PGInstance.BobDB, destination)
if err != nil {
// doesn't exist
if err.Error() == "sql: no rows in result set" {
_, err = models.CommsPhones.Insert(&models.CommsPhoneSetter{
E164: omit.From(destination),
IsSubscribed: omit.From(false),
Status: omit.From(enums.CommsPhonestatustypeUnconfirmed),
}).One(ctx, db.PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to insert new phone contact: %w", err)
}
log.Info().Str("phone", destination).Msg("Added text to the comms database")
return nil
}
return fmt.Errorf("Unexpected error searching for phone contact: %w", err)
}
return nil
}
func ensureInitialText(ctx context.Context, src string, dst string) error {
//
rows, err := models.CommsTextLogs.Query(
models.SelectWhere.CommsTextLogs.Destination.EQ(dst),
models.SelectWhere.CommsTextLogs.IsWelcome.EQ(true),
).All(ctx, db.PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to query text logs: %w", err)
}
if len(rows) > 0 {
return nil
}
return sendInitialText(ctx, src, dst)
}
func generateNextMessage(ctx context.Context, history []llm.Message, customer_phone string) (llm.Message, error) {
_handle_report_status := func() (string, error) {
return "Report: ABCD-1234-5678, District: Delta MVCD, Status: scheduled, Appointment: Wednesday 3:30pm", nil
}
_handle_contact_district := func(reason string) {
log.Warn().Str("reason", reason).Msg("Contacting district")
}
_handle_contact_supervisor := func(reason string) {
log.Warn().Str("reason", reason).Msg("Contacting supervisor")
}
return llm.GenerateNextMessage(ctx, history, _handle_report_status, _handle_contact_district, _handle_contact_supervisor)
}
func handleWaitingTextJobs(ctx context.Context, src string) {
jobs, err := models.CommsTextJobs.Query(
models.SelectWhere.CommsTextJobs.Destination.EQ(src),
models.SelectWhere.CommsTextJobs.Completed.IsNull(),
).All(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to query for jobs")
return
}
for _, job := range jobs {
var src string
switch job.Source {
case enums.CommsTextjobsourceRmo:
src = config.PhoneNumberReportStr
//case enums.CommsTextJobsourcenidus:
//src := config.PhoneNumebrNidusStr
default:
log.Error().Str("source", job.Source.String()).Msg("Can't support background text job.")
}
err = sendText(ctx, src, job.Destination, job.Content, enums.CommsTextoriginWebsiteAction, false, true)
if err != nil {
log.Error().Err(err).Int32("id", job.ID).Msg("Failed to send delayed text job.")
continue
}
err := job.Update(ctx, db.PGInstance.BobDB, &models.CommsTextJobSetter{
Completed: omitnull.From(time.Now()),
})
if err != nil {
log.Error().Err(err).Int32("id", job.ID).Msg("Failed to update delayed text job.")
continue
}
}
}
func handleResetConversation(ctx context.Context, src string, dst string) {
err := wipeLLMMemory(ctx, src, dst)
if err != nil {
log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("Failed to wipe memory")
content := "Failed to wip memory"
err = sendText(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false, false)
if err != nil {
log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("Failed to indicated memory wipe failure.")
}
return
}
content := "LLM memory wiped"
err = sendText(ctx, dst, src, content, enums.CommsTextoriginCommandResponse, false, false)
if err != nil {
log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("Failed to indicated memory wiped.")
return
}
log.Info().Err(err).Str("src", src).Str("dst", dst).Msg("Wiped LLM memory")
}
func insertTextLog(ctx context.Context, content string, destination string, source string, origin enums.CommsTextorigin, is_welcome bool, is_visible_to_llm bool) (log *models.CommsTextLog, err error) {
log, err = models.CommsTextLogs.Insert(&models.CommsTextLogSetter{
//ID:
Content: omit.From(content),
Created: omit.From(time.Now()),
Destination: omit.From(destination),
IsVisibleToLLM: omit.From(is_visible_to_llm),
IsWelcome: omit.From(is_welcome),
Origin: omit.From(origin),
Source: omit.From(source),
TwilioSid: omitnull.FromPtr[string](nil),
TwilioStatus: omit.From(""),
}).One(ctx, db.PGInstance.BobDB)
return log, err
}
func phoneStatus(ctx context.Context, src string) (enums.CommsPhonestatustype, error) {
phone, err := models.FindCommsPhone(ctx, db.PGInstance.BobDB, src)
if err != nil {
return enums.CommsPhonestatustypeUnconfirmed, fmt.Errorf("Failed to determine if '%s' is subscribed: %w", src, err)
}
return phone.Status, nil
}
func loadPreviousMessagesForLLM(ctx context.Context, dst, src string) ([]llm.Message, error) {
messages, err := sql.TextsBySenders(dst, src).All(ctx, db.PGInstance.BobDB)
results := make([]llm.Message, 0)
if err != nil {
return results, fmt.Errorf("Failed to get message history for %s and %s: %w", dst, src, err)
}
for _, m := range messages {
if m.IsVisibleToLLM {
is_from_customer := (m.Source == src)
results = append(results, llm.Message{
IsFromCustomer: is_from_customer,
Content: m.Content,
})
}
}
return results, nil
}
func sendText(ctx context.Context, source string, destination string, message string, origin enums.CommsTextorigin, is_welcome bool, is_visible_to_llm bool) error {
err := ensureInDB(ctx, destination)
if err != nil {
return fmt.Errorf("Failed to ensure text message destination is in the DB: %w", err)
}
l, err := insertTextLog(ctx, message, destination, source, origin, is_welcome, is_visible_to_llm)
if err != nil {
return fmt.Errorf("Failed to insert text message in the DB: %w", err)
}
sid, err := text.SendText(ctx, source, destination, message)
if err != nil {
return fmt.Errorf("Failed to send text message: %w", err)
}
err = l.Update(ctx, db.PGInstance.BobDB, &models.CommsTextLogSetter{
TwilioSid: omitnull.From(sid),
TwilioStatus: omit.From("created"),
})
if err != nil {
return fmt.Errorf("Failed to update text Twilio status: %w", err)
}
log.Info().Int32("id", l.ID).Bool("is_visible_to_llm", is_visible_to_llm).Str("message", message).Msg("inserted text log")
return nil
}
func setPhoneStatus(ctx context.Context, src string, status enums.CommsPhonestatustype) error {
phone, err := models.FindCommsPhone(ctx, db.PGInstance.BobDB, src)
if err != nil {
return fmt.Errorf("Failed to determine if '%s' is subscribed: %w", src, err)
}
phone.Update(ctx, db.PGInstance.BobDB, &models.CommsPhoneSetter{
Status: omit.From(status),
})
log.Info().Str("src", src).Str("status", string(status)).Msg("Set number subscribed")
return nil
}
func wipeLLMMemory(ctx context.Context, src string, dst string) error {
rows, err := sql.TextsBySenders(dst, src).All(ctx, db.PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to query for texts: %w", err)
}
ids := make([]int32, 0)
for _, r := range rows {
ids = append(ids, r.ID)
}
_, err = models.CommsTextLogs.Update(
um.Where(
models.CommsTextLogs.Columns.ID.EQ(psql.Any(ids)),
),
um.SetCol("is_visible_to_llm").ToArg(false),
).Exec(ctx, db.PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to update texts: %w", err)
}
return nil
}