nidus-sync/platform/csv/pool.go
Eli Ribble f1fe8b4d2b
Add contacts, rework comms schema
This in a pretty huge change. At a high level we're adding the concept
of a 'contact' which is a person or organization that has zero or more
contact methods (email, phone). This ended up cascading a number of
changes, including critically to the publicreprt schema. In the end it
seemed safer to get to the point where I'm confident we aren't using any
of the old fields for storing reporter information (though I haven't
deleted the columns yet) so I removed the code for defining those
columns.

At this point I think it's not possible for me to regenerate the bob
schema due to the interdependencies between my various schemas, so the
migration is well-and-truly happening.
2026-05-15 16:58:28 +00:00

422 lines
13 KiB
Go

package csv
import (
"context"
"encoding/csv"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/Gleipnir-Technology/bob"
"github.com/Gleipnir-Technology/bob/dialect/psql"
"github.com/Gleipnir-Technology/bob/dialect/psql/um"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/lint"
"github.com/Gleipnir-Technology/nidus-sync/platform/file"
"github.com/Gleipnir-Technology/nidus-sync/platform/geocode"
"github.com/Gleipnir-Technology/nidus-sync/platform/geom"
"github.com/Gleipnir-Technology/nidus-sync/platform/text"
"github.com/Gleipnir-Technology/nidus-sync/platform/types"
"github.com/Gleipnir-Technology/nidus-sync/stadia"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
"github.com/rs/zerolog/log"
)
type headerPoolEnum int
const (
headerPoolUnknown headerPoolEnum = iota
headerPoolAddressLocality
headerPoolAddressPostalCode
headerPoolAddressRegion
headerPoolAddressStreet
headerPoolCondition
headerPoolNotes
headerPoolPropertyOwnerName
headerPoolPropertyOwnerPhone
headerPoolResidentOwned
headerPoolResidentPhone
headerPoolTag
)
func (e headerPoolEnum) String() string {
switch e {
case headerPoolAddressPostalCode:
return "Postal Code"
case headerPoolAddressLocality:
return "City"
case headerPoolAddressStreet:
return "Street Address"
case headerPoolCondition:
return "Condition"
case headerPoolNotes:
return "Notes"
case headerPoolPropertyOwnerName:
return "Property Owner Name"
case headerPoolPropertyOwnerPhone:
return "Property Owner Phone"
case headerPoolResidentOwned:
return "Resident Owned"
case headerPoolResidentPhone:
return "Resident Phone"
case headerPoolAddressRegion:
return "State"
default:
return "bad programmer"
}
}
func bulkGeocode(ctx context.Context, txn bob.Tx, file *models.FileuploadFile, c *models.FileuploadCSV, pools []*models.FileuploadPool, org *models.Organization) error {
if len(pools) == 0 {
return nil
}
log.Info().Int("len pools", len(pools)).Msg("bulk geocoding")
client := stadia.NewStadiaMaps(config.StadiaMapsAPIKey)
jobs := make(chan *jobGeocode, len(pools))
errors := make(chan error, len(pools))
var wg sync.WaitGroup
/*
for i := 0; i < 20; i++ {
wg.Add(1)
go worker(ctx, txn, client, jobs, errors, &wg)
}
*/
wg.Add(1)
go worker(ctx, txn, client, jobs, errors, &wg)
for _, pool := range pools {
jobs <- &jobGeocode{
csv: c,
org: org,
pool: pool,
}
}
close(jobs)
go func() {
wg.Wait()
close(errors)
}()
error_count := 0
for err := range errors {
log.Error().Err(err).Msg("failed to geocode")
error_count++
}
if error_count > 0 {
lint.LogOnErrRollback(txn.Rollback, ctx, "rollback")
return fmt.Errorf("%d errors encountered in bulk geocode", error_count)
}
update_query := `
UPDATE fileupload.pool p
SET is_in_district = (
EXISTS (
SELECT 1
FROM organization o, fileupload.file f
WHERE
p.csv_file = f.id AND
f.organization_id = o.id AND (
ST_Contains(o.service_area_geometry, p.geom) OR
o.is_catchall
)
)
)
WHERE p.geom IS NOT NULL;`
_, err := txn.ExecContext(ctx, update_query)
if err != nil {
return fmt.Errorf("failed to update is_in_district: %w", err)
}
return nil
}
type jobGeocode struct {
csv *models.FileuploadCSV
org *models.Organization
pool *models.FileuploadPool
}
func geocodePool(ctx context.Context, txn bob.Tx, client *stadia.StadiaMaps, job *jobGeocode) error {
pool := job.pool
a := types.Address{
Number: pool.AddressNumber,
Locality: pool.AddressLocality,
PostalCode: pool.AddressPostalCode,
Region: pool.AddressRegion,
Street: pool.AddressStreet,
}
geo, err := geocode.GeocodeStructured(ctx, job.org, a)
if err != nil {
lint.LogOnErrCtx(func(ctx context.Context) error {
return addError(ctx, txn, job.csv, pool.LineNumber, 0, err.Error())
}, ctx, "add geocode error")
return nil
}
if geo.Address.Location == nil {
lint.LogOnErrCtx(func(ctx context.Context) error {
return addError(ctx, txn, job.csv, pool.LineNumber, 0, "nil location from geocoding")
}, ctx, "add nil location error")
return nil
}
geom_query := geom.PostgisPointQuery(*geo.Address.Location)
_, err = psql.Update(
um.Table("fileupload.pool"),
um.SetCol("h3cell").ToArg(geo.Cell),
um.SetCol("geom").To(geom_query),
um.SetCol("address_id").To(*geo.Address.ID),
um.Where(psql.Quote("id").EQ(psql.Arg(pool.ID))),
).Exec(ctx, txn)
if err != nil {
return fmt.Errorf("failed to update pool: %w", err)
}
return nil
}
func parseCSVPoollist(ctx context.Context, txn bob.Tx, f *models.FileuploadFile, c *models.FileuploadCSV) ([]*models.FileuploadPool, error) {
pools := make([]*models.FileuploadPool, 0)
r, err := file.NewFileReader(file.CollectionCSV, f.FileUUID)
if err != nil {
return pools, fmt.Errorf("Failed to get filereader for %d: %w", f.ID, err)
}
reader := csv.NewReader(r)
h, err := reader.Read()
if err != nil {
return pools, fmt.Errorf("Failed to read header of CSV for file %d: %w", f.ID, err)
}
header_types, header_names := parseHeaders(h)
missing_headers := missingRequiredHeaders(header_types)
for _, mh := range missing_headers {
lint.LogOnErrCtx(func(ctx context.Context) error {
return errorMissingHeader(ctx, txn, c, mh)
}, ctx, "error missing header")
err = f.Update(ctx, txn, &models.FileuploadFileSetter{
Status: omit.From(enums.FileuploadFilestatustypeError),
})
if err != nil {
return pools, fmt.Errorf("update: %w", err)
}
return pools, nil
}
for i, header_name := range header_names {
log.Debug().Int("index", i).Str("name", header_name).Send()
}
// Start at 2 because the header is line 1, not line 0
line_number := int32(2)
for {
row, err := reader.Read()
if err != nil {
if err == io.EOF {
return pools, nil
}
return pools, fmt.Errorf("Failed to read all CSV records for file %d: %w", f.ID, err)
}
tags := make(map[string]string, 0)
setter := models.FileuploadPoolSetter{
AddressNumber: omit.From(""),
AddressLocality: omit.From(""),
AddressPostalCode: omit.From(""),
AddressRegion: omit.From(""),
AddressStreet: omit.From(""),
Committed: omit.From(false),
Condition: omit.From(enums.PoolconditiontypeUnknown),
Created: omit.From(time.Now()),
CreatorID: omit.From(f.CreatorID),
CSVFile: omit.From(f.ID),
Deleted: omitnull.FromPtr[time.Time](nil),
Geom: omitnull.FromPtr[string](nil),
H3cell: omitnull.FromPtr[string](nil),
// ID - generated
IsInDistrict: omit.From(false),
IsNew: omit.From(true),
LineNumber: omit.From(line_number),
Notes: omit.From(""),
PropertyOwnerName: omit.From(""),
PropertyOwnerPhoneE164: omitnull.FromPtr[string](nil),
ResidentOwned: omitnull.FromPtr[bool](nil),
ResidentPhoneE164: omitnull.FromPtr[string](nil),
//Tags: convertToPGData(tags),
}
for i, col := range row {
hdr_t := header_types[i]
if col == "" {
continue
}
switch hdr_t {
case headerPoolUnknown:
log.Error().Int("i", i).Str("col", col).Int32("line", line_number).Msg("unknown header. This should never happen.")
case headerPoolAddressLocality:
setter.AddressLocality = omit.From(col)
case headerPoolAddressPostalCode:
setter.AddressPostalCode = omit.From(col)
case headerPoolAddressRegion:
setter.AddressRegion = omit.From(col)
case headerPoolAddressStreet:
// This type of spreadsheet normally has '123 Main Str'
parts := strings.SplitN(col, " ", 2)
if len(parts) != 2 {
lint.LogOnErrCtx(func(ctx context.Context) error {
return addError(ctx, txn, c, int32(line_number), int32(i), fmt.Sprintf("'%s' is not a house number and street. It needs to be in the form '123 main'", col))
}, ctx, "add address parse error")
continue
}
setter.AddressNumber = omit.From(parts[0])
setter.AddressStreet = omit.From(parts[1])
case headerPoolCondition:
var condition enums.Poolconditiontype
col_l := strings.ToLower(col)
col_translated := col_l
switch col_l {
case "empty":
col_translated = "dry"
}
err := condition.Scan(col_translated)
if err != nil {
lint.LogOnErrCtx(func(ctx context.Context) error {
return addError(ctx, txn, c, int32(line_number), int32(i), fmt.Sprintf("'%s' is not a pool condition that we recognize. It should be one of %s", col, poolConditionValidValues()))
}, ctx, "add pool condition error")
setter.Condition = omit.From(enums.PoolconditiontypeUnknown)
continue
}
setter.Condition = omit.From(condition)
case headerPoolNotes:
setter.Notes = omit.From(col)
case headerPoolPropertyOwnerName:
setter.PropertyOwnerName = omit.From(col)
case headerPoolPropertyOwnerPhone:
phone, err := text.ParsePhoneNumber(col)
if err != nil {
lint.LogOnErrCtx(func(ctx context.Context) error {
return addError(ctx, txn, c, int32(line_number), int32(i), fmt.Sprintf("'%s' is not a phone number that we recognize. Ideally it should be of the form '+12223334444'", col))
}, ctx, "add phone number error")
continue
}
setter.PropertyOwnerPhoneE164 = omitnull.From(phone.PhoneString())
case headerPoolResidentOwned:
boolValue, err := parseBool(col)
if err != nil {
lint.LogOnErrCtx(func(ctx context.Context) error {
return addError(ctx, txn, c, int32(line_number), int32(i), fmt.Sprintf("'%s' is not something that we recognize as a true/false value. Please use either 'true' or 'false'", col))
}, ctx, "add bool error")
continue
}
setter.ResidentOwned = omitnull.From(boolValue)
case headerPoolResidentPhone:
phone, err := text.ParsePhoneNumber(col)
if err != nil {
lint.LogOnErrCtx(func(ctx context.Context) error {
return addError(ctx, txn, c, int32(line_number), int32(i), fmt.Sprintf("'%s' is not a phone number that we recognize. Ideally it should be of the form '+12223334444'", col))
}, ctx, "add phone number error")
continue
}
setter.ResidentPhoneE164 = omitnull.From(phone.PhoneString())
case headerPoolTag:
tags[header_names[i]] = col
}
}
setter.Tags = omit.From(db.ConvertToPGData(tags))
pool, err := models.FileuploadPools.Insert(&setter).One(ctx, txn)
if err != nil {
return pools, fmt.Errorf("Failed to create pool: %w", err)
}
pools = append(pools, pool)
line_number = line_number + 1
}
}
func processCSVPoollist(ctx context.Context, txn bob.Tx, f *models.FileuploadFile, c *models.FileuploadCSV, parsed []*models.FileuploadPool) error {
org, err := models.FindOrganization(ctx, db.PGInstance.BobDB, f.OrganizationID)
if err != nil {
return fmt.Errorf("get org: %w", err)
}
err = bulkGeocode(ctx, txn, f, c, parsed, org)
if err != nil {
log.Error().Err(err).Msg("Failure during geocoding")
}
return nil
}
func parseHeaders(row []string) ([]headerPoolEnum, []string) {
result_enums := make([]headerPoolEnum, 0)
result_names := make([]string, 0)
for _, h := range row {
ht := strings.TrimSpace(h)
hl := strings.ToLower(ht)
log.Debug().Str("header", hl).Msg("Saw CSV header")
var type_ = headerPoolTag
switch hl {
case "city":
type_ = headerPoolAddressLocality
case "zip":
case "postal code":
type_ = headerPoolAddressPostalCode
case "state":
type_ = headerPoolAddressRegion
case "street address":
type_ = headerPoolAddressStreet
case "condition":
case "pool condition":
type_ = headerPoolCondition
case "notes":
type_ = headerPoolNotes
case "property owner":
case "property owner name":
type_ = headerPoolPropertyOwnerName
case "property owner phone":
type_ = headerPoolPropertyOwnerPhone
case "resident owned":
type_ = headerPoolResidentOwned
case "resident phone":
case "resident phone number":
type_ = headerPoolResidentPhone
default:
type_ = headerPoolTag
}
result_enums = append(result_enums, type_)
result_names = append(result_names, hl)
}
return result_enums, result_names
}
func missingRequiredHeaders(headers []headerPoolEnum) []headerPoolEnum {
results := make([]headerPoolEnum, 0)
for _, rh := range []headerPoolEnum{headerPoolAddressLocality, headerPoolAddressRegion, headerPoolAddressPostalCode, headerPoolAddressStreet} {
present := false
for _, h := range headers {
if h == rh {
present = true
break
}
}
if !present {
results = append(results, rh)
}
}
return results
}
func poolConditionValidValues() string {
var b strings.Builder
for i, cond := range enums.AllPoolconditiontype() {
if i == 0 {
fmt.Fprintf(&b, "'%s'", cond)
} else {
fmt.Fprintf(&b, ", '%s'", cond)
}
}
return b.String()
}
func worker(ctx context.Context, txn bob.Tx, client *stadia.StadiaMaps, jobs <-chan *jobGeocode, errors chan<- error, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
err := geocodePool(ctx, txn, client, job)
if err != nil {
errors <- err
}
}
}