From 8e68230f4a9fe1d2a497329ed8e9ae07f9f53d96 Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Thu, 12 Feb 2026 21:05:51 +0000 Subject: [PATCH] Handle changes to arcgis-go --- background/arcgis.go | 126 ++++++++++++++++++++++++++----------------- main.go | 5 +- 2 files changed, 82 insertions(+), 49 deletions(-) diff --git a/background/arcgis.go b/background/arcgis.go index cab3698c..c96a4e95 100644 --- a/background/arcgis.go +++ b/background/arcgis.go @@ -157,7 +157,7 @@ func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) { err := maintainOAuth(workerCtx, oauth) if err != nil { markTokenFailed(ctx, oauth) - if errors.Is(err, arcgis.ErrorInvalidatedRefreshToken) { + if errors.Is(err, arcgis.ErrorInvalidRefreshToken) { log.Info().Int("oauth_token.id", int(oauth.ID)).Msg("Marked invalid by the server") } else { debug.LogErrorTypeInfo(err) @@ -208,7 +208,12 @@ type SyncStats struct { } func downloadFieldseekerSchema(ctx context.Context, fieldseekerClient *fieldseeker.FieldSeeker, arcgis_id string) { - for _, layer := range fieldseekerClient.FeatureServerLayers() { + layers, err := fieldseekerClient.FeatureServerLayers(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to get fieldseeker layers") + return + } + for _, layer := range layers { err := os.MkdirAll(filepath.Join(config.FieldseekerSchemaDirectory, arcgis_id), os.ModePerm) if err != nil { log.Error().Err(err).Msg("Failed to create parent directory") @@ -220,7 +225,7 @@ func downloadFieldseekerSchema(ctx context.Context, fieldseekerClient *fieldseek return } defer output.Close() - schema, err := fieldseekerClient.Schema(ctx, layer.ID) + schema, err := fieldseekerClient.SchemaRaw(ctx, layer.ID) if err != nil { log.Error().Err(err).Msg("Failed to get schema") return @@ -271,16 +276,20 @@ func generateCodeVerifier() string { // Find out what we can about this user func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models.OauthToken) { - client := arcgis.NewArcGIS( + client, err := arcgis.NewArcGIS( + ctx, + nil, arcgis.AuthenticatorOAuth{ AccessToken: oauth.AccessToken, AccessTokenExpires: oauth.AccessTokenExpires, RefreshToken: oauth.RefreshToken, RefreshTokenExpires: oauth.RefreshTokenExpires, }, - nil, - nil, ) + if err != nil { + log.Error().Err(err).Msg("Failed to create ArcGIS client") + return + } portal, err := updatePortalData(ctx, client, user.ID) if err != nil { @@ -334,12 +343,12 @@ func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models. return } - search, err := client.Search(ctx, "Fieldseeker") + search, err := client.Search(ctx, "FieldseekerGIS") if err != nil { log.Error().Err(err).Msg("Failed to get search FieldseekerGIS data") return } - var fieldseekerClient *fieldseeker.FieldSeeker + var fieldseeker_client *fieldseeker.FieldSeeker for _, result := range search.Results { log.Info().Str("name", result.Name).Msg("Got result") if result.Name == "FieldSeekerGIS" { @@ -352,9 +361,9 @@ func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models. log.Error().Err(err).Msg("Failed to create new organization") return } - fieldseekerClient, err = newFieldSeeker(ctx, oauth) + fieldseeker_client, err = NewFieldSeeker(ctx, oauth) if err != nil { - log.Error().Err(err).Msg("Failed to create fieldseeker client") + log.Error().Err(err).Msg("Failed to create new fieldseeker") return } } @@ -363,13 +372,13 @@ func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models. if !ok { log.Error().Int("org.id", int(org.ID)).Msg("Cannot get webhooks - ArcGIS ID is null") } - maybeCreateWebhook(ctx, fieldseekerClient) - downloadFieldseekerSchema(ctx, fieldseekerClient, arcgis_id) + maybeCreateWebhook(ctx, fieldseeker_client) + downloadFieldseekerSchema(ctx, fieldseeker_client, arcgis_id) notification.ClearOauth(ctx, user) newOAuthTokenChannel <- struct{}{} } -func newFieldSeeker(ctx context.Context, oauth *models.OauthToken) (*fieldseeker.FieldSeeker, error) { +func NewFieldSeeker(ctx context.Context, oauth *models.OauthToken) (*fieldseeker.FieldSeeker, error) { row, err := sql.OrgByOauthId(oauth.ID).One(ctx, db.PGInstance.BobDB) if err != nil { return nil, fmt.Errorf("Failed to get org ID: %w", err) @@ -385,24 +394,35 @@ func newFieldSeeker(ctx context.Context, oauth *models.OauthToken) (*fieldseeker return nil, errors.New("Didn't get enough path parts") } context := pathParts[0] - ar := arcgis.NewArcGIS( + ar, err := arcgis.NewArcGIS( + ctx, + &host, arcgis.AuthenticatorOAuth{ AccessToken: oauth.AccessToken, AccessTokenExpires: oauth.AccessTokenExpires, RefreshToken: oauth.RefreshToken, RefreshTokenExpires: oauth.RefreshTokenExpires, }, - &context, - &host, ) + if err != nil { + return nil, fmt.Errorf("Failed to create ArcGIS client: %w", err) + } log.Info().Str("context", context).Str("host", host).Msg("Using base fieldseeker URL") - //ar.Context = &context - //ar.Host = host - return fieldseeker.NewFieldSeeker( - ctx, - ar, - row.FieldseekerURL.MustGet(), - ) + fssync, err := fieldseeker.NewFieldSeeker(ar, row.FieldseekerURL.MustGet()) + if err != nil { + return nil, fmt.Errorf("Failed to create Fieldseeker client: %w", err) + } + // check that the authentication is valid + _, err = fssync.FeatureServerLayers(ctx) + if err != nil { + if errors.Is(err, arcgis.ErrorInvalidAuthToken) { + return nil, InvalidatedTokenError{} + } else if errors.Is(err, arcgis.ErrorInvalidRefreshToken) { + return nil, InvalidatedTokenError{} + } + return nil, fmt.Errorf("Unrecognized error checking oauth validity: %w", err) + } + return fssync, nil } func updatePortalData(ctx context.Context, client *arcgis.ArcGIS, user_id int32) (*arcgis.PortalsResponse, error) { p, err := client.PortalsSelf(ctx) @@ -484,7 +504,11 @@ func maybeCreateWebhook(ctx context.Context, client *fieldseeker.FieldSeeker) { log.Error().Err(err).Msg("Failed to get webhooks") return } - for _, hook := range webhooks { + if webhooks == nil { + log.Error().Msg("nil webhooks") + return + } + for _, hook := range *webhooks { if hook.Name == "Nidus Sync" { log.Info().Msg("Found nidus sync hook") } else { @@ -505,8 +529,16 @@ func periodicallyExportFieldseeker(ctx context.Context, org *models.Organization if err != nil { return fmt.Errorf("Failed to get oauth for org: %w", err) } - logPermissions(ctx, org, oauth) - err = exportFieldseekerData(ctx, org, oauth) + fssync, err := NewFieldSeeker( + ctx, + oauth, + ) + if err != nil { + return fmt.Errorf("Failed to create fieldseeker client: %w", err) + } + logPermissions(ctx, fssync) + syncStatusByOrg[org.ID] = true + err = exportFieldseekerData(ctx, fssync, org) syncStatusByOrg[org.ID] = false if err != nil { return fmt.Errorf("Failed to export Fieldseeker data: %w", err) @@ -515,20 +547,19 @@ func periodicallyExportFieldseeker(ctx context.Context, org *models.Organization } } } -func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth *models.OauthToken) error { +func exportFieldseekerData(ctx context.Context, fssync *fieldseeker.FieldSeeker, org *models.Organization) error { log.Info().Msg("Update Fieldseeker data") - syncStatusByOrg[org.ID] = true var err error - fssync, err := newFieldSeeker(ctx, oauth) - if err != nil { - return fmt.Errorf("Failed to create fssync: %w", err) - } var stats SyncStats pool := pond.NewResultPool[SyncStats](20) group := pool.NewGroup() var ss SyncStats - for _, l := range fssync.FeatureServerLayers() { + layers, err := fssync.FeatureServerLayers(ctx) + if err != nil { + return fmt.Errorf("Failed to get layers: %w", err) + } + for _, l := range layers { ss, err = exportFieldseekerLayer(ctx, group, org, fssync, l) if err != nil { return err @@ -561,7 +592,7 @@ func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth return nil } -func logPermissions(ctx context.Context, org *models.Organization, oauth *models.OauthToken) { +func logPermissions(ctx context.Context, fssync *fieldseeker.FieldSeeker) { /*row, err := sql.OrgByOauthId(oauth.ID).One(ctx, db.PGInstance.BobDB) if err != nil { log.Error().Err(err).Msg("Failed to get org in log permissions") @@ -573,15 +604,7 @@ func logPermissions(ctx context.Context, org *models.Organization, oauth *models } */ - fssync, err := newFieldSeeker( - ctx, - oauth, - ) - if err != nil { - log.Error().Err(err).Msg("Failed to create fieldseeker client in log permissions") - return - } - _, err = fssync.AdminInfo(ctx) + _, err := fssync.AdminInfo(ctx) if err != nil { if errors.Is(err, arcgis.ErrorNotPermitted) { log.Info().Msg("This oauth token is not allowed to query for admin info") @@ -595,7 +618,11 @@ func logPermissions(ctx context.Context, org *models.Organization, oauth *models log.Error().Err(err).Msg("Failed to query permissions in log permissions") return } - for _, p := range permissions { + if permissions == nil { + log.Error().Msg("nil permissions") + return + } + for _, p := range *permissions { log.Info().Str("p", p.Principal).Msg("Permission!") } } @@ -808,7 +835,7 @@ func saveResponse(data []byte, filename string) { } /* -func saveRawQuery(fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, query *arcgis.Query, filename string) { +func saveRawQuery(fssync fieldseeker.FieldSeeker, layer arcgis.LayerFeature, query *arcgis.Query, filename string) { output, err := os.Create(filename) if err != nil { log.Error().Str("filename", filename).Msg("Failed to create file") @@ -1175,13 +1202,16 @@ func exportFieldseekerLayer(ctx context.Context, group pond.ResultTaskGroup[Sync log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Msg("No records to download") return stats, nil } - maxRecords := uint(fssync.MaxRecordCount()) + max_records, err := fssync.MaxRecordCount(ctx) + if err != nil { + return stats, fmt.Errorf("Failed to get max records: %w", err) + } l, err := fieldseeker.NameToLayerType(layer.Name) if err != nil { return stats, fmt.Errorf("Failed to get layer for '%s': %w", layer.Name, err) } - log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Int("count", count.Count).Uint("iterations", uint(count.Count)/maxRecords).Msg("Queuing jobs for layer") - for offset := uint(0); offset < uint(count.Count); offset += maxRecords { + log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Int("count", count.Count).Uint("iterations", uint(count.Count)/uint(max_records)).Msg("Queuing jobs for layer") + for offset := uint(0); offset < uint(count.Count); offset += uint(max_records) { group.SubmitErr(func() (SyncStats, error) { var ss SyncStats var name string diff --git a/main.go b/main.go index 9c40d266..cf70739a 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "syscall" "time" + "github.com/Gleipnir-Technology/arcgis-go" "github.com/Gleipnir-Technology/nidus-sync/auth" "github.com/Gleipnir-Technology/nidus-sync/background" "github.com/Gleipnir-Technology/nidus-sync/config" @@ -143,7 +144,9 @@ func main() { log.Error().Err(err).Msg("Failed to start openAI client") os.Exit(8) } - background.Start(ctx) + custom_logger := log.With().Logger().Level(zerolog.InfoLevel) + background_ctx := arcgis.WithLogger(ctx, custom_logger) + background.Start(background_ctx) server := &http.Server{ Addr: config.Bind, Handler: r,