From 8feabbc489b4a9b4b165e78d56a603484212e60c Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Wed, 25 Feb 2026 14:57:28 +0000 Subject: [PATCH] Add bulk geocoding by goroutine worker This is nearly as fast, and doesn't require the corp-only license from stadia map which is 10x the cost. --- platform/csv/pool.go | 189 +++++++++++++++++--------- stadia/cmd/structured-geocode/main.go | 84 ++++++++++-- stadia/structured_geocode.go | 21 ++- 3 files changed, 211 insertions(+), 83 deletions(-) diff --git a/platform/csv/pool.go b/platform/csv/pool.go index 50dbb153..35b80b5a 100644 --- a/platform/csv/pool.go +++ b/platform/csv/pool.go @@ -7,6 +7,7 @@ import ( "io" "strconv" "strings" + "sync" "time" "github.com/Gleipnir-Technology/bob" @@ -75,7 +76,11 @@ func ProcessJob(ctx context.Context, file_id int32) error { return fmt.Errorf("Failed to start transaction: %w", err) } defer txn.Rollback(ctx) - pools, err := parseFile(ctx, txn, *file) + c, err := models.FindFileuploadCSV(ctx, txn, file.ID) + if err != nil { + return fmt.Errorf("Failed to get csv file %d from DB: %w", file.ID, err) + } + pools, err := parseFile(ctx, txn, file, c) if err != nil { return fmt.Errorf("parse file: %w", err) } @@ -87,9 +92,13 @@ func ProcessJob(ctx context.Context, file_id int32) error { if err != nil { return fmt.Errorf("update csv row: %w", err) } - err = bulkGeocode(ctx, txn, *file, pools) + org, err := models.FindOrganization(ctx, db.PGInstance.BobDB, file.OrganizationID) if err != nil { - return fmt.Errorf("bulk geocode: %w", err) + return fmt.Errorf("get org: %w", err) + } + err = bulkGeocode(ctx, txn, file, c, pools, org) + if err != nil { + log.Error().Err(err).Msg("Failure during geocoding") } file.Update(ctx, txn, &models.FileuploadFileSetter{ Status: omit.From(enums.FileuploadFilestatustypeParsed), @@ -98,69 +107,45 @@ func ProcessJob(ctx context.Context, file_id int32) error { txn.Commit(ctx) return nil } -func bulkGeocode(ctx context.Context, txn bob.Tx, file models.FileuploadFile, pools []*models.FileuploadPool) error { +func bulkGeocode(ctx context.Context, txn bob.Tx, file *models.FileuploadFile, c *models.FileuploadCSV, pools []*models.FileuploadPool, org *models.Organization) error { if len(pools) == 0 { return nil } + log.Info().Int("len pools", len(pools)).Msg("bulk geocoding") client := stadia.NewStadiaMaps(config.StadiaMapsAPIKey) - requests := make([]stadia.BulkGeocodeQuery, 0) - for _, pool := range pools { - requests = append(requests, stadia.StructuredGeocodeRequest{ - Address: &pool.AddressStreet, - PostalCode: &pool.AddressPostalCode, - }) - } - log.Info().Int("len pools", len(pools)).Int("len requests", len(requests)).Msg("bulk querying") - responses, err := client.BulkGeocode(requests) - if err != nil { - return fmt.Errorf("client bulk geocode: %w", err) - } - log.Info().Int("len", len(responses)).Msg("bulk query response") - for i, resp := range responses { - pool := pools[i] - sublog := log.With(). - Int("row", i). - Str("pool.address_postal", pool.AddressPostalCode). - Str("pool.address_street", pool.AddressStreet). - Str("pool.postal", pool.AddressPostalCode). - Logger() + jobs := make(chan *jobGeocode, len(pools)) + errors := make(chan error, len(pools)) - if resp.Status != 200 { - sublog.Info().Int("status", resp.Status).Str("msg", resp.Message).Msg("Non-200 status on geocode request") - continue - } - if resp.Response == nil { - sublog.Info().Str("msg", resp.Message).Msg("nil response on geocode") - continue - } - if len(resp.Response.Features) > 1 { - sublog.Warn().Int("len", len(resp.Response.Features)).Msg("too many features") - continue - } - feature := resp.Response.Features[0] - if feature.Geometry.Type != "Point" { - sublog.Warn().Str("type", feature.Geometry.Type).Msg("wrong type") - continue - } - longitude := feature.Geometry.Coordinates[0] - latitude := feature.Geometry.Coordinates[1] - cell, err := h3utils.GetCell(longitude, latitude, 15) - if err != nil { - sublog.Warn().Err(err).Float64("lng", longitude).Float64("lat", latitude).Msg("failed to convert h3 cell") - continue - } - geom_query := geom.PostgisPointQuery(longitude, latitude) - _, err = psql.Update( - um.Table("fileupload.pool"), - um.SetCol("h3cell").ToArg(cell), - um.SetCol("geom").To(geom_query), - um.Where(psql.Quote("id").EQ(psql.Arg(pool.ID))), - ).Exec(ctx, txn) - if err != nil { - log.Warn().Err(err).Msg("failed to update pool") - continue + var wg sync.WaitGroup + for i := 0; i < 20; i++ { + wg.Add(1) + go worker(ctx, txn, client, jobs, errors, &wg) + } + + for i, pool := range pools { + jobs <- &jobGeocode{ + csv: c, + rownumber: int32(i), + org: org, + pool: pool, } } + close(jobs) + + go func() { + wg.Wait() + close(errors) + }() + + error_count := 0 + for err := range errors { + log.Error().Err(err).Msg("failed to geocode") + error_count++ + } + if error_count > 0 { + txn.Rollback(ctx) + return fmt.Errorf("%d errors encountered in bulk geocode", error_count) + } update_query := ` UPDATE fileupload.pool p SET is_in_district = ( @@ -173,18 +158,65 @@ func bulkGeocode(ctx context.Context, txn bob.Tx, file models.FileuploadFile, po ) ) WHERE p.geom IS NOT NULL;` - _, err = txn.ExecContext(ctx, update_query) + _, err := txn.ExecContext(ctx, update_query) if err != nil { return fmt.Errorf("failed to update is_in_district: %w", err) } return nil } -func parseFile(ctx context.Context, txn bob.Tx, file models.FileuploadFile) ([]*models.FileuploadPool, error) { - pools := make([]*models.FileuploadPool, 0) - c, err := models.FindFileuploadCSV(ctx, db.PGInstance.BobDB, file.ID) - if err != nil { - return pools, fmt.Errorf("Failed to get file %d from DB: %w", file.ID, err) + +type jobGeocode struct { + csv *models.FileuploadCSV + rownumber int32 + org *models.Organization + pool *models.FileuploadPool +} + +func geocode(ctx context.Context, txn bob.Tx, client *stadia.StadiaMaps, job *jobGeocode) error { + pool := job.pool + sublog := log.With(). + Str("pool.address_postal", pool.AddressPostalCode). + Str("pool.address_street", pool.AddressStreet). + Str("pool.postal", pool.AddressPostalCode). + Logger() + req := stadia.StructuredGeocodeRequest{ + Address: &pool.AddressStreet, + Locality: &pool.AddressCity, + PostalCode: &pool.AddressPostalCode, } + maybeAddServiceArea(&req, job.org) + resp, err := client.StructuredGeocode(ctx, req) + if err != nil { + return fmt.Errorf("client structured geocode failure on %s, %s, %s: %w", pool.AddressStreet, pool.AddressCity, pool.AddressPostalCode, err) + } + if len(resp.Features) > 1 { + sublog.Warn().Int("len", len(resp.Features)).Msg("More than one feature") + addError(ctx, txn, job.csv, job.rownumber, 0, "The address provided matched more than one location") + } + feature := resp.Features[0] + if feature.Geometry.Type != "Point" { + return fmt.Errorf("wrong type %s from %s %s", feature.Geometry.Type, pool.AddressStreet, pool.AddressPostalCode) + } + longitude := feature.Geometry.Coordinates[0] + latitude := feature.Geometry.Coordinates[1] + cell, err := h3utils.GetCell(longitude, latitude, 15) + if err != nil { + return fmt.Errorf("failed to convert lat %f lng %f to h3 cell", longitude, latitude) + } + geom_query := geom.PostgisPointQuery(longitude, latitude) + _, err = psql.Update( + um.Table("fileupload.pool"), + um.SetCol("h3cell").ToArg(cell), + um.SetCol("geom").To(geom_query), + um.Where(psql.Quote("id").EQ(psql.Arg(pool.ID))), + ).Exec(ctx, txn) + if err != nil { + return fmt.Errorf("failed to update pool: %w", err) + } + return nil +} +func parseFile(ctx context.Context, txn bob.Tx, file *models.FileuploadFile, c *models.FileuploadCSV) ([]*models.FileuploadPool, error) { + pools := make([]*models.FileuploadPool, 0) r, err := userfile.NewFileReader(userfile.CollectionCSV, file.FileUUID) if err != nil { return pools, fmt.Errorf("Failed to get filereader for %d: %w", file.ID, err) @@ -346,6 +378,22 @@ func errorMissingHeader(ctx context.Context, txn bob.Tx, c *models.FileuploadCSV msg := fmt.Sprintf("The file is missing the '%s' header", h.String()) return addError(ctx, txn, c, 0, 0, msg) } +func maybeAddServiceArea(req *stadia.StructuredGeocodeRequest, org *models.Organization) { + if org.ServiceAreaXmax.IsNull() || + org.ServiceAreaYmax.IsNull() || + org.ServiceAreaXmin.IsNull() || + org.ServiceAreaYmin.IsNull() { + return + } + xmax := org.ServiceAreaXmax.MustGet() + ymax := org.ServiceAreaYmax.MustGet() + xmin := org.ServiceAreaXmin.MustGet() + ymin := org.ServiceAreaYmin.MustGet() + req.BoundaryRectMaxLon = &xmax + req.BoundaryRectMaxLat = &ymax + req.BoundaryRectMinLon = &xmin + req.BoundaryRectMinLat = &ymin +} func parseHeaders(row []string) ([]headerPoolEnum, []string) { result_enums := make([]headerPoolEnum, 0) result_names := make([]string, 0) @@ -413,3 +461,14 @@ func poolConditionValidValues() string { } return b.String() } +func worker(ctx context.Context, txn bob.Tx, client *stadia.StadiaMaps, jobs <-chan *jobGeocode, errors chan<- error, wg *sync.WaitGroup) { + defer wg.Done() + + for job := range jobs { + err := geocode(ctx, txn, client, job) + + if err != nil { + errors <- err + } + } +} diff --git a/stadia/cmd/structured-geocode/main.go b/stadia/cmd/structured-geocode/main.go index bb0f262e..f66283da 100644 --- a/stadia/cmd/structured-geocode/main.go +++ b/stadia/cmd/structured-geocode/main.go @@ -10,23 +10,83 @@ import ( ) func main() { - key := os.Getenv("STADIA_MAPS_API_KEY") - if key == "" { - log.Println("stadia maps api key is empty") + // Define command-line flags + address := flag.String("address", "", "Street address to geocode") + boundaryRectMaxLat := flag.Float64("boundary-rect-max-lat", 0, "The max lat of the boundary") + boundaryRectMinLat := flag.Float64("boundary-rect-min-lat", 0, "The min lat of the boundary") + boundaryRectMaxLon := flag.Float64("boundary-rect-max-lng", 0, "The max lon of the boundary") + boundaryRectMinLon := flag.Float64("boundary-rect-min-lng", 0, "The min lon of the boundary") + postalCode := flag.String("postal-code", "", "Postal code") + focusLat := flag.Float64("focus-lat", 0, "The latitude of the focus point") + focusLng := flag.Float64("focus-lng", 0, "The longitude of the focus point") + + // Parse the flags + flag.Parse() + + // Validate required arguments + if *address == "" { + log.Println("Error: -address is required") + flag.Usage() os.Exit(1) } + + if *postalCode == "" { + log.Println("Error: -postal-code is required") + flag.Usage() + os.Exit(1) + } + if focusLat != nil && focusLng == nil { + log.Println("Error: you must specify both focus-lat and focus-lng together, not just focus-lat") + flag.Usage() + os.Exit(1) + } + if focusLat == nil && focusLng != nil { + log.Println("Error: you must specify both focus-lat and focus-lng together, not just focus-lng") + flag.Usage() + os.Exit(1) + } + if (boundaryRectMaxLat != nil || + boundaryRectMinLat != nil || + boundaryRectMaxLon != nil || + boundaryRectMinLon != nil) && (boundaryRectMaxLat == nil || + boundaryRectMinLat == nil || + boundaryRectMaxLon == nil || + boundaryRectMinLon == nil) { + log.Println("If you specify one of boundary-rect you need to specify them all") + os.Exit(1) + } + + key := os.Getenv("STADIA_MAPS_API_KEY") + if key == "" { + log.Println("STADIA_MAPS_API_KEY is empty") + os.Exit(1) + } + client := stadia.NewStadiaMaps(key) - resp, err := client.StructuredGeocode(stadia.StructuredGeocodeRequest{ - Address: strPtr("12932 Ave 404"), - PostalCode: strPtr("93615"), - }) + ctx := context.Background() + req := stadia.StructuredGeocodeRequest{ + Address: address, + PostalCode: postalCode, + } + if focusLat != nil && focusLng != nil { + req.FocusPointLat = focusLat + req.FocusPointLng = focusLng + } + if boundaryRectMaxLat != nil { + req.BoundaryRectMaxLat = boundaryRectMaxLat + req.BoundaryRectMinLat = boundaryRectMinLat + req.BoundaryRectMaxLon = boundaryRectMaxLon + req.BoundaryRectMinLon = boundaryRectMinLon + } + resp, err := client.StructuredGeocode(ctx, req) if err != nil { log.Printf("err: %v\n", err) os.Exit(2) } - log.Printf("type: %s", resp.Type) -} - -func strPtr(s string) *string { - return &s + log.Printf("type: %s, features: %d\n", resp.Type, len(resp.Features)) + for i, feature := range resp.Features { + log.Printf("feature %d: type %s\n", i, feature.Type) + log.Printf("\tgeometry %s\n", feature.Geometry.Type) + log.Printf("\tproperties %s\n", feature.Properties.Layer) + } } diff --git a/stadia/structured_geocode.go b/stadia/structured_geocode.go index 1c831b65..622736c9 100644 --- a/stadia/structured_geocode.go +++ b/stadia/structured_geocode.go @@ -20,14 +20,23 @@ type StructuredGeocodeRequest struct { PostalCode *string `url:"postalcode,omitempty" json:"postalcode,omitempty"` Country *string `url:"country,omitempty" json:"country,omitempty"` - // Focus point - FocusPoint *FocusPoint `url:",omitempty" json:",omitempty"` + // Boundary circle parameters + BoundaryCircleLat *float64 `url:"boundary.circle.lat,omitempty"` + BoundaryCircleLon *float64 `url:"boundary.circle.lon,omitempty"` + BoundaryCircleRadius *float64 `url:"boundary.circle.radius,omitempty"` + BoundaryCountry []string `url:"boundary.country,omitempty,comma" json:"boundary.country,omitempty,comma"` + + BoundaryGid *string `url:"boundary.gid,omitempty" json:"boundary.gid,omitempty"` // Boundary parameters - BoundaryRect *BoundaryRect `url:",omitempty" json:",omitempty"` - BoundaryCircle *BoundaryCircle `url:",omitempty" json:",omitempty"` - BoundaryCountry []string `url:"boundary.country,omitempty,comma" json:"boundary.country,omitempty,comma"` - BoundaryGid *string `url:"boundary.gid,omitempty" json:"boundary.gid,omitempty"` + BoundaryRectMaxLat *float64 `url:"boundary.rect.max_lat,omitempty"` + BoundaryRectMinLat *float64 `url:"boundary.rect.min_lat,omitempty"` + BoundaryRectMaxLon *float64 `url:"boundary.rect.max_lon,omitempty"` + BoundaryRectMinLon *float64 `url:"boundary.rect.min_lon,omitempty"` + + // Focus point + FocusPointLat *float64 `url:"focus.point.lat,omitempty" json:",omitempty"` + FocusPointLng *float64 `url:"focus.point.lon,omitempty" json:",omitempty"` // Other parameters Layers []string `url:"layers,omitempty,comma" json:"layers,omitempty,comma"`