diff --git a/background/arcgis.go b/background/arcgis.go index 6a32ec53..cf6836a1 100644 --- a/background/arcgis.go +++ b/background/arcgis.go @@ -39,6 +39,8 @@ import ( "github.com/stephenafamo/bob/dialect/psql" "github.com/stephenafamo/bob/dialect/psql/dialect" "github.com/stephenafamo/bob/dialect/psql/im" + "github.com/stephenafamo/bob/dialect/psql/sm" + "github.com/stephenafamo/bob/dialect/psql/um" "github.com/uber/h3-go/v4" ) @@ -245,18 +247,49 @@ func updateArcgisUserData(ctx context.Context, user *models.User, access_token s } log.Info().Str("Username", portal.User.Username).Str("user_id", portal.User.ID).Str("org_id", portal.User.OrgID).Str("org_name", portal.Name).Str("license_type_id", portal.User.UserLicenseTypeID).Msg("Got portals data") - _, err = sql.UpdateOauthTokenOrg(portal.User.ID, portal.User.UserLicenseTypeID, refresh_token).Exec(ctx, db.PGInstance.BobDB) + _, err = models.OauthTokens.Update( + //um.SetCol(string(models.OauthTokens.Columns.ArcgisID)).ToArg(portal.User.ID), + //um.SetCol(string(models.OauthTokens.Columns.ArcgisLicenseTypeID)).ToArg(portal.User.UserLicenseTypeID), + um.SetCol("arcgis_id").ToArg(portal.User.ID), + um.SetCol("arcgis_license_type_id").ToArg(portal.User.UserLicenseTypeID), + um.Where(models.OauthTokens.Columns.RefreshToken.EQ(psql.Arg(refresh_token))), + ).Exec(ctx, db.PGInstance.BobDB) + //_, err = sql.UpdateOauthTokenOrg(portal.User.ID, portal.User.UserLicenseTypeID, refresh_token).Exec(ctx, db.PGInstance.BobDB) if err != nil { log.Error().Err(err).Msg("Failed to update oauth token portal data") return } - org := user.R.Organization - err = org.Update(ctx, db.PGInstance.BobDB, &models.OrganizationSetter{ - ArcgisID: omitnull.From(portal.User.OrgID), - ArcgisName: omitnull.From(portal.Name), - }) + // At this point we have the arcgis ID. If the ID matches an existing ID, join it with the users organization + orgs, err := models.Organizations.Query( + sm.Where( + psql.Quote("arcgis_id").EQ(psql.Arg(portal.User.OrgID)), + ), + ).All(ctx, db.PGInstance.BobDB) if err != nil { - log.Error().Err(err).Int32("id", user.R.Organization.ID).Msg("Failed to update organization's arcgis info") + log.Error().Err(err).Str("arcgis_id", portal.User.OrgID).Msg("Failed to search for organization with arcgis id") + return + } + var org *models.Organization + switch len(orgs) { + case 0: + org = user.R.Organization + err = org.Update(ctx, db.PGInstance.BobDB, &models.OrganizationSetter{ + ArcgisID: omitnull.From(portal.User.OrgID), + ArcgisName: omitnull.From(portal.Name), + }) + if err != nil { + log.Error().Err(err).Int32("id", user.R.Organization.ID).Msg("Failed to update organization's arcgis info") + return + } + log.Info().Int32("org_id", org.ID).Str("arcgis_id", portal.User.OrgID).Msg("Updated org arcgis ID") + case 1: + org = orgs[0] + user.Update(ctx, db.PGInstance.BobDB, &models.UserSetter{ + OrganizationID: omit.From(org.ID), + }) + log.Info().Int32("org_id", org.ID).Int32("user_id", user.ID).Msg("Moved user into organization") + default: + log.Warn().Int("orgs", len(orgs)).Msg("Got too many orgs, programmer error.") return } @@ -313,67 +346,6 @@ func maybeCreateWebhook(ctx context.Context, client *fieldseeker.FieldSeeker) { } } -func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (SyncStats, error) { - var stats SyncStats - count, err := fssync.QueryCount(layer.ID) - if err != nil { - return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err) - } - log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("Starting on layer") - if count.Count == 0 { - return stats, nil - } - pool := pond.NewResultPool[SyncStats](20) - group := pool.NewGroup() - maxRecords := uint(fssync.MaxRecordCount()) - for offset := uint(0); offset < uint(count.Count); offset += maxRecords { - group.SubmitErr(func() (SyncStats, error) { - /*query := arcgis.NewQuery() - query.ResultRecordCount = maxRecords - query.ResultOffset = offset - query.SpatialReference = "4326" - query.OutFields = "*" - query.Where = "1=1" - qr, err := fssync.DoQuery( - layer.ID, - query) - if err != nil { - return SyncStats{}, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %w", layer.Name, layer.ID, offset, err) - } - - i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr, org_id) - if err != nil { - filename := fmt.Sprintf("failure-%s-%d-%d.json", layer.Name, layer.ID, offset) - saveRawQuery(fssync, layer, query, filename) - log.Error().Err(err).Msg("Faild to save DB records") - return SyncStats{}, fmt.Errorf("Failed to save records: %w", err) - } - return SyncStats{ - Inserts: i, - Updates: u, - Unchanged: len(qr.Features) - u - i, - }, nil - */ - return SyncStats{ - Inserts: 0, - Updates: 0, - Unchanged: 0, - }, nil - }) - } - results, err := group.Wait() - if err != nil { - return stats, fmt.Errorf("one or more tasks in the work pool failed: %w", err) - } - for _, r := range results { - stats.Inserts += r.Inserts - stats.Updates += r.Updates - stats.Unchanged += r.Unchanged - } - log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Msg("Finished layer") - return stats, nil -} - func getOAuthForOrg(ctx context.Context, org *models.Organization) (*models.OauthToken, error) { users, err := org.User().All(ctx, db.PGInstance.BobDB) if err != nil { @@ -402,6 +374,7 @@ func periodicallyExportFieldseeker(ctx context.Context, org *models.Organization if err != nil { return fmt.Errorf("Failed to get oauth for org: %w", err) } + logPermissions(ctx, org, oauth) err = exportFieldseekerData(ctx, org, oauth) syncStatusByOrg[org.ID] = false if err != nil { @@ -436,11 +409,12 @@ func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth return fmt.Errorf("Failed to create fssync: %w", err) } var stats SyncStats - //layers := fssync.FeatureServerLayers() + pool := pond.NewResultPool[SyncStats](20) + group := pool.NewGroup() var ss SyncStats for _, l := range fssync.FeatureServerLayers() { - ss, err = exportFieldseekerLayer(ctx, org, fssync, l) + ss, err = exportFieldseekerLayer(ctx, group, org, fssync, l) if err != nil { return err } @@ -448,6 +422,15 @@ func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth stats.Updates += ss.Updates stats.Unchanged += ss.Unchanged } + results, err := group.Wait() + if err != nil { + return fmt.Errorf("one or more tasks in the work pool failed: %w", err) + } + for _, r := range results { + stats.Inserts += r.Inserts + stats.Updates += r.Updates + stats.Unchanged += r.Unchanged + } setter := models.FieldseekerSyncSetter{ RecordsCreated: omit.From(int32(stats.Inserts)), @@ -463,6 +446,37 @@ func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth return nil } +func logPermissions(ctx context.Context, org *models.Organization, oauth *models.OauthToken) { + ar := arcgis.NewArcGIS( + arcgis.AuthenticatorOAuth{ + AccessToken: oauth.AccessToken, + AccessTokenExpires: oauth.AccessTokenExpires, + RefreshToken: oauth.RefreshToken, + RefreshTokenExpires: oauth.RefreshTokenExpires, + }, + ) + row, err := sql.OrgByOauthId(oauth.ID).One(ctx, db.PGInstance.BobDB) + if err != nil { + log.Error().Err(err).Msg("Failed to get org in log permissions") + return + } + fssync, err := fieldseeker.NewFieldSeeker( + ar, + row.FieldseekerURL.MustGet(), + ) + if err != nil { + log.Error().Err(err).Msg("Failed to create fieldseeker client in log permissions") + return + } + permissions, err := fssync.PermissionList() + if err != nil { + log.Error().Err(err).Msg("Failed to query permissions in log permissions") + return + } + for _, p := range permissions { + log.Info().Str("p", p.Principal).Msg("Permission!") + } +} func maintainOAuth(ctx context.Context, oauth *models.OauthToken) error { for { // Refresh from the database @@ -1086,24 +1100,22 @@ func updateSummaryTables(ctx context.Context, org *models.Organization) { } } -func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature) (SyncStats, error) { +func exportFieldseekerLayer(ctx context.Context, group pond.ResultTaskGroup[SyncStats], org *models.Organization, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature) (SyncStats, error) { var stats SyncStats count, err := fssync.QueryCount(layer.ID) if err != nil { return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err) } if count.Count == 0 { - log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("No records to download") + log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Msg("No records to download") return stats, nil } - log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("Starting on layer") - pool := pond.NewResultPool[SyncStats](20) - group := pool.NewGroup() maxRecords := uint(fssync.MaxRecordCount()) l, err := fieldseeker.NameToLayerType(layer.Name) if err != nil { return stats, fmt.Errorf("Failed to get layer for '%s': %w", layer.Name, err) } + log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Int("count", count.Count).Uint("iterations", uint(count.Count)/maxRecords).Msg("Queuing jobs for layer") for offset := uint(0); offset < uint(count.Count); offset += maxRecords { group.SubmitErr(func() (SyncStats, error) { var ss SyncStats @@ -1571,15 +1583,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn return ss, err }) } - results, err := group.Wait() - if err != nil { - return stats, fmt.Errorf("one or more tasks in the work pool failed: %w", err) - } - for _, r := range results { - stats.Inserts += r.Inserts - stats.Updates += r.Updates - stats.Unchanged += r.Unchanged - } - log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Str("layer", layer.Name).Msg("Finished layer") + //log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Str("layer", layer.Name).Msg("Finished layer") return stats, nil }