Actually set geom and h3cell for uploaded pools
This commit is contained in:
parent
0659b8993d
commit
ebc329fc5e
3 changed files with 64 additions and 9 deletions
|
|
@ -10,10 +10,14 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/Gleipnir-Technology/bob"
|
||||
"github.com/Gleipnir-Technology/bob/dialect/psql"
|
||||
"github.com/Gleipnir-Technology/bob/dialect/psql/um"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/config"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/db/models"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/h3utils"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/platform/geom"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/platform/text"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/stadia"
|
||||
"github.com/Gleipnir-Technology/nidus-sync/userfile"
|
||||
|
|
@ -79,6 +83,7 @@ func ProcessJob(ctx context.Context, file_id int32) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("bulk geocode: %w", err)
|
||||
}
|
||||
txn.Commit(ctx)
|
||||
return nil
|
||||
}
|
||||
func bulkGeocode(ctx context.Context, txn bob.Tx, file models.FileuploadFile, pools []*models.FileuploadPool) error {
|
||||
|
|
@ -99,17 +104,59 @@ func bulkGeocode(ctx context.Context, txn bob.Tx, file models.FileuploadFile, po
|
|||
return fmt.Errorf("client bulk geocode: %w", err)
|
||||
}
|
||||
log.Info().Int("len", len(responses)).Msg("bulk query response")
|
||||
//setters := make([]*models.FileuploadPoolSetter, 0)
|
||||
for i, resp := range responses {
|
||||
pool := pools[i]
|
||||
if resp.Status != 200 {
|
||||
log.Warn().Int("row", i).Int("status", resp.Status).Str("pool.address", pool.AddressStreet).Str("pool.postal", pool.AddressPostalCode).Str("msg", resp.Message)
|
||||
log.Info().Int("row", i).Int("status", resp.Status).Str("pool.address", pool.AddressStreet).Str("pool.postal", pool.AddressPostalCode).Str("msg", resp.Message).Msg("Non-200 status on geocode request")
|
||||
continue
|
||||
}
|
||||
if resp.Response != nil {
|
||||
for _, feature := range resp.Response.Features {
|
||||
log.Info().Int("row", i).Int("status", resp.Status).Str("pool.address", pool.AddressStreet).Str("pool.postal", pool.AddressPostalCode).Str("feature.type", feature.Type).Str("formatted", feature.Properties.FormattedAddressLine).Float64("coordinates[0]", feature.Geometry.Coordinates[0]).Float64("coordinates[1]", feature.Geometry.Coordinates[1]).Msg("geocode")
|
||||
}
|
||||
if resp.Response == nil {
|
||||
log.Info().Int("row", i).Str("pool.address", pool.AddressStreet).Str("pool.postal", pool.AddressPostalCode).Str("msg", resp.Message).Msg("nil response on geocode")
|
||||
continue
|
||||
}
|
||||
if len(resp.Response.Features) > 1 {
|
||||
log.Warn().Int("row", i).Int("len", len(resp.Response.Features)).Msg("too many features")
|
||||
continue
|
||||
}
|
||||
feature := resp.Response.Features[0]
|
||||
if feature.Geometry.Type != "Point" {
|
||||
log.Warn().Int("row", i).Str("type", feature.Geometry.Type).Msg("wrong type")
|
||||
continue
|
||||
}
|
||||
longitude := feature.Geometry.Coordinates[0]
|
||||
latitude := feature.Geometry.Coordinates[1]
|
||||
cell, err := h3utils.GetCell(longitude, latitude, 15)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Int("row", i).Float64("lng", longitude).Float64("lat", latitude).Msg("failed to convert h3 cell")
|
||||
continue
|
||||
}
|
||||
geom_query := geom.PostgisPointQuery(longitude, latitude)
|
||||
_, err = psql.Update(
|
||||
um.Table("fileupload.pool"),
|
||||
um.SetCol("h3cell").ToArg(cell),
|
||||
um.SetCol("geom").To(geom_query),
|
||||
um.Where(psql.Quote("id").EQ(psql.Arg(pool.ID))),
|
||||
).Exec(ctx, txn)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("failed to update pool")
|
||||
continue
|
||||
}
|
||||
}
|
||||
update_query := `
|
||||
UPDATE fileupload.pool p
|
||||
SET is_in_district = (
|
||||
EXISTS (
|
||||
SELECT 1
|
||||
FROM import.district d
|
||||
JOIN organization o ON d.gid = o.import_district_gid
|
||||
WHERE o.id = p.organization_id
|
||||
AND ST_Contains(d.geom, p.geom)
|
||||
)
|
||||
)
|
||||
WHERE p.geom IS NOT NULL;`
|
||||
_, err = txn.ExecContext(ctx, update_query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update is_in_district: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -135,7 +182,6 @@ func parseFile(ctx context.Context, txn bob.Tx, file models.FileuploadFile) ([]*
|
|||
file.Update(ctx, txn, &models.FileuploadFileSetter{
|
||||
Status: omit.From(enums.FileuploadFilestatustypeError),
|
||||
})
|
||||
txn.Commit(ctx)
|
||||
return pools, nil
|
||||
}
|
||||
row_number := 0
|
||||
|
|
@ -147,7 +193,6 @@ func parseFile(ctx context.Context, txn bob.Tx, file models.FileuploadFile) ([]*
|
|||
Status: omit.From(enums.FileuploadFilestatustypeParsed),
|
||||
})
|
||||
log.Info().Int32("file.ID", file.ID).Msg("Set file to parsed")
|
||||
txn.Commit(ctx)
|
||||
return pools, nil
|
||||
}
|
||||
return pools, fmt.Errorf("Failed to read all CSV records for file %d: %w", file.ID, err)
|
||||
|
|
|
|||
9
platform/geom/geom.go
Normal file
9
platform/geom/geom.go
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
package geom
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func PostgisPointQuery(longitude, latitude float64) string {
|
||||
return fmt.Sprintf("ST_GeometryFromText('Point(%f %f)')", longitude, latitude)
|
||||
}
|
||||
|
|
@ -15,7 +15,8 @@ type StadiaMaps struct {
|
|||
func NewStadiaMaps(api_key string) *StadiaMaps {
|
||||
//logger := NewLogger(log.Logger)
|
||||
//r := resty.New().SetLogger(logger).SetDebug(true)
|
||||
r := resty.New().SetDebug(true)
|
||||
//r := resty.New().SetDebug(true)
|
||||
r := resty.New()
|
||||
return &StadiaMaps{
|
||||
APIKey: api_key,
|
||||
client: r,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue