Add initial integration with stadia maps for bulk geocoding

This commit is contained in:
Eli Ribble 2026-02-14 15:41:38 +00:00
parent 4c856ab403
commit 5d0d75ebb1
No known key found for this signature in database
4 changed files with 76 additions and 20 deletions

View file

@ -10,10 +10,12 @@ import (
"time"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/platform/text"
"github.com/Gleipnir-Technology/nidus-sync/stadia"
"github.com/Gleipnir-Technology/nidus-sync/userfile"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
@ -64,24 +66,68 @@ func ProcessJob(ctx context.Context, file_id int32) error {
if err != nil {
return fmt.Errorf("Failed to get file %d from DB: %w", file_id, err)
}
c, err := models.FindFileuploadCSV(ctx, db.PGInstance.BobDB, file_id)
if err != nil {
return fmt.Errorf("Failed to get file %d from DB: %w", file_id, err)
}
r, err := userfile.NewFileReader(userfile.CollectionCSV, file.FileUUID)
if err != nil {
return fmt.Errorf("Failed to get filereader for %d: %w", file_id, err)
}
reader := csv.NewReader(r)
h, err := reader.Read()
if err != nil {
return fmt.Errorf("Failed to read header of CSV for file %d: %w", file_id, err)
}
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("Failed to start transaction: %w", err)
}
defer txn.Rollback(ctx)
pools, err := parseFile(ctx, txn, *file)
if err != nil {
return fmt.Errorf("parse file: %w", err)
}
err = bulkGeocode(ctx, txn, *file, pools)
if err != nil {
return fmt.Errorf("bulk geocode: %w", err)
}
return nil
}
func bulkGeocode(ctx context.Context, txn bob.Tx, file models.FileuploadFile, pools []*models.FileuploadPool) error {
if len(pools) == 0 {
return nil
}
client := stadia.NewStadiaMaps(config.StadiaMapsAPIKey)
requests := make([]stadia.BulkGeocodeQuery, 0)
for _, pool := range pools {
requests = append(requests, stadia.StructuredGeocodeRequest{
Address: &pool.AddressStreet,
PostalCode: &pool.AddressPostalCode,
})
}
log.Info().Int("len pools", len(pools)).Int("len requests", len(requests)).Msg("bulk querying")
responses, err := client.BulkGeocode(requests)
if err != nil {
return fmt.Errorf("client bulk geocode: %w", err)
}
log.Info().Int("len", len(responses)).Msg("bulk query response")
//setters := make([]*models.FileuploadPoolSetter, 0)
for i, resp := range responses {
pool := pools[i]
if resp.Status != 200 {
log.Warn().Int("row", i).Int("status", resp.Status).Str("pool.address", pool.AddressStreet).Str("pool.postal", pool.AddressPostalCode).Str("msg", resp.Message)
}
if resp.Response != nil {
for _, feature := range resp.Response.Features {
log.Info().Int("row", i).Int("status", resp.Status).Str("pool.address", pool.AddressStreet).Str("pool.postal", pool.AddressPostalCode).Str("feature.type", feature.Type).Str("formatted", feature.Properties.FormattedAddressLine).Float64("coordinates[0]", feature.Geometry.Coordinates[0]).Float64("coordinates[1]", feature.Geometry.Coordinates[1]).Msg("geocode")
}
}
}
return nil
}
func parseFile(ctx context.Context, txn bob.Tx, file models.FileuploadFile) ([]*models.FileuploadPool, error) {
pools := make([]*models.FileuploadPool, 0)
c, err := models.FindFileuploadCSV(ctx, db.PGInstance.BobDB, file.ID)
if err != nil {
return pools, fmt.Errorf("Failed to get file %d from DB: %w", file.ID, err)
}
r, err := userfile.NewFileReader(userfile.CollectionCSV, file.FileUUID)
if err != nil {
return pools, fmt.Errorf("Failed to get filereader for %d: %w", file.ID, err)
}
reader := csv.NewReader(r)
h, err := reader.Read()
if err != nil {
return pools, fmt.Errorf("Failed to read header of CSV for file %d: %w", file.ID, err)
}
headers := parseHeaders(h)
missing_headers := missingRequiredHeaders(headers)
for _, mh := range missing_headers {
@ -90,7 +136,7 @@ func ProcessJob(ctx context.Context, file_id int32) error {
Status: omit.From(enums.FileuploadFilestatustypeError),
})
txn.Commit(ctx)
return nil
return pools, nil
}
row_number := 0
for {
@ -100,11 +146,11 @@ func ProcessJob(ctx context.Context, file_id int32) error {
file.Update(ctx, txn, &models.FileuploadFileSetter{
Status: omit.From(enums.FileuploadFilestatustypeParsed),
})
log.Info().Int32("file_id", file_id).Msg("Set file to parsed")
log.Info().Int32("file.ID", file.ID).Msg("Set file to parsed")
txn.Commit(ctx)
return nil
return pools, nil
}
return fmt.Errorf("Failed to read all CSV records for file %d: %w", file_id, err)
return pools, fmt.Errorf("Failed to read all CSV records for file %d: %w", file.ID, err)
}
setter := models.FileuploadPoolSetter{
// required fields
@ -176,10 +222,11 @@ func ProcessJob(ctx context.Context, file_id int32) error {
setter.ResidentPhoneE164 = omitnull.From(text.PhoneString(*phone))
}
}
_, err = models.FileuploadPools.Insert(&setter).Exec(ctx, txn)
pool, err := models.FileuploadPools.Insert(&setter).One(ctx, txn)
if err != nil {
return fmt.Errorf("Failed to create pool: %w", err)
return pools, fmt.Errorf("Failed to create pool: %w", err)
}
pools = append(pools, pool)
row_number = row_number + 1
}
}