Add statistics on the sync and save org ID with fieldseeker tables

We need the org ID so that we can avoid bleedover between different
organizations.
This commit is contained in:
Eli Ribble 2025-11-07 09:30:31 +00:00
parent bf6e40d877
commit b0432f3243
No known key found for this signature in database
178 changed files with 9075 additions and 6223 deletions

114
arcgis.go
View file

@ -239,6 +239,19 @@ func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
}()
}
orgs, err := models.Organizations.Query().All(ctx, PGInstance.BobDB)
if err != nil {
slog.Error("Failed to get orgs", slog.String("err", err.Error()))
return
}
for _, org := range orgs {
wg.Add(1)
go func() {
defer wg.Done()
periodicallyExportFieldseeker(workerCtx, org)
}()
}
select {
case <-ctx.Done():
slog.Info("Exiting refresh worker...")
@ -253,17 +266,17 @@ func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
}
}
func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature) (int, int, error) {
func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (int, int, int, error) {
inserts := 0
updates := 0
offset := 0
count, err := fssync.QueryCount(layer.ID)
if err != nil {
return inserts, updates, fmt.Errorf("Failed to get counts for layer %s (%d): %v", layer.Name, layer.ID, err)
return inserts, updates, 0, fmt.Errorf("Failed to get counts for layer %s (%d): %v", layer.Name, layer.ID, err)
}
slog.Info("Starting on layer", slog.String("name", layer.Name), slog.Int("id", layer.ID))
if count.Count == 0 {
return inserts, updates, nil
return inserts, updates, 0, nil
}
for {
if offset >= count.Count {
@ -279,22 +292,58 @@ func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, la
layer.ID,
query)
if err != nil {
return inserts, updates, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %v", layer.Name, layer.ID, offset, err)
return inserts, updates, count.Count - inserts - updates, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %v", layer.Name, layer.ID, offset, err)
}
i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr)
i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr, org_id)
if err != nil {
saveRawQuery(fssync, layer, query, "failure.json")
return inserts, updates, fmt.Errorf("Failed to save records: %v", err)
return inserts, updates, count.Count - inserts - updates, fmt.Errorf("Failed to save records: %v", err)
}
inserts += i
updates += u
offset += len(qr.Features)
}
slog.Info("Finished layer", slog.Int("inserts", inserts), slog.Int("updates", updates), slog.Int("no change", count.Count-inserts-updates))
return inserts, updates, nil
return inserts, updates, count.Count - inserts - updates, nil
}
func exportFieldseekerData(ctx context.Context, oauth *models.OauthToken) error {
func getOAuthForOrg(ctx context.Context, org *models.Organization) (*models.OauthToken, error) {
users, err := org.User().All(ctx, PGInstance.BobDB)
if err != nil {
return nil, fmt.Errorf("Failed to query all users for org: %v", err)
}
for _, user := range users {
oauths, err := user.UserOauthTokens().All(ctx, PGInstance.BobDB)
if err != nil {
return nil, fmt.Errorf("Failed to query all oauth tokens for org: %v", err)
}
for _, oauth := range oauths {
return oauth, nil
}
}
return nil, errors.New("No oauth tokens found")
}
func periodicallyExportFieldseeker(ctx context.Context, org *models.Organization) error {
pollTicker := time.NewTicker(1)
for {
select {
case <-ctx.Done():
return nil
case <-pollTicker.C:
oauth, err := getOAuthForOrg(ctx, org)
if err != nil {
return fmt.Errorf("Failed to get oauth for org: %v", err)
}
err = exportFieldseekerData(ctx, org, oauth)
if err != nil {
return fmt.Errorf("Failed to export Fieldseeker data: %v", err)
}
pollTicker = time.NewTicker(15 * time.Minute)
}
}
}
func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth *models.OauthToken) error {
slog.Info("Update Fieldseeker data")
ar := arcgis.NewArcGIS(
arcgis.AuthenticatorOAuth{
@ -317,15 +366,25 @@ func exportFieldseekerData(ctx context.Context, oauth *models.OauthToken) error
}
inserts := 0
updates := 0
unchanged := 0
for _, layer := range fssync.FeatureServerLayers() {
i, u, err := downloadAllRecords(ctx, fssync, layer)
i, u, un, err := downloadAllRecords(ctx, fssync, layer, row.OrganizationID)
if err != nil {
return fmt.Errorf("Failed to get layer %s: %v", layer, err)
}
inserts += i
updates += u
unchanged += un
}
setter := models.FieldseekerSyncSetter{
RecordsCreated: omit.From(int32(inserts)),
RecordsUpdated: omit.From(int32(updates)),
RecordsUnchanged: omit.From(int32(unchanged)),
}
err = org.InsertFieldseekerSyncs(ctx, PGInstance.BobDB, &setter)
//err = user.InsertUserOauthTokens(ctx, PGInstance.BobDB, &setter)
return nil
}
@ -341,7 +400,6 @@ func maintainOAuth(ctx context.Context, oauth *models.OauthToken) {
refreshDelay = time.Until(oauth.AccessTokenExpires)
}
refreshTicker := time.NewTicker(refreshDelay)
pollTicker := time.NewTicker(1)
for {
select {
case <-ctx.Done():
@ -352,12 +410,6 @@ func maintainOAuth(ctx context.Context, oauth *models.OauthToken) {
slog.Error("Failed to refresh token", slog.String("err", err.Error()))
return
}
case <-pollTicker.C:
err := exportFieldseekerData(ctx, oauth)
if err != nil {
slog.Error("Failed to export Fieldseeker data", slog.String("err", err.Error()))
}
pollTicker = time.NewTicker(15 * time.Minute)
}
}
@ -501,7 +553,7 @@ func saveRawQuery(fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, qu
slog.Info("Wrote failed query", slog.String("filename", filename))
}
func saveOrUpdateDBRecords(ctx context.Context, table string, qr *arcgis.QueryResult) (int, int, error) {
func saveOrUpdateDBRecords(ctx context.Context, table string, qr *arcgis.QueryResult, org_id int32) (int, int, error) {
inserts, updates := 0, 0
sorted_columns := make([]string, 0, len(qr.Fields))
for _, f := range qr.Fields {
@ -527,12 +579,12 @@ func saveOrUpdateDBRecords(ctx context.Context, table string, qr *arcgis.QueryRe
// If we have no matching row we'll need to create it
if len(row) == 0 {
if err := insertRowFromFeature(ctx, table, sorted_columns, &feature); err != nil {
if err := insertRowFromFeature(ctx, table, sorted_columns, &feature, org_id); err != nil {
return inserts, updates, fmt.Errorf("Failed to insert row: %v", err)
}
inserts += 1
} else if hasUpdates(row, feature) {
if err := updateRowFromFeature(ctx, table, sorted_columns, &feature); err != nil {
if err := updateRowFromFeature(ctx, table, sorted_columns, &feature, org_id); err != nil {
return inserts, updates, fmt.Errorf("Failed to update row: %v", err)
}
updates += 1
@ -603,20 +655,20 @@ func rowmapViaQuery(ctx context.Context, table string, sorted_columns []string,
return result, nil
}
func insertRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *arcgis.Feature) error {
func insertRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error {
var options pgx.TxOptions
transaction, err := PGInstance.PGXPool.BeginTx(ctx, options)
if err != nil {
return fmt.Errorf("Unable to start transaction")
}
err = insertRowFromFeatureFS(ctx, transaction, table, sorted_columns, feature)
err = insertRowFromFeatureFS(ctx, transaction, table, sorted_columns, feature, org_id)
if err != nil {
transaction.Rollback(ctx)
return fmt.Errorf("Unable to insert FS: %v", err)
}
err = insertRowFromFeatureHistory(ctx, transaction, table, sorted_columns, feature, 1)
err = insertRowFromFeatureHistory(ctx, transaction, table, sorted_columns, feature, org_id, 1)
if err != nil {
transaction.Rollback(ctx)
return fmt.Errorf("Failed to insert history: %v", err)
@ -629,7 +681,7 @@ func insertRowFromFeature(ctx context.Context, table string, sorted_columns []st
return nil
}
func insertRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature) error {
func insertRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error {
// Create the query to produce the main row
var sb strings.Builder
sb.WriteString("INSERT INTO ")
@ -640,7 +692,7 @@ func insertRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table strin
sb.WriteString(",")
}
// Specially add the geometry values since they aren't in the fields
sb.WriteString("geometry_x,geometry_y,updated")
sb.WriteString("geometry_x,geometry_y,organization_id,updated")
sb.WriteString(")\nVALUES (")
for _, field := range sorted_columns {
sb.WriteString("@")
@ -648,7 +700,7 @@ func insertRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table strin
sb.WriteString(",")
}
// Specially add the geometry values since they aren't in the fields
sb.WriteString("@geometry_x,@geometry_y,@updated)")
sb.WriteString("@geometry_x,@geometry_y,@organization_id,@updated)")
args := pgx.NamedArgs{}
for k, v := range feature.Attributes {
@ -657,6 +709,7 @@ func insertRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table strin
// specially add geometry since it isn't in the list of attributes
args["geometry_x"] = feature.Geometry.X
args["geometry_y"] = feature.Geometry.Y
args["organization_id"] = org_id
args["updated"] = time.Now()
_, err := transaction.Exec(ctx, sb.String(), args)
@ -718,7 +771,7 @@ func hasUpdates(row map[string]string, feature arcgis.Feature) bool {
}
return false
}
func updateRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *arcgis.Feature) error {
func updateRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error {
// Get the current highest version for the row in question
history_table := toHistoryTable(table)
var sb strings.Builder
@ -741,7 +794,7 @@ func updateRowFromFeature(ctx context.Context, table string, sorted_columns []st
return fmt.Errorf("Unable to start transaction")
}
err = insertRowFromFeatureHistory(ctx, transaction, table, sorted_columns, feature, version+1)
err = insertRowFromFeatureHistory(ctx, transaction, table, sorted_columns, feature, org_id, version+1)
if err != nil {
transaction.Rollback(ctx)
return fmt.Errorf("Failed to insert history: %v", err)
@ -758,7 +811,7 @@ func updateRowFromFeature(ctx context.Context, table string, sorted_columns []st
}
return nil
}
func insertRowFromFeatureHistory(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature, version int) error {
func insertRowFromFeatureHistory(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32, version int) error {
history_table := toHistoryTable(table)
var sb strings.Builder
sb.WriteString("INSERT INTO ")
@ -769,7 +822,7 @@ func insertRowFromFeatureHistory(ctx context.Context, transaction pgx.Tx, table
sb.WriteString(",")
}
// Specially add the geometry values since they aren't in the fields
sb.WriteString("created,geometry_x,geometry_y,version")
sb.WriteString("created,geometry_x,geometry_y,organization_id,version")
sb.WriteString(")\nVALUES (")
for _, field := range sorted_columns {
sb.WriteString("@")
@ -777,12 +830,13 @@ func insertRowFromFeatureHistory(ctx context.Context, transaction pgx.Tx, table
sb.WriteString(",")
}
// Specially add the geometry values since they aren't in the fields
sb.WriteString("@created,@geometry_x,@geometry_y,@version)")
sb.WriteString("@created,@geometry_x,@geometry_y,@organization_id,@version)")
args := pgx.NamedArgs{}
for k, v := range feature.Attributes {
args[k] = v
}
args["created"] = time.Now()
args["organization_id"] = org_id
args["version"] = version
if _, err := transaction.Exec(ctx, sb.String(), args); err != nil {
return fmt.Errorf("Failed to insert history row into %s: %v", table, err)