Switch sync stats to uints, save partial work on new export logic.
This commit is contained in:
parent
41291f07b3
commit
04ba2e5df2
1 changed files with 561 additions and 11 deletions
572
arcgis.go
572
arcgis.go
|
|
@ -351,9 +351,9 @@ func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
|
|||
}
|
||||
|
||||
type SyncStats struct {
|
||||
Inserts int
|
||||
Updates int
|
||||
Unchanged int
|
||||
Inserts uint
|
||||
Updates uint
|
||||
Unchanged uint
|
||||
}
|
||||
|
||||
func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (SyncStats, error) {
|
||||
|
|
@ -362,16 +362,16 @@ func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, la
|
|||
if err != nil {
|
||||
return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err)
|
||||
}
|
||||
log.Info().Str("name", layer.Name).Int("id", layer.ID).Msg("Starting on layer")
|
||||
log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("Starting on layer")
|
||||
if count.Count == 0 {
|
||||
return stats, nil
|
||||
}
|
||||
pool := pond.NewResultPool[SyncStats](20)
|
||||
group := pool.NewGroup()
|
||||
maxRecords := fssync.MaxRecordCount()
|
||||
for offset := 0; offset < count.Count; offset += maxRecords {
|
||||
maxRecords := uint(fssync.MaxRecordCount())
|
||||
for offset := uint(0); offset < uint(count.Count); offset += maxRecords {
|
||||
group.SubmitErr(func() (SyncStats, error) {
|
||||
query := arcgis.NewQuery()
|
||||
/*query := arcgis.NewQuery()
|
||||
query.ResultRecordCount = maxRecords
|
||||
query.ResultOffset = offset
|
||||
query.SpatialReference = "4326"
|
||||
|
|
@ -383,6 +383,7 @@ func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, la
|
|||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %w", layer.Name, layer.ID, offset, err)
|
||||
}
|
||||
|
||||
i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr, org_id)
|
||||
if err != nil {
|
||||
filename := fmt.Sprintf("failure-%s-%d-%d.json", layer.Name, layer.ID, offset)
|
||||
|
|
@ -395,6 +396,12 @@ func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, la
|
|||
Updates: u,
|
||||
Unchanged: len(qr.Features) - u - i,
|
||||
}, nil
|
||||
*/
|
||||
return SyncStats{
|
||||
Inserts: 0,
|
||||
Updates: 0,
|
||||
Unchanged: 0,
|
||||
}, nil
|
||||
})
|
||||
}
|
||||
results, err := group.Wait()
|
||||
|
|
@ -406,7 +413,7 @@ func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, la
|
|||
stats.Updates += r.Updates
|
||||
stats.Unchanged += r.Unchanged
|
||||
}
|
||||
log.Info().Int("inserts", stats.Inserts).Int("updates", stats.Updates).Int("no change", stats.Unchanged).Msg("Finished layer")
|
||||
log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Msg("Finished layer")
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
|
|
@ -470,10 +477,13 @@ func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth
|
|||
return fmt.Errorf("Failed to create fssync: %w", err)
|
||||
}
|
||||
var stats SyncStats
|
||||
for _, layer := range fssync.FeatureServerLayers() {
|
||||
ss, err := downloadAllRecords(ctx, fssync, layer, row.OrganizationID)
|
||||
//layers := fssync.FeatureServerLayers()
|
||||
|
||||
var ss SyncStats
|
||||
for _, l := range fssync.FeatureServerLayers() {
|
||||
ss, err = exportFieldseekerLayer(ctx, org, fssync, l)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get layer %s: %w", layer, err)
|
||||
return err
|
||||
}
|
||||
stats.Inserts += ss.Inserts
|
||||
stats.Updates += ss.Updates
|
||||
|
|
@ -697,6 +707,7 @@ func saveResponse(data []byte, filename string) {
|
|||
log.Info().Str("filename", filename).Msg("Wrote response")
|
||||
}
|
||||
|
||||
/*
|
||||
func saveRawQuery(fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, query *arcgis.Query, filename string) {
|
||||
output, err := os.Create(filename)
|
||||
if err != nil {
|
||||
|
|
@ -717,6 +728,7 @@ func saveRawQuery(fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, qu
|
|||
}
|
||||
log.Info().Str("filename", filename).Msg("Wrote failed query")
|
||||
}
|
||||
*/
|
||||
|
||||
func saveOrUpdateDBRecords(ctx context.Context, table string, qr *arcgis.QueryResult, org_id int32) (int, int, error) {
|
||||
inserts, updates := 0, 0
|
||||
|
|
@ -1100,3 +1112,541 @@ func updateSummaryTables(ctx context.Context, org *models.Organization) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature) (SyncStats, error) {
|
||||
var stats SyncStats
|
||||
count, err := fssync.QueryCount(layer.ID)
|
||||
if err != nil {
|
||||
return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err)
|
||||
}
|
||||
if count.Count == 0 {
|
||||
log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("No records to download")
|
||||
return stats, nil
|
||||
}
|
||||
log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("Starting on layer")
|
||||
pool := pond.NewResultPool[SyncStats](20)
|
||||
group := pool.NewGroup()
|
||||
maxRecords := uint(fssync.MaxRecordCount())
|
||||
l, err := fieldseeker.NameToLayerType(layer.Name)
|
||||
if err != nil {
|
||||
return stats, fmt.Errorf("Failed to get layer for '%s': %w", layer.Name, err)
|
||||
}
|
||||
for offset := uint(0); offset < uint(count.Count); offset += maxRecords {
|
||||
group.SubmitErr(func() (SyncStats, error) {
|
||||
var ss SyncStats
|
||||
var name string
|
||||
var inserts, unchanged, updates uint
|
||||
var err error
|
||||
switch l {
|
||||
case fieldseeker.LayerAerialSpraySession:
|
||||
name = "AerialSpraySession"
|
||||
rows, err := fssync.AerialSpraySession(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateAerialSpraySession(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerAerialSprayLine:
|
||||
name = "LayerAerialSprayLine"
|
||||
rows, err := fssync.AerialSprayLine(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateAerialSprayLine(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerBarrierSpray:
|
||||
name = "LayerBarrierSpray"
|
||||
rows, err := fssync.BarrierSpray(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateBarrierSpray(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerBarrierSprayRoute:
|
||||
name = "LayerBarrierSprayRoute"
|
||||
rows, err := fssync.BarrierSprayRoute(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateBarrierSprayRoute(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerContainerRelate:
|
||||
name = "LayerContainerRelate"
|
||||
rows, err := fssync.ContainerRelate(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateContainerRelate(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerFieldScoutingLog:
|
||||
name = "LayerFieldScoutingLog"
|
||||
rows, err := fssync.FieldScoutingLog(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateFieldScoutingLog(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerHabitatRelate:
|
||||
name = "LayerHabitatRelate"
|
||||
rows, err := fssync.HabitatRelate(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateHabitatRelate(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerInspectionSample:
|
||||
name = "LayerInspectionSample"
|
||||
rows, err := fssync.InspectionSample(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateInspectionSample(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerInspectionSampleDetail:
|
||||
name = "LayerInspectionSampleDetail"
|
||||
rows, err := fssync.InspectionSampleDetail(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateInspectionSampleDetail(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerLandingCount:
|
||||
name = "LayerLandingCount"
|
||||
rows, err := fssync.LandingCount(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateLandingCount(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerLandingCountLocation:
|
||||
name = "LayerLandingCountLocation"
|
||||
rows, err := fssync.LandingCountLocation(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateLandingCountLocation(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerLineLocation:
|
||||
name = "LayerLineLocation"
|
||||
rows, err := fssync.LineLocation(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateLineLocation(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerLocationTracking:
|
||||
name = "LayerLocationTracking"
|
||||
rows, err := fssync.LocationTracking(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateLocationTracking(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerMosquitoInspection:
|
||||
name = "LayerMosquitoInspection"
|
||||
rows, err := fssync.MosquitoInspection(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateMosquitoInspection(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerOfflineMapAreas:
|
||||
name = "LayerOfflineMapAreas"
|
||||
rows, err := fssync.OfflineMapAreas(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateOfflineMapAreas(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerProposedTreatmentArea:
|
||||
name = "LayerProposedTreatmentArea"
|
||||
rows, err := fssync.ProposedTreatmentArea(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateProposedTreatmentArea(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerPointLocation:
|
||||
name = "LayerPointLocation"
|
||||
rows, err := fssync.PointLocation(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdatePointLocation(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerPolygonLocation:
|
||||
name = "LayerPolygonLocation"
|
||||
rows, err := fssync.PolygonLocation(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdatePolygonLocation(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerPoolDetail:
|
||||
name = "LayerPoolDetail"
|
||||
rows, err := fssync.PoolDetail(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdatePoolDetail(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerPool:
|
||||
name = "LayerPool"
|
||||
rows, err := fssync.Pool(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdatePool(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerPoolBuffer:
|
||||
name = "LayerPoolBuffer"
|
||||
rows, err := fssync.PoolBuffer(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdatePoolBuffer(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerQALarvCount:
|
||||
name = "LayerQALarvCount"
|
||||
rows, err := fssync.QALarvCount(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateQALarvCount(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerQAMosquitoInspection:
|
||||
name = "LayerQAMosquitoInspection"
|
||||
rows, err := fssync.QAMosquitoInspection(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateQAMosquitoInspection(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerQAProductObservation:
|
||||
name = "LayerQAProductObservation"
|
||||
rows, err := fssync.QAProductObservation(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateQAProductObservation(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerRestrictedArea:
|
||||
name = "LayerRestrictedArea"
|
||||
rows, err := fssync.RestrictedArea(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateRestrictedArea(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerRodentInspection:
|
||||
name = "LayerRodentInspection"
|
||||
rows, err := fssync.RodentInspection(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateRodentInspection(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerRodentLocation:
|
||||
name = "LayerRodentLocation"
|
||||
rows, err := fssync.RodentLocation(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateRodentLocation(ctx, org, rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerSampleCollection:
|
||||
name = "LayerSampleCollection"
|
||||
rows, err := fssync.SampleCollection(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateSampleCollection(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerSampleLocation:
|
||||
name = "LayerSampleLocation"
|
||||
rows, err := fssync.SampleLocation(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateSampleLocation(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerServiceRequest:
|
||||
name = "LayerServiceRequest"
|
||||
rows, err := fssync.ServiceRequest(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateServiceRequest(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerSpeciesAbundance:
|
||||
name = "LayerSpeciesAbundance"
|
||||
rows, err := fssync.SpeciesAbundance(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateSpeciesAbundance(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerStormDrain:
|
||||
name = "LayerStormDrain"
|
||||
rows, err := fssync.StormDrain(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateStormDrain(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerTracklog:
|
||||
name = "LayerTracklog"
|
||||
rows, err := fssync.Tracklog(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateTracklog(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerTrapLocation:
|
||||
name = "LayerTrapLocation"
|
||||
rows, err := fssync.TrapLocation(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateTrapLocation(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerTrapData:
|
||||
name = "LayerTrapData"
|
||||
rows, err := fssync.TrapData(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateTrapData(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerTimeCard:
|
||||
name = "LayerTimeCard"
|
||||
rows, err := fssync.TimeCard(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateTimeCard(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerTreatment:
|
||||
name = "LayerTreatment"
|
||||
rows, err := fssync.Treatment(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateTreatment(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerTreatmentArea:
|
||||
name = "LayerTreatmentArea"
|
||||
rows, err := fssync.TreatmentArea(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateTreatmentArea(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerULVSprayRoute:
|
||||
name = "LayerULVSprayRoute"
|
||||
rows, err := fssync.ULVSprayRoute(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateULVSprayRoute(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerZones:
|
||||
name = "LayerZones"
|
||||
rows, err := fssync.Zones(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateZones(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
return ss, err
|
||||
case fieldseeker.LayerZones2:
|
||||
name = "LayerZones2"
|
||||
rows, err := fssync.Zones2(offset)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
||||
}
|
||||
inserts, updates, err = db.SaveOrUpdateZones2(rows)
|
||||
if err != nil {
|
||||
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
||||
}
|
||||
unchanged = uint(len(rows)) - inserts - updates
|
||||
default:
|
||||
return ss, errors.New("Unrecognized layer")
|
||||
}
|
||||
ss.Inserts = inserts
|
||||
ss.Updates = updates
|
||||
ss.Unchanged = unchanged
|
||||
return ss, err
|
||||
})
|
||||
}
|
||||
results, err := group.Wait()
|
||||
if err != nil {
|
||||
return stats, fmt.Errorf("one or more tasks in the work pool failed: %w", err)
|
||||
}
|
||||
for _, r := range results {
|
||||
stats.Inserts += r.Inserts
|
||||
stats.Updates += r.Updates
|
||||
stats.Unchanged += r.Unchanged
|
||||
}
|
||||
log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Str("layer", layer.Name).Msg("Finished layer")
|
||||
return stats, nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue