From 5d0d75ebb1e4d82edd119e8bf1b385fb0d168fbd Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Sat, 14 Feb 2026 15:41:38 +0000 Subject: [PATCH] Add initial integration with stadia maps for bulk geocoding --- config/config.go | 5 +++ go.mod | 3 +- go.sum | 3 ++ platform/csv/pool.go | 85 ++++++++++++++++++++++++++++++++++---------- 4 files changed, 76 insertions(+), 20 deletions(-) diff --git a/config/config.go b/config/config.go index c30eff68..adfefbe2 100644 --- a/config/config.go +++ b/config/config.go @@ -30,6 +30,7 @@ var ( PhoneNumberSupport phonenumbers.PhoneNumber PhoneNumberSupportStr string SentryDSN string + StadiaMapsAPIKey string TextProvider string TwilioAuthToken string TwilioAccountSID string @@ -152,6 +153,10 @@ func Parse() (err error) { if SentryDSN == "" { return fmt.Errorf("You must specify a non-empty SENTRY_DSN") } + StadiaMapsAPIKey = os.Getenv("STADIA_MAPS_API_KEY") + if StadiaMapsAPIKey == "" { + return fmt.Errorf("You must specify a non-empty STADIA_MAPS_API_KEY") + } TextProvider = os.Getenv("TEXT_PROVIDER") switch TextProvider { case "": diff --git a/go.mod b/go.mod index 75f07ef0..5f273761 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/go-chi/hostrouter v0.3.0 github.com/go-chi/render v1.0.3 github.com/gofrs/uuid/v5 v5.4.0 + github.com/google/go-querystring v1.2.0 github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.6 github.com/jaswdr/faker/v2 v2.8.1 @@ -33,6 +34,7 @@ require ( github.com/twilio/twilio-go v1.29.1 github.com/uber/h3-go/v4 v4.4.0 golang.org/x/crypto v0.47.0 + resty.dev/v3 v3.0.0-beta.6 ) require ( @@ -81,7 +83,6 @@ require ( golang.org/x/text v0.33.0 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - resty.dev/v3 v3.0.0-beta.6 // indirect ) // replace github.com/stephenafamo/bob v0.42.0 => ../bob diff --git a/go.sum b/go.sum index 025dd554..06942c91 100644 --- a/go.sum +++ b/go.sum @@ -87,8 +87,11 @@ github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeD github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/go-querystring v1.2.0 h1:yhqkPbu2/OH+V9BfpCVPZkNmUXhb2gBxJArfhIxNtP0= +github.com/google/go-querystring v1.2.0/go.mod h1:8IFJqpSRITyJ8QhQ13bmbeMBDfmeEJZD5A0egEOmkqU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/invopop/jsonschema v0.13.0 h1:KvpoAJWEjR3uD9Kbm2HWJmqsEaHt8lBUpd0qHcIi21E= diff --git a/platform/csv/pool.go b/platform/csv/pool.go index 219a6f20..74e8eb5a 100644 --- a/platform/csv/pool.go +++ b/platform/csv/pool.go @@ -10,10 +10,12 @@ import ( "time" "github.com/Gleipnir-Technology/bob" + "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/platform/text" + "github.com/Gleipnir-Technology/nidus-sync/stadia" "github.com/Gleipnir-Technology/nidus-sync/userfile" "github.com/aarondl/opt/omit" "github.com/aarondl/opt/omitnull" @@ -64,24 +66,68 @@ func ProcessJob(ctx context.Context, file_id int32) error { if err != nil { return fmt.Errorf("Failed to get file %d from DB: %w", file_id, err) } - c, err := models.FindFileuploadCSV(ctx, db.PGInstance.BobDB, file_id) - if err != nil { - return fmt.Errorf("Failed to get file %d from DB: %w", file_id, err) - } - r, err := userfile.NewFileReader(userfile.CollectionCSV, file.FileUUID) - if err != nil { - return fmt.Errorf("Failed to get filereader for %d: %w", file_id, err) - } - reader := csv.NewReader(r) - h, err := reader.Read() - if err != nil { - return fmt.Errorf("Failed to read header of CSV for file %d: %w", file_id, err) - } txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("Failed to start transaction: %w", err) } defer txn.Rollback(ctx) + pools, err := parseFile(ctx, txn, *file) + if err != nil { + return fmt.Errorf("parse file: %w", err) + } + err = bulkGeocode(ctx, txn, *file, pools) + if err != nil { + return fmt.Errorf("bulk geocode: %w", err) + } + return nil +} +func bulkGeocode(ctx context.Context, txn bob.Tx, file models.FileuploadFile, pools []*models.FileuploadPool) error { + if len(pools) == 0 { + return nil + } + client := stadia.NewStadiaMaps(config.StadiaMapsAPIKey) + requests := make([]stadia.BulkGeocodeQuery, 0) + for _, pool := range pools { + requests = append(requests, stadia.StructuredGeocodeRequest{ + Address: &pool.AddressStreet, + PostalCode: &pool.AddressPostalCode, + }) + } + log.Info().Int("len pools", len(pools)).Int("len requests", len(requests)).Msg("bulk querying") + responses, err := client.BulkGeocode(requests) + if err != nil { + return fmt.Errorf("client bulk geocode: %w", err) + } + log.Info().Int("len", len(responses)).Msg("bulk query response") + //setters := make([]*models.FileuploadPoolSetter, 0) + for i, resp := range responses { + pool := pools[i] + if resp.Status != 200 { + log.Warn().Int("row", i).Int("status", resp.Status).Str("pool.address", pool.AddressStreet).Str("pool.postal", pool.AddressPostalCode).Str("msg", resp.Message) + } + if resp.Response != nil { + for _, feature := range resp.Response.Features { + log.Info().Int("row", i).Int("status", resp.Status).Str("pool.address", pool.AddressStreet).Str("pool.postal", pool.AddressPostalCode).Str("feature.type", feature.Type).Str("formatted", feature.Properties.FormattedAddressLine).Float64("coordinates[0]", feature.Geometry.Coordinates[0]).Float64("coordinates[1]", feature.Geometry.Coordinates[1]).Msg("geocode") + } + } + } + return nil +} +func parseFile(ctx context.Context, txn bob.Tx, file models.FileuploadFile) ([]*models.FileuploadPool, error) { + pools := make([]*models.FileuploadPool, 0) + c, err := models.FindFileuploadCSV(ctx, db.PGInstance.BobDB, file.ID) + if err != nil { + return pools, fmt.Errorf("Failed to get file %d from DB: %w", file.ID, err) + } + r, err := userfile.NewFileReader(userfile.CollectionCSV, file.FileUUID) + if err != nil { + return pools, fmt.Errorf("Failed to get filereader for %d: %w", file.ID, err) + } + reader := csv.NewReader(r) + h, err := reader.Read() + if err != nil { + return pools, fmt.Errorf("Failed to read header of CSV for file %d: %w", file.ID, err) + } headers := parseHeaders(h) missing_headers := missingRequiredHeaders(headers) for _, mh := range missing_headers { @@ -90,7 +136,7 @@ func ProcessJob(ctx context.Context, file_id int32) error { Status: omit.From(enums.FileuploadFilestatustypeError), }) txn.Commit(ctx) - return nil + return pools, nil } row_number := 0 for { @@ -100,11 +146,11 @@ func ProcessJob(ctx context.Context, file_id int32) error { file.Update(ctx, txn, &models.FileuploadFileSetter{ Status: omit.From(enums.FileuploadFilestatustypeParsed), }) - log.Info().Int32("file_id", file_id).Msg("Set file to parsed") + log.Info().Int32("file.ID", file.ID).Msg("Set file to parsed") txn.Commit(ctx) - return nil + return pools, nil } - return fmt.Errorf("Failed to read all CSV records for file %d: %w", file_id, err) + return pools, fmt.Errorf("Failed to read all CSV records for file %d: %w", file.ID, err) } setter := models.FileuploadPoolSetter{ // required fields @@ -176,10 +222,11 @@ func ProcessJob(ctx context.Context, file_id int32) error { setter.ResidentPhoneE164 = omitnull.From(text.PhoneString(*phone)) } } - _, err = models.FileuploadPools.Insert(&setter).Exec(ctx, txn) + pool, err := models.FileuploadPools.Insert(&setter).One(ctx, txn) if err != nil { - return fmt.Errorf("Failed to create pool: %w", err) + return pools, fmt.Errorf("Failed to create pool: %w", err) } + pools = append(pools, pool) row_number = row_number + 1 } }