From d4d97494310717998c381696c80b76c9904ff805 Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Sat, 28 Feb 2026 23:26:08 +0000 Subject: [PATCH] Rework arcgis integration for arcgis-go changes and table changes --- background/arcgis.go | 1547 +++++++++++++++++--------------- db/fieldseeker.go | 29 +- db/prepared.go | 110 +-- db/sql/org_by_oauth_id.bob.sql | 6 +- db/sql/org_by_oauth_id.sql | 6 +- 5 files changed, 903 insertions(+), 795 deletions(-) diff --git a/background/arcgis.go b/background/arcgis.go index 42548b85..c76e45a2 100644 --- a/background/arcgis.go +++ b/background/arcgis.go @@ -23,8 +23,10 @@ import ( "github.com/Gleipnir-Technology/arcgis-go" "github.com/Gleipnir-Technology/arcgis-go/fieldseeker" "github.com/Gleipnir-Technology/arcgis-go/response" + "github.com/Gleipnir-Technology/bob" "github.com/Gleipnir-Technology/bob/dialect/psql" "github.com/Gleipnir-Technology/bob/dialect/psql/dm" + "github.com/Gleipnir-Technology/bob/dialect/psql/im" "github.com/Gleipnir-Technology/bob/dialect/psql/sm" "github.com/Gleipnir-Technology/bob/dialect/psql/um" "github.com/Gleipnir-Technology/nidus-sync/config" @@ -65,13 +67,13 @@ type OAuthTokenResponse struct { Username string `json:"username"` } -func GetOAuthForOrg(ctx context.Context, org *models.Organization) (*models.OauthToken, error) { +func GetOAuthForOrg(ctx context.Context, org *models.Organization) (*models.ArcgisOauthToken, error) { users, err := org.User().All(ctx, db.PGInstance.BobDB) if err != nil { return nil, fmt.Errorf("Failed to query all users for org: %w", err) } for _, user := range users { - oauths, err := user.UserOauthTokens(models.SelectWhere.OauthTokens.InvalidatedAt.IsNull()).All(ctx, db.PGInstance.BobDB) + oauths, err := user.UserOauthTokens(models.SelectWhere.ArcgisOauthTokens.InvalidatedAt.IsNull()).All(ctx, db.PGInstance.BobDB) if err != nil { return nil, fmt.Errorf("Failed to query all oauth tokens for org: %w", err) } @@ -83,31 +85,22 @@ func GetOAuthForOrg(ctx context.Context, org *models.Organization) (*models.Oaut } func HandleOauthAccessCode(ctx context.Context, user *models.User, code string) error { - baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/" - - //params.Add("code_verifier", "S256") - form := url.Values{ "grant_type": []string{"authorization_code"}, "code": []string{code}, - "client_id": []string{config.ClientID}, "redirect_uri": []string{config.ArcGISOauthRedirectURL()}, } - req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode())) - if err != nil { - return fmt.Errorf("Failed to create request: %w", err) - } - req.Header.Add("Content-Type", "application/x-www-form-urlencoded") - token, err := handleTokenRequest(ctx, req) + token, err := doTokenRequest(ctx, form) if err != nil { return fmt.Errorf("Failed to exchange authorization code for token: %w", err) } accessExpires := futureUTCTimestamp(token.ExpiresIn) refreshExpires := futureUTCTimestamp(token.RefreshTokenExpiresIn) - setter := models.OauthTokenSetter{ - AccessToken: omit.From(token.AccessToken), - AccessTokenExpires: omit.From(accessExpires), + setter := models.ArcgisOauthTokenSetter{ + AccessToken: omit.From(token.AccessToken), + AccessTokenExpires: omit.From(accessExpires), + //ArcgisAccountID: omit.From( ArcgisID: omitnull.FromPtr[string](nil), ArcgisLicenseTypeID: omitnull.FromPtr[string](nil), Created: omit.From(time.Now()), @@ -117,7 +110,7 @@ func HandleOauthAccessCode(ctx context.Context, user *models.User, code string) UserID: omit.From(user.ID), Username: omit.From(token.Username), } - oauth, err := models.OauthTokens.Insert(&setter).One(ctx, db.PGInstance.BobDB) + oauth, err := models.ArcgisOauthTokens.Insert(&setter).One(ctx, db.PGInstance.BobDB) if err != nil { return fmt.Errorf("Failed to save token to database: %w", err) } @@ -126,9 +119,9 @@ func HandleOauthAccessCode(ctx context.Context, user *models.User, code string) } func HasFieldseekerConnection(ctx context.Context, user *models.User) (bool, error) { - result, err := models.OauthTokens.Query( + result, err := models.ArcgisOauthTokens.Query( sm.Where( - models.OauthTokens.Columns.UserID.EQ(psql.Arg(user.ID)), + models.ArcgisOauthTokens.Columns.UserID.EQ(psql.Arg(user.ID)), ), ).Exists(ctx, db.PGInstance.BobDB) if err != nil { @@ -149,7 +142,7 @@ func refreshFieldseekerData(background_ctx context.Context, newOauthCh <-chan st workerCtx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup - oauths, err := models.OauthTokens.Query(models.SelectWhere.OauthTokens.InvalidatedAt.IsNull()).All(ctx, db.PGInstance.BobDB) + oauths, err := models.ArcgisOauthTokens.Query(models.SelectWhere.ArcgisOauthTokens.InvalidatedAt.IsNull()).All(ctx, db.PGInstance.BobDB) if err != nil { log.Error().Err(err).Msg("Failed to get oauths") return @@ -218,9 +211,13 @@ type SyncStats struct { } func downloadFieldseekerSchema(ctx context.Context, fieldseekerClient *fieldseeker.FieldSeeker, arcgis_id string) { - layers := fieldseekerClient.Layers() + layers, err := fieldseekerClient.Layers(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to get layers") + return + } log.Debug().Int("len", len(layers)).Msg("Downloading fieldseeker schema") - for _, layer := range layers { + for i, layer := range layers { err := os.MkdirAll(filepath.Join(config.FieldseekerSchemaDirectory, arcgis_id), os.ModePerm) if err != nil { log.Error().Err(err).Msg("Failed to create parent directory") @@ -232,7 +229,7 @@ func downloadFieldseekerSchema(ctx context.Context, fieldseekerClient *fieldseek return } defer output.Close() - schema, err := fieldseekerClient.SchemaRaw(ctx, layer.ID) + schema, err := fieldseekerClient.SchemaRaw(ctx, uint(i)) if err != nil { log.Error().Err(err).Msg("Failed to get schema") return @@ -282,7 +279,7 @@ func generateCodeVerifier() string { } // Find out what we can about this user -func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models.OauthToken) { +func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models.ArcgisOauthToken) { client, err := arcgis.NewArcGISAuth( ctx, &arcgis.AuthenticatorOAuth{ @@ -297,20 +294,32 @@ func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models. return } - portal, err := updatePortalData(ctx, client, user.ID) + txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) + if err != nil { + log.Error().Err(err).Msg("Create transaction") + return + } + defer txn.Rollback(ctx) + + account, ag_user, err := updateArcgisAccount(ctx, txn, client, user) if err != nil { log.Error().Err(err).Msg("Failed to get portal data") return } - _, err = models.OauthTokens.Update( - //um.SetCol(string(models.OauthTokens.Columns.ArcgisID)).ToArg(portal.User.ID), - //um.SetCol(string(models.OauthTokens.Columns.ArcgisLicenseTypeID)).ToArg(portal.User.UserLicenseTypeID), - um.SetCol("arcgis_id").ToArg(portal.User.ID), - um.SetCol("arcgis_license_type_id").ToArg(portal.User.UserLicenseTypeID), - um.Where(models.OauthTokens.Columns.RefreshToken.EQ(psql.Arg(oauth.RefreshToken))), + err = updateServiceData(ctx, txn, client, user, account) + if err != nil { + log.Error().Err(err).Msg("Failed to get service data") + return + } + + _, err = models.ArcgisOauthTokens.Update( + //um.SetCol(string(models.ArcgisOauthTokens.Columns.ArcgisID)).ToArg(portal.User.ID), + //um.SetCol(string(models.ArcgisOauthTokens.Columns.ArcgisLicenseTypeID)).ToArg(portal.User.UserLicenseTypeID), + um.SetCol("arcgis_id").ToArg(ag_user.ID), + um.SetCol("arcgis_license_type_id").ToArg(ag_user.UserLicenseTypeID), + um.Where(models.ArcgisOauthTokens.Columns.RefreshToken.EQ(psql.Arg(oauth.RefreshToken))), ).Exec(ctx, db.PGInstance.BobDB) - //_, err = sql.UpdateOauthTokenOrg(portal.User.ID, portal.User.UserLicenseTypeID, refresh_token).Exec(ctx, db.PGInstance.BobDB) if err != nil { log.Error().Err(err).Msg("Failed to update oauth token portal data") return @@ -318,11 +327,11 @@ func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models. // At this point we have the arcgis ID. If the ID matches an existing ID, join it with the users organization orgs, err := models.Organizations.Query( sm.Where( - psql.Quote("arcgis_id").EQ(psql.Arg(portal.User.OrgID)), + psql.Quote("arcgis_account_id").EQ(psql.Arg(ag_user.OrgID)), ), ).All(ctx, db.PGInstance.BobDB) if err != nil { - log.Error().Err(err).Str("arcgis_id", portal.User.OrgID).Msg("Failed to search for organization with arcgis id") + log.Error().Err(err).Str("arcgis_id", ag_user.OrgID).Msg("Failed to search for organization with arcgis id") return } var org *models.Organization @@ -330,14 +339,13 @@ func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models. case 0: org = user.R.Organization err = org.Update(ctx, db.PGInstance.BobDB, &models.OrganizationSetter{ - ArcgisID: omitnull.From(portal.User.OrgID), - ArcgisName: omitnull.From(portal.Name), + ArcgisAccountID: omitnull.From(ag_user.OrgID), }) if err != nil { log.Error().Err(err).Int32("id", user.R.Organization.ID).Msg("Failed to update organization's arcgis info") return } - log.Info().Int32("org_id", org.ID).Str("arcgis_id", portal.User.OrgID).Msg("Updated org arcgis ID") + log.Info().Int32("org_id", org.ID).Str("arcgis_id", ag_user.OrgID).Msg("Updated org arcgis ID") case 1: org = orgs[0] user.Update(ctx, db.PGInstance.BobDB, &models.UserSetter{ @@ -352,27 +360,34 @@ func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models. fssync, err := fieldseeker.NewFieldSeekerFromAG(ctx, *client) if err != nil { log.Error().Err(err).Msg("Failed to create fieldseeker") + return } log.Info().Str("url", fssync.ServiceFeature.URL.String()).Msg("Found Fieldseeker") + + // Ensure the fieldseeker service is saved on the account + service_account, err := models.ArcgisServiceFeatures.Query( + models.SelectWhere.ArcgisServiceFeatures.URL.EQ(fssync.ServiceFeature.URL.String()), + ).One(ctx, txn) + if err != nil { + log.Error().Err(err).Msg("no fieldseeker service") + return + } setter := models.OrganizationSetter{ - FieldseekerURL: omitnull.From(fssync.ServiceFeature.URL.String()), + FieldseekerServiceFeatureItemID: omitnull.From(service_account.ItemID), } err = org.Update(ctx, db.PGInstance.BobDB, &setter) if err != nil { log.Error().Err(err).Msg("Failed to create new organization") return } - arcgis_id, ok := org.ArcgisID.Get() - if !ok { - log.Error().Int("org.id", int(org.ID)).Msg("Cannot get webhooks - ArcGIS ID is null") - } + txn.Commit(ctx) maybeCreateWebhook(ctx, fssync) - downloadFieldseekerSchema(ctx, fssync, arcgis_id) + downloadFieldseekerSchema(ctx, fssync, account.ID) notification.ClearOauth(ctx, user) newOAuthTokenChannel <- struct{}{} } -func NewFieldSeeker(ctx context.Context, oauth *models.OauthToken) (*fieldseeker.FieldSeeker, error) { +func NewFieldSeeker(ctx context.Context, oauth *models.ArcgisOauthToken) (*fieldseeker.FieldSeeker, error) { row, err := sql.OrgByOauthId(oauth.ID).One(ctx, db.PGInstance.BobDB) if err != nil { return nil, fmt.Errorf("Failed to get org ID: %w", err) @@ -380,10 +395,7 @@ func NewFieldSeeker(ctx context.Context, oauth *models.OauthToken) (*fieldseeker // The URL for fieldseeker should be something like // https://foo.arcgis.com/123abc/arcgis/rest/services/FieldSeekerGIS/FeatureServer // We need to break it up - if row.FieldseekerURL.IsNull() { - return nil, fmt.Errorf("No fieldseeker URL") - } - host, pathParts, err := extractURLParts(row.FieldseekerURL.MustGet()) + host, pathParts, err := extractURLParts(row.FieldseekerURL) if err != nil { return nil, fmt.Errorf("Failed to break up provided url: %v", err) } @@ -409,80 +421,153 @@ func NewFieldSeeker(ctx context.Context, oauth *models.OauthToken) (*fieldseeker return nil, fmt.Errorf("Failed to create ArcGIS client: %w", err) } log.Info().Str("context", context).Str("host", host).Msg("Using base fieldseeker URL") - fssync, err := fieldseeker.NewFieldSeekerFromURL(ctx, *ar, row.FieldseekerURL.MustGet()) + fssync, err := fieldseeker.NewFieldSeekerFromURL(ctx, *ar, row.FieldseekerURL) if err != nil { return nil, fmt.Errorf("Failed to create Fieldseeker client: %w", err) } return fssync, nil } -func updatePortalData(ctx context.Context, client *arcgis.ArcGIS, user_id int32) (*response.Portal, error) { +func updateArcgisAccount(ctx context.Context, txn bob.Tx, client *arcgis.ArcGIS, user *models.User) (*models.ArcgisAccount, *models.ArcgisUser, error) { p, err := client.PortalsSelf(ctx) if err != nil { - return nil, fmt.Errorf("Failed to get ArcGIS user data: %w", err) + return nil, nil, fmt.Errorf("Failed to get ArcGIS user data: %w", err) } - tx, err := db.PGInstance.BobDB.BeginTx(ctx, nil) + // Ensure that an arcgis account exists to attach to + account, err := ensureArcgisAccount(ctx, txn, p, user) + ag_user, err := models.FindArcgisUser(ctx, txn, p.User.ID) if err != nil { - return nil, fmt.Errorf("Failed to create transaction: %w", err) + log.Warn().Err(err).Msg("need arcgis user account?") + if err.Error() == "sql: no rows in result set" { + setter := models.ArcgisUserSetter{ + Access: omit.From(p.Access), + Created: omit.From(time.Unix(p.User.Created, 0)), + Email: omit.From(p.User.Email), + FullName: omit.From(p.User.FullName), + ID: omit.From(p.User.ID), + Level: omit.From(p.User.Level), + OrgID: omit.From(p.User.OrgID), + PublicUserID: omit.From(user.ID), + Region: omit.From(p.Region), + Role: omit.From(p.User.Role), + RoleID: omit.From(p.User.RoleId), + Username: omit.From(p.User.Username), + UserLicenseTypeID: omit.From(p.User.UserLicenseTypeID), + UserType: omit.From(p.User.UserType), + } + ag_user, err = models.ArcgisUsers.Insert(&setter).One(ctx, txn) + if err != nil { + return nil, nil, fmt.Errorf("Failed to add arcgis user data: %w", err) + } + } else { + return nil, nil, fmt.Errorf("Failed to find arcgis user: %w", err) + } } _, err = models.ArcgisUserPrivileges.Delete( dm.Where( models.ArcgisUserPrivileges.Columns.UserID.EQ(psql.Arg(p.User.ID)), ), - ).Exec(ctx, tx) + ).Exec(ctx, txn) if err != nil { - tx.Rollback(ctx) - return nil, fmt.Errorf("Failed to delete previous user privilege data: %w", err) + return nil, nil, fmt.Errorf("Failed to delete previous user privilege data: %w", err) } - _, err = models.ArcgisUsers.Delete( - dm.Where( - models.ArcgisUsers.Columns.ID.EQ(psql.Arg(p.User.ID)), - ), - ).Exec(ctx, tx) - - if err != nil { - tx.Rollback(ctx) - return nil, fmt.Errorf("Failed to delete previous user data: %w", err) - } - - setter := models.ArcgisUserSetter{ - Access: omit.From(p.Access), - Created: omit.From(time.Unix(p.User.Created, 0)), - Email: omit.From(p.User.Email), - FullName: omit.From(p.User.FullName), - ID: omit.From(p.User.ID), - Level: omit.From(p.User.Level), - OrgID: omit.From(p.User.OrgID), - PublicUserID: omit.From(user_id), - Region: omit.From(p.Region), - Role: omit.From(p.User.Role), - RoleID: omit.From(p.User.RoleId), - Username: omit.From(p.User.Username), - UserLicenseTypeID: omit.From(p.User.UserLicenseTypeID), - UserType: omit.From(p.User.UserType), - } - _, err = models.ArcgisUsers.Insert(&setter).One(ctx, tx) - if err != nil { - tx.Rollback(ctx) - return nil, fmt.Errorf("Failed to add arcgis user data: %w", err) - } for _, priv := range p.User.Privileges { s := models.ArcgisUserPrivilegeSetter{ Privilege: omit.From(priv), UserID: omit.From(p.User.ID), } - _, err := models.ArcgisUserPrivileges.Insert(&s).One(ctx, tx) + _, err := models.ArcgisUserPrivileges.Insert(&s).One(ctx, txn) if err != nil { - tx.Rollback(ctx) - return nil, fmt.Errorf("Failed to add arcgis user privilege data: %w", err) + return nil, nil, fmt.Errorf("Failed to add arcgis user privilege data: %w", err) } } - err = tx.Commit(ctx) log.Info().Str("username", p.User.Username).Str("user_id", p.User.ID).Str("org_id", p.User.OrgID).Str("org_name", p.Name).Str("license_type_id", p.User.UserLicenseTypeID).Msg("Updated portals data") - return p, nil + return account, ag_user, nil +} +func updateServiceData(ctx context.Context, txn bob.Tx, client *arcgis.ArcGIS, user *models.User, account *models.ArcgisAccount) error { + service_maps, err := client.MapServices(ctx) + if err != nil { + return fmt.Errorf("list map services: %w", err) + } + for _, sm := range service_maps { + log.Info().Str("account-id", account.ID).Str("arcgis-id", sm.ID).Str("name", sm.Name).Str("title", sm.Title).Str("url", sm.URL.String()).Msg("inserting map service") + setter := models.ArcgisServiceMapSetter{ + AccountID: omit.From(account.ID), + ArcgisID: omit.From(sm.ID), + Name: omit.From(sm.Name), + Title: omit.From(sm.Title), + URL: omit.From(sm.URL.String()), + } + _, err := models.ArcgisServiceMaps.Insert(&setter).One(ctx, txn) + if err != nil { + return fmt.Errorf("save map service: %w", err) + } + /* + + // Created this after (maybe mistakenly) marking the above as: + // TODO: No idea why this isn't working, but it's mixing up the inputs + _, err = psql.Insert( + im.Into("arcgis.service_map", "account_id", "arcgis_id", "name", "title", "url"), + im.Values( + psql.Arg(account.ID), + psql.Arg(sm.ID), + psql.Arg(sm.Name), + psql.Arg(sm.Title), + psql.Arg(sm.URL.String()), + ), + ).Exec(ctx, txn) + */ + } + + services, err := client.Services(ctx) + for _, service := range services { + err := ensureServiceFeature(ctx, txn, client, user, account, service) + if err != nil { + return fmt.Errorf("ensure service feature: %w", err) + } + } + return nil +} +func ensureServiceFeature(ctx context.Context, txn bob.Tx, client *arcgis.ArcGIS, user *models.User, account *models.ArcgisAccount, service *arcgis.ServiceFeature) error { + _, err := models.ArcgisServiceFeatures.Query( + models.SelectWhere.ArcgisServiceFeatures.URL.EQ(service.URL.String()), + ).One(ctx, txn) + if err == nil { + return nil + } + if err.Error() != "sql: no rows in result set" { + return err + } + metadata, err := service.PopulateMetadata(ctx) + if err != nil { + return fmt.Errorf("populate metadata: %w", err) + } + + /* + TODO: figure out how to do this without raw insert + setter := models.ArcgisServiceFeatureSetter{ + AccountID: omitnull.From(account.ID), + Extent: omit.From(metadata.FullExtent), + ItemID: omit.From(metadata.ServiceItemId), + SpatialReference: omit.From(metadata.SpatialReference.LatestWKID), + URL: omit.From(service.URL.String()), + } + _, err = models.ArcgisServiceFeatures.Insert(&setter).One(ctx, txn) + */ + _, err = psql.Insert( + im.Into("arcgis.service_feature", "account_id", "extent", "item_id", "spatial_reference", "url"), + im.Values( + psql.Arg(account.ID), + psql.Raw("Box2D(ST_GeomFromText('LINESTRING(1 2, 3 4, 5 6)'))"), + psql.Arg(metadata.ServiceItemId), + psql.Arg(metadata.SpatialReference.LatestWKID), + psql.Arg(service.URL.String()), + ), + ).Exec(ctx, txn) + return err } func maybeCreateWebhook(ctx context.Context, client *fieldseeker.FieldSeeker) { @@ -550,7 +635,10 @@ func exportFieldseekerData(ctx context.Context, fssync *fieldseeker.FieldSeeker, pool := pond.NewResultPool[SyncStats](20) group := pool.NewGroup() var ss SyncStats - layers := fssync.Layers() + layers, err := fssync.Layers(ctx) + if err != nil { + return fmt.Errorf("get layers: %w", err) + } for _, l := range layers { ss, err = exportFieldseekerLayer(ctx, group, org, fssync, l) if err != nil { @@ -619,10 +707,10 @@ func logPermissions(ctx context.Context, fssync *fieldseeker.FieldSeeker) { } } -func maintainOAuth(ctx context.Context, oauth *models.OauthToken) error { +func maintainOAuth(ctx context.Context, oauth *models.ArcgisOauthToken) error { for { // Refresh from the database - oauth, err := models.FindOauthToken(ctx, db.PGInstance.BobDB, oauth.ID) + oauth, err := models.FindArcgisOauthToken(ctx, db.PGInstance.BobDB, oauth.ID) if err != nil { return fmt.Errorf("Failed to update oauth token from database: %w", err) } @@ -662,8 +750,8 @@ func maintainOAuth(ctx context.Context, oauth *models.OauthToken) error { // Mark that a given oauth token has failed. This includes a notification to // the user. -func markTokenFailed(ctx context.Context, oauth *models.OauthToken) { - oauthSetter := models.OauthTokenSetter{ +func markTokenFailed(ctx context.Context, oauth *models.ArcgisOauthToken) { + oauthSetter := models.ArcgisOauthTokenSetter{ InvalidatedAt: omitnull.From(time.Now()), } err := oauth.Update(ctx, db.PGInstance.BobDB, &oauthSetter) @@ -680,26 +768,18 @@ func markTokenFailed(ctx context.Context, oauth *models.OauthToken) { } // Update the access token to keep it fresh and alive -func refreshAccessToken(ctx context.Context, oauth *models.OauthToken) error { - baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/" - +func refreshAccessToken(ctx context.Context, oauth *models.ArcgisOauthToken) error { form := url.Values{ "grant_type": []string{"refresh_token"}, "client_id": []string{config.ClientID}, "refresh_token": []string{oauth.RefreshToken}, } - - req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode())) - if err != nil { - return fmt.Errorf("Failed to create request: %w", err) - } - req.Header.Add("Content-Type", "application/x-www-form-urlencoded") - token, err := handleTokenRequest(ctx, req) + token, err := doTokenRequest(ctx, form) if err != nil { return fmt.Errorf("Failed to handle request: %w", err) } accessExpires := futureUTCTimestamp(token.ExpiresIn) - setter := models.OauthTokenSetter{ + setter := models.ArcgisOauthTokenSetter{ AccessToken: omit.From(token.AccessToken), AccessTokenExpires: omit.From(accessExpires), Username: omit.From(token.Username), @@ -713,27 +793,20 @@ func refreshAccessToken(ctx context.Context, oauth *models.OauthToken) error { } // Update the refresh token to keep it fresh and alive -func refreshRefreshToken(ctx context.Context, oauth *models.OauthToken) error { - baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/" +func refreshRefreshToken(ctx context.Context, oauth *models.ArcgisOauthToken) error { form := url.Values{ "grant_type": []string{"exchange_refresh_token"}, - "client_id": []string{config.ClientID}, "redirect_uri": []string{config.ArcGISOauthRedirectURL()}, "refresh_token": []string{oauth.RefreshToken}, } - req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode())) - if err != nil { - return fmt.Errorf("Failed to create request: %w", err) - } - req.Header.Add("Content-Type", "application/x-www-form-urlencoded") - token, err := handleTokenRequest(ctx, req) + token, err := doTokenRequest(ctx, form) if err != nil { return fmt.Errorf("Failed to handle request: %w", err) } refreshExpires := futureUTCTimestamp(token.ExpiresIn) - setter := models.OauthTokenSetter{ + setter := models.ArcgisOauthTokenSetter{ RefreshToken: omit.From(token.RefreshToken), RefreshTokenExpires: omit.From(refreshExpires), Username: omit.From(token.Username), @@ -751,7 +824,15 @@ func newTimestampedFilename(prefix, suffix string) string { return prefix + timestamp + suffix } -func handleTokenRequest(ctx context.Context, req *http.Request) (*OAuthTokenResponse, error) { +func doTokenRequest(ctx context.Context, form url.Values) (*OAuthTokenResponse, error) { + form.Set("client_id", config.ClientID) + + baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/" + req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode())) + if err != nil { + return nil, fmt.Errorf("Failed to create request: %w", err) + } + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") client := http.Client{} log.Info().Str("url", req.URL.String()).Msg("POST") resp, err := client.Do(req) @@ -849,18 +930,24 @@ func saveRawQuery(fssync fieldseeker.FieldSeeker, layer arcgis.LayerFeature, que } */ -func saveOrUpdateDBRecords(ctx context.Context, table string, qr *arcgis.QueryResult, org_id int32) (int, int, error) { +func saveOrUpdateDBRecords(ctx context.Context, table string, qr *response.QueryResult, org_id int32) (int, int, error) { inserts, updates := 0, 0 sorted_columns := make([]string, 0, len(qr.Fields)) for _, f := range qr.Fields { - sorted_columns = append(sorted_columns, f.Name) + sorted_columns = append(sorted_columns, *f.Name) } sort.Strings(sorted_columns) objectids := make([]int, 0) for _, l := range qr.Features { - oid := l.Attributes["OBJECTID"].(float64) - objectids = append(objectids, int(oid)) + attr := l.Attributes["OBJECTID"] + attr_s := attr.String() + oid, err := strconv.Atoi(attr_s) + if err != nil { + log.Warn().Str("attr_s", attr_s).Msg("failed to convert") + continue + } + objectids = append(objectids, oid) } rows_by_objectid, err := rowmapViaQuery(ctx, table, sorted_columns, objectids) @@ -870,8 +957,14 @@ func saveOrUpdateDBRecords(ctx context.Context, table string, qr *arcgis.QueryRe // log.Println("Rows from query", len(rows_by_objectid)) for _, feature := range qr.Features { - oid := feature.Attributes["OBJECTID"].(float64) - row := rows_by_objectid[int(oid)] + attr := feature.Attributes["OBJECTID"] + attr_s := attr.String() + oid, err := strconv.Atoi(attr_s) + if err != nil { + log.Warn().Str("attr_s", attr_s).Msg("failed to convert") + continue + } + row := rows_by_objectid[oid] // If we have no matching row we'll need to create it if len(row) == 0 { @@ -951,33 +1044,31 @@ func rowmapViaQuery(ctx context.Context, table string, sorted_columns []string, return result, nil } -func insertRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error { - var options pgx.TxOptions - transaction, err := db.PGInstance.PGXPool.BeginTx(ctx, options) +func insertRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *response.Feature, org_id int32) error { + txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("Unable to start transaction") } + defer txn.Rollback(ctx) - err = insertRowFromFeatureFS(ctx, transaction, table, sorted_columns, feature, org_id) + err = insertRowFromFeatureFS(ctx, txn, table, sorted_columns, feature, org_id) if err != nil { - transaction.Rollback(ctx) return fmt.Errorf("Unable to insert FS: %w", err) } - err = insertRowFromFeatureHistory(ctx, transaction, table, sorted_columns, feature, org_id, 1) + err = insertRowFromFeatureHistory(ctx, txn, table, sorted_columns, feature, org_id, 1) if err != nil { - transaction.Rollback(ctx) return fmt.Errorf("Failed to insert history: %w", err) } - err = transaction.Commit(ctx) + txn.Commit(ctx) if err != nil { return fmt.Errorf("Failed to commit transaction: %w", err) } return nil } -func insertRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error { +func insertRowFromFeatureFS(ctx context.Context, txn bob.Tx, table string, sorted_columns []string, feature *response.Feature, org_id int32) error { // Create the query to produce the main row var sb strings.Builder sb.WriteString("INSERT INTO ") @@ -1008,105 +1099,106 @@ func insertRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table strin args["organization_id"] = org_id args["updated"] = time.Now() - _, err := transaction.Exec(ctx, sb.String(), args) + _, err := txn.ExecContext(ctx, sb.String(), args) if err != nil { return fmt.Errorf("Failed to insert row into %s: %w", table, err) } return nil } -func hasUpdates(row map[string]string, feature arcgis.Feature) bool { - for key, value := range feature.Attributes { - rowdata := row[strings.ToLower(key)] - // We'll accept any 'nil' as represented by the empty string in the database - if value == nil { - if rowdata == "" { - continue - } else if len(rowdata) > 0 { - return true - } else { - log.Error().Msg("Looks like our original value is nil, but our row value is something non-empty with a zero length. Need a programmer to look into this.") - } - } - // check strings first, their simplest - if featureAsString, ok := value.(string); ok { - if featureAsString != rowdata { - return true - } - continue - } else if featureAsInt, ok := value.(int); ok { - // Previously had a nil value, now we have a real value - if rowdata == "" { - return true - } - rowAsInt, err := strconv.Atoi(rowdata) - if err != nil { - log.Error().Msg(fmt.Sprintf("Failed to convert '%s' to an int to compare against %v for %v", rowdata, featureAsInt, key)) - } - if rowAsInt != featureAsInt { - return true - } else { - continue - } - } else if featureAsFloat, ok := value.(float64); ok { - // Previously had a nil value, now we have a real value - if rowdata == "" { - return true - } - rowAsFloat, err := strconv.ParseFloat(rowdata, 64) - if err != nil { - log.Error().Msg(fmt.Sprintf("Failed to convert '%s' to a float64 to compare against %v for %v", rowdata, featureAsFloat, key)) - } - if rowAsFloat != featureAsFloat { - return true - } else { - continue - } - } - log.Error().Str("key", key).Str("rowdata", rowdata).Msg("we've hit a point where we can't tell if we have an update or not, need a programmer to look at the above") - } +func hasUpdates(row map[string]string, feature response.Feature) bool { return false + /* + for key, value := range feature.Attributes { + rowdata := row[strings.ToLower(key)] + // We'll accept any 'nil' as represented by the empty string in the database + if value == nil { + if rowdata == "" { + continue + } else if len(rowdata) > 0 { + return true + } else { + log.Error().Msg("Looks like our original value is nil, but our row value is something non-empty with a zero length. Need a programmer to look into this.") + } + } + // check strings first, their simplest + if featureAsString, ok := value.(response.TextValue); ok { + if featureAsString.String() != rowdata { + return true + } + continue + } else if featureAsInt, ok := value.(response.Int32Value); ok { + // Previously had a nil value, now we have a real value + if rowdata == "" { + return true + } + rowAsInt, err := strconv.Atoi(rowdata) + if err != nil { + log.Error().Msg(fmt.Sprintf("Failed to convert '%s' to an int to compare against %v for %v", rowdata, featureAsInt, key)) + } + if rowAsInt != featureAsInt.V { + return true + } else { + continue + } + } else if featureAsFloat, ok := value.(Float64Value); ok { + // Previously had a nil value, now we have a real value + if rowdata == "" { + return true + } + rowAsFloat, err := strconv.ParseFloat(rowdata, 64) + if err != nil { + log.Error().Msg(fmt.Sprintf("Failed to convert '%s' to a float64 to compare against %v for %v", rowdata, featureAsFloat, key)) + } + if rowAsFloat != featureAsFloat { + return true + } else { + continue + } + } + log.Error().Str("key", key).Str("rowdata", rowdata).Msg("we've hit a point where we can't tell if we have an update or not, need a programmer to look at the above") + } + return false + */ } -func updateRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error { - // Get the current highest version for the row in question - history_table := toHistoryTable(table) - var sb strings.Builder - sb.WriteString("SELECT MAX(version) FROM ") - sb.WriteString(history_table) - sb.WriteString(" WHERE OBJECTID=@objectid") - - args := pgx.NamedArgs{} - o := feature.Attributes["OBJECTID"].(float64) - args["objectid"] = int(o) - - var version int - if err := db.PGInstance.PGXPool.QueryRow(ctx, sb.String(), args).Scan(&version); err != nil { - return fmt.Errorf("Failed to query for version: %w", err) - } - - var options pgx.TxOptions - transaction, err := db.PGInstance.PGXPool.BeginTx(ctx, options) - if err != nil { - return fmt.Errorf("Unable to start transaction") - } - - err = insertRowFromFeatureHistory(ctx, transaction, table, sorted_columns, feature, org_id, version+1) - if err != nil { - transaction.Rollback(ctx) - return fmt.Errorf("Failed to insert history: %w", err) - } - err = updateRowFromFeatureFS(ctx, transaction, table, sorted_columns, feature) - if err != nil { - transaction.Rollback(ctx) - return fmt.Errorf("Failed to update row from feature: %w", err) - } - - err = transaction.Commit(ctx) - if err != nil { - return fmt.Errorf("Failed to commit transaction: %w", err) - } +func updateRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *response.Feature, org_id int32) error { return nil + /* + // Get the current highest version for the row in question + history_table := toHistoryTable(table) + var sb strings.Builder + sb.WriteString("SELECT MAX(version) FROM ") + sb.WriteString(history_table) + sb.WriteString(" WHERE OBJECTID=@objectid") + + args := pgx.NamedArgs{} + o := feature.Attributes["OBJECTID"].(float64) + args["objectid"] = int(o) + + var version int + if err := db.PGInstance.PGXPool.QueryRow(ctx, sb.String(), args).Scan(&version); err != nil { + return fmt.Errorf("Failed to query for version: %w", err) + } + + txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("Unable to start transaction") + } + defer txn.Rollback(ctx) + + err = insertRowFromFeatureHistory(ctx, txn, table, sorted_columns, feature, org_id, version+1) + if err != nil { + return fmt.Errorf("Failed to insert history: %w", err) + } + err = updateRowFromFeatureFS(ctx, txn, table, sorted_columns, feature) + if err != nil { + return fmt.Errorf("Failed to update row from feature: %w", err) + } + + txn.Commit(ctx) + return nil + */ } -func insertRowFromFeatureHistory(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32, version int) error { +func insertRowFromFeatureHistory(ctx context.Context, transaction bob.Tx, table string, sorted_columns []string, feature *response.Feature, org_id int32, version int) error { history_table := toHistoryTable(table) var sb strings.Builder sb.WriteString("INSERT INTO ") @@ -1133,7 +1225,7 @@ func insertRowFromFeatureHistory(ctx context.Context, transaction pgx.Tx, table args["created"] = time.Now() args["organization_id"] = org_id args["version"] = version - if _, err := transaction.Exec(ctx, sb.String(), args); err != nil { + if _, err := transaction.ExecContext(ctx, sb.String(), args); err != nil { return fmt.Errorf("Failed to insert history row into %s: %w", table, err) } return nil @@ -1149,7 +1241,7 @@ func toHistoryTable(table string) string { return "History_" + table[3:len(table)] } -func updateRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature) error { +func updateRowFromFeatureFS(ctx context.Context, transaction bob.Tx, table string, sorted_columns []string, feature *response.Feature) error { // Create the query to produce the main row var sb strings.Builder sb.WriteString("UPDATE ") @@ -1177,7 +1269,7 @@ func updateRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table strin //args["geometry_y"] = feature.Geometry.Y args["updated"] = time.Now() - _, err := transaction.Exec(ctx, sb.String(), args) + _, err := transaction.ExecContext(ctx, sb.String(), args) if err != nil { return fmt.Errorf("Failed to update row into %s: %w", table, err) } @@ -1186,490 +1278,519 @@ func updateRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table strin func exportFieldseekerLayer(ctx context.Context, group pond.ResultTaskGroup[SyncStats], org *models.Organization, fssync *fieldseeker.FieldSeeker, layer response.Layer) (SyncStats, error) { var stats SyncStats - count, err := fssync.QueryCount(ctx, layer.ID) - if err != nil { - return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err) - } - if count.Count == 0 { - log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Msg("No records to download") - return stats, nil - } - max_records, err := fssync.MaxRecordCount(ctx) - if err != nil { - return stats, fmt.Errorf("Failed to get max records: %w", err) - } - l, err := fieldseeker.NameToLayerType(layer.Name) - if err != nil { - return stats, fmt.Errorf("Failed to get layer for '%s': %w", layer.Name, err) - } - log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Int("count", count.Count).Uint("iterations", uint(count.Count)/uint(max_records)).Msg("Queuing jobs for layer") - for offset := uint(0); offset < uint(count.Count); offset += uint(max_records) { - group.SubmitErr(func() (SyncStats, error) { - var ss SyncStats - var name string - var inserts, unchanged, updates uint - var err error - switch l { - case fieldseeker.LayerAerialSpraySession: - name = "AerialSpraySession" - rows, err := fssync.AerialSpraySession(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateAerialSpraySession(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerAerialSprayLine: - name = "LayerAerialSprayLine" - rows, err := fssync.AerialSprayLine(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateAerialSprayLine(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerBarrierSpray: - name = "LayerBarrierSpray" - rows, err := fssync.BarrierSpray(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateBarrierSpray(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerBarrierSprayRoute: - name = "LayerBarrierSprayRoute" - rows, err := fssync.BarrierSprayRoute(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateBarrierSprayRoute(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerContainerRelate: - name = "LayerContainerRelate" - rows, err := fssync.ContainerRelate(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateContainerRelate(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerFieldScoutingLog: - name = "LayerFieldScoutingLog" - rows, err := fssync.FieldScoutingLog(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateFieldScoutingLog(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerHabitatRelate: - name = "LayerHabitatRelate" - rows, err := fssync.HabitatRelate(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateHabitatRelate(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerInspectionSample: - name = "LayerInspectionSample" - rows, err := fssync.InspectionSample(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateInspectionSample(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerInspectionSampleDetail: - name = "LayerInspectionSampleDetail" - rows, err := fssync.InspectionSampleDetail(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateInspectionSampleDetail(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerLandingCount: - name = "LayerLandingCount" - rows, err := fssync.LandingCount(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateLandingCount(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerLandingCountLocation: - name = "LayerLandingCountLocation" - rows, err := fssync.LandingCountLocation(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateLandingCountLocation(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerLineLocation: - name = "LayerLineLocation" - rows, err := fssync.LineLocation(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateLineLocation(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerLocationTracking: - name = "LayerLocationTracking" - rows, err := fssync.LocationTracking(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateLocationTracking(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerMosquitoInspection: - name = "LayerMosquitoInspection" - rows, err := fssync.MosquitoInspection(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateMosquitoInspection(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerOfflineMapAreas: - name = "LayerOfflineMapAreas" - rows, err := fssync.OfflineMapAreas(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateOfflineMapAreas(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerProposedTreatmentArea: - name = "LayerProposedTreatmentArea" - rows, err := fssync.ProposedTreatmentArea(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateProposedTreatmentArea(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerPointLocation: - name = "LayerPointLocation" - rows, err := fssync.PointLocation(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdatePointLocation(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerPolygonLocation: - name = "LayerPolygonLocation" - rows, err := fssync.PolygonLocation(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdatePolygonLocation(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerPoolDetail: - name = "LayerPoolDetail" - rows, err := fssync.PoolDetail(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdatePoolDetail(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerPool: - name = "LayerPool" - rows, err := fssync.Pool(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdatePool(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerPoolBuffer: - name = "LayerPoolBuffer" - rows, err := fssync.PoolBuffer(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdatePoolBuffer(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerQALarvCount: - name = "LayerQALarvCount" - rows, err := fssync.QALarvCount(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateQALarvCount(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerQAMosquitoInspection: - name = "LayerQAMosquitoInspection" - rows, err := fssync.QAMosquitoInspection(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateQAMosquitoInspection(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerQAProductObservation: - name = "LayerQAProductObservation" - rows, err := fssync.QAProductObservation(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateQAProductObservation(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerRestrictedArea: - name = "LayerRestrictedArea" - rows, err := fssync.RestrictedArea(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateRestrictedArea(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerRodentInspection: - name = "LayerRodentInspection" - rows, err := fssync.RodentInspection(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateRodentInspection(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerRodentLocation: - name = "LayerRodentLocation" - rows, err := fssync.RodentLocation(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateRodentLocation(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerSampleCollection: - name = "LayerSampleCollection" - rows, err := fssync.SampleCollection(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateSampleCollection(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerSampleLocation: - name = "LayerSampleLocation" - rows, err := fssync.SampleLocation(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateSampleLocation(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerServiceRequest: - name = "LayerServiceRequest" - rows, err := fssync.ServiceRequest(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateServiceRequest(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerSpeciesAbundance: - name = "LayerSpeciesAbundance" - rows, err := fssync.SpeciesAbundance(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateSpeciesAbundance(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerStormDrain: - name = "LayerStormDrain" - rows, err := fssync.StormDrain(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateStormDrain(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerTracklog: - name = "LayerTracklog" - rows, err := fssync.Tracklog(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateTracklog(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerTrapLocation: - name = "LayerTrapLocation" - rows, err := fssync.TrapLocation(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateTrapLocation(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerTrapData: - name = "LayerTrapData" - rows, err := fssync.TrapData(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateTrapData(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerTimeCard: - name = "LayerTimeCard" - rows, err := fssync.TimeCard(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateTimeCard(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerTreatment: - name = "LayerTreatment" - rows, err := fssync.Treatment(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateTreatment(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerTreatmentArea: - name = "LayerTreatmentArea" - rows, err := fssync.TreatmentArea(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateTreatmentArea(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerULVSprayRoute: - name = "LayerULVSprayRoute" - rows, err := fssync.ULVSprayRoute(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateULVSprayRoute(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerZones: - name = "LayerZones" - rows, err := fssync.Zones(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateZones(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - case fieldseeker.LayerZones2: - name = "LayerZones2" - rows, err := fssync.Zones2(ctx, offset) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) - } - inserts, updates, err = db.SaveOrUpdateZones2(ctx, org, rows) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) - } - unchanged = uint(len(rows)) - inserts - updates - default: - return ss, errors.New("Unrecognized layer") - } - ss.Inserts = inserts - ss.Updates = updates - ss.Unchanged = unchanged - return ss, err - }) - } - //log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Str("layer", layer.Name).Msg("Finished layer") return stats, nil + /* + count, err := fssync.QueryCount(ctx, layer.ID) + if err != nil { + return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err) + } + if count.Count == 0 { + log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Msg("No records to download") + return stats, nil + } + max_records, err := fssync.MaxRecordCount(ctx) + if err != nil { + return stats, fmt.Errorf("Failed to get max records: %w", err) + } + l, err := fieldseeker.NameToLayerType(layer.Name) + if err != nil { + return stats, fmt.Errorf("Failed to get layer for '%s': %w", layer.Name, err) + } + log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Int("count", count.Count).Uint("iterations", uint(count.Count)/uint(max_records)).Msg("Queuing jobs for layer") + for offset := uint(0); offset < uint(count.Count); offset += uint(max_records) { + group.SubmitErr(func() (SyncStats, error) { + var ss SyncStats + var name string + var inserts, unchanged, updates uint + var err error + switch l { + case fieldseeker.LayerAerialSpraySession: + name = "AerialSpraySession" + rows, err := fssync.AerialSpraySession(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateAerialSpraySession(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerAerialSprayLine: + name = "LayerAerialSprayLine" + rows, err := fssync.AerialSprayLine(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateAerialSprayLine(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerBarrierSpray: + name = "LayerBarrierSpray" + rows, err := fssync.BarrierSpray(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateBarrierSpray(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerBarrierSprayRoute: + name = "LayerBarrierSprayRoute" + rows, err := fssync.BarrierSprayRoute(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateBarrierSprayRoute(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerContainerRelate: + name = "LayerContainerRelate" + rows, err := fssync.ContainerRelate(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateContainerRelate(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerFieldScoutingLog: + name = "LayerFieldScoutingLog" + rows, err := fssync.FieldScoutingLog(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateFieldScoutingLog(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerHabitatRelate: + name = "LayerHabitatRelate" + rows, err := fssync.HabitatRelate(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateHabitatRelate(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerInspectionSample: + name = "LayerInspectionSample" + rows, err := fssync.InspectionSample(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateInspectionSample(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerInspectionSampleDetail: + name = "LayerInspectionSampleDetail" + rows, err := fssync.InspectionSampleDetail(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateInspectionSampleDetail(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerLandingCount: + name = "LayerLandingCount" + rows, err := fssync.LandingCount(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateLandingCount(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerLandingCountLocation: + name = "LayerLandingCountLocation" + rows, err := fssync.LandingCountLocation(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateLandingCountLocation(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerLineLocation: + name = "LayerLineLocation" + rows, err := fssync.LineLocation(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateLineLocation(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerLocationTracking: + name = "LayerLocationTracking" + rows, err := fssync.LocationTracking(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateLocationTracking(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerMosquitoInspection: + name = "LayerMosquitoInspection" + rows, err := fssync.MosquitoInspection(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateMosquitoInspection(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerOfflineMapAreas: + name = "LayerOfflineMapAreas" + rows, err := fssync.OfflineMapAreas(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateOfflineMapAreas(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerProposedTreatmentArea: + name = "LayerProposedTreatmentArea" + rows, err := fssync.ProposedTreatmentArea(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateProposedTreatmentArea(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerPointLocation: + name = "LayerPointLocation" + rows, err := fssync.PointLocation(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdatePointLocation(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerPolygonLocation: + name = "LayerPolygonLocation" + rows, err := fssync.PolygonLocation(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdatePolygonLocation(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerPoolDetail: + name = "LayerPoolDetail" + rows, err := fssync.PoolDetail(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdatePoolDetail(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerPool: + name = "LayerPool" + rows, err := fssync.Pool(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdatePool(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerPoolBuffer: + name = "LayerPoolBuffer" + rows, err := fssync.PoolBuffer(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdatePoolBuffer(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerQALarvCount: + name = "LayerQALarvCount" + rows, err := fssync.QALarvCount(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateQALarvCount(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerQAMosquitoInspection: + name = "LayerQAMosquitoInspection" + rows, err := fssync.QAMosquitoInspection(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateQAMosquitoInspection(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerQAProductObservation: + name = "LayerQAProductObservation" + rows, err := fssync.QAProductObservation(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateQAProductObservation(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerRestrictedArea: + name = "LayerRestrictedArea" + rows, err := fssync.RestrictedArea(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateRestrictedArea(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerRodentInspection: + name = "LayerRodentInspection" + rows, err := fssync.RodentInspection(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateRodentInspection(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerRodentLocation: + name = "LayerRodentLocation" + rows, err := fssync.RodentLocation(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateRodentLocation(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerSampleCollection: + name = "LayerSampleCollection" + rows, err := fssync.SampleCollection(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateSampleCollection(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerSampleLocation: + name = "LayerSampleLocation" + rows, err := fssync.SampleLocation(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateSampleLocation(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerServiceRequest: + name = "LayerServiceRequest" + rows, err := fssync.ServiceRequest(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateServiceRequest(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerSpeciesAbundance: + name = "LayerSpeciesAbundance" + rows, err := fssync.SpeciesAbundance(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateSpeciesAbundance(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerStormDrain: + name = "LayerStormDrain" + rows, err := fssync.StormDrain(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateStormDrain(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerTracklog: + name = "LayerTracklog" + rows, err := fssync.Tracklog(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateTracklog(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerTrapLocation: + name = "LayerTrapLocation" + rows, err := fssync.TrapLocation(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateTrapLocation(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerTrapData: + name = "LayerTrapData" + rows, err := fssync.TrapData(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateTrapData(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerTimeCard: + name = "LayerTimeCard" + rows, err := fssync.TimeCard(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateTimeCard(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerTreatment: + name = "LayerTreatment" + rows, err := fssync.Treatment(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateTreatment(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerTreatmentArea: + name = "LayerTreatmentArea" + rows, err := fssync.TreatmentArea(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateTreatmentArea(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerULVSprayRoute: + name = "LayerULVSprayRoute" + rows, err := fssync.ULVSprayRoute(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateULVSprayRoute(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerZones: + name = "LayerZones" + rows, err := fssync.Zones(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateZones(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + case fieldseeker.LayerZones2: + name = "LayerZones2" + rows, err := fssync.Zones2(ctx, offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateZones2(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + default: + return ss, errors.New("Unrecognized layer") + } + ss.Inserts = inserts + ss.Updates = updates + ss.Unchanged = unchanged + return ss, err + }) + } + //log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Str("layer", layer.Name).Msg("Finished layer") + return stats, nil + */ +} + +func ensureArcgisAccount(ctx context.Context, txn bob.Tx, portal *response.Portal, user *models.User) (*models.ArcgisAccount, error) { + account, err := models.FindArcgisAccount(ctx, txn, portal.User.OrgID) + if err != nil { + log.Warn().Err(err).Msg("need arcgis account?") + if err.Error() == "sql: no rows in result set" { + setter := models.ArcgisAccountSetter{ + ID: omit.From(portal.User.OrgID), + Name: omit.From(portal.Name), + OrganizationID: omit.From(user.OrganizationID), + URLFeatures: omitnull.FromPtr[string](nil), + URLInsights: omitnull.FromPtr[string](nil), + URLGeometry: omitnull.FromPtr[string](nil), + URLNotebooks: omitnull.FromPtr[string](nil), + URLTiles: omitnull.FromPtr[string](nil), + } + account, err = models.ArcgisAccounts.Insert(&setter).One(ctx, txn) + if err != nil { + return nil, fmt.Errorf("create arcgis account: %w", err) + } + } else { + return nil, fmt.Errorf("find arcgis account: %w", err) + } + } + return account, nil } diff --git a/db/fieldseeker.go b/db/fieldseeker.go index 32a173d8..3395e56c 100644 --- a/db/fieldseeker.go +++ b/db/fieldseeker.go @@ -1,5 +1,6 @@ package db +/* import ( "context" "fmt" @@ -19,43 +20,35 @@ import ( func SaveOrUpdateAerialSpraySession(ctx context.Context, org *models.Organization, fs []*fslayer.AerialSpraySession) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring AerialSpraySession data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "AerialSpraySession", "fieldseeker.insert_aerialspraysession", func(row *fslayer.AerialSpraySession) ([]SqlParam, error) { return []SqlParam{ //Uint("p_objectid", row.ObjectID), }, nil }) - */ } func SaveOrUpdateAerialSprayLine(ctx context.Context, org *models.Organization, fs []*fslayer.AerialSprayLine) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring AerialSprayLine data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "AerialSprayLine", "fieldseeker.insert_aerialsprayline", func(row *fslayer.AerialSprayLine) ([]SqlParam, error) { return []SqlParam{ }, nil }) - */ } func SaveOrUpdateBarrierSpray(ctx context.Context, org *models.Organization, fs []*fslayer.BarrierSpray) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring BarrierSpray data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "BarrierSpray", "fieldseeker.insert_barrierspray", func(row *fslayer.BarrierSpray) ([]SqlParam, error) { return []SqlParam{ }, nil }) - */ } func SaveOrUpdateBarrierSprayRoute(ctx context.Context, org *models.Organization, fs []*fslayer.BarrierSprayRoute) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring BarrierSprayRoute data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "BarrierSprayRoute", "fieldseeker.insert_barriersprayroute", func(row *fslayer.BarrierSprayRoute) ([]SqlParam, error) { return []SqlParam{ }, nil }) - */ } func SaveOrUpdateContainerRelate(ctx context.Context, org *models.Organization, fs []*fslayer.ContainerRelate) (inserts uint, updates uint, err error) { return doUpdatesViaFunction(ctx, org, fs, "ContainerRelate", "fieldseeker.insert_containerrelate", func(row *fslayer.ContainerRelate) ([]SqlParam, error) { @@ -205,24 +198,20 @@ func SaveOrUpdateInspectionSampleDetail(ctx context.Context, org *models.Organiz func SaveOrUpdateLandingCount(ctx context.Context, org *models.Organization, fs []*fslayer.LandingCount) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring LandingCount data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "LandingCount", "fieldseeker.insert_landingcount", func(row *fslayer.LandingCount) ([]SqlParam, error) { return []SqlParam{ Uint("p_objectid", row.ObjectID), }, nil }) - */ } func SaveOrUpdateLandingCountLocation(ctx context.Context, org *models.Organization, fs []*fslayer.LandingCountLocation) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring LandingCountLocation data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "LandingCountLocation", "fieldseeker.insert_landingcountlocation", func(row *fslayer.LandingCountLocation) ([]SqlParam, error) { return []SqlParam{ Uint("p_objectid", row.ObjectID), }, nil }) - */ } func SaveOrUpdateLineLocation(ctx context.Context, org *models.Organization, fs []*fslayer.LineLocation) (inserts uint, updates uint, err error) { return doUpdatesViaFunction(ctx, org, fs, "LineLocation", "fieldseeker.insert_linelocation", func(row *fslayer.LineLocation) ([]SqlParam, error) { @@ -385,13 +374,11 @@ func SaveOrUpdateMosquitoInspection(ctx context.Context, org *models.Organizatio func SaveOrUpdateOfflineMapAreas(ctx context.Context, org *models.Organization, fs []*fslayer.OfflineMapAreas) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring OfflineMapAreas data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "OfflineMapAreas", "fieldseeker.insert_offlinemapareas", func(row *fslayer.OfflineMapAreas) ([]SqlParam, error) { return []SqlParam{ Uint("p_objectid", row.ObjectID), }, nil }) - */ } func SaveOrUpdateProposedTreatmentArea(ctx context.Context, org *models.Organization, fs []*fslayer.ProposedTreatmentArea) (inserts uint, updates uint, err error) { return doUpdatesViaFunction(ctx, org, fs, "ProposedTreatmentArea", "fieldseeker.insert_proposedtreatmentarea", func(row *fslayer.ProposedTreatmentArea) ([]SqlParam, error) { @@ -632,24 +619,20 @@ func SaveOrUpdatePool(ctx context.Context, org *models.Organization, fs []*fslay func SaveOrUpdatePoolBuffer(ctx context.Context, org *models.Organization, fs []*fslayer.PoolBuffer) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring PoolBuffer data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "PoolBuffer", "fieldseeker.insert_poolbuffer", func(row *fslayer.PoolBuffer) ([]SqlParam, error) { return []SqlParam{ Uint("p_objectid", row.ObjectID), }, nil }) - */ } func SaveOrUpdateQALarvCount(ctx context.Context, org *models.Organization, fs []*fslayer.QALarvCount) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring QALarvCount data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "QALarvCount", "fieldseeker.insert_qalarvcount", func(row *fslayer.QALarvCount) ([]SqlParam, error) { return []SqlParam{ Uint("p_objectid", row.ObjectID), }, nil }) - */ } func SaveOrUpdateQAMosquitoInspection(ctx context.Context, org *models.Organization, fs []*fslayer.QAMosquitoInspection) (inserts uint, updates uint, err error) { return doUpdatesViaFunction(ctx, org, fs, "QAMosquitoInspection", "fieldseeker.insert_qamosquitoinspection", func(row *fslayer.QAMosquitoInspection) ([]SqlParam, error) { @@ -730,24 +713,20 @@ func SaveOrUpdateQAMosquitoInspection(ctx context.Context, org *models.Organizat func SaveOrUpdateQAProductObservation(ctx context.Context, org *models.Organization, fs []*fslayer.QAProductObservation) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring QAProductObservation data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "QAProductObservation", "fieldseeker.insert_qaproductobservation", func(row *fslayer.QAProductObservation) ([]SqlParam, error) { return []SqlParam{ Uint("p_objectid", row.ObjectID), }, nil }) - */ } func SaveOrUpdateRestrictedArea(ctx context.Context, org *models.Organization, fs []*fslayer.RestrictedArea) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring RestrictedArea data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "RestrictedArea", "fieldseeker.insert_restrictedarea", func(row *fslayer.RestrictedArea) ([]SqlParam, error) { return []SqlParam{ Uint("p_objectid", row.ObjectID), }, nil }) - */ } func SaveOrUpdateRodentInspection(ctx context.Context, org *models.Organization, fs []*fslayer.RodentInspection) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring RodentInspection data") @@ -758,7 +737,6 @@ func SaveOrUpdateRodentInspection(ctx context.Context, org *models.Organization, Uint("p_objectid", row.ObjectID), }, nil }) - */ } func SaveOrUpdateRodentLocation(ctx context.Context, org *models.Organization, fs []*fslayer.RodentLocation) (inserts uint, updates uint, err error) { return doUpdatesViaFunction(ctx, org, fs, "RodentLocation", "fieldseeker.insert_rodentlocation", func(row *fslayer.RodentLocation) ([]SqlParam, error) { @@ -1076,13 +1054,11 @@ func SaveOrUpdateStormDrain(ctx context.Context, org *models.Organization, fs [] func SaveOrUpdateTracklog(ctx context.Context, org *models.Organization, fs []*fslayer.Tracklog) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring RodentInspection data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "Tracklog", "fieldseeker.insert_tracklog", func(row *fslayer.Tracklog) ([]SqlParam, error) { return []SqlParam{ Uint("p_objectid", row.ObjectID), }, nil }) - */ } func SaveOrUpdateTrapLocation(ctx context.Context, org *models.Organization, fs []*fslayer.TrapLocation) (inserts uint, updates uint, err error) { return doUpdatesViaFunction(ctx, org, fs, "TrapLocation", "fieldseeker.insert_traplocation", func(row *fslayer.TrapLocation) ([]SqlParam, error) { @@ -1328,13 +1304,11 @@ func SaveOrUpdateTreatmentArea(ctx context.Context, org *models.Organization, fs func SaveOrUpdateULVSprayRoute(ctx context.Context, org *models.Organization, fs []*fslayer.ULVSprayRoute) (inserts uint, updates uint, err error) { log.Warn().Msg("Ignoring RodentInspection data") return 0, 0, nil - /* return doUpdatesViaFunction(ctx, org, fs, "ULVSprayRoute", "fieldseeker.insert_ulvsprayroute", func(row *fslayer.ULVSprayRoute) ([]SqlParam, error) { return []SqlParam{ Uint("p_objectid", row.ObjectID), }, nil }) - */ } func SaveOrUpdateZones(ctx context.Context, org *models.Organization, fs []*fslayer.Zones) (inserts uint, updates uint, err error) { return doUpdatesViaFunction(ctx, org, fs, "Zones", "fieldseeker.insert_zones", func(row *fslayer.Zones) ([]SqlParam, error) { @@ -1442,3 +1416,4 @@ func toUUID(u googleuuid.UUID) omitnull.Val[uuid.UUID] { func toObjectID(o uint) omit.Val[int64] { return omit.From[int64](int64(o)) } +*/ diff --git a/db/prepared.go b/db/prepared.go index f66277ba..33b34758 100644 --- a/db/prepared.go +++ b/db/prepared.go @@ -10,6 +10,7 @@ import ( "time" fslayer "github.com/Gleipnir-Technology/arcgis-go/fieldseeker/layer" + "github.com/Gleipnir-Technology/arcgis-go/response" "github.com/Gleipnir-Technology/bob" "github.com/Gleipnir-Technology/bob/dialect/psql" "github.com/google/uuid" @@ -81,48 +82,49 @@ func TestPreparedQueryOld(ctx context.Context) error { return nil } func TestPreparedQuery(ctx context.Context, row *fslayer.RodentLocation) error { - q := queryStoredProcedure("fieldseeker.insert_rodentlocation", - Uint("p_objectid", row.ObjectID), - String("p_locationname", row.LocationName), - String("p_zone", row.Zone), - String("p_zone2", row.Zone2), - String("p_habitat", row.Habitat), - String("p_priority", row.Priority), - String("p_usetype", row.Usetype), - Int16("p_active", row.Active), - String("p_description", row.Description), - String("p_accessdesc", row.Accessdesc), - String("p_comments", row.Comments), - String("p_symbology", row.Symbology), - String("p_externalid", row.ExternalID), - Timestamp("p_nextactiondatescheduled", row.Nextactiondatescheduled), - Int32("p_locationnumber", row.Locationnumber), - Timestamp("p_lastinspectdate", row.LastInspectionDate), - String("p_lastinspectspecies", row.LastInspectionSpecies), - String("p_lastinspectaction", row.LastInspectionAction), - String("p_lastinspectconditions", row.LastInspectionConditions), - String("p_lastinspectrodentevidence", row.LastInspectionRodentEvidence), - UUID("p_globalid", row.GlobalID), - String("p_created_user", row.CreatedUser), - Timestamp("p_created_date", row.CreatedDate), - String("p_last_edited_user", row.LastEditedUser), - Timestamp("p_last_edited_date", row.LastEditedDate), - Timestamp("p_creationdate", row.CreationDate), - String("p_creator", row.Creator), - Timestamp("p_editdate", row.EditDate), - String("p_editor", row.Editor), - String("p_jurisdiction", row.Jurisdiction), - ) - query := psql.RawQuery(q) - log.Info().Str("query", q).Msg("querying") - result, err := bob.One[InsertResultRow](ctx, PGInstance.BobDB, query, scan.StructMapper[InsertResultRow]()) - if err != nil { - return fmt.Errorf("Failed to execute test function: %w", err) - } - //log.Info().Int("version", result.NextVersion).Msg("got result") - //log.Info().Bool("added", result.Row.Added).Int("version", result.Row.Version).Msg("done") - log.Info().Bool("inserted", result.Inserted).Int("version", result.Version).Msg("done") - + /* + q := queryStoredProcedure("fieldseeker.insert_rodentlocation", + Uint("p_objectid", row.ObjectID), + String("p_locationname", row.LocationName), + String("p_zone", row.Zone), + String("p_zone2", row.Zone2), + String("p_habitat", row.Habitat), + String("p_priority", row.Priority), + String("p_usetype", row.Usetype), + Int16("p_active", row.Active), + String("p_description", row.Description), + String("p_accessdesc", row.Accessdesc), + String("p_comments", row.Comments), + String("p_symbology", row.Symbology), + String("p_externalid", row.ExternalID), + Timestamp("p_nextactiondatescheduled", row.Nextactiondatescheduled), + Int32("p_locationnumber", row.Locationnumber), + Timestamp("p_lastinspectdate", row.LastInspectionDate), + String("p_lastinspectspecies", row.LastInspectionSpecies), + String("p_lastinspectaction", row.LastInspectionAction), + String("p_lastinspectconditions", row.LastInspectionConditions), + String("p_lastinspectrodentevidence", row.LastInspectionRodentEvidence), + UUID("p_globalid", row.GlobalID), + String("p_created_user", row.CreatedUser), + Timestamp("p_created_date", row.CreatedDate), + String("p_last_edited_user", row.LastEditedUser), + Timestamp("p_last_edited_date", row.LastEditedDate), + Timestamp("p_creationdate", row.CreationDate), + String("p_creator", row.Creator), + Timestamp("p_editdate", row.EditDate), + String("p_editor", row.Editor), + String("p_jurisdiction", row.Jurisdiction), + ) + query := psql.RawQuery(q) + log.Info().Str("query", q).Msg("querying") + result, err := bob.One[InsertResultRow](ctx, PGInstance.BobDB, query, scan.StructMapper[InsertResultRow]()) + if err != nil { + return fmt.Errorf("Failed to execute test function: %w", err) + } + //log.Info().Int("version", result.NextVersion).Msg("got result") + //log.Info().Bool("added", result.Row.Added).Int("version", result.Row.Version).Msg("done") + log.Info().Bool("inserted", result.Inserted).Int("version", result.Version).Msg("done") + */ return nil } @@ -511,16 +513,22 @@ func lineOrNull(msg json.RawMessage) (SqlParam, error) { return GISLine("p_geospatial", geo, 3857), nil } -func pointOrNull(msg json.RawMessage) (SqlParam, error) { - // Surprisingly some geos are actually empty - if len(msg) == 0 { - return NullParam{"p_geospatial"}, nil +func pointOrNull(geo response.Geometry) (SqlParam, error) { + switch geo.Type() { + case "esriGeometryPoint": + p, ok := geo.(response.Point) + if !ok { + return nil, fmt.Errorf("point that isn't a point") + } + return GISPoint("p_geospatial", GeometryPoint{ + X: p.X, + Y: p.Y, + }, 3857), nil + default: + log.Warn().Str("type", geo.Type()).Msg("expected point, got something else") + return nil, nil } - geo, err := parsePoint(msg) - if err != nil { - return NullParam{"p_geospatial"}, fmt.Errorf("Failed to pepare GISPoint: %w", err) - } - return GISPoint("p_geospatial", geo, 3857), nil + } func polygonOrNull(msg json.RawMessage) (SqlParam, error) { diff --git a/db/sql/org_by_oauth_id.bob.sql b/db/sql/org_by_oauth_id.bob.sql index 800e0e6e..327bb192 100644 --- a/db/sql/org_by_oauth_id.bob.sql +++ b/db/sql/org_by_oauth_id.bob.sql @@ -2,8 +2,10 @@ -- This file is meant to be re-generated in place and/or deleted at any time. -- OrgByOauthId -SELECT o.id AS organization_id, o.arcgis_id AS arcgis_id, o.fieldseeker_url AS fieldseeker_url -FROM oauth_token ot +SELECT o.id AS organization_id, aa.id AS arcgis_id, asf.url AS fieldseeker_url +FROM arcgis.oauth_token ot JOIN user_ u ON ot.user_id = u.id JOIN organization o ON u.organization_id = o.id +JOIN arcgis.account aa ON aa.id = o.arcgis_account_id +JOIN arcgis.service_feature asf ON asf.item_id = o.fieldseeker_service_feature_item_id WHERE ot.id = $1; diff --git a/db/sql/org_by_oauth_id.sql b/db/sql/org_by_oauth_id.sql index d38ff980..87080135 100644 --- a/db/sql/org_by_oauth_id.sql +++ b/db/sql/org_by_oauth_id.sql @@ -1,6 +1,8 @@ -- OrgByOauthId -SELECT o.id AS organization_id, o.arcgis_id AS arcgis_id, o.fieldseeker_url AS fieldseeker_url -FROM oauth_token ot +SELECT o.id AS organization_id, aa.id AS arcgis_id, asf.url AS fieldseeker_url +FROM arcgis.oauth_token ot JOIN user_ u ON ot.user_id = u.id JOIN organization o ON u.organization_id = o.id +JOIN arcgis.account aa ON aa.id = o.arcgis_account_id +JOIN arcgis.service_feature asf ON asf.item_id = o.fieldseeker_service_feature_item_id WHERE ot.id = $1