2026-02-08 04:36:12 +00:00
package csv
import (
2026-02-08 04:46:54 +00:00
"context"
"encoding/csv"
"fmt"
2026-02-08 05:00:14 +00:00
"io"
2026-02-09 18:25:44 +00:00
"strings"
2026-02-25 14:57:28 +00:00
"sync"
2026-02-09 18:25:44 +00:00
"time"
2026-02-08 05:00:14 +00:00
2026-02-09 18:25:44 +00:00
"github.com/Gleipnir-Technology/bob"
2026-02-14 16:49:54 +00:00
"github.com/Gleipnir-Technology/bob/dialect/psql"
"github.com/Gleipnir-Technology/bob/dialect/psql/um"
2026-02-14 15:41:38 +00:00
"github.com/Gleipnir-Technology/nidus-sync/config"
2026-02-08 04:46:54 +00:00
"github.com/Gleipnir-Technology/nidus-sync/db"
2026-02-09 18:25:44 +00:00
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
2026-02-08 04:46:54 +00:00
"github.com/Gleipnir-Technology/nidus-sync/db/models"
2026-03-12 23:49:16 +00:00
"github.com/Gleipnir-Technology/nidus-sync/platform/file"
2026-03-04 18:29:52 +00:00
"github.com/Gleipnir-Technology/nidus-sync/platform/geocode"
2026-02-14 16:49:54 +00:00
"github.com/Gleipnir-Technology/nidus-sync/platform/geom"
2026-02-14 05:05:31 +00:00
"github.com/Gleipnir-Technology/nidus-sync/platform/text"
2026-03-09 18:02:22 +00:00
"github.com/Gleipnir-Technology/nidus-sync/platform/types"
2026-02-14 15:41:38 +00:00
"github.com/Gleipnir-Technology/nidus-sync/stadia"
2026-02-09 18:25:44 +00:00
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
2026-02-08 04:46:54 +00:00
"github.com/rs/zerolog/log"
2026-02-08 04:36:12 +00:00
)
2026-02-09 18:25:44 +00:00
type headerPoolEnum int
const (
2026-04-15 19:30:50 +00:00
headerPoolAddressLocality headerPoolEnum = iota
headerPoolAddressPostalCode
2026-03-04 14:52:34 +00:00
headerPoolAddressRegion
headerPoolAddressStreet
headerPoolCondition
headerPoolNotes
headerPoolPropertyOwnerName
headerPoolPropertyOwnerPhone
headerPoolResidentOwned
headerPoolResidentPhone
headerPoolTag
2026-02-09 18:25:44 +00:00
)
func ( e headerPoolEnum ) String ( ) string {
switch e {
2026-03-04 14:52:34 +00:00
case headerPoolAddressPostalCode :
2026-02-09 18:25:44 +00:00
return "Postal Code"
2026-04-15 19:30:50 +00:00
case headerPoolAddressLocality :
2026-03-04 14:52:34 +00:00
return "City"
case headerPoolAddressStreet :
2026-02-09 18:25:44 +00:00
return "Street Address"
2026-03-04 14:52:34 +00:00
case headerPoolCondition :
2026-02-09 18:25:44 +00:00
return "Condition"
2026-03-04 14:52:34 +00:00
case headerPoolNotes :
2026-02-09 18:25:44 +00:00
return "Notes"
2026-03-04 14:52:34 +00:00
case headerPoolPropertyOwnerName :
2026-02-09 18:25:44 +00:00
return "Property Owner Name"
2026-03-04 14:52:34 +00:00
case headerPoolPropertyOwnerPhone :
2026-02-09 18:25:44 +00:00
return "Property Owner Phone"
2026-03-04 14:52:34 +00:00
case headerPoolResidentOwned :
2026-02-09 18:25:44 +00:00
return "Resident Owned"
2026-03-04 14:52:34 +00:00
case headerPoolResidentPhone :
2026-02-09 18:25:44 +00:00
return "Resident Phone"
2026-04-15 19:30:50 +00:00
case headerPoolAddressRegion :
return "State"
2026-02-09 18:25:44 +00:00
default :
return "bad programmer"
}
}
2026-02-25 14:57:28 +00:00
func bulkGeocode ( ctx context . Context , txn bob . Tx , file * models . FileuploadFile , c * models . FileuploadCSV , pools [ ] * models . FileuploadPool , org * models . Organization ) error {
2026-02-14 15:41:38 +00:00
if len ( pools ) == 0 {
return nil
}
2026-02-25 14:57:28 +00:00
log . Info ( ) . Int ( "len pools" , len ( pools ) ) . Msg ( "bulk geocoding" )
2026-02-14 15:41:38 +00:00
client := stadia . NewStadiaMaps ( config . StadiaMapsAPIKey )
2026-02-25 14:57:28 +00:00
jobs := make ( chan * jobGeocode , len ( pools ) )
errors := make ( chan error , len ( pools ) )
var wg sync . WaitGroup
for i := 0 ; i < 20 ; i ++ {
wg . Add ( 1 )
go worker ( ctx , txn , client , jobs , errors , & wg )
2026-02-14 15:41:38 +00:00
}
2026-02-16 17:59:18 +00:00
2026-02-25 14:57:28 +00:00
for i , pool := range pools {
jobs <- & jobGeocode {
csv : c ,
rownumber : int32 ( i ) ,
org : org ,
pool : pool ,
2026-02-14 16:49:54 +00:00
}
}
2026-02-25 14:57:28 +00:00
close ( jobs )
go func ( ) {
wg . Wait ( )
close ( errors )
} ( )
error_count := 0
for err := range errors {
log . Error ( ) . Err ( err ) . Msg ( "failed to geocode" )
error_count ++
}
if error_count > 0 {
txn . Rollback ( ctx )
return fmt . Errorf ( "%d errors encountered in bulk geocode" , error_count )
}
2026-02-14 16:49:54 +00:00
update_query := `
UPDATE fileupload . pool p
SET is_in_district = (
EXISTS (
SELECT 1
2026-04-15 19:31:32 +00:00
FROM organization o , fileupload . file f
WHERE
p . csv_file = f . id AND
f . organization_id = o . id AND (
ST_Contains ( o . service_area_geometry , p . geom ) OR
o . is_catchall
)
2026-02-14 16:49:54 +00:00
)
)
WHERE p . geom IS NOT NULL ; `
2026-02-25 14:57:28 +00:00
_ , err := txn . ExecContext ( ctx , update_query )
2026-02-14 16:49:54 +00:00
if err != nil {
return fmt . Errorf ( "failed to update is_in_district: %w" , err )
2026-02-14 15:41:38 +00:00
}
return nil
}
2026-02-25 14:57:28 +00:00
type jobGeocode struct {
csv * models . FileuploadCSV
rownumber int32
org * models . Organization
pool * models . FileuploadPool
}
2026-03-04 18:29:52 +00:00
func geocodePool ( ctx context . Context , txn bob . Tx , client * stadia . StadiaMaps , job * jobGeocode ) error {
2026-02-25 14:57:28 +00:00
pool := job . pool
2026-03-09 18:02:22 +00:00
a := types . Address {
2026-03-04 18:29:52 +00:00
Number : pool . AddressNumber ,
Locality : pool . AddressLocality ,
PostalCode : pool . AddressPostalCode ,
2026-04-15 19:30:50 +00:00
Region : pool . AddressRegion ,
2026-03-04 18:29:52 +00:00
Street : pool . AddressStreet ,
2026-02-25 14:57:28 +00:00
}
2026-04-14 01:32:32 +00:00
geo , err := geocode . GeocodeStructured ( ctx , job . org , a )
2026-02-25 14:57:28 +00:00
if err != nil {
2026-03-04 18:29:52 +00:00
addError ( ctx , txn , job . csv , job . rownumber , 0 , err . Error ( ) )
2026-04-15 19:31:55 +00:00
return nil
2026-02-25 14:57:28 +00:00
}
2026-04-14 01:32:32 +00:00
if geo . Address . Location == nil {
2026-04-15 19:31:55 +00:00
addError ( ctx , txn , job . csv , job . rownumber , 0 , fmt . Sprintf ( "nil location from geocoding" ) )
return nil
2026-04-14 01:32:32 +00:00
}
geom_query := geom . PostgisPointQuery ( * geo . Address . Location )
2026-02-25 14:57:28 +00:00
_ , err = psql . Update (
um . Table ( "fileupload.pool" ) ,
2026-04-14 01:32:32 +00:00
um . SetCol ( "h3cell" ) . ToArg ( geo . Cell ) ,
2026-02-25 14:57:28 +00:00
um . SetCol ( "geom" ) . To ( geom_query ) ,
2026-04-15 20:29:42 +00:00
um . SetCol ( "address_id" ) . To ( * geo . Address . ID ) ,
2026-02-25 14:57:28 +00:00
um . Where ( psql . Quote ( "id" ) . EQ ( psql . Arg ( pool . ID ) ) ) ,
) . Exec ( ctx , txn )
if err != nil {
return fmt . Errorf ( "failed to update pool: %w" , err )
}
return nil
}
2026-03-12 23:49:16 +00:00
func parseCSVPoollist ( ctx context . Context , txn bob . Tx , f * models . FileuploadFile , c * models . FileuploadCSV ) ( [ ] * models . FileuploadPool , error ) {
2026-02-25 14:57:28 +00:00
pools := make ( [ ] * models . FileuploadPool , 0 )
2026-03-12 23:49:16 +00:00
r , err := file . NewFileReader ( file . CollectionCSV , f . FileUUID )
2026-02-08 04:46:54 +00:00
if err != nil {
2026-03-12 23:49:16 +00:00
return pools , fmt . Errorf ( "Failed to get filereader for %d: %w" , f . ID , err )
2026-02-08 04:46:54 +00:00
}
reader := csv . NewReader ( r )
2026-02-09 18:25:44 +00:00
h , err := reader . Read ( )
2026-02-08 04:46:54 +00:00
if err != nil {
2026-03-12 23:49:16 +00:00
return pools , fmt . Errorf ( "Failed to read header of CSV for file %d: %w" , f . ID , err )
2026-02-08 04:46:54 +00:00
}
2026-02-16 16:38:04 +00:00
header_types , header_names := parseHeaders ( h )
missing_headers := missingRequiredHeaders ( header_types )
2026-02-09 18:25:44 +00:00
for _ , mh := range missing_headers {
errorMissingHeader ( ctx , txn , c , mh )
2026-03-12 23:49:16 +00:00
f . Update ( ctx , txn , & models . FileuploadFileSetter {
2026-02-09 18:25:44 +00:00
Status : omit . From ( enums . FileuploadFilestatustypeError ) ,
} )
2026-02-14 15:41:38 +00:00
return pools , nil
2026-02-09 18:25:44 +00:00
}
2026-02-16 17:59:18 +00:00
// Start at 2 because the header is line 1, not line 0
line_number := int32 ( 2 )
2026-02-08 04:55:33 +00:00
for {
row , err := reader . Read ( )
if err != nil {
2026-02-08 05:00:14 +00:00
if err == io . EOF {
2026-02-14 15:41:38 +00:00
return pools , nil
2026-02-08 05:00:14 +00:00
}
2026-03-12 23:49:16 +00:00
return pools , fmt . Errorf ( "Failed to read all CSV records for file %d: %w" , f . ID , err )
2026-02-08 04:55:33 +00:00
}
2026-02-16 16:38:04 +00:00
tags := make ( map [ string ] string , 0 )
2026-02-09 19:03:27 +00:00
setter := models . FileuploadPoolSetter {
2026-02-09 18:25:44 +00:00
// required fields
2026-03-04 14:52:34 +00:00
//AddressNumber: omit.From(),
//AddressLocality: omit.From(),
2026-02-09 18:25:44 +00:00
//AddressPostalCode: omit.From(),
2026-03-04 14:52:34 +00:00
//AddressRegion: omit.From(),
2026-02-09 18:25:44 +00:00
//AddressStreet: omit.From(),
2026-02-09 19:03:27 +00:00
Committed : omit . From ( false ) ,
2026-03-05 01:22:21 +00:00
Condition : omit . From ( enums . PoolconditiontypeUnknown ) ,
2026-02-09 18:25:44 +00:00
Created : omit . From ( time . Now ( ) ) ,
2026-03-12 23:49:16 +00:00
CreatorID : omit . From ( f . CreatorID ) ,
CSVFile : omit . From ( f . ID ) ,
2026-02-09 18:25:44 +00:00
Deleted : omitnull . FromPtr [ time . Time ] ( nil ) ,
2026-02-09 19:03:27 +00:00
Geom : omitnull . FromPtr [ string ] ( nil ) ,
H3cell : omitnull . FromPtr [ string ] ( nil ) ,
2026-02-09 18:25:44 +00:00
// ID - generated
2026-02-14 05:40:27 +00:00
IsInDistrict : omit . From ( false ) ,
2026-02-16 16:49:24 +00:00
IsNew : omit . From ( true ) ,
2026-02-16 17:59:18 +00:00
LineNumber : omit . From ( line_number ) ,
2026-02-14 05:40:27 +00:00
Notes : omit . From ( "" ) ,
PropertyOwnerName : omit . From ( "" ) ,
PropertyOwnerPhoneE164 : omitnull . FromPtr [ string ] ( nil ) ,
ResidentOwned : omitnull . FromPtr [ bool ] ( nil ) ,
ResidentPhoneE164 : omitnull . FromPtr [ string ] ( nil ) ,
2026-02-16 16:38:04 +00:00
//Tags: convertToPGData(tags),
2026-02-09 18:25:44 +00:00
}
for i , col := range row {
2026-02-16 16:38:04 +00:00
hdr_t := header_types [ i ]
2026-02-14 15:42:59 +00:00
if col == "" {
continue
}
2026-02-16 16:38:04 +00:00
switch hdr_t {
2026-04-15 19:30:50 +00:00
case headerPoolAddressLocality :
setter . AddressLocality = omit . From ( col )
2026-03-04 14:52:34 +00:00
case headerPoolAddressPostalCode :
2026-02-09 18:25:44 +00:00
setter . AddressPostalCode = omit . From ( col )
2026-04-15 19:30:50 +00:00
case headerPoolAddressRegion :
setter . AddressRegion = omit . From ( col )
2026-03-04 14:52:34 +00:00
case headerPoolAddressStreet :
// This type of spreadsheet normally has '123 Main Str'
parts := strings . SplitN ( col , " " , 2 )
if len ( parts ) != 2 {
addError ( ctx , txn , c , int32 ( line_number ) , int32 ( i ) , fmt . Sprintf ( "'%s' is not a house number and street. It needs to be in the form '123 main'" , col ) )
continue
}
setter . AddressNumber = omit . From ( parts [ 0 ] )
setter . AddressStreet = omit . From ( parts [ 1 ] )
case headerPoolCondition :
2026-03-05 01:22:21 +00:00
var condition enums . Poolconditiontype
2026-02-24 20:34:46 +00:00
col_l := strings . ToLower ( col )
col_translated := col_l
switch col_l {
case "empty" :
col_translated = "dry"
}
err := condition . Scan ( col_translated )
2026-02-09 18:25:44 +00:00
if err != nil {
2026-02-16 17:59:18 +00:00
addError ( ctx , txn , c , int32 ( line_number ) , int32 ( i ) , fmt . Sprintf ( "'%s' is not a pool condition that we recognize. It should be one of %s" , col , poolConditionValidValues ( ) ) )
2026-03-05 01:22:21 +00:00
setter . Condition = omit . From ( enums . PoolconditiontypeUnknown )
2026-02-09 18:25:44 +00:00
continue
}
setter . Condition = omit . From ( condition )
2026-03-04 14:52:34 +00:00
case headerPoolNotes :
2026-02-09 18:25:44 +00:00
setter . Notes = omit . From ( col )
2026-03-04 14:52:34 +00:00
case headerPoolPropertyOwnerName :
2026-02-09 18:25:44 +00:00
setter . PropertyOwnerName = omit . From ( col )
2026-03-04 14:52:34 +00:00
case headerPoolPropertyOwnerPhone :
2026-02-14 05:05:31 +00:00
phone , err := text . ParsePhoneNumber ( col )
if err != nil {
2026-02-16 17:59:18 +00:00
addError ( ctx , txn , c , int32 ( line_number ) , int32 ( i ) , fmt . Sprintf ( "'%s' is not a phone number that we recognize. Ideally it should be of the form '+12223334444'" , col ) )
2026-02-14 05:05:31 +00:00
continue
}
text . EnsureInDB ( ctx , txn , * phone )
2026-03-16 19:52:29 +00:00
setter . PropertyOwnerPhoneE164 = omitnull . From ( phone . PhoneString ( ) )
2026-03-04 14:52:34 +00:00
case headerPoolResidentOwned :
2026-02-09 18:25:44 +00:00
boolValue , err := parseBool ( col )
if err != nil {
2026-02-16 17:59:18 +00:00
addError ( ctx , txn , c , int32 ( line_number ) , int32 ( i ) , fmt . Sprintf ( "'%s' is not something that we recognize as a true/false value. Please use either 'true' or 'false'" , col ) )
2026-02-09 18:25:44 +00:00
continue
}
setter . ResidentOwned = omitnull . From ( boolValue )
2026-03-04 14:52:34 +00:00
case headerPoolResidentPhone :
2026-02-14 05:40:27 +00:00
phone , err := text . ParsePhoneNumber ( col )
if err != nil {
2026-02-16 17:59:18 +00:00
addError ( ctx , txn , c , int32 ( line_number ) , int32 ( i ) , fmt . Sprintf ( "'%s' is not a phone number that we recognize. Ideally it should be of the form '+12223334444'" , col ) )
2026-02-14 05:40:27 +00:00
continue
}
text . EnsureInDB ( ctx , txn , * phone )
2026-03-16 19:52:29 +00:00
setter . ResidentPhoneE164 = omitnull . From ( phone . PhoneString ( ) )
2026-03-04 14:52:34 +00:00
case headerPoolTag :
2026-02-16 16:38:04 +00:00
tags [ header_names [ i ] ] = col
2026-02-09 18:25:44 +00:00
}
2026-02-16 16:38:04 +00:00
2026-02-09 18:25:44 +00:00
}
2026-02-16 16:38:04 +00:00
setter . Tags = omit . From ( db . ConvertToPGData ( tags ) )
2026-02-14 15:41:38 +00:00
pool , err := models . FileuploadPools . Insert ( & setter ) . One ( ctx , txn )
2026-02-09 18:25:44 +00:00
if err != nil {
2026-02-14 15:41:38 +00:00
return pools , fmt . Errorf ( "Failed to create pool: %w" , err )
2026-02-09 18:25:44 +00:00
}
2026-02-14 15:41:38 +00:00
pools = append ( pools , pool )
2026-02-16 17:59:18 +00:00
line_number = line_number + 1
2026-02-08 04:46:54 +00:00
}
2026-02-09 18:25:44 +00:00
}
2026-03-12 23:49:16 +00:00
func processCSVPoollist ( ctx context . Context , txn bob . Tx , f * models . FileuploadFile , c * models . FileuploadCSV , parsed [ ] * models . FileuploadPool ) error {
org , err := models . FindOrganization ( ctx , db . PGInstance . BobDB , f . OrganizationID )
2026-02-09 18:25:44 +00:00
if err != nil {
2026-03-02 18:49:02 +00:00
return fmt . Errorf ( "get org: %w" , err )
2026-02-09 18:25:44 +00:00
}
2026-03-12 23:49:16 +00:00
err = bulkGeocode ( ctx , txn , f , c , parsed , org )
2026-02-09 18:25:44 +00:00
if err != nil {
2026-03-02 18:49:02 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Failure during geocoding" )
2026-02-09 18:25:44 +00:00
}
2026-03-02 18:49:02 +00:00
return nil
2026-02-09 18:25:44 +00:00
}
2026-02-16 16:38:04 +00:00
func parseHeaders ( row [ ] string ) ( [ ] headerPoolEnum , [ ] string ) {
result_enums := make ( [ ] headerPoolEnum , 0 )
result_names := make ( [ ] string , 0 )
2026-02-09 18:25:44 +00:00
for _ , h := range row {
ht := strings . TrimSpace ( h )
hl := strings . ToLower ( ht )
log . Debug ( ) . Str ( "header" , hl ) . Msg ( "Saw CSV header" )
var type_ headerPoolEnum
switch hl {
case "city" :
2026-04-15 19:30:50 +00:00
type_ = headerPoolAddressLocality
2026-02-09 18:25:44 +00:00
case "zip" :
case "postal code" :
2026-03-04 14:52:34 +00:00
type_ = headerPoolAddressPostalCode
2026-04-15 19:30:50 +00:00
case "state" :
type_ = headerPoolAddressRegion
2026-02-09 18:25:44 +00:00
case "street address" :
2026-03-04 14:52:34 +00:00
type_ = headerPoolAddressStreet
2026-02-09 18:25:44 +00:00
case "condition" :
case "pool condition" :
2026-03-04 14:52:34 +00:00
type_ = headerPoolCondition
2026-02-09 18:25:44 +00:00
case "notes" :
2026-03-04 14:52:34 +00:00
type_ = headerPoolNotes
2026-02-09 18:25:44 +00:00
case "property owner" :
case "property owner name" :
2026-03-04 14:52:34 +00:00
type_ = headerPoolPropertyOwnerName
2026-02-09 18:25:44 +00:00
case "property owner phone" :
2026-03-04 14:52:34 +00:00
type_ = headerPoolPropertyOwnerPhone
2026-02-09 18:25:44 +00:00
case "resident owned" :
2026-03-04 14:52:34 +00:00
type_ = headerPoolResidentOwned
2026-02-09 18:25:44 +00:00
case "resident phone" :
case "resident phone number" :
2026-03-04 14:52:34 +00:00
type_ = headerPoolResidentPhone
2026-02-09 18:25:44 +00:00
default :
2026-03-04 14:52:34 +00:00
type_ = headerPoolTag
2026-02-09 18:25:44 +00:00
}
2026-02-16 16:38:04 +00:00
result_enums = append ( result_enums , type_ )
result_names = append ( result_names , hl )
2026-02-09 18:25:44 +00:00
}
2026-02-16 16:38:04 +00:00
return result_enums , result_names
2026-02-09 18:25:44 +00:00
}
func missingRequiredHeaders ( headers [ ] headerPoolEnum ) [ ] headerPoolEnum {
results := make ( [ ] headerPoolEnum , 0 )
2026-04-15 19:30:50 +00:00
for _ , rh := range [ ] headerPoolEnum { headerPoolAddressLocality , headerPoolAddressRegion , headerPoolAddressPostalCode , headerPoolAddressStreet } {
2026-02-09 18:25:44 +00:00
present := false
for _ , h := range headers {
if h == rh {
present = true
break
}
}
if ! present {
results = append ( results , rh )
}
}
return results
}
func poolConditionValidValues ( ) string {
var b strings . Builder
2026-03-05 01:22:21 +00:00
for i , cond := range enums . AllPoolconditiontype ( ) {
2026-02-09 18:25:44 +00:00
if i == 0 {
fmt . Fprintf ( & b , "'%s'" , cond )
} else {
fmt . Fprintf ( & b , ", '%s'" , cond )
}
}
return b . String ( )
2026-02-08 04:55:33 +00:00
}
2026-02-25 14:57:28 +00:00
func worker ( ctx context . Context , txn bob . Tx , client * stadia . StadiaMaps , jobs <- chan * jobGeocode , errors chan <- error , wg * sync . WaitGroup ) {
defer wg . Done ( )
for job := range jobs {
2026-03-04 18:29:52 +00:00
err := geocodePool ( ctx , txn , client , job )
2026-02-25 14:57:28 +00:00
if err != nil {
errors <- err
}
}
}