2026-02-08 04:36:12 +00:00
package csv
import (
2026-02-08 04:46:54 +00:00
"context"
"encoding/csv"
"fmt"
2026-02-08 05:00:14 +00:00
"io"
2026-02-09 18:25:44 +00:00
"strconv"
"strings"
"time"
2026-02-08 05:00:14 +00:00
2026-02-09 18:25:44 +00:00
"github.com/Gleipnir-Technology/bob"
2026-02-14 16:49:54 +00:00
"github.com/Gleipnir-Technology/bob/dialect/psql"
"github.com/Gleipnir-Technology/bob/dialect/psql/um"
2026-02-14 15:41:38 +00:00
"github.com/Gleipnir-Technology/nidus-sync/config"
2026-02-08 04:46:54 +00:00
"github.com/Gleipnir-Technology/nidus-sync/db"
2026-02-09 18:25:44 +00:00
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
2026-02-08 04:46:54 +00:00
"github.com/Gleipnir-Technology/nidus-sync/db/models"
2026-02-14 16:49:54 +00:00
"github.com/Gleipnir-Technology/nidus-sync/h3utils"
"github.com/Gleipnir-Technology/nidus-sync/platform/geom"
2026-02-14 05:05:31 +00:00
"github.com/Gleipnir-Technology/nidus-sync/platform/text"
2026-02-14 15:41:38 +00:00
"github.com/Gleipnir-Technology/nidus-sync/stadia"
2026-02-08 04:46:54 +00:00
"github.com/Gleipnir-Technology/nidus-sync/userfile"
2026-02-09 18:25:44 +00:00
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
2026-02-08 04:46:54 +00:00
"github.com/rs/zerolog/log"
2026-02-08 04:36:12 +00:00
)
2026-02-09 18:25:44 +00:00
type headerPoolEnum int
const (
headerAddressCity = iota
headerAddressPostalCode
headerAddressStreet
headerCondition
headerNotes
headerPropertyOwnerName
headerPropertyOwnerPhone
headerResidentOwned
headerResidentPhone
headerTag
)
func ( e headerPoolEnum ) String ( ) string {
switch e {
case headerAddressCity :
return "City"
case headerAddressPostalCode :
return "Postal Code"
case headerAddressStreet :
return "Street Address"
case headerCondition :
return "Condition"
case headerNotes :
return "Notes"
case headerPropertyOwnerName :
return "Property Owner Name"
case headerPropertyOwnerPhone :
return "Property Owner Phone"
case headerResidentOwned :
return "Resident Owned"
case headerResidentPhone :
return "Resident Phone"
default :
return "bad programmer"
}
}
2026-02-08 04:46:54 +00:00
func ProcessJob ( ctx context . Context , file_id int32 ) error {
file , err := models . FindFileuploadFile ( ctx , db . PGInstance . BobDB , file_id )
if err != nil {
return fmt . Errorf ( "Failed to get file %d from DB: %w" , file_id , err )
}
2026-02-14 15:41:38 +00:00
txn , err := db . PGInstance . BobDB . BeginTx ( ctx , nil )
2026-02-09 18:25:44 +00:00
if err != nil {
2026-02-14 15:41:38 +00:00
return fmt . Errorf ( "Failed to start transaction: %w" , err )
}
defer txn . Rollback ( ctx )
pools , err := parseFile ( ctx , txn , * file )
if err != nil {
return fmt . Errorf ( "parse file: %w" , err )
}
2026-02-24 20:02:44 +00:00
_ , err = psql . Update (
um . Table ( "fileupload.csv" ) ,
um . SetCol ( "rowcount" ) . ToArg ( len ( pools ) ) ,
2026-02-24 20:07:39 +00:00
um . Where ( psql . Quote ( "file_id" ) . EQ ( psql . Arg ( file_id ) ) ) ,
2026-02-24 20:02:44 +00:00
) . Exec ( ctx , txn )
if err != nil {
return fmt . Errorf ( "update csv row: %w" , err )
}
2026-02-14 15:41:38 +00:00
err = bulkGeocode ( ctx , txn , * file , pools )
if err != nil {
return fmt . Errorf ( "bulk geocode: %w" , err )
}
2026-02-24 20:34:32 +00:00
file . Update ( ctx , txn , & models . FileuploadFileSetter {
Status : omit . From ( enums . FileuploadFilestatustypeParsed ) ,
} )
log . Info ( ) . Int32 ( "file.ID" , file . ID ) . Msg ( "Set file to parsed" )
2026-02-14 16:49:54 +00:00
txn . Commit ( ctx )
2026-02-14 15:41:38 +00:00
return nil
}
func bulkGeocode ( ctx context . Context , txn bob . Tx , file models . FileuploadFile , pools [ ] * models . FileuploadPool ) error {
if len ( pools ) == 0 {
return nil
}
client := stadia . NewStadiaMaps ( config . StadiaMapsAPIKey )
requests := make ( [ ] stadia . BulkGeocodeQuery , 0 )
for _ , pool := range pools {
requests = append ( requests , stadia . StructuredGeocodeRequest {
Address : & pool . AddressStreet ,
PostalCode : & pool . AddressPostalCode ,
} )
}
log . Info ( ) . Int ( "len pools" , len ( pools ) ) . Int ( "len requests" , len ( requests ) ) . Msg ( "bulk querying" )
responses , err := client . BulkGeocode ( requests )
if err != nil {
return fmt . Errorf ( "client bulk geocode: %w" , err )
}
log . Info ( ) . Int ( "len" , len ( responses ) ) . Msg ( "bulk query response" )
for i , resp := range responses {
pool := pools [ i ]
2026-02-16 17:59:18 +00:00
sublog := log . With ( ) .
Int ( "row" , i ) .
Str ( "pool.address_postal" , pool . AddressPostalCode ) .
Str ( "pool.address_street" , pool . AddressStreet ) .
Str ( "pool.postal" , pool . AddressPostalCode ) .
Logger ( )
2026-02-14 15:41:38 +00:00
if resp . Status != 200 {
2026-02-16 17:59:18 +00:00
sublog . Info ( ) . Int ( "status" , resp . Status ) . Str ( "msg" , resp . Message ) . Msg ( "Non-200 status on geocode request" )
2026-02-14 16:49:54 +00:00
continue
2026-02-14 15:41:38 +00:00
}
2026-02-14 16:49:54 +00:00
if resp . Response == nil {
2026-02-16 17:59:18 +00:00
sublog . Info ( ) . Str ( "msg" , resp . Message ) . Msg ( "nil response on geocode" )
2026-02-14 16:49:54 +00:00
continue
}
if len ( resp . Response . Features ) > 1 {
2026-02-16 17:59:18 +00:00
sublog . Warn ( ) . Int ( "len" , len ( resp . Response . Features ) ) . Msg ( "too many features" )
2026-02-14 16:49:54 +00:00
continue
}
feature := resp . Response . Features [ 0 ]
if feature . Geometry . Type != "Point" {
2026-02-16 17:59:18 +00:00
sublog . Warn ( ) . Str ( "type" , feature . Geometry . Type ) . Msg ( "wrong type" )
2026-02-14 16:49:54 +00:00
continue
}
longitude := feature . Geometry . Coordinates [ 0 ]
latitude := feature . Geometry . Coordinates [ 1 ]
cell , err := h3utils . GetCell ( longitude , latitude , 15 )
if err != nil {
2026-02-16 17:59:18 +00:00
sublog . Warn ( ) . Err ( err ) . Float64 ( "lng" , longitude ) . Float64 ( "lat" , latitude ) . Msg ( "failed to convert h3 cell" )
2026-02-14 16:49:54 +00:00
continue
2026-02-14 15:41:38 +00:00
}
2026-02-14 16:49:54 +00:00
geom_query := geom . PostgisPointQuery ( longitude , latitude )
_ , err = psql . Update (
um . Table ( "fileupload.pool" ) ,
um . SetCol ( "h3cell" ) . ToArg ( cell ) ,
um . SetCol ( "geom" ) . To ( geom_query ) ,
um . Where ( psql . Quote ( "id" ) . EQ ( psql . Arg ( pool . ID ) ) ) ,
) . Exec ( ctx , txn )
if err != nil {
log . Warn ( ) . Err ( err ) . Msg ( "failed to update pool" )
continue
}
}
update_query := `
UPDATE fileupload . pool p
SET is_in_district = (
EXISTS (
SELECT 1
FROM import . district d
JOIN organization o ON d . gid = o . import_district_gid
WHERE o . id = p . organization_id
2026-02-16 15:26:41 +00:00
AND ST_Contains ( d . geom_4326 , p . geom )
2026-02-14 16:49:54 +00:00
)
)
WHERE p . geom IS NOT NULL ; `
_ , err = txn . ExecContext ( ctx , update_query )
if err != nil {
return fmt . Errorf ( "failed to update is_in_district: %w" , err )
2026-02-14 15:41:38 +00:00
}
return nil
}
func parseFile ( ctx context . Context , txn bob . Tx , file models . FileuploadFile ) ( [ ] * models . FileuploadPool , error ) {
pools := make ( [ ] * models . FileuploadPool , 0 )
c , err := models . FindFileuploadCSV ( ctx , db . PGInstance . BobDB , file . ID )
if err != nil {
return pools , fmt . Errorf ( "Failed to get file %d from DB: %w" , file . ID , err )
2026-02-09 18:25:44 +00:00
}
2026-02-08 04:46:54 +00:00
r , err := userfile . NewFileReader ( userfile . CollectionCSV , file . FileUUID )
if err != nil {
2026-02-14 15:41:38 +00:00
return pools , fmt . Errorf ( "Failed to get filereader for %d: %w" , file . ID , err )
2026-02-08 04:46:54 +00:00
}
reader := csv . NewReader ( r )
2026-02-09 18:25:44 +00:00
h , err := reader . Read ( )
2026-02-08 04:46:54 +00:00
if err != nil {
2026-02-14 15:41:38 +00:00
return pools , fmt . Errorf ( "Failed to read header of CSV for file %d: %w" , file . ID , err )
2026-02-08 04:46:54 +00:00
}
2026-02-16 16:38:04 +00:00
header_types , header_names := parseHeaders ( h )
missing_headers := missingRequiredHeaders ( header_types )
2026-02-09 18:25:44 +00:00
for _ , mh := range missing_headers {
errorMissingHeader ( ctx , txn , c , mh )
file . Update ( ctx , txn , & models . FileuploadFileSetter {
Status : omit . From ( enums . FileuploadFilestatustypeError ) ,
} )
2026-02-14 15:41:38 +00:00
return pools , nil
2026-02-09 18:25:44 +00:00
}
2026-02-16 17:59:18 +00:00
// Start at 2 because the header is line 1, not line 0
line_number := int32 ( 2 )
2026-02-08 04:55:33 +00:00
for {
row , err := reader . Read ( )
if err != nil {
2026-02-08 05:00:14 +00:00
if err == io . EOF {
2026-02-14 15:41:38 +00:00
return pools , nil
2026-02-08 05:00:14 +00:00
}
2026-02-14 15:41:38 +00:00
return pools , fmt . Errorf ( "Failed to read all CSV records for file %d: %w" , file . ID , err )
2026-02-08 04:55:33 +00:00
}
2026-02-16 16:38:04 +00:00
tags := make ( map [ string ] string , 0 )
2026-02-09 19:03:27 +00:00
setter := models . FileuploadPoolSetter {
2026-02-09 18:25:44 +00:00
// required fields
//AddressCity: omit.From(),
//AddressPostalCode: omit.From(),
//AddressStreet: omit.From(),
2026-02-09 19:03:27 +00:00
Committed : omit . From ( false ) ,
Condition : omit . From ( enums . FileuploadPoolconditiontypeUnknown ) ,
2026-02-09 18:25:44 +00:00
Created : omit . From ( time . Now ( ) ) ,
CreatorID : omit . From ( file . CreatorID ) ,
2026-02-09 19:03:27 +00:00
CSVFile : omit . From ( file . ID ) ,
2026-02-09 18:25:44 +00:00
Deleted : omitnull . FromPtr [ time . Time ] ( nil ) ,
2026-02-09 19:03:27 +00:00
Geom : omitnull . FromPtr [ string ] ( nil ) ,
H3cell : omitnull . FromPtr [ string ] ( nil ) ,
2026-02-09 18:25:44 +00:00
// ID - generated
2026-02-14 05:40:27 +00:00
IsInDistrict : omit . From ( false ) ,
2026-02-16 16:49:24 +00:00
IsNew : omit . From ( true ) ,
2026-02-16 17:59:18 +00:00
LineNumber : omit . From ( line_number ) ,
2026-02-14 05:40:27 +00:00
Notes : omit . From ( "" ) ,
OrganizationID : omit . From ( file . OrganizationID ) ,
PropertyOwnerName : omit . From ( "" ) ,
PropertyOwnerPhoneE164 : omitnull . FromPtr [ string ] ( nil ) ,
ResidentOwned : omitnull . FromPtr [ bool ] ( nil ) ,
ResidentPhoneE164 : omitnull . FromPtr [ string ] ( nil ) ,
2026-02-16 16:38:04 +00:00
//Tags: convertToPGData(tags),
2026-02-09 18:25:44 +00:00
}
for i , col := range row {
2026-02-16 16:38:04 +00:00
hdr_t := header_types [ i ]
2026-02-14 15:42:59 +00:00
if col == "" {
continue
}
2026-02-16 16:38:04 +00:00
switch hdr_t {
2026-02-09 18:25:44 +00:00
case headerAddressCity :
setter . AddressCity = omit . From ( col )
case headerAddressPostalCode :
setter . AddressPostalCode = omit . From ( col )
case headerAddressStreet :
setter . AddressStreet = omit . From ( col )
case headerCondition :
2026-02-09 19:03:27 +00:00
var condition enums . FileuploadPoolconditiontype
2026-02-24 20:34:46 +00:00
col_l := strings . ToLower ( col )
col_translated := col_l
switch col_l {
case "empty" :
col_translated = "dry"
}
err := condition . Scan ( col_translated )
2026-02-09 18:25:44 +00:00
if err != nil {
2026-02-16 17:59:18 +00:00
addError ( ctx , txn , c , int32 ( line_number ) , int32 ( i ) , fmt . Sprintf ( "'%s' is not a pool condition that we recognize. It should be one of %s" , col , poolConditionValidValues ( ) ) )
setter . Condition = omit . From ( enums . FileuploadPoolconditiontypeUnknown )
2026-02-09 18:25:44 +00:00
continue
}
setter . Condition = omit . From ( condition )
case headerNotes :
setter . Notes = omit . From ( col )
case headerPropertyOwnerName :
setter . PropertyOwnerName = omit . From ( col )
case headerPropertyOwnerPhone :
2026-02-14 05:05:31 +00:00
phone , err := text . ParsePhoneNumber ( col )
if err != nil {
2026-02-16 17:59:18 +00:00
addError ( ctx , txn , c , int32 ( line_number ) , int32 ( i ) , fmt . Sprintf ( "'%s' is not a phone number that we recognize. Ideally it should be of the form '+12223334444'" , col ) )
2026-02-14 05:05:31 +00:00
continue
}
text . EnsureInDB ( ctx , txn , * phone )
2026-02-14 05:40:27 +00:00
setter . PropertyOwnerPhoneE164 = omitnull . From ( text . PhoneString ( * phone ) )
2026-02-09 18:25:44 +00:00
case headerResidentOwned :
boolValue , err := parseBool ( col )
if err != nil {
2026-02-16 17:59:18 +00:00
addError ( ctx , txn , c , int32 ( line_number ) , int32 ( i ) , fmt . Sprintf ( "'%s' is not something that we recognize as a true/false value. Please use either 'true' or 'false'" , col ) )
2026-02-09 18:25:44 +00:00
continue
}
setter . ResidentOwned = omitnull . From ( boolValue )
case headerResidentPhone :
2026-02-14 05:40:27 +00:00
phone , err := text . ParsePhoneNumber ( col )
if err != nil {
2026-02-16 17:59:18 +00:00
addError ( ctx , txn , c , int32 ( line_number ) , int32 ( i ) , fmt . Sprintf ( "'%s' is not a phone number that we recognize. Ideally it should be of the form '+12223334444'" , col ) )
2026-02-14 05:40:27 +00:00
continue
}
text . EnsureInDB ( ctx , txn , * phone )
setter . ResidentPhoneE164 = omitnull . From ( text . PhoneString ( * phone ) )
2026-02-16 16:38:04 +00:00
case headerTag :
tags [ header_names [ i ] ] = col
2026-02-09 18:25:44 +00:00
}
2026-02-16 16:38:04 +00:00
2026-02-09 18:25:44 +00:00
}
2026-02-16 16:38:04 +00:00
setter . Tags = omit . From ( db . ConvertToPGData ( tags ) )
2026-02-14 15:41:38 +00:00
pool , err := models . FileuploadPools . Insert ( & setter ) . One ( ctx , txn )
2026-02-09 18:25:44 +00:00
if err != nil {
2026-02-14 15:41:38 +00:00
return pools , fmt . Errorf ( "Failed to create pool: %w" , err )
2026-02-09 18:25:44 +00:00
}
2026-02-14 15:41:38 +00:00
pools = append ( pools , pool )
2026-02-16 17:59:18 +00:00
line_number = line_number + 1
2026-02-08 04:46:54 +00:00
}
2026-02-09 18:25:44 +00:00
}
func addError ( ctx context . Context , txn bob . Tx , c * models . FileuploadCSV , row_number int32 , column_number int32 , msg string ) error {
r , err := models . FileuploadErrorCSVS . Insert ( & models . FileuploadErrorCSVSetter {
Col : omit . From ( column_number ) ,
CSVFileID : omit . From ( c . FileID ) ,
// ID
Line : omit . From ( row_number ) ,
Message : omit . From ( msg ) ,
} ) . One ( ctx , txn )
if err != nil {
return fmt . Errorf ( "Failed to add error: %w" , err )
}
2026-02-14 05:40:27 +00:00
log . Info ( ) . Int32 ( "id" , r . ID ) . Int32 ( "file_id" , c . FileID ) . Str ( "msg" , msg ) . Int32 ( "row" , row_number ) . Int32 ( "col" , column_number ) . Msg ( "Created CSV file error" )
2026-02-08 04:36:12 +00:00
return nil
}
2026-02-08 04:55:33 +00:00
func addImportError ( file * models . FileuploadFile , err error ) {
log . Debug ( ) . Err ( err ) . Int32 ( "file_id" , file . ID ) . Msg ( "Fake add import error" )
}
2026-02-09 18:25:44 +00:00
func parseBool ( s string ) ( bool , error ) {
sl := strings . ToLower ( s )
boolValue , err := strconv . ParseBool ( sl )
if err != nil {
// Handle some of the stuff that strconv doesn't handle
switch sl {
case "yes" :
return true , nil
case "no" :
return false , nil
default :
return false , fmt . Errorf ( "unrecognized '%s'" , sl )
}
}
return boolValue , err
}
func errorMissingHeader ( ctx context . Context , txn bob . Tx , c * models . FileuploadCSV , h headerPoolEnum ) error {
msg := fmt . Sprintf ( "The file is missing the '%s' header" , h . String ( ) )
return addError ( ctx , txn , c , 0 , 0 , msg )
}
2026-02-16 16:38:04 +00:00
func parseHeaders ( row [ ] string ) ( [ ] headerPoolEnum , [ ] string ) {
result_enums := make ( [ ] headerPoolEnum , 0 )
result_names := make ( [ ] string , 0 )
2026-02-09 18:25:44 +00:00
for _ , h := range row {
ht := strings . TrimSpace ( h )
hl := strings . ToLower ( ht )
log . Debug ( ) . Str ( "header" , hl ) . Msg ( "Saw CSV header" )
var type_ headerPoolEnum
switch hl {
case "city" :
type_ = headerAddressCity
case "zip" :
case "postal code" :
type_ = headerAddressPostalCode
case "street address" :
type_ = headerAddressStreet
case "condition" :
case "pool condition" :
type_ = headerCondition
case "notes" :
type_ = headerNotes
case "property owner" :
case "property owner name" :
type_ = headerPropertyOwnerName
case "property owner phone" :
type_ = headerPropertyOwnerPhone
case "resident owned" :
type_ = headerResidentOwned
case "resident phone" :
case "resident phone number" :
type_ = headerResidentPhone
default :
type_ = headerTag
}
2026-02-16 16:38:04 +00:00
result_enums = append ( result_enums , type_ )
result_names = append ( result_names , hl )
2026-02-09 18:25:44 +00:00
}
2026-02-16 16:38:04 +00:00
return result_enums , result_names
2026-02-09 18:25:44 +00:00
}
func missingRequiredHeaders ( headers [ ] headerPoolEnum ) [ ] headerPoolEnum {
results := make ( [ ] headerPoolEnum , 0 )
for _ , rh := range [ ] headerPoolEnum { headerAddressCity , headerAddressPostalCode , headerAddressStreet } {
present := false
for _ , h := range headers {
if h == rh {
present = true
break
}
}
if ! present {
results = append ( results , rh )
}
}
return results
}
func poolConditionValidValues ( ) string {
var b strings . Builder
2026-02-09 19:03:27 +00:00
for i , cond := range enums . AllFileuploadPoolconditiontype ( ) {
2026-02-09 18:25:44 +00:00
if i == 0 {
fmt . Fprintf ( & b , "'%s'" , cond )
} else {
fmt . Fprintf ( & b , ", '%s'" , cond )
}
}
return b . String ( )
2026-02-08 04:55:33 +00:00
}