diff --git a/platform/csv/pool.go b/platform/csv/pool.go index e1db76cb..a6114d1b 100644 --- a/platform/csv/pool.go +++ b/platform/csv/pool.go @@ -10,10 +10,14 @@ import ( "time" "github.com/Gleipnir-Technology/bob" + "github.com/Gleipnir-Technology/bob/dialect/psql" + "github.com/Gleipnir-Technology/bob/dialect/psql/um" "github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/Gleipnir-Technology/nidus-sync/db/models" + "github.com/Gleipnir-Technology/nidus-sync/h3utils" + "github.com/Gleipnir-Technology/nidus-sync/platform/geom" "github.com/Gleipnir-Technology/nidus-sync/platform/text" "github.com/Gleipnir-Technology/nidus-sync/stadia" "github.com/Gleipnir-Technology/nidus-sync/userfile" @@ -79,6 +83,7 @@ func ProcessJob(ctx context.Context, file_id int32) error { if err != nil { return fmt.Errorf("bulk geocode: %w", err) } + txn.Commit(ctx) return nil } func bulkGeocode(ctx context.Context, txn bob.Tx, file models.FileuploadFile, pools []*models.FileuploadPool) error { @@ -99,17 +104,59 @@ func bulkGeocode(ctx context.Context, txn bob.Tx, file models.FileuploadFile, po return fmt.Errorf("client bulk geocode: %w", err) } log.Info().Int("len", len(responses)).Msg("bulk query response") - //setters := make([]*models.FileuploadPoolSetter, 0) for i, resp := range responses { pool := pools[i] if resp.Status != 200 { - log.Warn().Int("row", i).Int("status", resp.Status).Str("pool.address", pool.AddressStreet).Str("pool.postal", pool.AddressPostalCode).Str("msg", resp.Message) + log.Info().Int("row", i).Int("status", resp.Status).Str("pool.address", pool.AddressStreet).Str("pool.postal", pool.AddressPostalCode).Str("msg", resp.Message).Msg("Non-200 status on geocode request") + continue } - if resp.Response != nil { - for _, feature := range resp.Response.Features { - log.Info().Int("row", i).Int("status", resp.Status).Str("pool.address", pool.AddressStreet).Str("pool.postal", pool.AddressPostalCode).Str("feature.type", feature.Type).Str("formatted", feature.Properties.FormattedAddressLine).Float64("coordinates[0]", feature.Geometry.Coordinates[0]).Float64("coordinates[1]", feature.Geometry.Coordinates[1]).Msg("geocode") - } + if resp.Response == nil { + log.Info().Int("row", i).Str("pool.address", pool.AddressStreet).Str("pool.postal", pool.AddressPostalCode).Str("msg", resp.Message).Msg("nil response on geocode") + continue } + if len(resp.Response.Features) > 1 { + log.Warn().Int("row", i).Int("len", len(resp.Response.Features)).Msg("too many features") + continue + } + feature := resp.Response.Features[0] + if feature.Geometry.Type != "Point" { + log.Warn().Int("row", i).Str("type", feature.Geometry.Type).Msg("wrong type") + continue + } + longitude := feature.Geometry.Coordinates[0] + latitude := feature.Geometry.Coordinates[1] + cell, err := h3utils.GetCell(longitude, latitude, 15) + if err != nil { + log.Warn().Err(err).Int("row", i).Float64("lng", longitude).Float64("lat", latitude).Msg("failed to convert h3 cell") + continue + } + geom_query := geom.PostgisPointQuery(longitude, latitude) + _, err = psql.Update( + um.Table("fileupload.pool"), + um.SetCol("h3cell").ToArg(cell), + um.SetCol("geom").To(geom_query), + um.Where(psql.Quote("id").EQ(psql.Arg(pool.ID))), + ).Exec(ctx, txn) + if err != nil { + log.Warn().Err(err).Msg("failed to update pool") + continue + } + } + update_query := ` + UPDATE fileupload.pool p + SET is_in_district = ( + EXISTS ( + SELECT 1 + FROM import.district d + JOIN organization o ON d.gid = o.import_district_gid + WHERE o.id = p.organization_id + AND ST_Contains(d.geom, p.geom) + ) + ) + WHERE p.geom IS NOT NULL;` + _, err = txn.ExecContext(ctx, update_query) + if err != nil { + return fmt.Errorf("failed to update is_in_district: %w", err) } return nil } @@ -135,7 +182,6 @@ func parseFile(ctx context.Context, txn bob.Tx, file models.FileuploadFile) ([]* file.Update(ctx, txn, &models.FileuploadFileSetter{ Status: omit.From(enums.FileuploadFilestatustypeError), }) - txn.Commit(ctx) return pools, nil } row_number := 0 @@ -147,7 +193,6 @@ func parseFile(ctx context.Context, txn bob.Tx, file models.FileuploadFile) ([]* Status: omit.From(enums.FileuploadFilestatustypeParsed), }) log.Info().Int32("file.ID", file.ID).Msg("Set file to parsed") - txn.Commit(ctx) return pools, nil } return pools, fmt.Errorf("Failed to read all CSV records for file %d: %w", file.ID, err) diff --git a/platform/geom/geom.go b/platform/geom/geom.go new file mode 100644 index 00000000..b1011224 --- /dev/null +++ b/platform/geom/geom.go @@ -0,0 +1,9 @@ +package geom + +import ( + "fmt" +) + +func PostgisPointQuery(longitude, latitude float64) string { + return fmt.Sprintf("ST_GeometryFromText('Point(%f %f)')", longitude, latitude) +} diff --git a/stadia/stadia.go b/stadia/stadia.go index 2de48192..a919fca5 100644 --- a/stadia/stadia.go +++ b/stadia/stadia.go @@ -15,7 +15,8 @@ type StadiaMaps struct { func NewStadiaMaps(api_key string) *StadiaMaps { //logger := NewLogger(log.Logger) //r := resty.New().SetLogger(logger).SetDebug(true) - r := resty.New().SetDebug(true) + //r := resty.New().SetDebug(true) + r := resty.New() return &StadiaMaps{ APIKey: api_key, client: r,