Rework arcgis oauth flow logic
This is several changes after the demo with Ben * When a user adds their oauth and they get an arcgis ID for an organization that exits they are added to that organization. The previous org isn't removed * All layer processing is done in a single large pool. This makes it much faster in aggregate * Some queries are done more directly instead of through custom sql
This commit is contained in:
parent
92294e5b16
commit
248cffd323
1 changed files with 88 additions and 85 deletions
|
|
@ -39,6 +39,8 @@ import (
|
|||
"github.com/stephenafamo/bob/dialect/psql"
|
||||
"github.com/stephenafamo/bob/dialect/psql/dialect"
|
||||
"github.com/stephenafamo/bob/dialect/psql/im"
|
||||
"github.com/stephenafamo/bob/dialect/psql/sm"
|
||||
"github.com/stephenafamo/bob/dialect/psql/um"
|
||||
"github.com/uber/h3-go/v4"
|
||||
)
|
||||
|
||||
|
|
@ -245,18 +247,49 @@ func updateArcgisUserData(ctx context.Context, user *models.User, access_token s
|
|||
}
|
||||
log.Info().Str("Username", portal.User.Username).Str("user_id", portal.User.ID).Str("org_id", portal.User.OrgID).Str("org_name", portal.Name).Str("license_type_id", portal.User.UserLicenseTypeID).Msg("Got portals data")
|
||||
|
||||
_, err = sql.UpdateOauthTokenOrg(portal.User.ID, portal.User.UserLicenseTypeID, refresh_token).Exec(ctx, db.PGInstance.BobDB)
|
||||
_, err = models.OauthTokens.Update(
|
||||
//um.SetCol(string(models.OauthTokens.Columns.ArcgisID)).ToArg(portal.User.ID),
|
||||
//um.SetCol(string(models.OauthTokens.Columns.ArcgisLicenseTypeID)).ToArg(portal.User.UserLicenseTypeID),
|
||||
um.SetCol("arcgis_id").ToArg(portal.User.ID),
|
||||
um.SetCol("arcgis_license_type_id").ToArg(portal.User.UserLicenseTypeID),
|
||||
um.Where(models.OauthTokens.Columns.RefreshToken.EQ(psql.Arg(refresh_token))),
|
||||
).Exec(ctx, db.PGInstance.BobDB)
|
||||
//_, err = sql.UpdateOauthTokenOrg(portal.User.ID, portal.User.UserLicenseTypeID, refresh_token).Exec(ctx, db.PGInstance.BobDB)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to update oauth token portal data")
|
||||
return
|
||||
}
|
||||
org := user.R.Organization
|
||||
err = org.Update(ctx, db.PGInstance.BobDB, &models.OrganizationSetter{
|
||||
ArcgisID: omitnull.From(portal.User.OrgID),
|
||||
ArcgisName: omitnull.From(portal.Name),
|
||||
})
|
||||
// At this point we have the arcgis ID. If the ID matches an existing ID, join it with the users organization
|
||||
orgs, err := models.Organizations.Query(
|
||||
sm.Where(
|
||||
psql.Quote("arcgis_id").EQ(psql.Arg(portal.User.OrgID)),
|
||||
),
|
||||
).All(ctx, db.PGInstance.BobDB)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Int32("id", user.R.Organization.ID).Msg("Failed to update organization's arcgis info")
|
||||
log.Error().Err(err).Str("arcgis_id", portal.User.OrgID).Msg("Failed to search for organization with arcgis id")
|
||||
return
|
||||
}
|
||||
var org *models.Organization
|
||||
switch len(orgs) {
|
||||
case 0:
|
||||
org = user.R.Organization
|
||||
err = org.Update(ctx, db.PGInstance.BobDB, &models.OrganizationSetter{
|
||||
ArcgisID: omitnull.From(portal.User.OrgID),
|
||||
ArcgisName: omitnull.From(portal.Name),
|
||||
})
|
||||
if err != nil {
|
||||
log.Error().Err(err).Int32("id", user.R.Organization.ID).Msg("Failed to update organization's arcgis info")
|
||||
return
|
||||
}
|
||||
log.Info().Int32("org_id", org.ID).Str("arcgis_id", portal.User.OrgID).Msg("Updated org arcgis ID")
|
||||
case 1:
|
||||
org = orgs[0]
|
||||
user.Update(ctx, db.PGInstance.BobDB, &models.UserSetter{
|
||||
OrganizationID: omit.From(org.ID),
|
||||
})
|
||||
log.Info().Int32("org_id", org.ID).Int32("user_id", user.ID).Msg("Moved user into organization")
|
||||
default:
|
||||
log.Warn().Int("orgs", len(orgs)).Msg("Got too many orgs, programmer error.")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -313,67 +346,6 @@ func maybeCreateWebhook(ctx context.Context, client *fieldseeker.FieldSeeker) {
|
|||
}
|
||||
}
|
||||
|
||||
func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (SyncStats, error) {
|
||||
var stats SyncStats
|
||||
count, err := fssync.QueryCount(layer.ID)
|
||||
if err != nil {
|
||||
return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err)
|
||||
}
|
||||
log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("Starting on layer")
|
||||
if count.Count == 0 {
|
||||
return stats, nil
|
||||
}
|
||||
pool := pond.NewResultPool[SyncStats](20)
|
||||
group := pool.NewGroup()
|
||||
maxRecords := uint(fssync.MaxRecordCount())
|
||||
for offset := uint(0); offset < uint(count.Count); offset += maxRecords {
|
||||
group.SubmitErr(func() (SyncStats, error) {
|
||||
/*query := arcgis.NewQuery()
|
||||
query.ResultRecordCount = maxRecords
|
||||
query.ResultOffset = offset
|
||||
query.SpatialReference = "4326"
|
||||
query.OutFields = "*"
|
||||
query.Where = "1=1"
|
||||
qr, err := fssync.DoQuery(
|
||||
layer.ID,
|
||||
query)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %w", layer.Name, layer.ID, offset, err)
|
||||
}
|
||||
|
||||
i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr, org_id)
|
||||
if err != nil {
|
||||
filename := fmt.Sprintf("failure-%s-%d-%d.json", layer.Name, layer.ID, offset)
|
||||
saveRawQuery(fssync, layer, query, filename)
|
||||
log.Error().Err(err).Msg("Faild to save DB records")
|
||||
return SyncStats{}, fmt.Errorf("Failed to save records: %w", err)
|
||||
}
|
||||
return SyncStats{
|
||||
Inserts: i,
|
||||
Updates: u,
|
||||
Unchanged: len(qr.Features) - u - i,
|
||||
}, nil
|
||||
*/
|
||||
return SyncStats{
|
||||
Inserts: 0,
|
||||
Updates: 0,
|
||||
Unchanged: 0,
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
results, err := group.Wait()
|
||||
if err != nil {
|
||||
return stats, fmt.Errorf("one or more tasks in the work pool failed: %w", err)
|
||||
}
|
||||
for _, r := range results {
|
||||
stats.Inserts += r.Inserts
|
||||
stats.Updates += r.Updates
|
||||
stats.Unchanged += r.Unchanged
|
||||
}
|
||||
log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Msg("Finished layer")
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func getOAuthForOrg(ctx context.Context, org *models.Organization) (*models.OauthToken, error) {
|
||||
users, err := org.User().All(ctx, db.PGInstance.BobDB)
|
||||
if err != nil {
|
||||
|
|
@ -402,6 +374,7 @@ func periodicallyExportFieldseeker(ctx context.Context, org *models.Organization
|
|||
if err != nil {
|
||||
return fmt.Errorf("Failed to get oauth for org: %w", err)
|
||||
}
|
||||
logPermissions(ctx, org, oauth)
|
||||
err = exportFieldseekerData(ctx, org, oauth)
|
||||
syncStatusByOrg[org.ID] = false
|
||||
if err != nil {
|
||||
|
|
@ -436,11 +409,12 @@ func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth
|
|||
return fmt.Errorf("Failed to create fssync: %w", err)
|
||||
}
|
||||
var stats SyncStats
|
||||
//layers := fssync.FeatureServerLayers()
|
||||
|
||||
pool := pond.NewResultPool[SyncStats](20)
|
||||
group := pool.NewGroup()
|
||||
var ss SyncStats
|
||||
for _, l := range fssync.FeatureServerLayers() {
|
||||
ss, err = exportFieldseekerLayer(ctx, org, fssync, l)
|
||||
ss, err = exportFieldseekerLayer(ctx, group, org, fssync, l)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -448,6 +422,15 @@ func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth
|
|||
stats.Updates += ss.Updates
|
||||
stats.Unchanged += ss.Unchanged
|
||||
}
|
||||
results, err := group.Wait()
|
||||
if err != nil {
|
||||
return fmt.Errorf("one or more tasks in the work pool failed: %w", err)
|
||||
}
|
||||
for _, r := range results {
|
||||
stats.Inserts += r.Inserts
|
||||
stats.Updates += r.Updates
|
||||
stats.Unchanged += r.Unchanged
|
||||
}
|
||||
|
||||
setter := models.FieldseekerSyncSetter{
|
||||
RecordsCreated: omit.From(int32(stats.Inserts)),
|
||||
|
|
@ -463,6 +446,37 @@ func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth
|
|||
return nil
|
||||
}
|
||||
|
||||
func logPermissions(ctx context.Context, org *models.Organization, oauth *models.OauthToken) {
|
||||
ar := arcgis.NewArcGIS(
|
||||
arcgis.AuthenticatorOAuth{
|
||||
AccessToken: oauth.AccessToken,
|
||||
AccessTokenExpires: oauth.AccessTokenExpires,
|
||||
RefreshToken: oauth.RefreshToken,
|
||||
RefreshTokenExpires: oauth.RefreshTokenExpires,
|
||||
},
|
||||
)
|
||||
row, err := sql.OrgByOauthId(oauth.ID).One(ctx, db.PGInstance.BobDB)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to get org in log permissions")
|
||||
return
|
||||
}
|
||||
fssync, err := fieldseeker.NewFieldSeeker(
|
||||
ar,
|
||||
row.FieldseekerURL.MustGet(),
|
||||
)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to create fieldseeker client in log permissions")
|
||||
return
|
||||
}
|
||||
permissions, err := fssync.PermissionList()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to query permissions in log permissions")
|
||||
return
|
||||
}
|
||||
for _, p := range permissions {
|
||||
log.Info().Str("p", p.Principal).Msg("Permission!")
|
||||
}
|
||||
}
|
||||
func maintainOAuth(ctx context.Context, oauth *models.OauthToken) error {
|
||||
for {
|
||||
// Refresh from the database
|
||||
|
|
@ -1086,24 +1100,22 @@ func updateSummaryTables(ctx context.Context, org *models.Organization) {
|
|||
}
|
||||
}
|
||||
|
||||
func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature) (SyncStats, error) {
|
||||
func exportFieldseekerLayer(ctx context.Context, group pond.ResultTaskGroup[SyncStats], org *models.Organization, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature) (SyncStats, error) {
|
||||
var stats SyncStats
|
||||
count, err := fssync.QueryCount(layer.ID)
|
||||
if err != nil {
|
||||
return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err)
|
||||
}
|
||||
if count.Count == 0 {
|
||||
log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("No records to download")
|
||||
log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Msg("No records to download")
|
||||
return stats, nil
|
||||
}
|
||||
log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("Starting on layer")
|
||||
pool := pond.NewResultPool[SyncStats](20)
|
||||
group := pool.NewGroup()
|
||||
maxRecords := uint(fssync.MaxRecordCount())
|
||||
l, err := fieldseeker.NameToLayerType(layer.Name)
|
||||
if err != nil {
|
||||
return stats, fmt.Errorf("Failed to get layer for '%s': %w", layer.Name, err)
|
||||
}
|
||||
log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Int("count", count.Count).Uint("iterations", uint(count.Count)/maxRecords).Msg("Queuing jobs for layer")
|
||||
for offset := uint(0); offset < uint(count.Count); offset += maxRecords {
|
||||
group.SubmitErr(func() (SyncStats, error) {
|
||||
var ss SyncStats
|
||||
|
|
@ -1571,15 +1583,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
|
|||
return ss, err
|
||||
})
|
||||
}
|
||||
results, err := group.Wait()
|
||||
if err != nil {
|
||||
return stats, fmt.Errorf("one or more tasks in the work pool failed: %w", err)
|
||||
}
|
||||
for _, r := range results {
|
||||
stats.Inserts += r.Inserts
|
||||
stats.Updates += r.Updates
|
||||
stats.Unchanged += r.Unchanged
|
||||
}
|
||||
log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Str("layer", layer.Name).Msg("Finished layer")
|
||||
//log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Str("layer", layer.Name).Msg("Finished layer")
|
||||
return stats, nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue