2025-11-06 00:23:58 +00:00
package main
import (
2025-11-07 05:46:41 +00:00
"bytes"
2025-11-06 00:23:58 +00:00
"context"
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"encoding/json"
2025-11-07 02:07:33 +00:00
"errors"
2025-11-06 00:23:58 +00:00
"fmt"
"io"
"net/http"
"net/url"
2025-11-07 05:46:41 +00:00
"os"
2025-11-07 08:34:32 +00:00
"sort"
2025-11-06 00:23:58 +00:00
"strconv"
"strings"
2025-11-07 05:46:41 +00:00
"sync"
2025-11-06 00:23:58 +00:00
"time"
2025-11-06 22:28:56 +00:00
"github.com/Gleipnir-Technology/arcgis-go"
2025-11-07 08:34:32 +00:00
"github.com/Gleipnir-Technology/arcgis-go/fieldseeker"
2025-11-06 00:23:58 +00:00
"github.com/Gleipnir-Technology/nidus-sync/models"
2025-11-06 22:58:18 +00:00
"github.com/Gleipnir-Technology/nidus-sync/sql"
2025-11-06 22:31:51 +00:00
"github.com/aarondl/opt/omit"
2025-11-07 02:07:33 +00:00
"github.com/aarondl/opt/omitnull"
2025-11-07 10:45:59 +00:00
"github.com/alitto/pond/v2"
2025-11-07 02:07:33 +00:00
"github.com/jackc/pgx/v5"
2025-11-13 20:34:48 +00:00
"github.com/rs/zerolog/log"
2025-11-06 00:23:58 +00:00
)
2025-11-14 23:08:26 +00:00
// When the API responds that the token is now invalidated
type InvalidatedTokenError struct { }
func ( e InvalidatedTokenError ) Error ( ) string { return "The token has been invalidated by the server" }
2025-11-13 20:53:20 +00:00
// When there is no oauth for an organization
type NoOAuthForOrg struct { }
func ( e NoOAuthForOrg ) Error ( ) string { return "No oauth available for organization" }
2025-11-07 05:46:41 +00:00
var NewOAuthTokenChannel chan struct { }
2025-11-06 00:23:58 +00:00
var CodeVerifier string = "random_secure_string_min_43_chars_long_should_be_stored_in_session"
type OAuthTokenResponse struct {
2025-11-07 05:46:41 +00:00
AccessToken string ` json:"access_token" `
ExpiresIn int ` json:"expires_in" `
RefreshToken string ` json:"refresh_token" `
RefreshTokenExpiresIn int ` json:"refresh_token_expires_in" `
SSL bool ` json:"ssl" `
Username string ` json:"username" `
2025-11-06 00:23:58 +00:00
}
// Build the ArcGIS authorization URL with PKCE
2025-11-13 15:15:35 +00:00
func buildArcGISAuthURL ( clientID string ) string {
2025-11-06 00:23:58 +00:00
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/authorize/"
params := url . Values { }
params . Add ( "client_id" , clientID )
2025-11-13 14:31:33 +00:00
params . Add ( "redirect_uri" , redirectURL ( ) )
2025-11-06 00:23:58 +00:00
params . Add ( "response_type" , "code" )
//params.Add("code_challenge", generateCodeChallenge(codeVerifier))
//params.Add("code_challenge_method", "S256")
2025-11-13 15:15:35 +00:00
// See https://developers.arcgis.com/rest/users-groups-and-items/token/
// expiration is defined in minutes
var expiration int
if IsProductionEnvironment ( ) {
// 2 weeks is the maximum allowed
expiration = 20160
} else {
expiration = 20
}
2025-11-06 00:23:58 +00:00
params . Add ( "expiration" , strconv . Itoa ( expiration ) )
return baseURL + "?" + params . Encode ( )
}
func futureUTCTimestamp ( secondsFromNow int ) time . Time {
return time . Now ( ) . UTC ( ) . Add ( time . Duration ( secondsFromNow ) * time . Second )
}
// Helper function to generate code challenge from code verifier
func generateCodeChallenge ( codeVerifier string ) string {
hash := sha256 . Sum256 ( [ ] byte ( codeVerifier ) )
return base64 . RawURLEncoding . EncodeToString ( hash [ : ] )
}
// Generate a random code verifier for PKCE
func generateCodeVerifier ( ) string {
bytes := make ( [ ] byte , 64 ) // 64 bytes = 512 bits
rand . Read ( bytes )
return base64 . RawURLEncoding . EncodeToString ( bytes )
}
2025-11-06 22:28:56 +00:00
// Find out what we can about this user
2025-11-07 05:46:41 +00:00
func updateArcgisUserData ( ctx context . Context , user * models . User , access_token string , access_token_expires time . Time , refresh_token string , refresh_token_expires time . Time ) {
2025-11-06 22:28:56 +00:00
client := arcgis . NewArcGIS (
arcgis . AuthenticatorOAuth {
2025-11-07 05:46:41 +00:00
AccessToken : access_token ,
AccessTokenExpires : access_token_expires ,
RefreshToken : refresh_token ,
RefreshTokenExpires : refresh_token_expires ,
2025-11-06 22:28:56 +00:00
} ,
)
2025-11-07 02:07:33 +00:00
portal , err := client . PortalsSelf ( )
2025-11-06 22:28:56 +00:00
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Failed to get ArcGIS user data" )
2025-11-06 22:28:56 +00:00
return
}
2025-11-13 20:34:48 +00:00
log . Info ( ) . Str ( "Username" , portal . User . Username ) . Str ( "user_id" , portal . User . ID ) . Str ( "org_id" , portal . User . OrgID ) . Str ( "org_name" , portal . Name ) . Str ( "license_type_id" , portal . User . UserLicenseTypeID ) . Msg ( "Got portals data" )
2025-11-07 02:07:33 +00:00
_ , err = sql . UpdateOauthTokenOrg ( portal . User . ID , portal . User . UserLicenseTypeID , refresh_token ) . Exec ( ctx , PGInstance . BobDB )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Failed to update oauth token portal data" )
2025-11-07 02:07:33 +00:00
return
}
var org * models . Organization
orgs , err := models . Organizations . Query ( models . SelectWhere . Organizations . ArcgisName . EQ ( portal . Name ) ) . All ( ctx , PGInstance . BobDB )
switch len ( orgs ) {
case 0 :
setter := models . OrganizationSetter {
Name : omitnull . From ( portal . Name ) ,
ArcgisID : omitnull . From ( portal . User . OrgID ) ,
ArcgisName : omitnull . From ( portal . Name ) ,
}
org , err = models . Organizations . Insert ( & setter ) . One ( ctx , PGInstance . BobDB )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Failed to create new organization" )
2025-11-07 02:07:33 +00:00
return
}
2025-11-13 20:34:48 +00:00
log . Info ( ) . Int ( "org_id" , int ( org . ID ) ) . Msg ( "Created new organization" )
2025-11-07 02:07:33 +00:00
case 1 :
org = orgs [ 0 ]
2025-11-13 20:34:48 +00:00
log . Info ( ) . Msg ( "Organization already exists" )
2025-11-07 02:07:33 +00:00
default :
2025-11-13 20:34:48 +00:00
log . Error ( ) . Msg ( "Got too many organizations, bailing" )
2025-11-07 02:07:33 +00:00
return
}
if err != nil {
LogErrorTypeInfo ( err )
if errors . Is ( err , pgx . ErrNoRows ) {
} else {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Failed to query for existing org" )
2025-11-07 02:07:33 +00:00
return
}
}
err = org . AttachUser ( ctx , PGInstance . BobDB , user )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Int ( "user_id" , int ( user . ID ) ) . Int ( "org_id" , int ( org . ID ) ) . Msg ( "Failed to attach user to organization" )
2025-11-07 02:07:33 +00:00
return
}
2025-11-06 22:28:56 +00:00
search , err := client . Search ( "Fieldseeker" )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Failed to get search FieldseekerGIS data" )
2025-11-06 22:28:56 +00:00
return
}
2025-11-13 03:15:45 +00:00
var fieldseekerClient * fieldseeker . FieldSeeker
2025-11-06 22:28:56 +00:00
for _ , result := range search . Results {
2025-11-13 20:34:48 +00:00
log . Info ( ) . Str ( "name" , result . Name ) . Msg ( "Got result" )
2025-11-07 08:34:32 +00:00
if result . Name == "FieldSeekerGIS" {
2025-11-13 20:34:48 +00:00
log . Info ( ) . Str ( "url" , result . URL ) . Msg ( "Found Fieldseeker" )
2025-11-07 08:34:32 +00:00
setter := models . OrganizationSetter {
FieldseekerURL : omitnull . From ( result . URL ) ,
}
err = org . Update ( ctx , PGInstance . BobDB , & setter )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Failed to create new organization" )
2025-11-07 08:34:32 +00:00
return
}
2025-11-13 03:15:45 +00:00
fieldseekerClient , err = fieldseeker . NewFieldSeeker (
client ,
result . URL ,
)
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Failed to create fieldseeker client" )
2025-11-13 03:15:45 +00:00
return
}
2025-11-07 08:34:32 +00:00
}
2025-11-06 22:28:56 +00:00
}
2025-11-13 03:15:45 +00:00
arcgis_id , ok := org . ArcgisID . Get ( )
if ! ok {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Int ( "org.id" , int ( org . ID ) ) . Msg ( "Cannot get webhooks - ArcGIS ID is null" )
2025-11-13 03:15:45 +00:00
}
client . Context = & arcgis_id
2025-11-13 16:46:30 +00:00
maybeCreateWebhook ( ctx , fieldseekerClient )
clearNotificationsOauth ( ctx , user )
2025-11-07 08:34:32 +00:00
NewOAuthTokenChannel <- struct { } { }
2025-11-06 22:28:56 +00:00
}
2025-11-13 16:46:30 +00:00
func maybeCreateWebhook ( ctx context . Context , client * fieldseeker . FieldSeeker ) {
2025-11-13 03:15:45 +00:00
webhooks , err := client . WebhookList ( )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Failed to get webhooks" )
2025-11-13 03:15:45 +00:00
}
for _ , hook := range webhooks {
if hook . Name == "Nidus Sync" {
2025-11-13 20:34:48 +00:00
log . Info ( ) . Msg ( "Found nidus sync hook" )
2025-11-13 03:15:45 +00:00
} else {
2025-11-13 20:34:48 +00:00
log . Info ( ) . Str ( "name" , hook . Name ) . Msg ( "Found webhook" )
2025-11-13 03:15:45 +00:00
}
}
}
2025-11-06 00:23:58 +00:00
func handleOauthAccessCode ( ctx context . Context , user * models . User , code string ) error {
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
//params.Add("code_verifier", "S256")
form := url . Values {
"grant_type" : [ ] string { "authorization_code" } ,
"code" : [ ] string { code } ,
"client_id" : [ ] string { ClientID } ,
"redirect_uri" : [ ] string { redirectURL ( ) } ,
}
req , err := http . NewRequest ( "POST" , baseURL , strings . NewReader ( form . Encode ( ) ) )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to create request: %w" , err )
2025-11-06 00:23:58 +00:00
}
req . Header . Add ( "Content-Type" , "application/x-www-form-urlencoded" )
2025-11-07 05:46:41 +00:00
token , err := handleTokenRequest ( ctx , req )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to exchange authorization code for token: %w" , err )
2025-11-07 05:46:41 +00:00
}
accessExpires := futureUTCTimestamp ( token . ExpiresIn )
refreshExpires := futureUTCTimestamp ( token . RefreshTokenExpiresIn )
setter := models . OauthTokenSetter {
AccessToken : omit . From ( token . AccessToken ) ,
AccessTokenExpires : omit . From ( accessExpires ) ,
RefreshToken : omit . From ( token . RefreshToken ) ,
RefreshTokenExpires : omit . From ( refreshExpires ) ,
Username : omit . From ( token . Username ) ,
}
err = user . InsertUserOauthTokens ( ctx , PGInstance . BobDB , & setter )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to save token to database: %w" , err )
2025-11-07 05:46:41 +00:00
}
go updateArcgisUserData ( context . Background ( ) , user , token . AccessToken , accessExpires , token . RefreshToken , refreshExpires )
return nil
}
func hasFieldseekerConnection ( ctx context . Context , user * models . User ) ( bool , error ) {
result , err := sql . OauthTokenByUserId ( user . ID ) . All ( ctx , PGInstance . BobDB )
if err != nil {
return false , err
}
return len ( result ) > 0 , nil
}
func redirectURL ( ) string {
return BaseURL + "/arcgis/oauth/callback"
}
// This is a goroutine that is in charge of getting Fieldseeker data and keeping it fresh.
func refreshFieldseekerData ( ctx context . Context , newOauthCh <- chan struct { } ) {
for {
workerCtx , cancel := context . WithCancel ( context . Background ( ) )
var wg sync . WaitGroup
2025-11-12 21:27:39 +00:00
oauths , err := models . OauthTokens . Query ( models . SelectWhere . OauthTokens . InvalidatedAt . IsNull ( ) ) . All ( ctx , PGInstance . BobDB )
2025-11-07 05:46:41 +00:00
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Failed to get oauths" )
2025-11-07 05:46:41 +00:00
return
}
for _ , oauth := range oauths {
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
2025-11-11 22:36:29 +00:00
err := maintainOAuth ( workerCtx , oauth )
if err != nil {
2025-11-14 23:08:26 +00:00
LogErrorTypeInfo ( err )
if errors . Is ( err , & InvalidatedTokenError { } ) {
log . Info ( ) . Int ( "oauth_token.id" , int ( oauth . ID ) ) . Msg ( "The server has marked the token invalid" )
return
}
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Crashed oauth maintenance goroutine" )
2025-11-11 22:36:29 +00:00
}
2025-11-07 05:46:41 +00:00
} ( )
}
2025-11-07 08:34:32 +00:00
2025-11-07 09:30:31 +00:00
orgs , err := models . Organizations . Query ( ) . All ( ctx , PGInstance . BobDB )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Failed to get orgs" )
2025-11-07 09:30:31 +00:00
return
}
for _ , org := range orgs {
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
2025-11-11 22:36:29 +00:00
err := periodicallyExportFieldseeker ( workerCtx , org )
if err != nil {
2025-11-13 20:53:20 +00:00
if errors . Is ( err , & NoOAuthForOrg { } ) {
log . Info ( ) . Int ( "organization_id" , int ( org . ID ) ) . Msg ( "No oauth available for organization, exiting exporter." )
return
}
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Crashed fieldseeker export goroutine" )
2025-11-11 22:36:29 +00:00
}
2025-11-07 09:30:31 +00:00
} ( )
}
2025-11-07 05:46:41 +00:00
select {
case <- ctx . Done ( ) :
2025-11-13 20:34:48 +00:00
log . Info ( ) . Msg ( "Exiting refresh worker..." )
2025-11-07 05:46:41 +00:00
cancel ( )
wg . Wait ( )
return
case <- newOauthCh :
2025-11-13 20:34:48 +00:00
log . Info ( ) . Msg ( "Updating oauth background work" )
2025-11-07 05:46:41 +00:00
cancel ( )
wg . Wait ( )
}
}
}
2025-11-07 10:45:59 +00:00
type SyncStats struct {
Inserts int
Updates int
Unchanged int
}
func downloadAllRecords ( ctx context . Context , fssync * fieldseeker . FieldSeeker , layer arcgis . LayerFeature , org_id int32 ) ( SyncStats , error ) {
var stats SyncStats
2025-11-07 08:34:32 +00:00
count , err := fssync . QueryCount ( layer . ID )
if err != nil {
2025-11-13 20:53:20 +00:00
return stats , fmt . Errorf ( "Failed to get counts for layer %s (%d): %w" , layer . Name , layer . ID , err )
2025-11-07 08:34:32 +00:00
}
2025-11-13 20:34:48 +00:00
log . Info ( ) . Str ( "name" , layer . Name ) . Int ( "id" , layer . ID ) . Msg ( "Starting on layer" )
2025-11-07 08:34:32 +00:00
if count . Count == 0 {
2025-11-07 10:45:59 +00:00
return stats , nil
2025-11-07 08:34:32 +00:00
}
2025-11-07 10:45:59 +00:00
pool := pond . NewResultPool [ SyncStats ] ( 20 )
group := pool . NewGroup ( )
maxRecords := fssync . MaxRecordCount ( )
for offset := 0 ; offset < count . Count ; offset += maxRecords {
group . SubmitErr ( func ( ) ( SyncStats , error ) {
query := arcgis . NewQuery ( )
query . ResultRecordCount = maxRecords
query . ResultOffset = offset
query . SpatialReference = "4326"
query . OutFields = "*"
query . Where = "1=1"
qr , err := fssync . DoQuery (
layer . ID ,
query )
if err != nil {
2025-11-13 20:53:20 +00:00
return SyncStats { } , fmt . Errorf ( "Failed to get layer %s (%d) at offset %d: %w" , layer . Name , layer . ID , offset , err )
2025-11-07 10:45:59 +00:00
}
i , u , err := saveOrUpdateDBRecords ( ctx , "FS_" + layer . Name , qr , org_id )
if err != nil {
2025-11-11 22:36:29 +00:00
filename := fmt . Sprintf ( "failure-%s-%d-%d.json" , layer . Name , layer . ID , offset )
2025-11-07 10:45:59 +00:00
saveRawQuery ( fssync , layer , query , filename )
2025-11-13 20:34:48 +00:00
log . Error ( ) . Err ( err ) . Msg ( "Faild to save DB records" )
2025-11-13 20:53:20 +00:00
return SyncStats { } , fmt . Errorf ( "Failed to save records: %w" , err )
2025-11-07 10:45:59 +00:00
}
return SyncStats {
Inserts : i ,
Updates : u ,
Unchanged : len ( qr . Features ) - u - i ,
} , nil
} )
}
results , err := group . Wait ( )
if err != nil {
2025-11-13 20:53:20 +00:00
return stats , fmt . Errorf ( "one or more tasks in the work pool failed: %w" , err )
2025-11-07 10:45:59 +00:00
}
for _ , r := range results {
stats . Inserts += r . Inserts
stats . Updates += r . Updates
stats . Unchanged += r . Unchanged
2025-11-07 08:34:32 +00:00
}
2025-11-13 20:34:48 +00:00
log . Info ( ) . Int ( "inserts" , stats . Inserts ) . Int ( "updates" , stats . Updates ) . Int ( "no change" , stats . Unchanged ) . Msg ( "Finished layer" )
2025-11-07 10:45:59 +00:00
return stats , nil
2025-11-07 08:34:32 +00:00
}
2025-11-07 09:30:31 +00:00
func getOAuthForOrg ( ctx context . Context , org * models . Organization ) ( * models . OauthToken , error ) {
users , err := org . User ( ) . All ( ctx , PGInstance . BobDB )
if err != nil {
2025-11-13 20:53:20 +00:00
return nil , fmt . Errorf ( "Failed to query all users for org: %w" , err )
2025-11-07 09:30:31 +00:00
}
for _ , user := range users {
2025-11-13 03:17:05 +00:00
oauths , err := user . UserOauthTokens ( models . SelectWhere . OauthTokens . InvalidatedAt . IsNull ( ) ) . All ( ctx , PGInstance . BobDB )
2025-11-07 09:30:31 +00:00
if err != nil {
2025-11-13 20:53:20 +00:00
return nil , fmt . Errorf ( "Failed to query all oauth tokens for org: %w" , err )
2025-11-07 09:30:31 +00:00
}
for _ , oauth := range oauths {
return oauth , nil
}
}
2025-11-13 20:53:20 +00:00
return nil , & NoOAuthForOrg { }
2025-11-07 09:30:31 +00:00
}
func periodicallyExportFieldseeker ( ctx context . Context , org * models . Organization ) error {
pollTicker := time . NewTicker ( 1 )
for {
select {
case <- ctx . Done ( ) :
return nil
case <- pollTicker . C :
oauth , err := getOAuthForOrg ( ctx , org )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to get oauth for org: %w" , err )
2025-11-07 09:30:31 +00:00
}
err = exportFieldseekerData ( ctx , org , oauth )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to export Fieldseeker data: %w" , err )
2025-11-07 09:30:31 +00:00
}
2025-11-13 20:34:48 +00:00
log . Info ( ) . Msg ( "Completed exporting data, waiting 15 minutes to go agoin." )
2025-11-07 09:30:31 +00:00
pollTicker = time . NewTicker ( 15 * time . Minute )
}
}
}
func exportFieldseekerData ( ctx context . Context , org * models . Organization , oauth * models . OauthToken ) error {
2025-11-13 20:34:48 +00:00
log . Info ( ) . Msg ( "Update Fieldseeker data" )
2025-11-07 08:34:32 +00:00
ar := arcgis . NewArcGIS (
arcgis . AuthenticatorOAuth {
AccessToken : oauth . AccessToken ,
AccessTokenExpires : oauth . AccessTokenExpires ,
RefreshToken : oauth . RefreshToken ,
RefreshTokenExpires : oauth . RefreshTokenExpires ,
} ,
)
row , err := sql . OrgByOauthId ( oauth . ID ) . One ( ctx , PGInstance . BobDB )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to get org ID: %w" , err )
2025-11-07 08:34:32 +00:00
}
fssync , err := fieldseeker . NewFieldSeeker (
ar ,
row . FieldseekerURL . MustGet ( ) ,
)
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to create fssync: %w" , err )
2025-11-07 08:34:32 +00:00
}
2025-11-07 10:45:59 +00:00
var stats SyncStats
2025-11-07 08:34:32 +00:00
for _ , layer := range fssync . FeatureServerLayers ( ) {
2025-11-07 10:45:59 +00:00
ss , err := downloadAllRecords ( ctx , fssync , layer , row . OrganizationID )
2025-11-07 08:34:32 +00:00
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to get layer %s: %w" , layer , err )
2025-11-07 08:34:32 +00:00
}
2025-11-07 10:45:59 +00:00
stats . Inserts += ss . Inserts
stats . Updates += ss . Updates
stats . Unchanged += ss . Unchanged
2025-11-07 08:34:32 +00:00
}
2025-11-07 09:30:31 +00:00
setter := models . FieldseekerSyncSetter {
2025-11-07 10:45:59 +00:00
RecordsCreated : omit . From ( int32 ( stats . Inserts ) ) ,
RecordsUpdated : omit . From ( int32 ( stats . Updates ) ) ,
RecordsUnchanged : omit . From ( int32 ( stats . Unchanged ) ) ,
2025-11-07 09:30:31 +00:00
}
err = org . InsertFieldseekerSyncs ( ctx , PGInstance . BobDB , & setter )
2025-11-08 00:04:44 +00:00
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to insert sync: %w" , err )
2025-11-08 00:04:44 +00:00
}
2025-11-07 09:30:31 +00:00
2025-11-21 17:28:05 +00:00
updateSummaryTables ( ctx , org )
2025-11-07 08:34:32 +00:00
return nil
}
2025-11-11 22:36:29 +00:00
func maintainOAuth ( ctx context . Context , oauth * models . OauthToken ) error {
2025-11-13 14:34:50 +00:00
for {
// Refresh from the database
oauth , err := models . FindOauthToken ( ctx , PGInstance . BobDB , oauth . ID )
2025-11-07 05:46:41 +00:00
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to update oauth token from database: %w" , err )
2025-11-07 05:46:41 +00:00
}
2025-11-13 15:15:35 +00:00
accessTokenDelay := time . Until ( oauth . AccessTokenExpires ) - ( 3 * time . Second )
refreshTokenDelay := time . Until ( oauth . RefreshTokenExpires ) - ( 3 * time . Second )
2025-11-13 14:34:50 +00:00
if oauth . AccessTokenExpires . Before ( time . Now ( ) ) {
accessTokenDelay = 0
2025-11-13 03:17:23 +00:00
}
2025-11-13 14:34:50 +00:00
if oauth . RefreshTokenExpires . Before ( time . Now ( ) ) {
refreshTokenDelay = 0
}
2025-11-13 20:34:48 +00:00
log . Info ( ) . Int ( "id" , int ( oauth . ID ) ) . Float64 ( "seconds" , accessTokenDelay . Seconds ( ) ) . Str ( "access_token" , oauth . AccessToken ) . Msg ( "Need to refresh access token" )
log . Info ( ) . Int ( "id" , int ( oauth . ID ) ) . Float64 ( "seconds" , refreshTokenDelay . Seconds ( ) ) . Str ( "refresh_token" , oauth . RefreshToken ) . Msg ( "Need to refresh refresh token" )
2025-11-13 14:34:50 +00:00
accessTokenTicker := time . NewTicker ( accessTokenDelay )
refreshTokenTicker := time . NewTicker ( refreshTokenDelay )
2025-11-07 05:46:41 +00:00
select {
case <- ctx . Done ( ) :
2025-11-11 22:36:29 +00:00
return nil
2025-11-13 03:17:23 +00:00
case <- accessTokenTicker . C :
err := refreshAccessToken ( ctx , oauth )
if err != nil {
markTokenFailed ( ctx , oauth )
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to refresh access token: %w" , err )
2025-11-13 03:17:23 +00:00
}
case <- refreshTokenTicker . C :
err := refreshRefreshToken ( ctx , oauth )
2025-11-07 08:34:32 +00:00
if err != nil {
2025-11-13 03:17:23 +00:00
markTokenFailed ( ctx , oauth )
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to maintain refresh token: %w" , err )
2025-11-07 08:34:32 +00:00
}
2025-11-07 05:46:41 +00:00
}
}
}
2025-11-11 20:09:11 +00:00
// Mark that a given oauth token has failed. This includes a notification to
// the user.
func markTokenFailed ( ctx context . Context , oauth * models . OauthToken ) {
oauthSetter := models . OauthTokenSetter {
InvalidatedAt : omitnull . From ( time . Now ( ) ) ,
}
err := oauth . Update ( ctx , PGInstance . BobDB , & oauthSetter )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Str ( "err" , err . Error ( ) ) . Msg ( "Failed to mark token failed" )
2025-11-11 20:09:11 +00:00
}
user , err := models . FindUser ( ctx , PGInstance . BobDB , oauth . UserID )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Str ( "err" , err . Error ( ) ) . Msg ( "Failed to get oauth user" )
2025-11-11 20:09:11 +00:00
return
}
2025-11-13 16:46:30 +00:00
notifyOauthInvalid ( ctx , user )
2025-11-13 20:34:48 +00:00
log . Info ( ) . Int ( "id" , int ( oauth . ID ) ) . Msg ( "Marked oauth token invalid" )
2025-11-11 20:09:11 +00:00
}
2025-11-13 03:17:23 +00:00
// Update the access token to keep it fresh and alive
func refreshAccessToken ( ctx context . Context , oauth * models . OauthToken ) error {
2025-11-07 05:46:41 +00:00
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
form := url . Values {
"grant_type" : [ ] string { "refresh_token" } ,
"client_id" : [ ] string { ClientID } ,
"refresh_token" : [ ] string { oauth . RefreshToken } ,
}
req , err := http . NewRequest ( "POST" , baseURL , strings . NewReader ( form . Encode ( ) ) )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to create request: %w" , err )
2025-11-07 05:46:41 +00:00
}
req . Header . Add ( "Content-Type" , "application/x-www-form-urlencoded" )
token , err := handleTokenRequest ( ctx , req )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to handle request: %w" , err )
2025-11-07 05:46:41 +00:00
}
accessExpires := futureUTCTimestamp ( token . ExpiresIn )
setter := models . OauthTokenSetter {
2025-11-07 08:34:32 +00:00
AccessToken : omit . From ( token . AccessToken ) ,
AccessTokenExpires : omit . From ( accessExpires ) ,
Username : omit . From ( token . Username ) ,
2025-11-07 05:46:41 +00:00
}
err = oauth . Update ( ctx , PGInstance . BobDB , & setter )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to update oauth in database: %w" , err )
2025-11-07 05:46:41 +00:00
}
2025-11-13 20:34:48 +00:00
log . Info ( ) . Int ( "oauth token id" , int ( oauth . ID ) ) . Msg ( "Updated oauth token" )
2025-11-07 05:46:41 +00:00
return nil
}
2025-11-13 03:17:23 +00:00
// Update the refresh token to keep it fresh and alive
func refreshRefreshToken ( ctx context . Context , oauth * models . OauthToken ) error {
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
form := url . Values {
"grant_type" : [ ] string { "exchange_refresh_token" } ,
"client_id" : [ ] string { ClientID } ,
"redirect_uri" : [ ] string { redirectURL ( ) } ,
"refresh_token" : [ ] string { oauth . RefreshToken } ,
}
req , err := http . NewRequest ( "POST" , baseURL , strings . NewReader ( form . Encode ( ) ) )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to create request: %w" , err )
2025-11-13 03:17:23 +00:00
}
req . Header . Add ( "Content-Type" , "application/x-www-form-urlencoded" )
token , err := handleTokenRequest ( ctx , req )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to handle request: %w" , err )
2025-11-13 03:17:23 +00:00
}
refreshExpires := futureUTCTimestamp ( token . ExpiresIn )
setter := models . OauthTokenSetter {
2025-11-13 14:33:08 +00:00
RefreshToken : omit . From ( token . RefreshToken ) ,
2025-11-13 03:17:23 +00:00
RefreshTokenExpires : omit . From ( refreshExpires ) ,
Username : omit . From ( token . Username ) ,
}
err = oauth . Update ( ctx , PGInstance . BobDB , & setter )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to update oauth in database: %w" , err )
2025-11-13 03:17:23 +00:00
}
2025-11-13 20:34:48 +00:00
log . Info ( ) . Int ( "oauth token id" , int ( oauth . ID ) ) . Msg ( "Updated oauth token" )
2025-11-13 03:17:23 +00:00
return nil
}
func newTimestampedFilename ( prefix , suffix string ) string {
timestamp := time . Now ( ) . Format ( "20060102_150405" ) // YYYYMMDD_HHMMSS format
return prefix + timestamp + suffix
}
2025-11-07 05:46:41 +00:00
func handleTokenRequest ( ctx context . Context , req * http . Request ) ( * OAuthTokenResponse , error ) {
2025-11-06 00:23:58 +00:00
client := http . Client { }
2025-11-13 20:34:48 +00:00
log . Info ( ) . Str ( "url" , req . URL . String ( ) ) . Msg ( "POST" )
2025-11-06 00:23:58 +00:00
resp , err := client . Do ( req )
if err != nil {
2025-11-13 20:53:20 +00:00
return nil , fmt . Errorf ( "Failed to do request: %w" , err )
2025-11-06 00:23:58 +00:00
}
defer resp . Body . Close ( )
bodyBytes , err := io . ReadAll ( resp . Body )
2025-11-13 20:34:48 +00:00
log . Info ( ) . Int ( "status" , resp . StatusCode ) . Msg ( "Token request" )
2025-11-13 03:17:49 +00:00
filename := newTimestampedFilename ( "token" , ".json" )
saveResponse ( bodyBytes , filename )
2025-11-06 00:23:58 +00:00
if resp . StatusCode >= http . StatusBadRequest {
if err != nil {
2025-11-13 20:53:20 +00:00
return nil , fmt . Errorf ( "Got status code %d and failed to read response body: %w" , resp . StatusCode , err )
2025-11-06 00:23:58 +00:00
}
bodyString := string ( bodyBytes )
2025-11-14 23:08:26 +00:00
var errorResp arcgis . ErrorResponse
2025-11-06 00:23:58 +00:00
if err := json . Unmarshal ( bodyBytes , & errorResp ) ; err == nil {
2025-11-14 23:08:26 +00:00
if errorResp . Error . Code == 498 && errorResp . Error . Description == "invalidated refresh_token" {
return nil , InvalidatedTokenError { }
}
2025-11-13 20:53:20 +00:00
return nil , fmt . Errorf ( "API response JSON error: %d: %w" , resp . StatusCode , errorResp )
2025-11-06 00:23:58 +00:00
}
2025-11-07 05:46:41 +00:00
return nil , fmt . Errorf ( "API returned error status %d: %s" , resp . StatusCode , bodyString )
2025-11-06 00:23:58 +00:00
}
2025-11-07 05:46:41 +00:00
//logResponseHeaders(resp)
2025-11-06 00:23:58 +00:00
var tokenResponse OAuthTokenResponse
err = json . Unmarshal ( bodyBytes , & tokenResponse )
if err != nil {
2025-11-13 20:53:20 +00:00
return nil , fmt . Errorf ( "Failed to unmarshal JSON: %w" , err )
2025-11-07 05:46:41 +00:00
}
// Just because we got a 200-level status code doesn't mean it worked. Experience has taught us that
// we can get errors without anything indicated in the headers or the status code
if tokenResponse == ( OAuthTokenResponse { } ) {
2025-11-14 23:08:26 +00:00
var errorResponse arcgis . ErrorResponse
2025-11-07 05:46:41 +00:00
err = json . Unmarshal ( bodyBytes , & errorResponse )
if err != nil {
2025-11-13 20:53:20 +00:00
return nil , fmt . Errorf ( "Failed to unmarshal error JSON: %w" , err )
2025-11-07 05:46:41 +00:00
}
if errorResponse . Error . Code > 0 {
2025-11-14 23:08:26 +00:00
return nil , errorResponse . AsError ( )
2025-11-07 05:46:41 +00:00
}
2025-11-06 00:23:58 +00:00
}
2025-11-13 20:34:48 +00:00
log . Info ( ) . Str ( "refresh token" , tokenResponse . RefreshToken ) . Str ( "access token" , tokenResponse . AccessToken ) . Int ( "access expires" , tokenResponse . ExpiresIn ) . Int ( "refresh expires" , tokenResponse . RefreshTokenExpiresIn ) . Msg ( "Oauth token acquired" )
2025-11-07 05:46:41 +00:00
return & tokenResponse , nil
}
2025-11-06 00:23:58 +00:00
2025-11-07 05:46:41 +00:00
func logResponseHeaders ( resp * http . Response ) {
if resp == nil {
2025-11-13 20:34:48 +00:00
log . Info ( ) . Msg ( "Response is nil" )
2025-11-07 05:46:41 +00:00
return
2025-11-06 00:23:58 +00:00
}
2025-11-07 05:46:41 +00:00
2025-11-13 20:34:48 +00:00
log . Info ( ) . Str ( "status" , resp . Status ) . Int ( "statusCode" , resp . StatusCode ) . Msg ( "HTTP Response headers" )
2025-11-07 05:46:41 +00:00
for name , values := range resp . Header {
2025-11-13 20:34:48 +00:00
log . Info ( ) . Str ( "name" , name ) . Strs ( "values" , values ) . Msg ( "Header" )
2025-11-06 00:23:58 +00:00
}
}
2025-11-07 05:46:41 +00:00
func saveResponse ( data [ ] byte , filename string ) {
dest , err := os . Create ( filename )
2025-11-06 22:58:18 +00:00
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Str ( "filename" , filename ) . Str ( "err" , err . Error ( ) ) . Msg ( "Failed to create file" )
2025-11-07 05:46:41 +00:00
return
2025-11-06 22:58:18 +00:00
}
2025-11-07 05:46:41 +00:00
_ , err = io . Copy ( dest , bytes . NewReader ( data ) )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Str ( "filename" , filename ) . Str ( "err" , err . Error ( ) ) . Msg ( "Failed to write" )
2025-11-07 05:46:41 +00:00
return
2025-11-07 02:29:34 +00:00
}
2025-11-13 20:34:48 +00:00
log . Info ( ) . Str ( "filename" , filename ) . Msg ( "Wrote response" )
2025-11-07 02:07:33 +00:00
}
2025-11-07 08:34:32 +00:00
func saveRawQuery ( fssync * fieldseeker . FieldSeeker , layer arcgis . LayerFeature , query * arcgis . Query , filename string ) {
output , err := os . Create ( filename )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Str ( "filename" , filename ) . Msg ( "Failed to create file" )
2025-11-07 08:34:32 +00:00
return
}
qr , err := fssync . DoQueryRaw (
layer . ID ,
query )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Str ( "err" , err . Error ( ) ) . Msg ( "Failed to do query" )
2025-11-07 08:34:32 +00:00
return
}
_ , err = output . Write ( qr )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Str ( "err" , err . Error ( ) ) . Msg ( "Failed to write results" )
2025-11-07 08:34:32 +00:00
return
}
2025-11-13 20:34:48 +00:00
log . Info ( ) . Str ( "filename" , filename ) . Msg ( "Wrote failed query" )
2025-11-07 08:34:32 +00:00
}
2025-11-07 09:30:31 +00:00
func saveOrUpdateDBRecords ( ctx context . Context , table string , qr * arcgis . QueryResult , org_id int32 ) ( int , int , error ) {
2025-11-07 08:34:32 +00:00
inserts , updates := 0 , 0
sorted_columns := make ( [ ] string , 0 , len ( qr . Fields ) )
for _ , f := range qr . Fields {
sorted_columns = append ( sorted_columns , f . Name )
}
sort . Strings ( sorted_columns )
objectids := make ( [ ] int , 0 )
for _ , l := range qr . Features {
oid := l . Attributes [ "OBJECTID" ] . ( float64 )
objectids = append ( objectids , int ( oid ) )
}
rows_by_objectid , err := rowmapViaQuery ( ctx , table , sorted_columns , objectids )
if err != nil {
2025-11-13 20:53:20 +00:00
return inserts , updates , fmt . Errorf ( "Failed to get existing rows: %w" , err )
2025-11-07 08:34:32 +00:00
}
// log.Println("Rows from query", len(rows_by_objectid))
for _ , feature := range qr . Features {
oid := feature . Attributes [ "OBJECTID" ] . ( float64 )
row := rows_by_objectid [ int ( oid ) ]
// If we have no matching row we'll need to create it
if len ( row ) == 0 {
2025-11-07 09:30:31 +00:00
if err := insertRowFromFeature ( ctx , table , sorted_columns , & feature , org_id ) ; err != nil {
2025-11-13 20:53:20 +00:00
return inserts , updates , fmt . Errorf ( "Failed to insert row: %w" , err )
2025-11-07 08:34:32 +00:00
}
inserts += 1
} else if hasUpdates ( row , feature ) {
2025-11-07 09:30:31 +00:00
if err := updateRowFromFeature ( ctx , table , sorted_columns , & feature , org_id ) ; err != nil {
2025-11-13 20:53:20 +00:00
return inserts , updates , fmt . Errorf ( "Failed to update row: %w" , err )
2025-11-07 08:34:32 +00:00
}
updates += 1
}
}
return inserts , updates , nil
}
// Produces a map of OBJECTID to a 'row' which is in turn a map of column names to their values as strings
func rowmapViaQuery ( ctx context . Context , table string , sorted_columns [ ] string , objectids [ ] int ) ( map [ int ] map [ string ] string , error ) {
result := make ( map [ int ] map [ string ] string )
query := selectAllFromQueryResult ( table , sorted_columns )
args := pgx . NamedArgs {
"objectids" : objectids ,
}
rows , err := PGInstance . PGXPool . Query ( ctx , query , args )
if err != nil {
2025-11-13 20:53:20 +00:00
return result , fmt . Errorf ( "Failed to query rows: %w" , err )
2025-11-07 08:34:32 +00:00
}
defer rows . Close ( )
// +2 for geometry x and geometry x
columnNames := make ( [ ] string , len ( sorted_columns ) + 2 )
for i , c := range sorted_columns {
columnNames [ i ] = c
}
columnNames [ len ( sorted_columns ) ] = "geometry_x"
columnNames [ len ( sorted_columns ) + 1 ] = "geometry_y"
rowSlice , err := pgx . CollectRows ( rows , func ( row pgx . CollectableRow ) ( map [ string ] string , error ) {
fieldDescriptions := row . FieldDescriptions ( )
values := make ( [ ] interface { } , len ( fieldDescriptions ) )
valuePtrs := make ( [ ] interface { } , len ( fieldDescriptions ) )
for i := range values {
valuePtrs [ i ] = & values [ i ]
}
if err := row . Scan ( valuePtrs ... ) ; err != nil {
return nil , err
}
result := make ( map [ string ] string )
for i , fd := range fieldDescriptions {
if values [ i ] != nil {
result [ fd . Name ] = fmt . Sprintf ( "%v" , values [ i ] )
//log.Printf("col %v type %T val %v", fd.Name, values[i], values[i])
} else {
result [ fd . Name ] = ""
}
}
return result , nil
} )
if err != nil {
2025-11-13 20:53:20 +00:00
return result , fmt . Errorf ( "Failed to collect rows: %w" , err )
2025-11-07 08:34:32 +00:00
}
for _ , row := range rowSlice {
o := row [ "objectid" ]
objectid , err := strconv . Atoi ( o )
if err != nil {
2025-11-13 20:53:20 +00:00
return result , fmt . Errorf ( "Failed to parse objectid %s: %w" , o , err )
2025-11-07 08:34:32 +00:00
}
result [ objectid ] = row
}
return result , nil
}
2025-11-07 09:30:31 +00:00
func insertRowFromFeature ( ctx context . Context , table string , sorted_columns [ ] string , feature * arcgis . Feature , org_id int32 ) error {
2025-11-07 08:34:32 +00:00
var options pgx . TxOptions
transaction , err := PGInstance . PGXPool . BeginTx ( ctx , options )
if err != nil {
return fmt . Errorf ( "Unable to start transaction" )
}
2025-11-07 09:30:31 +00:00
err = insertRowFromFeatureFS ( ctx , transaction , table , sorted_columns , feature , org_id )
2025-11-07 08:34:32 +00:00
if err != nil {
transaction . Rollback ( ctx )
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Unable to insert FS: %w" , err )
2025-11-07 08:34:32 +00:00
}
2025-11-07 09:30:31 +00:00
err = insertRowFromFeatureHistory ( ctx , transaction , table , sorted_columns , feature , org_id , 1 )
2025-11-07 08:34:32 +00:00
if err != nil {
transaction . Rollback ( ctx )
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to insert history: %w" , err )
2025-11-07 08:34:32 +00:00
}
err = transaction . Commit ( ctx )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to commit transaction: %w" , err )
2025-11-07 08:34:32 +00:00
}
return nil
}
2025-11-07 09:30:31 +00:00
func insertRowFromFeatureFS ( ctx context . Context , transaction pgx . Tx , table string , sorted_columns [ ] string , feature * arcgis . Feature , org_id int32 ) error {
2025-11-07 08:34:32 +00:00
// Create the query to produce the main row
var sb strings . Builder
sb . WriteString ( "INSERT INTO " )
sb . WriteString ( table )
sb . WriteString ( " (" )
for _ , field := range sorted_columns {
sb . WriteString ( field )
sb . WriteString ( "," )
}
// Specially add the geometry values since they aren't in the fields
2025-11-07 09:30:31 +00:00
sb . WriteString ( "geometry_x,geometry_y,organization_id,updated" )
2025-11-07 08:34:32 +00:00
sb . WriteString ( ")\nVALUES (" )
for _ , field := range sorted_columns {
sb . WriteString ( "@" )
sb . WriteString ( field )
sb . WriteString ( "," )
}
// Specially add the geometry values since they aren't in the fields
2025-11-07 09:30:31 +00:00
sb . WriteString ( "@geometry_x,@geometry_y,@organization_id,@updated)" )
2025-11-07 08:34:32 +00:00
args := pgx . NamedArgs { }
for k , v := range feature . Attributes {
args [ k ] = v
}
// specially add geometry since it isn't in the list of attributes
args [ "geometry_x" ] = feature . Geometry . X
args [ "geometry_y" ] = feature . Geometry . Y
2025-11-07 09:30:31 +00:00
args [ "organization_id" ] = org_id
2025-11-07 08:34:32 +00:00
args [ "updated" ] = time . Now ( )
_ , err := transaction . Exec ( ctx , sb . String ( ) , args )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to insert row into %s: %w" , table , err )
2025-11-07 08:34:32 +00:00
}
return nil
}
func hasUpdates ( row map [ string ] string , feature arcgis . Feature ) bool {
for key , value := range feature . Attributes {
rowdata := row [ strings . ToLower ( key ) ]
// We'll accept any 'nil' as represented by the empty string in the database
if value == nil {
if rowdata == "" {
continue
} else if len ( rowdata ) > 0 {
return true
} else {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Msg ( "Looks like our original value is nil, but our row value is something non-empty with a zero length. Need a programmer to look into this." )
2025-11-07 08:34:32 +00:00
}
}
// check strings first, their simplest
if featureAsString , ok := value . ( string ) ; ok {
if featureAsString != rowdata {
return true
}
continue
} else if featureAsInt , ok := value . ( int ) ; ok {
// Previously had a nil value, now we have a real value
if rowdata == "" {
return true
}
rowAsInt , err := strconv . Atoi ( rowdata )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Msg ( fmt . Sprintf ( "Failed to convert '%s' to an int to compare against %v for %v" , rowdata , featureAsInt , key ) )
2025-11-07 08:34:32 +00:00
}
if rowAsInt != featureAsInt {
return true
} else {
continue
}
} else if featureAsFloat , ok := value . ( float64 ) ; ok {
// Previously had a nil value, now we have a real value
if rowdata == "" {
return true
}
rowAsFloat , err := strconv . ParseFloat ( rowdata , 64 )
if err != nil {
2025-11-13 20:34:48 +00:00
log . Error ( ) . Msg ( fmt . Sprintf ( "Failed to convert '%s' to a float64 to compare against %v for %v" , rowdata , featureAsFloat , key ) )
2025-11-07 08:34:32 +00:00
}
if rowAsFloat != featureAsFloat {
return true
} else {
continue
}
}
2025-11-13 20:53:20 +00:00
log . Info ( ) . Msg ( fmt . Sprintf ( "key: %s\tvalue: %w (type %T)\trow: %s\n" , key , value , value , rowdata ) )
2025-11-13 20:34:48 +00:00
log . Error ( ) . Msg ( "we've hit a point where we can't tell if we have an update or not, need a programmer to look at the above" )
2025-11-07 08:34:32 +00:00
}
return false
}
2025-11-07 09:30:31 +00:00
func updateRowFromFeature ( ctx context . Context , table string , sorted_columns [ ] string , feature * arcgis . Feature , org_id int32 ) error {
2025-11-07 08:34:32 +00:00
// Get the current highest version for the row in question
history_table := toHistoryTable ( table )
var sb strings . Builder
sb . WriteString ( "SELECT MAX(version) FROM " )
sb . WriteString ( history_table )
sb . WriteString ( " WHERE OBJECTID=@objectid" )
args := pgx . NamedArgs { }
o := feature . Attributes [ "OBJECTID" ] . ( float64 )
args [ "objectid" ] = int ( o )
var version int
if err := PGInstance . PGXPool . QueryRow ( ctx , sb . String ( ) , args ) . Scan ( & version ) ; err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to query for version: %w" , err )
2025-11-07 08:34:32 +00:00
}
var options pgx . TxOptions
transaction , err := PGInstance . PGXPool . BeginTx ( ctx , options )
if err != nil {
return fmt . Errorf ( "Unable to start transaction" )
}
2025-11-07 09:30:31 +00:00
err = insertRowFromFeatureHistory ( ctx , transaction , table , sorted_columns , feature , org_id , version + 1 )
2025-11-07 08:34:32 +00:00
if err != nil {
transaction . Rollback ( ctx )
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to insert history: %w" , err )
2025-11-07 08:34:32 +00:00
}
err = updateRowFromFeatureFS ( ctx , transaction , table , sorted_columns , feature )
if err != nil {
transaction . Rollback ( ctx )
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to update row from feature: %w" , err )
2025-11-07 08:34:32 +00:00
}
err = transaction . Commit ( ctx )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to commit transaction: %w" , err )
2025-11-07 08:34:32 +00:00
}
return nil
}
2025-11-07 09:30:31 +00:00
func insertRowFromFeatureHistory ( ctx context . Context , transaction pgx . Tx , table string , sorted_columns [ ] string , feature * arcgis . Feature , org_id int32 , version int ) error {
2025-11-07 08:34:32 +00:00
history_table := toHistoryTable ( table )
var sb strings . Builder
sb . WriteString ( "INSERT INTO " )
sb . WriteString ( history_table )
sb . WriteString ( " (" )
for _ , field := range sorted_columns {
sb . WriteString ( field )
sb . WriteString ( "," )
}
// Specially add the geometry values since they aren't in the fields
2025-11-07 09:30:31 +00:00
sb . WriteString ( "created,geometry_x,geometry_y,organization_id,version" )
2025-11-07 08:34:32 +00:00
sb . WriteString ( ")\nVALUES (" )
for _ , field := range sorted_columns {
sb . WriteString ( "@" )
sb . WriteString ( field )
sb . WriteString ( "," )
}
// Specially add the geometry values since they aren't in the fields
2025-11-07 09:30:31 +00:00
sb . WriteString ( "@created,@geometry_x,@geometry_y,@organization_id,@version)" )
2025-11-07 08:34:32 +00:00
args := pgx . NamedArgs { }
for k , v := range feature . Attributes {
args [ k ] = v
}
args [ "created" ] = time . Now ( )
2025-11-07 09:30:31 +00:00
args [ "organization_id" ] = org_id
2025-11-07 08:34:32 +00:00
args [ "version" ] = version
if _ , err := transaction . Exec ( ctx , sb . String ( ) , args ) ; err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to insert history row into %s: %w" , table , err )
2025-11-07 08:34:32 +00:00
}
return nil
}
func selectAllFromQueryResult ( table string , sorted_columns [ ] string ) string {
var sb strings . Builder
sb . WriteString ( "SELECT * FROM " )
sb . WriteString ( table )
sb . WriteString ( " WHERE OBJECTID=ANY(@objectids)" )
return sb . String ( )
}
func toHistoryTable ( table string ) string {
return "History_" + table [ 3 : len ( table ) ]
}
func updateRowFromFeatureFS ( ctx context . Context , transaction pgx . Tx , table string , sorted_columns [ ] string , feature * arcgis . Feature ) error {
// Create the query to produce the main row
var sb strings . Builder
sb . WriteString ( "UPDATE " )
sb . WriteString ( table )
sb . WriteString ( " SET " )
for _ , field := range sorted_columns {
// OBJECTID is special as our primary key, so skip it
if field == "OBJECTID" {
continue
}
sb . WriteString ( field )
sb . WriteString ( "=@" )
sb . WriteString ( field )
sb . WriteString ( "," )
}
// Specially add the geometry values since they aren't in the fields
sb . WriteString ( "geometry_x=@geometry_x,geometry_y=@geometry_y,updated=@updated WHERE OBJECTID=@OBJECTID" )
args := pgx . NamedArgs { }
for k , v := range feature . Attributes {
args [ k ] = v
}
// specially add geometry since it isn't in the list of attributes
args [ "geometry_x" ] = feature . Geometry . X
args [ "geometry_y" ] = feature . Geometry . Y
args [ "updated" ] = time . Now ( )
_ , err := transaction . Exec ( ctx , sb . String ( ) , args )
if err != nil {
2025-11-13 20:53:20 +00:00
return fmt . Errorf ( "Failed to update row into %s: %w" , table , err )
2025-11-07 08:34:32 +00:00
}
return nil
}