Checkpoint on more dynamically populating tables for fieldseeker

This commit is contained in:
Eli Ribble 2025-12-04 02:52:01 +00:00
parent 4dc473bc85
commit ab5be998c8
No known key found for this signature in database
2 changed files with 485 additions and 506 deletions

View file

@ -511,16 +511,20 @@ func maintainOAuth(ctx context.Context, oauth *models.OauthToken) error {
if err != nil {
return fmt.Errorf("Failed to update oauth token from database: %w", err)
}
accessTokenDelay := time.Until(oauth.AccessTokenExpires) - (3 * time.Second)
refreshTokenDelay := time.Until(oauth.RefreshTokenExpires) - (3 * time.Second)
var accessTokenDelay time.Duration
if oauth.AccessTokenExpires.Before(time.Now()) {
accessTokenDelay = 0
accessTokenDelay = 1
} else {
accessTokenDelay = time.Until(oauth.AccessTokenExpires) - (3 * time.Second)
}
var refreshTokenDelay time.Duration
if oauth.RefreshTokenExpires.Before(time.Now()) {
refreshTokenDelay = 0
refreshTokenDelay = 1
} else {
refreshTokenDelay = time.Until(oauth.RefreshTokenExpires) - (3 * time.Second)
}
log.Info().Int("id", int(oauth.ID)).Float64("seconds", accessTokenDelay.Seconds()).Str("access_token", oauth.AccessToken).Msg("Need to refresh access token")
log.Info().Int("id", int(oauth.ID)).Float64("seconds", refreshTokenDelay.Seconds()).Str("refresh_token", oauth.RefreshToken).Msg("Need to refresh refresh token")
log.Info().Int("id", int(oauth.ID)).Float64("seconds", accessTokenDelay.Seconds()).Msg("Need to refresh access token")
log.Info().Int("id", int(oauth.ID)).Float64("seconds", refreshTokenDelay.Seconds()).Msg("Need to refresh refresh token")
accessTokenTicker := time.NewTicker(accessTokenDelay)
refreshTokenTicker := time.NewTicker(refreshTokenDelay)
select {
@ -1138,6 +1142,7 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
var inserts, unchanged, updates uint
var err error
switch l {
/*
case fieldseeker.LayerAerialSpraySession:
name = "AerialSpraySession"
rows, err := fssync.AerialSpraySession(offset)
@ -1149,7 +1154,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerAerialSprayLine:
name = "LayerAerialSprayLine"
rows, err := fssync.AerialSprayLine(offset)
@ -1161,7 +1165,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerBarrierSpray:
name = "LayerBarrierSpray"
rows, err := fssync.BarrierSpray(offset)
@ -1173,7 +1176,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerBarrierSprayRoute:
name = "LayerBarrierSprayRoute"
rows, err := fssync.BarrierSprayRoute(offset)
@ -1185,7 +1187,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerContainerRelate:
name = "LayerContainerRelate"
rows, err := fssync.ContainerRelate(offset)
@ -1197,7 +1198,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerFieldScoutingLog:
name = "LayerFieldScoutingLog"
rows, err := fssync.FieldScoutingLog(offset)
@ -1209,7 +1209,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerHabitatRelate:
name = "LayerHabitatRelate"
rows, err := fssync.HabitatRelate(offset)
@ -1221,7 +1220,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerInspectionSample:
name = "LayerInspectionSample"
rows, err := fssync.InspectionSample(offset)
@ -1233,7 +1231,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerInspectionSampleDetail:
name = "LayerInspectionSampleDetail"
rows, err := fssync.InspectionSampleDetail(offset)
@ -1245,7 +1242,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerLandingCount:
name = "LayerLandingCount"
rows, err := fssync.LandingCount(offset)
@ -1257,7 +1253,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerLandingCountLocation:
name = "LayerLandingCountLocation"
rows, err := fssync.LandingCountLocation(offset)
@ -1269,7 +1264,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerLineLocation:
name = "LayerLineLocation"
rows, err := fssync.LineLocation(offset)
@ -1281,7 +1275,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerLocationTracking:
name = "LayerLocationTracking"
rows, err := fssync.LocationTracking(offset)
@ -1293,7 +1286,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerMosquitoInspection:
name = "LayerMosquitoInspection"
rows, err := fssync.MosquitoInspection(offset)
@ -1305,7 +1297,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerOfflineMapAreas:
name = "LayerOfflineMapAreas"
rows, err := fssync.OfflineMapAreas(offset)
@ -1317,7 +1308,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerProposedTreatmentArea:
name = "LayerProposedTreatmentArea"
rows, err := fssync.ProposedTreatmentArea(offset)
@ -1329,7 +1319,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerPointLocation:
name = "LayerPointLocation"
rows, err := fssync.PointLocation(offset)
@ -1341,7 +1330,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerPolygonLocation:
name = "LayerPolygonLocation"
rows, err := fssync.PolygonLocation(offset)
@ -1353,7 +1341,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerPoolDetail:
name = "LayerPoolDetail"
rows, err := fssync.PoolDetail(offset)
@ -1365,7 +1352,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerPool:
name = "LayerPool"
rows, err := fssync.Pool(offset)
@ -1377,7 +1363,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerPoolBuffer:
name = "LayerPoolBuffer"
rows, err := fssync.PoolBuffer(offset)
@ -1389,7 +1374,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerQALarvCount:
name = "LayerQALarvCount"
rows, err := fssync.QALarvCount(offset)
@ -1401,7 +1385,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerQAMosquitoInspection:
name = "LayerQAMosquitoInspection"
rows, err := fssync.QAMosquitoInspection(offset)
@ -1413,7 +1396,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerQAProductObservation:
name = "LayerQAProductObservation"
rows, err := fssync.QAProductObservation(offset)
@ -1425,7 +1407,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerRestrictedArea:
name = "LayerRestrictedArea"
rows, err := fssync.RestrictedArea(offset)
@ -1437,7 +1418,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerRodentInspection:
name = "LayerRodentInspection"
rows, err := fssync.RodentInspection(offset)
@ -1449,7 +1429,7 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
*/
case fieldseeker.LayerRodentLocation:
name = "LayerRodentLocation"
rows, err := fssync.RodentLocation(offset)
@ -1461,7 +1441,7 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
/*
case fieldseeker.LayerSampleCollection:
name = "LayerSampleCollection"
rows, err := fssync.SampleCollection(offset)
@ -1473,7 +1453,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerSampleLocation:
name = "LayerSampleLocation"
rows, err := fssync.SampleLocation(offset)
@ -1485,7 +1464,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerServiceRequest:
name = "LayerServiceRequest"
rows, err := fssync.ServiceRequest(offset)
@ -1497,7 +1475,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerSpeciesAbundance:
name = "LayerSpeciesAbundance"
rows, err := fssync.SpeciesAbundance(offset)
@ -1509,7 +1486,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerStormDrain:
name = "LayerStormDrain"
rows, err := fssync.StormDrain(offset)
@ -1521,7 +1497,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerTracklog:
name = "LayerTracklog"
rows, err := fssync.Tracklog(offset)
@ -1533,7 +1508,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerTrapLocation:
name = "LayerTrapLocation"
rows, err := fssync.TrapLocation(offset)
@ -1545,7 +1519,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerTrapData:
name = "LayerTrapData"
rows, err := fssync.TrapData(offset)
@ -1557,7 +1530,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerTimeCard:
name = "LayerTimeCard"
rows, err := fssync.TimeCard(offset)
@ -1569,7 +1541,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerTreatment:
name = "LayerTreatment"
rows, err := fssync.Treatment(offset)
@ -1581,7 +1552,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerTreatmentArea:
name = "LayerTreatmentArea"
rows, err := fssync.TreatmentArea(offset)
@ -1593,7 +1563,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerULVSprayRoute:
name = "LayerULVSprayRoute"
rows, err := fssync.ULVSprayRoute(offset)
@ -1605,7 +1574,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerZones:
name = "LayerZones"
rows, err := fssync.Zones(offset)
@ -1617,7 +1585,6 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
return ss, err
case fieldseeker.LayerZones2:
name = "LayerZones2"
rows, err := fssync.Zones2(offset)
@ -1629,8 +1596,10 @@ func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssyn
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
*/
default:
return ss, errors.New("Unrecognized layer")
//return ss, errors.New("Unrecognized layer")
return ss, nil
}
ss.Inserts = inserts
ss.Updates = updates

View file

@ -138,11 +138,34 @@ type InsertResultRow struct {
Version int `db:"version_num"`
}
func SaveOrUpdateRodentLocation(ctx context.Context, org *models.Organization, fs []*fslayer.RodentLocation) (inserts uint, updates uint, err error) {
log.Info().Int("rows", len(fs)).Msg("Processing RodentLocation")
type rowConverter[T any] func(*T) []SqlParam
func doUpdatesViaFunction[T any](ctx context.Context, org *models.Organization, fs []*T, table string, procedure string, converter rowConverter[T]) (inserts uint, updates uint, err error) {
//log.Info().Int("rows", len(fs)).Msg("Processing RodentLocation")
for _, row := range fs {
procedure := "fieldseeker.insert_rodentlocation"
q := queryStoredProcedure(procedure,
params := converter(row)
q := queryStoredProcedure(procedure, params...)
query := psql.RawQuery(q)
log.Info().Str("query", q).Msg("querying")
result, err := bob.One[InsertResultRow](ctx, PGInstance.BobDB, query, scan.StructMapper[InsertResultRow]())
if err != nil {
return inserts, updates, fmt.Errorf("Failed to execute %s: %w", procedure, err)
}
if result.Inserted {
if result.Version == 1 {
inserts += 1
} else {
updates += 1
}
}
log.Info().Bool("inserted", result.Inserted).Int("version", result.Version).Msg("querying")
}
return inserts, updates, err
}
func SaveOrUpdateRodentLocation(ctx context.Context, org *models.Organization, fs []*fslayer.RodentLocation) (inserts uint, updates uint, err error) {
return doUpdatesViaFunction(ctx, org, fs, "RodentLocation", "fieldseeker.insert_rodentlocation", func(row *fslayer.RodentLocation) []SqlParam {
return []SqlParam{
Uint("p_objectid", row.ObjectID),
String("p_locationname", row.LocationName),
String("p_zone", row.Zone),
@ -173,23 +196,10 @@ func SaveOrUpdateRodentLocation(ctx context.Context, org *models.Organization, f
Timestamp("p_editdate", row.EditDate),
String("p_editor", row.Editor),
String("p_jurisdiction", row.Jurisdiction),
)
query := psql.RawQuery(q)
log.Info().Str("query", q).Msg("querying")
result, err := bob.One[InsertResultRow](ctx, PGInstance.BobDB, query, scan.StructMapper[InsertResultRow]())
if err != nil {
return inserts, updates, fmt.Errorf("Failed to execute %s: %w", procedure, err)
}
if result.Inserted {
if result.Version == 1 {
inserts += 1
} else {
updates += 1
}
}
}
return inserts, updates, err
})
}
func SaveOrUpdateSampleCollection(fs []*fslayer.SampleCollection) (inserts uint, updates uint, err error) {
//log.Warn().Int("len", len(fs)).Msg("Ignoring SampleCollection rows")
return 0, 0, nil