This refactor was born out of the inter-dependency cycles developing between the "background" module and just about every other module which was caused by the background module becoming a dependency of every module that needed to background work and the fact that the background module was also supposedly responsible for the logic for processing those tasks. Instead the "background" module is now very, very shallow and relies entirely on the Postgres NOTIFY logic for triggering jobs. There's a new table, `job` which holds just a type and single row ID. All told, this means that jobs can be added to the queue as part of the API-level or platform-level transaction, ensuring atomicity, and processing coordination is handled by the platform module, which can depend on anything.
379 lines
11 KiB
Go
379 lines
11 KiB
Go
package csv
|
|
|
|
import (
|
|
"context"
|
|
"encoding/csv"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Gleipnir-Technology/bob"
|
|
"github.com/Gleipnir-Technology/bob/dialect/psql"
|
|
"github.com/Gleipnir-Technology/bob/dialect/psql/um"
|
|
"github.com/Gleipnir-Technology/nidus-sync/config"
|
|
"github.com/Gleipnir-Technology/nidus-sync/db"
|
|
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
|
|
"github.com/Gleipnir-Technology/nidus-sync/db/models"
|
|
"github.com/Gleipnir-Technology/nidus-sync/platform/file"
|
|
"github.com/Gleipnir-Technology/nidus-sync/platform/geocode"
|
|
"github.com/Gleipnir-Technology/nidus-sync/platform/geom"
|
|
"github.com/Gleipnir-Technology/nidus-sync/platform/text"
|
|
"github.com/Gleipnir-Technology/nidus-sync/platform/types"
|
|
"github.com/Gleipnir-Technology/nidus-sync/stadia"
|
|
"github.com/aarondl/opt/omit"
|
|
"github.com/aarondl/opt/omitnull"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type headerPoolEnum int
|
|
|
|
const (
|
|
headerPoolAddressPostalCode headerPoolEnum = iota
|
|
headerPoolAddressRegion
|
|
headerPoolAddressStreet
|
|
headerPoolCondition
|
|
headerPoolNotes
|
|
headerPoolPropertyOwnerName
|
|
headerPoolPropertyOwnerPhone
|
|
headerPoolResidentOwned
|
|
headerPoolResidentPhone
|
|
headerPoolTag
|
|
)
|
|
|
|
func (e headerPoolEnum) String() string {
|
|
switch e {
|
|
case headerPoolAddressPostalCode:
|
|
return "Postal Code"
|
|
case headerPoolAddressRegion:
|
|
return "City"
|
|
case headerPoolAddressStreet:
|
|
return "Street Address"
|
|
case headerPoolCondition:
|
|
return "Condition"
|
|
case headerPoolNotes:
|
|
return "Notes"
|
|
case headerPoolPropertyOwnerName:
|
|
return "Property Owner Name"
|
|
case headerPoolPropertyOwnerPhone:
|
|
return "Property Owner Phone"
|
|
case headerPoolResidentOwned:
|
|
return "Resident Owned"
|
|
case headerPoolResidentPhone:
|
|
return "Resident Phone"
|
|
default:
|
|
return "bad programmer"
|
|
}
|
|
}
|
|
func bulkGeocode(ctx context.Context, txn bob.Tx, file *models.FileuploadFile, c *models.FileuploadCSV, pools []*models.FileuploadPool, org *models.Organization) error {
|
|
if len(pools) == 0 {
|
|
return nil
|
|
}
|
|
log.Info().Int("len pools", len(pools)).Msg("bulk geocoding")
|
|
client := stadia.NewStadiaMaps(config.StadiaMapsAPIKey)
|
|
jobs := make(chan *jobGeocode, len(pools))
|
|
errors := make(chan error, len(pools))
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 20; i++ {
|
|
wg.Add(1)
|
|
go worker(ctx, txn, client, jobs, errors, &wg)
|
|
}
|
|
|
|
for i, pool := range pools {
|
|
jobs <- &jobGeocode{
|
|
csv: c,
|
|
rownumber: int32(i),
|
|
org: org,
|
|
pool: pool,
|
|
}
|
|
}
|
|
close(jobs)
|
|
|
|
go func() {
|
|
wg.Wait()
|
|
close(errors)
|
|
}()
|
|
|
|
error_count := 0
|
|
for err := range errors {
|
|
log.Error().Err(err).Msg("failed to geocode")
|
|
error_count++
|
|
}
|
|
if error_count > 0 {
|
|
txn.Rollback(ctx)
|
|
return fmt.Errorf("%d errors encountered in bulk geocode", error_count)
|
|
}
|
|
update_query := `
|
|
UPDATE fileupload.pool p
|
|
SET is_in_district = (
|
|
EXISTS (
|
|
SELECT 1
|
|
FROM import.district d
|
|
JOIN organization o ON d.gid = o.import_district_gid
|
|
WHERE o.id = p.organization_id
|
|
AND ST_Contains(d.geom_4326, p.geom)
|
|
)
|
|
)
|
|
WHERE p.geom IS NOT NULL;`
|
|
_, err := txn.ExecContext(ctx, update_query)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update is_in_district: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type jobGeocode struct {
|
|
csv *models.FileuploadCSV
|
|
rownumber int32
|
|
org *models.Organization
|
|
pool *models.FileuploadPool
|
|
}
|
|
|
|
func geocodePool(ctx context.Context, txn bob.Tx, client *stadia.StadiaMaps, job *jobGeocode) error {
|
|
pool := job.pool
|
|
a := types.Address{
|
|
Number: pool.AddressNumber,
|
|
Locality: pool.AddressLocality,
|
|
PostalCode: pool.AddressPostalCode,
|
|
Street: pool.AddressStreet,
|
|
}
|
|
address, err := geocode.GeocodeStructured(ctx, job.org, a)
|
|
if err != nil {
|
|
addError(ctx, txn, job.csv, job.rownumber, 0, err.Error())
|
|
}
|
|
geom_query := geom.PostgisPointQuery(address.Location)
|
|
_, err = psql.Update(
|
|
um.Table("fileupload.pool"),
|
|
um.SetCol("h3cell").ToArg(address.Cell),
|
|
um.SetCol("geom").To(geom_query),
|
|
um.Where(psql.Quote("id").EQ(psql.Arg(pool.ID))),
|
|
).Exec(ctx, txn)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update pool: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
func parseCSVPoollist(ctx context.Context, txn bob.Tx, f *models.FileuploadFile, c *models.FileuploadCSV) ([]*models.FileuploadPool, error) {
|
|
pools := make([]*models.FileuploadPool, 0)
|
|
r, err := file.NewFileReader(file.CollectionCSV, f.FileUUID)
|
|
if err != nil {
|
|
return pools, fmt.Errorf("Failed to get filereader for %d: %w", f.ID, err)
|
|
}
|
|
reader := csv.NewReader(r)
|
|
h, err := reader.Read()
|
|
if err != nil {
|
|
return pools, fmt.Errorf("Failed to read header of CSV for file %d: %w", f.ID, err)
|
|
}
|
|
header_types, header_names := parseHeaders(h)
|
|
missing_headers := missingRequiredHeaders(header_types)
|
|
for _, mh := range missing_headers {
|
|
errorMissingHeader(ctx, txn, c, mh)
|
|
f.Update(ctx, txn, &models.FileuploadFileSetter{
|
|
Status: omit.From(enums.FileuploadFilestatustypeError),
|
|
})
|
|
return pools, nil
|
|
}
|
|
// Start at 2 because the header is line 1, not line 0
|
|
line_number := int32(2)
|
|
for {
|
|
row, err := reader.Read()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
return pools, nil
|
|
}
|
|
return pools, fmt.Errorf("Failed to read all CSV records for file %d: %w", f.ID, err)
|
|
}
|
|
tags := make(map[string]string, 0)
|
|
setter := models.FileuploadPoolSetter{
|
|
// required fields
|
|
//AddressNumber: omit.From(),
|
|
//AddressLocality: omit.From(),
|
|
//AddressPostalCode: omit.From(),
|
|
//AddressRegion: omit.From(),
|
|
//AddressStreet: omit.From(),
|
|
Committed: omit.From(false),
|
|
Condition: omit.From(enums.PoolconditiontypeUnknown),
|
|
Created: omit.From(time.Now()),
|
|
CreatorID: omit.From(f.CreatorID),
|
|
CSVFile: omit.From(f.ID),
|
|
Deleted: omitnull.FromPtr[time.Time](nil),
|
|
Geom: omitnull.FromPtr[string](nil),
|
|
H3cell: omitnull.FromPtr[string](nil),
|
|
// ID - generated
|
|
IsInDistrict: omit.From(false),
|
|
IsNew: omit.From(true),
|
|
LineNumber: omit.From(line_number),
|
|
Notes: omit.From(""),
|
|
PropertyOwnerName: omit.From(""),
|
|
PropertyOwnerPhoneE164: omitnull.FromPtr[string](nil),
|
|
ResidentOwned: omitnull.FromPtr[bool](nil),
|
|
ResidentPhoneE164: omitnull.FromPtr[string](nil),
|
|
//Tags: convertToPGData(tags),
|
|
}
|
|
for i, col := range row {
|
|
hdr_t := header_types[i]
|
|
if col == "" {
|
|
continue
|
|
}
|
|
switch hdr_t {
|
|
case headerPoolAddressRegion:
|
|
setter.AddressRegion = omit.From(col)
|
|
case headerPoolAddressPostalCode:
|
|
setter.AddressPostalCode = omit.From(col)
|
|
case headerPoolAddressStreet:
|
|
// This type of spreadsheet normally has '123 Main Str'
|
|
parts := strings.SplitN(col, " ", 2)
|
|
if len(parts) != 2 {
|
|
addError(ctx, txn, c, int32(line_number), int32(i), fmt.Sprintf("'%s' is not a house number and street. It needs to be in the form '123 main'", col))
|
|
continue
|
|
}
|
|
setter.AddressNumber = omit.From(parts[0])
|
|
setter.AddressStreet = omit.From(parts[1])
|
|
case headerPoolCondition:
|
|
var condition enums.Poolconditiontype
|
|
col_l := strings.ToLower(col)
|
|
col_translated := col_l
|
|
switch col_l {
|
|
case "empty":
|
|
col_translated = "dry"
|
|
}
|
|
err := condition.Scan(col_translated)
|
|
if err != nil {
|
|
addError(ctx, txn, c, int32(line_number), int32(i), fmt.Sprintf("'%s' is not a pool condition that we recognize. It should be one of %s", col, poolConditionValidValues()))
|
|
setter.Condition = omit.From(enums.PoolconditiontypeUnknown)
|
|
continue
|
|
}
|
|
setter.Condition = omit.From(condition)
|
|
case headerPoolNotes:
|
|
setter.Notes = omit.From(col)
|
|
case headerPoolPropertyOwnerName:
|
|
setter.PropertyOwnerName = omit.From(col)
|
|
case headerPoolPropertyOwnerPhone:
|
|
phone, err := text.ParsePhoneNumber(col)
|
|
if err != nil {
|
|
addError(ctx, txn, c, int32(line_number), int32(i), fmt.Sprintf("'%s' is not a phone number that we recognize. Ideally it should be of the form '+12223334444'", col))
|
|
continue
|
|
}
|
|
text.EnsureInDB(ctx, txn, *phone)
|
|
setter.PropertyOwnerPhoneE164 = omitnull.From(phone.PhoneString())
|
|
case headerPoolResidentOwned:
|
|
boolValue, err := parseBool(col)
|
|
if err != nil {
|
|
addError(ctx, txn, c, int32(line_number), int32(i), fmt.Sprintf("'%s' is not something that we recognize as a true/false value. Please use either 'true' or 'false'", col))
|
|
continue
|
|
}
|
|
setter.ResidentOwned = omitnull.From(boolValue)
|
|
case headerPoolResidentPhone:
|
|
phone, err := text.ParsePhoneNumber(col)
|
|
if err != nil {
|
|
addError(ctx, txn, c, int32(line_number), int32(i), fmt.Sprintf("'%s' is not a phone number that we recognize. Ideally it should be of the form '+12223334444'", col))
|
|
continue
|
|
}
|
|
text.EnsureInDB(ctx, txn, *phone)
|
|
setter.ResidentPhoneE164 = omitnull.From(phone.PhoneString())
|
|
case headerPoolTag:
|
|
tags[header_names[i]] = col
|
|
}
|
|
|
|
}
|
|
setter.Tags = omit.From(db.ConvertToPGData(tags))
|
|
pool, err := models.FileuploadPools.Insert(&setter).One(ctx, txn)
|
|
if err != nil {
|
|
return pools, fmt.Errorf("Failed to create pool: %w", err)
|
|
}
|
|
pools = append(pools, pool)
|
|
line_number = line_number + 1
|
|
}
|
|
}
|
|
func processCSVPoollist(ctx context.Context, txn bob.Tx, f *models.FileuploadFile, c *models.FileuploadCSV, parsed []*models.FileuploadPool) error {
|
|
org, err := models.FindOrganization(ctx, db.PGInstance.BobDB, f.OrganizationID)
|
|
if err != nil {
|
|
return fmt.Errorf("get org: %w", err)
|
|
}
|
|
err = bulkGeocode(ctx, txn, f, c, parsed, org)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failure during geocoding")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func parseHeaders(row []string) ([]headerPoolEnum, []string) {
|
|
result_enums := make([]headerPoolEnum, 0)
|
|
result_names := make([]string, 0)
|
|
for _, h := range row {
|
|
ht := strings.TrimSpace(h)
|
|
hl := strings.ToLower(ht)
|
|
log.Debug().Str("header", hl).Msg("Saw CSV header")
|
|
var type_ headerPoolEnum
|
|
switch hl {
|
|
case "city":
|
|
type_ = headerPoolAddressRegion
|
|
case "zip":
|
|
case "postal code":
|
|
type_ = headerPoolAddressPostalCode
|
|
case "street address":
|
|
type_ = headerPoolAddressStreet
|
|
case "condition":
|
|
case "pool condition":
|
|
type_ = headerPoolCondition
|
|
case "notes":
|
|
type_ = headerPoolNotes
|
|
case "property owner":
|
|
case "property owner name":
|
|
type_ = headerPoolPropertyOwnerName
|
|
case "property owner phone":
|
|
type_ = headerPoolPropertyOwnerPhone
|
|
case "resident owned":
|
|
type_ = headerPoolResidentOwned
|
|
case "resident phone":
|
|
case "resident phone number":
|
|
type_ = headerPoolResidentPhone
|
|
default:
|
|
type_ = headerPoolTag
|
|
}
|
|
result_enums = append(result_enums, type_)
|
|
result_names = append(result_names, hl)
|
|
}
|
|
|
|
return result_enums, result_names
|
|
}
|
|
func missingRequiredHeaders(headers []headerPoolEnum) []headerPoolEnum {
|
|
results := make([]headerPoolEnum, 0)
|
|
for _, rh := range []headerPoolEnum{headerPoolAddressRegion, headerPoolAddressPostalCode, headerPoolAddressStreet} {
|
|
present := false
|
|
for _, h := range headers {
|
|
if h == rh {
|
|
present = true
|
|
break
|
|
}
|
|
}
|
|
if !present {
|
|
results = append(results, rh)
|
|
}
|
|
}
|
|
return results
|
|
}
|
|
func poolConditionValidValues() string {
|
|
var b strings.Builder
|
|
for i, cond := range enums.AllPoolconditiontype() {
|
|
if i == 0 {
|
|
fmt.Fprintf(&b, "'%s'", cond)
|
|
} else {
|
|
fmt.Fprintf(&b, ", '%s'", cond)
|
|
}
|
|
}
|
|
return b.String()
|
|
}
|
|
func worker(ctx context.Context, txn bob.Tx, client *stadia.StadiaMaps, jobs <-chan *jobGeocode, errors chan<- error, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
|
|
for job := range jobs {
|
|
err := geocodePool(ctx, txn, client, job)
|
|
|
|
if err != nil {
|
|
errors <- err
|
|
}
|
|
}
|
|
}
|