Push geocoding down a layer

This makes it possible to always save address information from our
geocoder.
This commit is contained in:
Eli Ribble 2026-03-04 18:29:52 +00:00
parent 80e14568c6
commit daa8cb1748
No known key found for this signature in database
26 changed files with 576 additions and 431 deletions

View file

@ -17,6 +17,7 @@ import (
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/platform/geocode"
//"github.com/Gleipnir-Technology/nidus-sync/h3utils"
//"github.com/Gleipnir-Technology/nidus-sync/platform/geom"
//"github.com/Gleipnir-Technology/nidus-sync/platform/text"
@ -30,18 +31,71 @@ import (
type csvParserFunc[T any] = func(context.Context, bob.Tx, *models.FileuploadFile, *models.FileuploadCSV) ([]T, error)
type csvProcessorFunc[T any] = func(context.Context, bob.Tx, *models.FileuploadFile, *models.FileuploadCSV, []T) error
func ProcessJob(ctx context.Context, file_id int32, type_ enums.FileuploadCsvtype) error {
func JobCommit(ctx context.Context, file_id int32) error {
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("Failed to start transaction: %w", err)
}
f, err := models.FindFileuploadFile(ctx, txn, file_id)
if err != nil {
return fmt.Errorf("Failed to get csv file %d from DB: %w", file_id, err)
}
org, err := models.FindOrganization(ctx, txn, f.OrganizationID)
if err != nil {
return fmt.Errorf("Failed to get org %d from DB: %w", f.OrganizationID, err)
}
rows, err := models.FileuploadPools.Query(
models.SelectWhere.FileuploadPools.CSVFile.EQ(file_id),
).All(ctx, txn)
if err != nil {
return fmt.Errorf("Failed to get all rows of file %d: %w", file_id, err)
}
for _, row := range rows {
a := geocode.Address{
Country: enums.CountrytypeUsa,
Locality: row.AddressLocality,
Number: row.AddressNumber,
PostalCode: row.AddressPostalCode,
Region: row.AddressRegion,
Street: row.AddressStreet,
Unit: "",
}
address, err := geocode.EnsureAddress(ctx, txn, org, a)
if err != nil {
return fmt.Errorf("ensure address: %w", err)
}
log.Info().Int32("id", address.ID).Msg("made address")
}
return nil
}
func JobImport(ctx context.Context, file_id int32, type_ enums.FileuploadCsvtype) error {
var err error
switch type_ {
case enums.FileuploadCsvtypePoollist:
err = processCSV(ctx, file_id, parseCSVPoollist, processCSVPoollist)
err = importCSV(ctx, file_id, parseCSVPoollist, processCSVPoollist)
case enums.FileuploadCsvtypeFlyover:
err = processCSV(ctx, file_id, parseCSVFlyover, processCSVFlyover)
err = importCSV(ctx, file_id, parseCSVFlyover, processCSVFlyover)
}
if err != nil {
psql.Update(
um.Table("fileupload.csv"),
um.SetCol("status").ToArg("error"),
um.Where(psql.Quote("file_id").EQ(psql.Arg(file_id))),
).Exec(ctx, db.PGInstance.BobDB)
}
return err
}
func processCSV[T any](ctx context.Context, file_id int32, parser csvParserFunc[T], processor csvProcessorFunc[T]) error {
func importCSV[T any](ctx context.Context, file_id int32, parser csvParserFunc[T], processor csvProcessorFunc[T]) error {
// Not done in the transaction so the state shows up immediately
_, err := psql.Update(
um.Table("fileupload.csv"),
um.SetCol("status").ToArg("processing"),
um.Where(psql.Quote("file_id").EQ(psql.Arg(file_id))),
).Exec(ctx, db.PGInstance.BobDB)
file, c, err := loadFileAndCSV(ctx, file_id)
if err != nil {
return fmt.Errorf("load file and csv: %w", err)

View file

@ -16,7 +16,7 @@ import (
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/h3utils"
"github.com/Gleipnir-Technology/nidus-sync/platform/geocode"
"github.com/Gleipnir-Technology/nidus-sync/platform/geom"
"github.com/Gleipnir-Technology/nidus-sync/platform/text"
"github.com/Gleipnir-Technology/nidus-sync/stadia"
@ -130,41 +130,22 @@ type jobGeocode struct {
pool *models.FileuploadPool
}
func geocode(ctx context.Context, txn bob.Tx, client *stadia.StadiaMaps, job *jobGeocode) error {
func geocodePool(ctx context.Context, txn bob.Tx, client *stadia.StadiaMaps, job *jobGeocode) error {
pool := job.pool
sublog := log.With().
Str("pool.address_postal", pool.AddressPostalCode).
Str("pool.address_street", pool.AddressStreet).
Str("pool.postal", pool.AddressPostalCode).
Logger()
req := stadia.StructuredGeocodeRequest{
Address: &pool.AddressStreet,
Locality: &pool.AddressLocality,
PostalCode: &pool.AddressPostalCode,
a := geocode.Address{
Number: pool.AddressNumber,
Locality: pool.AddressLocality,
PostalCode: pool.AddressPostalCode,
Street: pool.AddressStreet,
}
maybeAddServiceArea(&req, job.org)
resp, err := client.StructuredGeocode(ctx, req)
address, err := geocode.Geocode(ctx, job.org, a)
if err != nil {
return fmt.Errorf("client structured geocode failure on %s, %s, %s: %w", pool.AddressStreet, pool.AddressLocality, pool.AddressPostalCode, err)
addError(ctx, txn, job.csv, job.rownumber, 0, err.Error())
}
if len(resp.Features) > 1 {
sublog.Warn().Int("len", len(resp.Features)).Msg("More than one feature")
addError(ctx, txn, job.csv, job.rownumber, 0, "The address provided matched more than one location")
}
feature := resp.Features[0]
if feature.Geometry.Type != "Point" {
return fmt.Errorf("wrong type %s from %s %s", feature.Geometry.Type, pool.AddressStreet, pool.AddressPostalCode)
}
longitude := feature.Geometry.Coordinates[0]
latitude := feature.Geometry.Coordinates[1]
cell, err := h3utils.GetCell(longitude, latitude, 15)
if err != nil {
return fmt.Errorf("failed to convert lat %f lng %f to h3 cell", longitude, latitude)
}
geom_query := geom.PostgisPointQuery(longitude, latitude)
geom_query := geom.PostgisPointQuery(address.Longitude, address.Latitude)
_, err = psql.Update(
um.Table("fileupload.pool"),
um.SetCol("h3cell").ToArg(cell),
um.SetCol("h3cell").ToArg(address.Cell),
um.SetCol("geom").To(geom_query),
um.Where(psql.Quote("id").EQ(psql.Arg(pool.ID))),
).Exec(ctx, txn)
@ -318,31 +299,6 @@ func processCSVPoollist(ctx context.Context, txn bob.Tx, file *models.Fileupload
return nil
}
func maybeAddServiceArea(req *stadia.StructuredGeocodeRequest, org *models.Organization) {
/*
if org.ServiceAreaXmax.IsNull() ||
org.ServiceAreaYmax.IsNull() ||
org.ServiceAreaXmin.IsNull() ||
org.ServiceAreaYmin.IsNull() {
return
}
xmax := org.ServiceAreaXmax.MustGet()
ymax := org.ServiceAreaYmax.MustGet()
xmin := org.ServiceAreaXmin.MustGet()
ymin := org.ServiceAreaYmin.MustGet()
req.BoundaryRectMaxLon = &xmax
req.BoundaryRectMaxLat = &ymax
req.BoundaryRectMinLon = &xmin
req.BoundaryRectMinLat = &ymin
*/
if org.ServiceAreaCentroidX.IsNull() || org.ServiceAreaCentroidY.IsNull() {
return
}
centroid_x := org.ServiceAreaCentroidX.MustGet()
centroid_y := org.ServiceAreaCentroidY.MustGet()
req.FocusPointLat = &centroid_y
req.FocusPointLng = &centroid_x
}
func parseHeaders(row []string) ([]headerPoolEnum, []string) {
result_enums := make([]headerPoolEnum, 0)
result_names := make([]string, 0)
@ -414,7 +370,7 @@ func worker(ctx context.Context, txn bob.Tx, client *stadia.StadiaMaps, jobs <-c
defer wg.Done()
for job := range jobs {
err := geocode(ctx, txn, client, job)
err := geocodePool(ctx, txn, client, job)
if err != nil {
errors <- err

187
platform/geocode/geocode.go Normal file
View file

@ -0,0 +1,187 @@
package geocode
import (
"context"
"fmt"
"strings"
"time"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/bob/dialect/psql"
"github.com/Gleipnir-Technology/bob/dialect/psql/im"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/h3utils"
"github.com/Gleipnir-Technology/nidus-sync/stadia"
"github.com/stephenafamo/scan"
//"github.com/rs/zerolog/log"
"github.com/uber/h3-go/v4"
)
type Address struct {
Country enums.Countrytype
Locality string
Number string
PostalCode string
Region string
Street string
Unit string
}
type GeocodeResult struct {
Address Address
Cell h3.Cell
Longitude float64
Latitude float64
}
func (a Address) String() string {
return fmt.Sprintf("%s %s, %s, %s, %s, %s", a.Number, a.Street, a.Locality, a.Region, a.PostalCode, a.Country)
}
var client *stadia.StadiaMaps
// Either get an address that matches, or create a new address. Either way, return an address
// This will make a call to a structured geocode service, so it's slow.
func EnsureAddress(ctx context.Context, txn bob.Tx, org *models.Organization, a Address) (*models.Address, error) {
address, err := models.Addresses.Query(
models.SelectWhere.Addresses.Country.EQ(a.Country),
models.SelectWhere.Addresses.Locality.EQ(a.Locality),
models.SelectWhere.Addresses.Number.EQ(a.Number),
models.SelectWhere.Addresses.PostalCode.EQ(a.PostalCode),
models.SelectWhere.Addresses.Region.EQ(a.Region),
models.SelectWhere.Addresses.Street.EQ(a.Street),
models.SelectWhere.Addresses.Unit.EQ(a.Unit),
).One(ctx, txn)
if err == nil {
return address, nil
}
// Geocode
geo, err := Geocode(ctx, org, a)
if err != nil {
return nil, fmt.Errorf("geocode: %w", err)
}
type _row struct {
ID int32 `db:"id"`
}
created := time.Now()
row, err := bob.One(ctx, txn, psql.Insert(
im.Into("address", "country", "created", "geom", "h3cell", "id", "locality", "number", "postal_code", "region", "street", "unit"),
im.Values(
psql.Arg(geo.Address.Country),
psql.Arg(created),
psql.F("ST_Point", geo.Longitude, geo.Latitude, 4326),
psql.Arg(geo.Cell),
psql.Raw("DEFAULT"),
psql.Arg(geo.Address.Locality),
psql.Arg(geo.Address.Number),
psql.Arg(geo.Address.PostalCode),
psql.Arg(geo.Address.Region),
psql.Arg(geo.Address.Street),
psql.Raw("''"),
),
im.Returning("id"),
), scan.StructMapper[_row]())
if err != nil {
return nil, fmt.Errorf("insert: %w", err)
}
return &models.Address{
Country: geo.Address.Country,
Created: created,
Geom: "",
H3cell: "",
ID: row.ID,
Locality: geo.Address.Locality,
PostalCode: geo.Address.PostalCode,
Street: geo.Address.Street,
Unit: geo.Address.Unit,
Region: geo.Address.Region,
Number: geo.Address.Number,
}, nil
}
func Geocode(ctx context.Context, org *models.Organization, a Address) (GeocodeResult, error) {
street := fmt.Sprintf("%s %s", a.Number, a.Street)
country_s := a.Country.String()
/*
sublog := log.With().
Str("street", street).
Str("country", country).
Str("locality", a.Locality).
Str("postal", a.PostalCode).
Str("region", a.Region).
Logger()
*/
req := stadia.StructuredGeocodeRequest{
Address: &street,
Country: &country_s,
Locality: &a.Locality,
PostalCode: &a.PostalCode,
Region: &a.Region,
}
maybeAddServiceArea(&req, org)
resp, err := client.StructuredGeocode(ctx, req)
if err != nil {
return GeocodeResult{}, fmt.Errorf("client structured geocode failure on %s: %w", a.String(), err)
}
if len(resp.Features) > 1 {
return GeocodeResult{}, fmt.Errorf("%s matched more than one location", a.String())
}
feature := resp.Features[0]
if feature.Geometry.Type != "Point" {
return GeocodeResult{}, fmt.Errorf("wrong type %s from %s", feature.Geometry.Type, a.String())
}
longitude := feature.Geometry.Coordinates[0]
latitude := feature.Geometry.Coordinates[1]
cell, err := h3utils.GetCell(longitude, latitude, 15)
if err != nil {
return GeocodeResult{}, fmt.Errorf("failed to convert lat %f lng %f to h3 cell", longitude, latitude)
}
var country enums.Countrytype
country_s = strings.ToLower(feature.Properties.CountryA)
err = country.Scan(country_s)
if err != nil {
return GeocodeResult{}, fmt.Errorf("failed to scan country '%s': %w", country_s, err)
}
return GeocodeResult{
Address: Address{
Country: country,
Locality: feature.Properties.Locality,
Number: feature.Properties.HouseNumber,
PostalCode: feature.Properties.PostalCode,
Region: feature.Properties.Region,
Street: feature.Properties.Street,
Unit: "",
},
Cell: cell,
Longitude: feature.Geometry.Coordinates[0],
Latitude: feature.Geometry.Coordinates[1],
}, nil
}
func maybeAddServiceArea(req *stadia.StructuredGeocodeRequest, org *models.Organization) {
if org.ServiceAreaXmax.IsNull() ||
org.ServiceAreaYmax.IsNull() ||
org.ServiceAreaXmin.IsNull() ||
org.ServiceAreaYmin.IsNull() {
return
}
xmax := org.ServiceAreaXmax.MustGet()
ymax := org.ServiceAreaYmax.MustGet()
xmin := org.ServiceAreaXmin.MustGet()
ymin := org.ServiceAreaYmin.MustGet()
req.BoundaryRectMaxLon = &xmax
req.BoundaryRectMaxLat = &ymax
req.BoundaryRectMinLon = &xmin
req.BoundaryRectMinLat = &ymin
if org.ServiceAreaCentroidX.IsNull() || org.ServiceAreaCentroidY.IsNull() {
return
}
centroid_x := org.ServiceAreaCentroidX.MustGet()
centroid_y := org.ServiceAreaCentroidY.MustGet()
req.FocusPointLat = &centroid_y
req.FocusPointLng = &centroid_x
}

6
platform/organization.go Normal file
View file

@ -0,0 +1,6 @@
package platform
type Organization struct {
ID int
Name string
}

View file

@ -9,14 +9,9 @@ import (
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/bob/dialect/psql"
"github.com/Gleipnir-Technology/bob/dialect/psql/sm"
"github.com/Gleipnir-Technology/nidus-sync/background"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/userfile"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
"github.com/rs/zerolog/log"
"github.com/stephenafamo/scan"
)
@ -52,43 +47,6 @@ type Upload struct {
Status string `db:"status"`
}
func NewUpload(ctx context.Context, u *models.User, upload userfile.FileUpload, t enums.FileuploadCsvtype) (Upload, error) {
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
if err != nil {
return Upload{}, fmt.Errorf("Failed to begin transaction: %w", err)
}
defer txn.Rollback(ctx)
file, err := models.FileuploadFiles.Insert(&models.FileuploadFileSetter{
ContentType: omit.From(upload.ContentType),
Created: omit.From(time.Now()),
CreatorID: omit.From(u.ID),
Deleted: omitnull.FromPtr[time.Time](nil),
Name: omit.From(upload.Name),
OrganizationID: omit.From(u.OrganizationID),
Status: omit.From(enums.FileuploadFilestatustypeUploaded),
SizeBytes: omit.From(int32(upload.SizeBytes)),
FileUUID: omit.From(upload.UUID),
}).One(ctx, txn)
if err != nil {
return Upload{}, fmt.Errorf("Failed to create file upload: %w", err)
}
_, err = models.FileuploadCSVS.Insert(&models.FileuploadCSVSetter{
Committed: omitnull.FromPtr[time.Time](nil),
FileID: omit.From(file.ID),
Rowcount: omit.From(int32(0)),
Type: omit.From(t),
}).One(ctx, txn)
if err != nil {
return Upload{}, fmt.Errorf("Failed to create csv: %w", err)
}
log.Info().Int32("id", file.ID).Msg("Created new pool CSV upload")
txn.Commit(ctx)
background.ProcessUpload(file.ID, t)
return Upload{
ID: file.ID,
}, nil
}
func GetUploadDetail(ctx context.Context, organization_id int32, file_id int32) (UploadPoolDetail, error) {
file, err := models.FindFileuploadFile(ctx, db.PGInstance.BobDB, file_id)
if err != nil {

View file

@ -9,8 +9,14 @@ import (
"github.com/Gleipnir-Technology/bob/dialect/psql"
"github.com/Gleipnir-Technology/bob/dialect/psql/sm"
"github.com/Gleipnir-Technology/bob/dialect/psql/um"
"github.com/Gleipnir-Technology/nidus-sync/background"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/userfile"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
"github.com/rs/zerolog/log"
"github.com/stephenafamo/scan"
)
@ -35,8 +41,55 @@ type UploadSummary struct {
Type string `db:"type"`
}
func NewUpload(ctx context.Context, u *models.User, upload userfile.FileUpload, t enums.FileuploadCsvtype) (Upload, error) {
txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil)
if err != nil {
return Upload{}, fmt.Errorf("Failed to begin transaction: %w", err)
}
defer txn.Rollback(ctx)
file, err := models.FileuploadFiles.Insert(&models.FileuploadFileSetter{
ContentType: omit.From(upload.ContentType),
Created: omit.From(time.Now()),
CreatorID: omit.From(u.ID),
Deleted: omitnull.FromPtr[time.Time](nil),
Name: omit.From(upload.Name),
OrganizationID: omit.From(u.OrganizationID),
Status: omit.From(enums.FileuploadFilestatustypeUploaded),
SizeBytes: omit.From(int32(upload.SizeBytes)),
FileUUID: omit.From(upload.UUID),
}).One(ctx, txn)
if err != nil {
return Upload{}, fmt.Errorf("Failed to create file upload: %w", err)
}
_, err = models.FileuploadCSVS.Insert(&models.FileuploadCSVSetter{
Committed: omitnull.FromPtr[time.Time](nil),
FileID: omit.From(file.ID),
Rowcount: omit.From(int32(0)),
Type: omit.From(t),
}).One(ctx, txn)
if err != nil {
return Upload{}, fmt.Errorf("Failed to create csv: %w", err)
}
log.Info().Int32("id", file.ID).Msg("Created new pool CSV upload")
txn.Commit(ctx)
background.ProcessUpload(file.ID, t)
return Upload{
ID: file.ID,
}, nil
}
func UploadCommit(ctx context.Context, org *models.Organization, file_id int32) error {
return nil
// Create addresses for each row
// Create sites for each row
// Create pools for each row
_, err := psql.Update(
um.Table(models.FileuploadFiles.Alias()),
um.SetCol("status").ToArg("committed"),
um.Where(psql.Quote("id").EQ(psql.Arg(file_id))),
um.Where(psql.Quote("organization_id").EQ(psql.Arg(org.ID))),
).Exec(ctx, db.PGInstance.BobDB)
background.CommitUpload(file_id)
return err
}
func UploadDiscard(ctx context.Context, org *models.Organization, file_id int32) error {
_, err := psql.Update(

14
platform/user.go Normal file
View file

@ -0,0 +1,14 @@
package platform
import (
"github.com/Gleipnir-Technology/nidus-sync/notification"
)
type User struct {
DisplayName string
Initials string
Notifications []notification.Notification
Organization Organization
Role string
Username string
}