Add bulk geocoding by goroutine worker
This is nearly as fast, and doesn't require the corp-only license from stadia map which is 10x the cost.
This commit is contained in:
parent
ca88a0eaab
commit
8feabbc489
3 changed files with 211 additions and 83 deletions
|
|
@ -7,6 +7,7 @@ import (
|
|||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Gleipnir-Technology/bob"
|
||||
|
|
@ -75,7 +76,11 @@ func ProcessJob(ctx context.Context, file_id int32) error {
|
|||
return fmt.Errorf("Failed to start transaction: %w", err)
|
||||
}
|
||||
defer txn.Rollback(ctx)
|
||||
pools, err := parseFile(ctx, txn, *file)
|
||||
c, err := models.FindFileuploadCSV(ctx, txn, file.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get csv file %d from DB: %w", file.ID, err)
|
||||
}
|
||||
pools, err := parseFile(ctx, txn, file, c)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse file: %w", err)
|
||||
}
|
||||
|
|
@ -87,9 +92,13 @@ func ProcessJob(ctx context.Context, file_id int32) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("update csv row: %w", err)
|
||||
}
|
||||
err = bulkGeocode(ctx, txn, *file, pools)
|
||||
org, err := models.FindOrganization(ctx, db.PGInstance.BobDB, file.OrganizationID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("bulk geocode: %w", err)
|
||||
return fmt.Errorf("get org: %w", err)
|
||||
}
|
||||
err = bulkGeocode(ctx, txn, file, c, pools, org)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failure during geocoding")
|
||||
}
|
||||
file.Update(ctx, txn, &models.FileuploadFileSetter{
|
||||
Status: omit.From(enums.FileuploadFilestatustypeParsed),
|
||||
|
|
@ -98,69 +107,45 @@ func ProcessJob(ctx context.Context, file_id int32) error {
|
|||
txn.Commit(ctx)
|
||||
return nil
|
||||
}
|
||||
func bulkGeocode(ctx context.Context, txn bob.Tx, file models.FileuploadFile, pools []*models.FileuploadPool) error {
|
||||
func bulkGeocode(ctx context.Context, txn bob.Tx, file *models.FileuploadFile, c *models.FileuploadCSV, pools []*models.FileuploadPool, org *models.Organization) error {
|
||||
if len(pools) == 0 {
|
||||
return nil
|
||||
}
|
||||
log.Info().Int("len pools", len(pools)).Msg("bulk geocoding")
|
||||
client := stadia.NewStadiaMaps(config.StadiaMapsAPIKey)
|
||||
requests := make([]stadia.BulkGeocodeQuery, 0)
|
||||
for _, pool := range pools {
|
||||
requests = append(requests, stadia.StructuredGeocodeRequest{
|
||||
Address: &pool.AddressStreet,
|
||||
PostalCode: &pool.AddressPostalCode,
|
||||
})
|
||||
}
|
||||
log.Info().Int("len pools", len(pools)).Int("len requests", len(requests)).Msg("bulk querying")
|
||||
responses, err := client.BulkGeocode(requests)
|
||||
if err != nil {
|
||||
return fmt.Errorf("client bulk geocode: %w", err)
|
||||
}
|
||||
log.Info().Int("len", len(responses)).Msg("bulk query response")
|
||||
for i, resp := range responses {
|
||||
pool := pools[i]
|
||||
sublog := log.With().
|
||||
Int("row", i).
|
||||
Str("pool.address_postal", pool.AddressPostalCode).
|
||||
Str("pool.address_street", pool.AddressStreet).
|
||||
Str("pool.postal", pool.AddressPostalCode).
|
||||
Logger()
|
||||
jobs := make(chan *jobGeocode, len(pools))
|
||||
errors := make(chan error, len(pools))
|
||||
|
||||
if resp.Status != 200 {
|
||||
sublog.Info().Int("status", resp.Status).Str("msg", resp.Message).Msg("Non-200 status on geocode request")
|
||||
continue
|
||||
}
|
||||
if resp.Response == nil {
|
||||
sublog.Info().Str("msg", resp.Message).Msg("nil response on geocode")
|
||||
continue
|
||||
}
|
||||
if len(resp.Response.Features) > 1 {
|
||||
sublog.Warn().Int("len", len(resp.Response.Features)).Msg("too many features")
|
||||
continue
|
||||
}
|
||||
feature := resp.Response.Features[0]
|
||||
if feature.Geometry.Type != "Point" {
|
||||
sublog.Warn().Str("type", feature.Geometry.Type).Msg("wrong type")
|
||||
continue
|
||||
}
|
||||
longitude := feature.Geometry.Coordinates[0]
|
||||
latitude := feature.Geometry.Coordinates[1]
|
||||
cell, err := h3utils.GetCell(longitude, latitude, 15)
|
||||
if err != nil {
|
||||
sublog.Warn().Err(err).Float64("lng", longitude).Float64("lat", latitude).Msg("failed to convert h3 cell")
|
||||
continue
|
||||
}
|
||||
geom_query := geom.PostgisPointQuery(longitude, latitude)
|
||||
_, err = psql.Update(
|
||||
um.Table("fileupload.pool"),
|
||||
um.SetCol("h3cell").ToArg(cell),
|
||||
um.SetCol("geom").To(geom_query),
|
||||
um.Where(psql.Quote("id").EQ(psql.Arg(pool.ID))),
|
||||
).Exec(ctx, txn)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("failed to update pool")
|
||||
continue
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 20; i++ {
|
||||
wg.Add(1)
|
||||
go worker(ctx, txn, client, jobs, errors, &wg)
|
||||
}
|
||||
|
||||
for i, pool := range pools {
|
||||
jobs <- &jobGeocode{
|
||||
csv: c,
|
||||
rownumber: int32(i),
|
||||
org: org,
|
||||
pool: pool,
|
||||
}
|
||||
}
|
||||
close(jobs)
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(errors)
|
||||
}()
|
||||
|
||||
error_count := 0
|
||||
for err := range errors {
|
||||
log.Error().Err(err).Msg("failed to geocode")
|
||||
error_count++
|
||||
}
|
||||
if error_count > 0 {
|
||||
txn.Rollback(ctx)
|
||||
return fmt.Errorf("%d errors encountered in bulk geocode", error_count)
|
||||
}
|
||||
update_query := `
|
||||
UPDATE fileupload.pool p
|
||||
SET is_in_district = (
|
||||
|
|
@ -173,18 +158,65 @@ func bulkGeocode(ctx context.Context, txn bob.Tx, file models.FileuploadFile, po
|
|||
)
|
||||
)
|
||||
WHERE p.geom IS NOT NULL;`
|
||||
_, err = txn.ExecContext(ctx, update_query)
|
||||
_, err := txn.ExecContext(ctx, update_query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update is_in_district: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func parseFile(ctx context.Context, txn bob.Tx, file models.FileuploadFile) ([]*models.FileuploadPool, error) {
|
||||
pools := make([]*models.FileuploadPool, 0)
|
||||
c, err := models.FindFileuploadCSV(ctx, db.PGInstance.BobDB, file.ID)
|
||||
if err != nil {
|
||||
return pools, fmt.Errorf("Failed to get file %d from DB: %w", file.ID, err)
|
||||
|
||||
type jobGeocode struct {
|
||||
csv *models.FileuploadCSV
|
||||
rownumber int32
|
||||
org *models.Organization
|
||||
pool *models.FileuploadPool
|
||||
}
|
||||
|
||||
func geocode(ctx context.Context, txn bob.Tx, client *stadia.StadiaMaps, job *jobGeocode) error {
|
||||
pool := job.pool
|
||||
sublog := log.With().
|
||||
Str("pool.address_postal", pool.AddressPostalCode).
|
||||
Str("pool.address_street", pool.AddressStreet).
|
||||
Str("pool.postal", pool.AddressPostalCode).
|
||||
Logger()
|
||||
req := stadia.StructuredGeocodeRequest{
|
||||
Address: &pool.AddressStreet,
|
||||
Locality: &pool.AddressCity,
|
||||
PostalCode: &pool.AddressPostalCode,
|
||||
}
|
||||
maybeAddServiceArea(&req, job.org)
|
||||
resp, err := client.StructuredGeocode(ctx, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("client structured geocode failure on %s, %s, %s: %w", pool.AddressStreet, pool.AddressCity, pool.AddressPostalCode, err)
|
||||
}
|
||||
if len(resp.Features) > 1 {
|
||||
sublog.Warn().Int("len", len(resp.Features)).Msg("More than one feature")
|
||||
addError(ctx, txn, job.csv, job.rownumber, 0, "The address provided matched more than one location")
|
||||
}
|
||||
feature := resp.Features[0]
|
||||
if feature.Geometry.Type != "Point" {
|
||||
return fmt.Errorf("wrong type %s from %s %s", feature.Geometry.Type, pool.AddressStreet, pool.AddressPostalCode)
|
||||
}
|
||||
longitude := feature.Geometry.Coordinates[0]
|
||||
latitude := feature.Geometry.Coordinates[1]
|
||||
cell, err := h3utils.GetCell(longitude, latitude, 15)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to convert lat %f lng %f to h3 cell", longitude, latitude)
|
||||
}
|
||||
geom_query := geom.PostgisPointQuery(longitude, latitude)
|
||||
_, err = psql.Update(
|
||||
um.Table("fileupload.pool"),
|
||||
um.SetCol("h3cell").ToArg(cell),
|
||||
um.SetCol("geom").To(geom_query),
|
||||
um.Where(psql.Quote("id").EQ(psql.Arg(pool.ID))),
|
||||
).Exec(ctx, txn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update pool: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func parseFile(ctx context.Context, txn bob.Tx, file *models.FileuploadFile, c *models.FileuploadCSV) ([]*models.FileuploadPool, error) {
|
||||
pools := make([]*models.FileuploadPool, 0)
|
||||
r, err := userfile.NewFileReader(userfile.CollectionCSV, file.FileUUID)
|
||||
if err != nil {
|
||||
return pools, fmt.Errorf("Failed to get filereader for %d: %w", file.ID, err)
|
||||
|
|
@ -346,6 +378,22 @@ func errorMissingHeader(ctx context.Context, txn bob.Tx, c *models.FileuploadCSV
|
|||
msg := fmt.Sprintf("The file is missing the '%s' header", h.String())
|
||||
return addError(ctx, txn, c, 0, 0, msg)
|
||||
}
|
||||
func maybeAddServiceArea(req *stadia.StructuredGeocodeRequest, org *models.Organization) {
|
||||
if org.ServiceAreaXmax.IsNull() ||
|
||||
org.ServiceAreaYmax.IsNull() ||
|
||||
org.ServiceAreaXmin.IsNull() ||
|
||||
org.ServiceAreaYmin.IsNull() {
|
||||
return
|
||||
}
|
||||
xmax := org.ServiceAreaXmax.MustGet()
|
||||
ymax := org.ServiceAreaYmax.MustGet()
|
||||
xmin := org.ServiceAreaXmin.MustGet()
|
||||
ymin := org.ServiceAreaYmin.MustGet()
|
||||
req.BoundaryRectMaxLon = &xmax
|
||||
req.BoundaryRectMaxLat = &ymax
|
||||
req.BoundaryRectMinLon = &xmin
|
||||
req.BoundaryRectMinLat = &ymin
|
||||
}
|
||||
func parseHeaders(row []string) ([]headerPoolEnum, []string) {
|
||||
result_enums := make([]headerPoolEnum, 0)
|
||||
result_names := make([]string, 0)
|
||||
|
|
@ -413,3 +461,14 @@ func poolConditionValidValues() string {
|
|||
}
|
||||
return b.String()
|
||||
}
|
||||
func worker(ctx context.Context, txn bob.Tx, client *stadia.StadiaMaps, jobs <-chan *jobGeocode, errors chan<- error, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
for job := range jobs {
|
||||
err := geocode(ctx, txn, client, job)
|
||||
|
||||
if err != nil {
|
||||
errors <- err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue