diff --git a/api/twilio.go b/api/twilio.go index 7323eb69..9a252e80 100644 --- a/api/twilio.go +++ b/api/twilio.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" + "github.com/Gleipnir-Technology/nidus-sync/platform" "github.com/rs/zerolog/log" "github.com/twilio/twilio-go/twiml" ) @@ -39,11 +40,9 @@ func twilioTextPost(w http.ResponseWriter, r *http.Request) { to_zip := r.PostFormValue("ToZip") to_country := r.PostFormValue("ToCountry") log.Info().Str("message_sid", message_sid).Str("account_sid", account_sid).Str("messaging_service_sid", messaging_service_sid).Str("from", from).Str("to_", to_).Str("body", body).Str("num_media", num_media).Str("num_segments", num_segments).Str("media_content_type0", media_content_type0).Str("media_url0", media_url0).Str("from_city", from_city).Str("from_state", from_state).Str("from_zip", from_zip).Str("from_country", from_country).Str("to_city", to_city).Str("to_state", to_state).Str("to_zip", to_zip).Str("to_country", to_country).Msg("got text") - twiml, _ := twiml.Messages([]twiml.Element{ - &twiml.MessagingMessage{ - Body: "Hey there.", - }, - }) + + twiml, _ := twiml.Messages([]twiml.Element{}) + go platform.HandleTextMessage(from, to_, body) w.Header().Set("Content-Type", "text/xml") fmt.Fprintf(w, "%s", twiml) } diff --git a/comms/text/db.go b/comms/text/db.go index 1795a061..9801e1ca 100644 --- a/comms/text/db.go +++ b/comms/text/db.go @@ -68,12 +68,13 @@ func ensureInDB(ctx context.Context, destination string) (err error) { return nil } -func insertTextLog(ctx context.Context, content string, destination string, source string, origin enums.CommsTextorigin) (err error) { +func insertTextLog(ctx context.Context, content string, destination string, source string, origin enums.CommsTextorigin, is_welcome bool) (err error) { _, err = models.CommsTextLogs.Insert(&models.CommsTextLogSetter{ //ID: Content: omit.From(content), Created: omit.From(time.Now()), Destination: omit.From(destination), + IsWelcome: omit.From(is_welcome), Origin: omit.From(origin), Source: omit.From(source), }).One(ctx, db.PGInstance.BobDB) diff --git a/comms/text/initial.go b/comms/text/initial.go index d2bf35a5..0c25a62c 100644 --- a/comms/text/initial.go +++ b/comms/text/initial.go @@ -23,9 +23,15 @@ func ensureInitialText(ctx context.Context, src string, dst string) error { return nil } content := "Welcome to Report Mosquitoes Online. We received your request and want to confirm text updates. Reply YES to continue. Reply STOP at any time to unsubscribe" - err = sendText(ctx, src, dst, content, origin) + err = sendText(ctx, src, dst, content, origin, true) if err != nil { return fmt.Errorf("Failed to send initial confirmation: %w", err) } return nil } + +func SendInitialReprompt(ctx context.Context, src string, dst string) error { + content := "I have to start with either 'YES' or 'STOP' first, Which do you want?" + err := sendText(ctx, src, dst, content, enums.CommsTextoriginLLM, false) + return err +} diff --git a/comms/text/llm.go b/comms/text/llm.go new file mode 100644 index 00000000..efd664fa --- /dev/null +++ b/comms/text/llm.go @@ -0,0 +1,9 @@ +package text + +import ( + "github.com/rs/zerolog/log" +) + +func SendTextFromLLM(content string) { + log.Info().Str("content", content).Msg("Pretend I sent a message") +} diff --git a/comms/text/report-subscription.go b/comms/text/report-subscription.go index 092b9ee2..c43f6b89 100644 --- a/comms/text/report-subscription.go +++ b/comms/text/report-subscription.go @@ -53,7 +53,7 @@ func sendReportSubscription(ctx context.Context, job Job) error { return fmt.Errorf("Failed to check if subscribed: %w", err) } if !sub { - err = sendText(ctx, j.source(), j.destination(), j.content(), enums.CommsTextoriginWebsiteAction) + err = sendText(ctx, j.source(), j.destination(), j.content(), enums.CommsTextoriginWebsiteAction, false) if err != nil { return fmt.Errorf("Failed to send report subscription confirmation: %w", err) } diff --git a/comms/text/text.go b/comms/text/text.go index 415a301d..8384405f 100644 --- a/comms/text/text.go +++ b/comms/text/text.go @@ -19,12 +19,12 @@ func ParsePhoneNumber(input string) (*E164, error) { return phonenumbers.Parse(input, "US") } -func sendText(ctx context.Context, source string, destination string, message string, origin enums.CommsTextorigin) error { +func sendText(ctx context.Context, source string, destination string, message string, origin enums.CommsTextorigin, is_welcome bool) error { err := ensureInDB(ctx, destination) if err != nil { return fmt.Errorf("Failed to ensure text message destination is in the DB: %w", err) } - err = insertTextLog(ctx, message, destination, source, origin) + err = insertTextLog(ctx, message, destination, source, origin, is_welcome) if err != nil { return fmt.Errorf("Failed to insert text message in the DB: %w", err) } diff --git a/config/config.go b/config/config.go index 09b356f2..9b7461ab 100644 --- a/config/config.go +++ b/config/config.go @@ -27,9 +27,11 @@ var ( MapboxToken string PGDSN string PhoneNumberReport phonenumbers.PhoneNumber + PhoneNumberReportStr string TwilioAuthToken string TwilioAccountSID string TwilioMessagingServiceSID string + TwilioRCSSenderRMO string ) func IsProductionEnvironment() bool { @@ -127,13 +129,13 @@ func Parse() (err error) { if PGDSN == "" { return fmt.Errorf("You must specify a non-empty POSTGRES_DSN") } - rmo_phone_number := os.Getenv("RMO_PHONE_NUMBER") - if rmo_phone_number == "" { + PhoneNumberReportStr = os.Getenv("RMO_PHONE_NUMBER") + if PhoneNumberReportStr == "" { return fmt.Errorf("You must specify a non-empty RMO_PHONE_NUMBER") } - p, err := phonenumbers.Parse(rmo_phone_number, "US") + p, err := phonenumbers.Parse(PhoneNumberReportStr, "US") if err != nil { - return fmt.Errorf("Failed to parse '%s' as a valid phone number: %w", rmo_phone_number, err) + return fmt.Errorf("Failed to parse '%s' as a valid phone number: %w", PhoneNumberReportStr, err) } PhoneNumberReport = *p @@ -149,6 +151,10 @@ func Parse() (err error) { if TwilioMessagingServiceSID == "" { return fmt.Errorf("You must specify a non-empty TWILIO_MESSAGING_SERVICE_SID") } + TwilioRCSSenderRMO = os.Getenv("TWILIO_RCS_SENDER_RMO") + if TwilioRCSSenderRMO == "" { + return fmt.Errorf("You must specify a non-empty TWILIO_RCS_SENDER_RMO") + } return nil } diff --git a/db/sql/texts_by_senders.bob.go b/db/sql/texts_by_senders.bob.go new file mode 100644 index 00000000..3a8ce5bb --- /dev/null +++ b/db/sql/texts_by_senders.bob.go @@ -0,0 +1,135 @@ +// Code generated by BobGen psql v0.42.1. DO NOT EDIT. +// This file is meant to be re-generated in place and/or deleted at any time. + +package sql + +import ( + "context" + _ "embed" + "io" + "iter" + "time" + + enums "github.com/Gleipnir-Technology/nidus-sync/db/enums" + "github.com/stephenafamo/bob" + "github.com/stephenafamo/bob/dialect/psql" + "github.com/stephenafamo/bob/dialect/psql/dialect" + "github.com/stephenafamo/bob/orm" + "github.com/stephenafamo/scan" +) + +//go:embed texts_by_senders.bob.sql +var formattedQueries_texts_by_senders string + +var textsBySendersSQL = formattedQueries_texts_by_senders[152:393] + +type TextsBySendersQuery = orm.ModQuery[*dialect.SelectQuery, textsBySenders, TextsBySendersRow, []TextsBySendersRow, textsBySendersTransformer] + +func TextsBySenders(Destination string, Source string) *TextsBySendersQuery { + var expressionTypArgs textsBySenders + + expressionTypArgs.Destination = psql.Arg(Destination) + expressionTypArgs.Source = psql.Arg(Source) + + return &TextsBySendersQuery{ + Query: orm.Query[textsBySenders, TextsBySendersRow, []TextsBySendersRow, textsBySendersTransformer]{ + ExecQuery: orm.ExecQuery[textsBySenders]{ + BaseQuery: bob.BaseQuery[textsBySenders]{ + Expression: expressionTypArgs, + Dialect: dialect.Dialect, + QueryType: bob.QueryTypeSelect, + }, + }, + Scanner: func(context.Context, []string) (func(*scan.Row) (any, error), func(any) (TextsBySendersRow, error)) { + return func(row *scan.Row) (any, error) { + var t TextsBySendersRow + row.ScheduleScanByIndex(0, &t.ID) + row.ScheduleScanByIndex(1, &t.Content) + row.ScheduleScanByIndex(2, &t.Created) + row.ScheduleScanByIndex(3, &t.Source) + row.ScheduleScanByIndex(4, &t.Destination) + row.ScheduleScanByIndex(5, &t.IsWelcome) + row.ScheduleScanByIndex(6, &t.Origin) + return &t, nil + }, func(v any) (TextsBySendersRow, error) { + return *(v.(*TextsBySendersRow)), nil + } + }, + }, + Mod: bob.ModFunc[*dialect.SelectQuery](func(q *dialect.SelectQuery) { + q.AppendSelect(expressionTypArgs.subExpr(12, 97)) + q.SetTable(expressionTypArgs.subExpr(108, 122)) + q.AppendWhere(expressionTypArgs.subExpr(135, 214)) + q.CombinedOrder.AppendOrder(expressionTypArgs.subExpr(230, 241)) + }), + } +} + +type TextsBySendersRow = struct { + ID int32 `db:"id"` + Content string `db:"content"` + Created time.Time `db:"created"` + Source string `db:"source"` + Destination string `db:"destination"` + IsWelcome bool `db:"is_welcome"` + Origin enums.CommsTextorigin `db:"origin"` +} + +type textsBySendersTransformer = bob.SliceTransformer[TextsBySendersRow, []TextsBySendersRow] + +type textsBySenders struct { + Destination bob.Expression + Source bob.Expression +} + +func (o textsBySenders) args() iter.Seq[orm.ArgWithPosition] { + return func(yield func(arg orm.ArgWithPosition) bool) { + if !yield(orm.ArgWithPosition{ + Name: "destination", + Start: 144, + Stop: 146, + Expression: o.Destination, + }) { + return + } + + if !yield(orm.ArgWithPosition{ + Name: "source", + Start: 165, + Stop: 167, + Expression: o.Source, + }) { + return + } + + if !yield(orm.ArgWithPosition{ + Name: "source", + Start: 191, + Stop: 193, + Expression: o.Source, + }) { + return + } + + if !yield(orm.ArgWithPosition{ + Name: "destination", + Start: 212, + Stop: 214, + Expression: o.Destination, + }) { + return + } + } +} + +func (o textsBySenders) raw(from, to int) string { + return textsBySendersSQL[from:to] +} + +func (o textsBySenders) subExpr(from, to int) bob.Expression { + return orm.ArgsToExpression(textsBySendersSQL, from, to, o.args()) +} + +func (o textsBySenders) WriteSQL(ctx context.Context, w io.StringWriter, d bob.Dialect, start int) ([]any, error) { + return o.subExpr(0, len(textsBySendersSQL)).WriteSQL(ctx, w, d, start) +} diff --git a/db/sql/texts_by_senders.bob.sql b/db/sql/texts_by_senders.bob.sql new file mode 100644 index 00000000..fbe09f0c --- /dev/null +++ b/db/sql/texts_by_senders.bob.sql @@ -0,0 +1,20 @@ +-- Code generated by BobGen psql v0.42.1. DO NOT EDIT. +-- This file is meant to be re-generated in place and/or deleted at any time. + +-- TextsBySenders +SELECT + id, + content, + created, + source, + destination, + is_welcome, + origin +FROM + comms.text_log +WHERE + (source = $1 AND destination = $2) + OR + (source = $3 AND destination = $4) +ORDER BY + created ASC; diff --git a/db/sql/texts_by_senders.sql b/db/sql/texts_by_senders.sql new file mode 100644 index 00000000..80fabedf --- /dev/null +++ b/db/sql/texts_by_senders.sql @@ -0,0 +1,17 @@ +-- TextsBySenders +SELECT + id, + content, + created, + source, + destination, + is_welcome, + origin +FROM + comms.text_log +WHERE + (source = $1 AND destination = $2) + OR + (source = $2 AND destination = $1) +ORDER BY + created ASC; diff --git a/go.mod b/go.mod index 00c6eb34..fdc15bd7 100644 --- a/go.mod +++ b/go.mod @@ -29,23 +29,31 @@ require ( github.com/tidwall/geojson v1.4.5 github.com/twilio/twilio-go v1.29.1 github.com/uber/h3-go/v4 v4.4.0 - golang.org/x/crypto v0.42.0 + golang.org/x/crypto v0.47.0 ) require ( github.com/ajg/form v1.5.1 // indirect + github.com/andybalholm/brotli v1.2.0 // indirect + github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beevik/etree v1.1.0 // indirect + github.com/buger/jsonparser v1.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/go-ini/ini v1.67.0 // indirect github.com/golang-jwt/jwt/v5 v5.2.2 // indirect github.com/golang/mock v1.6.0 // indirect + github.com/invopop/jsonschema v0.13.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/compress v1.18.2 // indirect github.com/klauspost/cpuid/v2 v2.2.11 // indirect github.com/klauspost/crc32 v1.3.0 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mailru/easyjson v0.9.1 // indirect + github.com/maruel/genai v0.0.0-20251221000642-77279d1194c1 // indirect + github.com/maruel/httpjson v0.5.0 // indirect + github.com/maruel/roundtrippers v0.5.0 // indirect + github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mfridman/interpolate v0.0.2 // indirect github.com/minio/crc64nvme v1.1.0 // indirect @@ -62,12 +70,14 @@ require ( github.com/tidwall/rtree v1.3.1 // indirect github.com/tidwall/sjson v1.2.4 // indirect github.com/tinylib/msgp v1.3.0 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect + go.mau.fi/util v0.9.5 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect - golang.org/x/net v0.43.0 // indirect - golang.org/x/sync v0.17.0 // indirect - golang.org/x/sys v0.36.0 // indirect - golang.org/x/text v0.29.0 // indirect + golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect + golang.org/x/net v0.49.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.40.0 // indirect + golang.org/x/text v0.33.0 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 18176087..f17eabfd 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,14 @@ github.com/alexedwards/scs/v2 v2.9.0 h1:xa05mVpwTBm1iLeTMNFfAWpKUm4fXAW7CeAViqBV github.com/alexedwards/scs/v2 v2.9.0/go.mod h1:ToaROZxyKukJKT/xLcVQAChi5k6+Pn1Gvmdl7h3RRj8= github.com/alitto/pond/v2 v2.5.0 h1:vPzS5GnvSDRhWQidmj2djHllOmjFExVFbDGCw1jdqDw= github.com/alitto/pond/v2 v2.5.0/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE= +github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= +github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs= github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= @@ -79,6 +85,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E= +github.com/invopop/jsonschema v0.13.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= @@ -94,6 +102,8 @@ github.com/jaswdr/faker/v2 v2.8.1 h1:2AcPgHDBXYQregFUH9LgVZKfFupc4SIquYhp29sf5wQ github.com/jaswdr/faker/v2 v2.8.1/go.mod h1:jZq+qzNQr8/P+5fHd9t3txe2GNPnthrTfohtnJ7B+68= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= @@ -115,8 +125,18 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.10 h1:s31yESBquKXCV9a/ScB3ESkOjUYYv+X0rg8SYxI99mE= github.com/magiconair/properties v1.8.10/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/mailru/easyjson v0.9.1 h1:LbtsOm5WAswyWbvTEOqhypdPeZzHavpZx96/n553mR8= +github.com/mailru/easyjson v0.9.1/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= +github.com/maruel/genai v0.0.0-20251221000642-77279d1194c1 h1:zBQI2oPYBnnho8AndgaD7ib8djNeTJZckwMOUGAqrn0= +github.com/maruel/genai v0.0.0-20251221000642-77279d1194c1/go.mod h1:5umBYxgRJHAjfc++Gto7w1Hnys5WHHHjwtkfiky5MSg= +github.com/maruel/httpjson v0.5.0 h1:fUkECNt2G2rSi9rzklMVcElsiucUj8LoKhKqaUvlaYA= +github.com/maruel/httpjson v0.5.0/go.mod h1:Rbue+VwOe1TC6doGXddW8EWg2fW4Je6RhCo7iPuNpTo= +github.com/maruel/roundtrippers v0.5.0 h1:0ot2VEWg2KbrHMh67/ysw5P9HQBhMdST4QZfR7QKFBo= +github.com/maruel/roundtrippers v0.5.0/go.mod h1:By9wgqtmfQEs7hQmz7m8N2jr2m8VDPXNIRxOtK/042U= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= +github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= @@ -245,10 +265,14 @@ github.com/wasilibs/go-pgquery v0.0.0-20250409022910-10ac41983c07 h1:mJdDDPblDfP github.com/wasilibs/go-pgquery v0.0.0-20250409022910-10ac41983c07/go.mod h1:Ak17IJ037caFp4jpCw/iQQ7/W74Sqpb1YuKJU6HTKfM= github.com/wasilibs/wazero-helpers v0.0.0-20240620070341-3dff1577cd52 h1:OvLBa8SqJnZ6P+mjlzc2K7PM22rRUPE1x32G9DTPrC4= github.com/wasilibs/wazero-helpers v0.0.0-20240620070341-3dff1577cd52/go.mod h1:jMeV4Vpbi8osrE/pKUxRZkVaA0EX7NZN0A9/oRzgpgY= +github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= +github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.mau.fi/util v0.9.5 h1:7AoWPCIZJGv4jvtFEuCe3GhAbI7uF9ckIooaXvwlIR4= +go.mau.fi/util v0.9.5/go.mod h1:g1uvZ03VQhtTt2BgaRGVytS/Zj67NV0YNIECch0sQCQ= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= @@ -267,8 +291,12 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= +golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= +golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= +golang.org/x/exp v0.0.0-20260112195511-716be5621a96 h1:Z/6YuSHTLOHfNFdb8zVZomZr7cqNgTJvA8+Qz75D8gU= +golang.org/x/exp v0.0.0-20260112195511-716be5621a96/go.mod h1:nzimsREAkjBCIEFtHiYkrJyT+2uy9YZJB7H1k68CXZU= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= @@ -281,12 +309,16 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= +golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -303,6 +335,10 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -316,6 +352,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= +golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= diff --git a/llm/client.go b/llm/client.go new file mode 100644 index 00000000..45ff2a24 --- /dev/null +++ b/llm/client.go @@ -0,0 +1,22 @@ +package llm + +import ( + "github.com/rs/zerolog/log" +) + +type Message struct { + Content string + IsFromCustomer bool +} + +func GenerateNextMessage(history []Message, current Message) (Message, error) { + // In general our history + for i, msg := range history { + log.Info().Int("i", i).Bool("is_customer", msg.IsFromCustomer).Msg("History") + } + + return Message{ + Content: "hey there. :)", + IsFromCustomer: false, + }, nil +} diff --git a/llm/log.go b/llm/log.go new file mode 100644 index 00000000..52c48989 --- /dev/null +++ b/llm/log.go @@ -0,0 +1,42 @@ +package llm + +import ( + "log" + "strings" + "time" + + "github.com/rs/zerolog" + "go.mau.fi/util/exzerolog" +) + +type Logger = zerolog.Logger + +func createLogger() *Logger { + l := zerolog.New(zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) { + //w.Out = io.Writer(buf) + w.TimeFormat = time.Stamp + })).With().Timestamp().Logger() + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + exzerolog.SetupDefaults(&l) + return &l +} + +type ZerologWriter struct { + zerologger zerolog.Logger + level zerolog.Level +} + +func (w ZerologWriter) Write(p []byte) (n int, err error) { + msg := strings.TrimSuffix(string(p), "\n") + event := w.zerologger.WithLevel(w.level) + event.Msg(msg) + return len(p), nil +} + +func LoggerShim(l zerolog.Logger) *log.Logger { + writer := &ZerologWriter{ + zerologger: l, + level: zerolog.DebugLevel, + } + return log.New(writer, "", 0) +} diff --git a/llm/openai.go b/llm/openai.go new file mode 100644 index 00000000..d936bb0d --- /dev/null +++ b/llm/openai.go @@ -0,0 +1,175 @@ +package llm + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/maruel/genai" + "github.com/maruel/genai/adapters" + "github.com/maruel/genai/providers/openaichat" + "github.com/rs/zerolog/log" +) + +type openAIClient struct { + client *openaichat.Client + conversations map[string][]genai.Message + log *Logger +} + +var client *openAIClient + +type AIRequest struct { + Displayname string + Message string + Sender string + Timestamp time.Time +} + +func CreateOpenAIClient(ctx context.Context) error { + logger := createLogger() + + opts := genai.ProviderOptions{ + Model: genai.ModelCheap, + } + c, err := openaichat.New(ctx, &opts, nil) + if err != nil { + return fmt.Errorf("Failed to create genai client: %v", err) + } + client = &openAIClient{ + client: c, + conversations: make(map[string][]genai.Message), + log: logger, + } + return nil +} + +func (c *openAIClient) continueConversation(ctx context.Context, req AIRequest) error { + msgs, ok := c.conversations["roomid"] + if !ok { + msgs = genai.Messages{ + c.startConversation(ctx, req), + } + } else { + msgs = append(msgs, genai.NewTextMessage(fmt.Sprintf("(%s) user: %s\nbot: ", req.Timestamp.String(), req.Message))) + } + + c.log.Debug().Msg("Generating response...") + opts := genai.OptionsTools{ + Tools: []genai.ToolDef{ + { + Name: "followup_timer", + Description: "This should be used to indicate that the bot should follow up with the user in the future to check on task progress.", + Callback: func(ctx2 context.Context, input *FollowupTimerInput) (string, error) { + return c.followupSchedule(ctx2, req, input) + }, + }, { + Name: "switch_task", + Description: "Any time the user indicates they change tasks this must be called to update the record of what tasks are being done.", + Callback: func(ctx2 context.Context, input *SwitchTaskInput) (string, error) { + return c.switchTask(ctx2, req, input) + }, + }, + }, + } + + res, _, err := adapters.GenSyncWithToolCallLoop(ctx, c.client, msgs, &opts) + if err != nil { + return fmt.Errorf("Failed to continue conversation: %v", err) + } + + for _, m := range res { + msgs = append(msgs, m) + // Empty responses are tool call related. + if m.String() == "" { + } else { + //c.log.Info().Str("room", req.RoomID.String()).Msg(m.String()) + var toSay string = m.String() + toSay = strings.Replace(toSay, "bot: ", "", 1) + log.Info().Str("to say", toSay).Msg("Responding") + /*c.aiResponseChannel <- AIResponse{ + Message: toSay, + RoomID: req.RoomID, + }*/ + } + } + //c.conversations[req.RoomID.String()] = msgs + + return nil +} + +type FollowupTimerInput struct { + DelayInSeconds int64 `json:"delay_in_seconds"` +} + +func (c *openAIClient) followupFire(ctx context.Context, req AIRequest, duration time.Duration) { + if err := ctx.Err(); err != nil { + //c.log.Info().Str("room", req.RoomID.String()).Msg("Context canceled") + return + } + msgs, ok := c.conversations["roomid"] + if !ok { + //c.log.Warn().Str("room", req.RoomID.String()).Str("elapsed", duration.String()).Msg("No messages for room") + return + } + msgs = append(msgs, genai.NewTextMessage(fmt.Sprintf("<%s passed>", duration.String()))) + res, err := c.client.GenSync(ctx, msgs) + if err != nil { + //c.log.Error().Str("room", req.RoomID.String()).Err(err).Msg("Failed to continue after timer") + return + } + msgs = append(msgs, res.Message) + var toSay string = res.String() + toSay = strings.Replace(toSay, "bot: ", "", 1) + log.Info().Str("to say", toSay).Msg("To say") + /*c.aiResponseChannel <- AIResponse{ + Message: toSay, + RoomID: req.RoomID, + } + c.conversations[req.RoomID.String()] = msgs + */ +} + +func (c *openAIClient) followupSchedule(ctx context.Context, req AIRequest, input *FollowupTimerInput) (string, error) { + //c.log.Info().Str("room", req.RoomID.String()).Int64("delay", input.DelayInSeconds).Msg("Followup timer scheduled.") + duration, err := time.ParseDuration(fmt.Sprintf("%ds", input.DelayInSeconds)) + if err != nil { + return "", fmt.Errorf("Failed to parse %d as a valid duration: %v", input.DelayInSeconds, err) + } + /*c.aiResponseChannel <- AIResponse{ + Message: fmt.Sprintf("⌛ followup scheduled '%s'", duration.String()), + RoomID: req.RoomID, + }*/ + time.AfterFunc(duration, func() { + c.followupFire(ctx, req, duration) + }) + return fmt.Sprintf("Followup timer set for %s in the future", duration.String()), nil +} + +type SwitchTaskInput struct { + TaskName string `json:"task_name"` +} + +func (c *openAIClient) switchTask(ctx context.Context, req AIRequest, input *SwitchTaskInput) (string, error) { + //c.log.Info().Str("room", req.RoomID.String()).Str("task", input.TaskName).Msg("Task Switched") + /*c.aiResponseChannel <- AIResponse{ + Message: fmt.Sprintf("📋 notes task '%s'", input.TaskName), + RoomID: req.RoomID, + }*/ + + return fmt.Sprintf("Recorded a switch to task %s at %s", input.TaskName, time.Now().String()), nil +} + +func (c *openAIClient) startConversation(ctx context.Context, req AIRequest) genai.Message { + return genai.NewTextMessage(fmt.Sprintf( + `This is a text chat conversation between an employee and a chatbot helping to manage timecards. + The user's name is '%[1]s'. + Messages from the user will start with '(timestamp) %[1]s:'. + Messages from the bot will start with 'bot:'. + Sometimes the user won't say anything for a long time and the chatbot needs to follow-up with them. + When time passes, there will be a prompt like '<200s passed>'. + The bot should then prompt the user to provide a bit of information about what they've been working on during that time. + The bot should be interested to know what the user's goals are at a high level and should pay attention to any difficulties or frustrations the user experiences.\n\n + (%[2]s) user: %[3]s\nbot:`, req.Displayname, req.Timestamp.String(), req.Message)) +} diff --git a/main.go b/main.go index ac76e6af..ee3b1c8c 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,7 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/comms/text" "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db" + "github.com/Gleipnir-Technology/nidus-sync/llm" "github.com/Gleipnir-Technology/nidus-sync/public-report" nidussync "github.com/Gleipnir-Technology/nidus-sync/sync" "github.com/go-chi/chi/v5" @@ -52,6 +53,7 @@ func main() { log.Error().Err(err).Msg("Failed to store text source phone numbers") os.Exit(4) } + router_logger := log.With().Logger() r := chi.NewRouter() @@ -75,6 +77,11 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + err = llm.CreateOpenAIClient(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to start openAI client") + os.Exit(5) + } background.Start(ctx) server := &http.Server{ Addr: config.Bind, diff --git a/platform/text.go b/platform/text.go new file mode 100644 index 00000000..776fe315 --- /dev/null +++ b/platform/text.go @@ -0,0 +1,113 @@ +package platform + +import ( + "context" + "fmt" + "strings" + + "github.com/Gleipnir-Technology/nidus-sync/comms/text" + "github.com/Gleipnir-Technology/nidus-sync/config" + "github.com/Gleipnir-Technology/nidus-sync/db" + "github.com/Gleipnir-Technology/nidus-sync/db/models" + "github.com/Gleipnir-Technology/nidus-sync/db/sql" + "github.com/Gleipnir-Technology/nidus-sync/llm" + "github.com/rs/zerolog/log" +) + +// Translate from Twilio's representation of a RCS message sender to our concept of a phone number +// From: rcs:dev_report_mosquitoes_online_dosrvwxm_agent +// To: +16235525879 +func getDst(ctx context.Context, to string) (string, error) { + + if to == config.TwilioRCSSenderRMO { + return config.PhoneNumberReportStr, nil + } + /* + phone, err := models.FindCommsPhone(ctx, db.PGInstance.BobDB, to) + if err != nil { + return "", fmt.Errorf("Failed to search for dest phone %s: %w", to, err) + } + return phone.E164, nil + */ + return "", fmt.Errorf("Cannot match phone number to '%s'", to) +} + +func loadPreviousMessages(ctx context.Context, dst, src string) ([]llm.Message, error) { + messages, err := sql.TextsBySenders(dst, src).All(ctx, db.PGInstance.BobDB) + results := make([]llm.Message, 0) + if err != nil { + return results, fmt.Errorf("Failed to get message history for %s and %s: %w", dst, src, err) + } + log.Info().Int("count", len(messages)).Str("src", src).Str("dst", dst).Msg("Found previous messages") + for _, m := range messages { + is_from_customer := (m.Source == src) + results = append(results, llm.Message{ + IsFromCustomer: is_from_customer, + Content: m.Content, + }) + } + return results, nil +} + +func splitPhoneSource(s string) (string, string) { + parts := strings.Split(s, ":") + switch len(parts) { + case 0: + return "this isn't", "possible" + case 1: + return "", s + case 2: + return parts[0], parts[1] + default: + log.Warn().Str("s", s).Msg("Got an incomprehensible number of parts of a phone number") + return parts[0], parts[1] + } + +} + +func isSubscribed(ctx context.Context, src string) (bool, error) { + phone, err := models.FindCommsPhone(ctx, db.PGInstance.BobDB, src) + if err != nil { + return false, fmt.Errorf("Failed to determine if '%s' is subscribed: %w", src, err) + } + return phone.IsSubscribed, nil +} + +func HandleTextMessage(from string, to string, body string) { + ctx := context.Background() + type_, src := splitPhoneSource(from) + dst, err := getDst(ctx, to) + if err != nil { + log.Error().Err(err).Str("to", to).Msg("Failed to get dst") + return + } + subscribed, err := isSubscribed(ctx, from) + if err != nil { + log.Error().Err(err).Msg("Failed to handle message") + return + } + if !subscribed { + err = text.SendInitialReprompt(ctx, dst, src) + if err != nil { + log.Error().Err(err).Msg("Failed to resend initial prompt.") + } + return + } + previous_messages, err := loadPreviousMessages(ctx, dst, src) + if err != nil { + log.Error().Err(err).Str("dst", dst).Str("src", from).Msg("Failed to get previous messages") + return + } + current := llm.Message{ + Content: body, + IsFromCustomer: true, + } + log.Info().Int("len", len(previous_messages)).Msg("passing") + next_message, err := llm.GenerateNextMessage(previous_messages, current) + if err != nil { + log.Error().Err(err).Str("dst", dst).Str("src", from).Msg("Failed to generate next message") + return + } + text.SendTextFromLLM(next_message.Content) + log.Info().Str("from", from).Str("from-type", type_).Str("to", to).Str("src", src).Str("dst", dst).Str("body", body).Str("reply", next_message.Content).Msg("Handling text message") +} diff --git a/tools/texts_by_senders.sql b/tools/texts_by_senders.sql new file mode 100644 index 00000000..a60a8ebe --- /dev/null +++ b/tools/texts_by_senders.sql @@ -0,0 +1,17 @@ +-- TextsBySenders +SELECT + id, + content, + created, + source, + destination, + is_welcome, + origin +FROM + comms.text_log +WHERE + (source = :src AND destination = :dst) + OR + (source = :dst AND destination = :src) +ORDER BY + created ASC;