diff --git a/arcgis.go b/arcgis.go index 2e609e4a..a11b3d58 100644 --- a/arcgis.go +++ b/arcgis.go @@ -351,9 +351,9 @@ func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) { } type SyncStats struct { - Inserts int - Updates int - Unchanged int + Inserts uint + Updates uint + Unchanged uint } func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (SyncStats, error) { @@ -362,16 +362,16 @@ func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, la if err != nil { return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err) } - log.Info().Str("name", layer.Name).Int("id", layer.ID).Msg("Starting on layer") + log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("Starting on layer") if count.Count == 0 { return stats, nil } pool := pond.NewResultPool[SyncStats](20) group := pool.NewGroup() - maxRecords := fssync.MaxRecordCount() - for offset := 0; offset < count.Count; offset += maxRecords { + maxRecords := uint(fssync.MaxRecordCount()) + for offset := uint(0); offset < uint(count.Count); offset += maxRecords { group.SubmitErr(func() (SyncStats, error) { - query := arcgis.NewQuery() + /*query := arcgis.NewQuery() query.ResultRecordCount = maxRecords query.ResultOffset = offset query.SpatialReference = "4326" @@ -383,6 +383,7 @@ func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, la if err != nil { return SyncStats{}, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %w", layer.Name, layer.ID, offset, err) } + i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr, org_id) if err != nil { filename := fmt.Sprintf("failure-%s-%d-%d.json", layer.Name, layer.ID, offset) @@ -395,6 +396,12 @@ func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, la Updates: u, Unchanged: len(qr.Features) - u - i, }, nil + */ + return SyncStats{ + Inserts: 0, + Updates: 0, + Unchanged: 0, + }, nil }) } results, err := group.Wait() @@ -406,7 +413,7 @@ func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, la stats.Updates += r.Updates stats.Unchanged += r.Unchanged } - log.Info().Int("inserts", stats.Inserts).Int("updates", stats.Updates).Int("no change", stats.Unchanged).Msg("Finished layer") + log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Msg("Finished layer") return stats, nil } @@ -470,10 +477,13 @@ func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth return fmt.Errorf("Failed to create fssync: %w", err) } var stats SyncStats - for _, layer := range fssync.FeatureServerLayers() { - ss, err := downloadAllRecords(ctx, fssync, layer, row.OrganizationID) + //layers := fssync.FeatureServerLayers() + + var ss SyncStats + for _, l := range fssync.FeatureServerLayers() { + ss, err = exportFieldseekerLayer(ctx, org, fssync, l) if err != nil { - return fmt.Errorf("Failed to get layer %s: %w", layer, err) + return err } stats.Inserts += ss.Inserts stats.Updates += ss.Updates @@ -697,6 +707,7 @@ func saveResponse(data []byte, filename string) { log.Info().Str("filename", filename).Msg("Wrote response") } +/* func saveRawQuery(fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, query *arcgis.Query, filename string) { output, err := os.Create(filename) if err != nil { @@ -717,6 +728,7 @@ func saveRawQuery(fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, qu } log.Info().Str("filename", filename).Msg("Wrote failed query") } +*/ func saveOrUpdateDBRecords(ctx context.Context, table string, qr *arcgis.QueryResult, org_id int32) (int, int, error) { inserts, updates := 0, 0 @@ -1100,3 +1112,541 @@ func updateSummaryTables(ctx context.Context, org *models.Organization) { } } } + +func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature) (SyncStats, error) { + var stats SyncStats + count, err := fssync.QueryCount(layer.ID) + if err != nil { + return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err) + } + if count.Count == 0 { + log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("No records to download") + return stats, nil + } + log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("Starting on layer") + pool := pond.NewResultPool[SyncStats](20) + group := pool.NewGroup() + maxRecords := uint(fssync.MaxRecordCount()) + l, err := fieldseeker.NameToLayerType(layer.Name) + if err != nil { + return stats, fmt.Errorf("Failed to get layer for '%s': %w", layer.Name, err) + } + for offset := uint(0); offset < uint(count.Count); offset += maxRecords { + group.SubmitErr(func() (SyncStats, error) { + var ss SyncStats + var name string + var inserts, unchanged, updates uint + var err error + switch l { + case fieldseeker.LayerAerialSpraySession: + name = "AerialSpraySession" + rows, err := fssync.AerialSpraySession(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateAerialSpraySession(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerAerialSprayLine: + name = "LayerAerialSprayLine" + rows, err := fssync.AerialSprayLine(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateAerialSprayLine(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerBarrierSpray: + name = "LayerBarrierSpray" + rows, err := fssync.BarrierSpray(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateBarrierSpray(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerBarrierSprayRoute: + name = "LayerBarrierSprayRoute" + rows, err := fssync.BarrierSprayRoute(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateBarrierSprayRoute(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerContainerRelate: + name = "LayerContainerRelate" + rows, err := fssync.ContainerRelate(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateContainerRelate(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerFieldScoutingLog: + name = "LayerFieldScoutingLog" + rows, err := fssync.FieldScoutingLog(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateFieldScoutingLog(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerHabitatRelate: + name = "LayerHabitatRelate" + rows, err := fssync.HabitatRelate(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateHabitatRelate(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerInspectionSample: + name = "LayerInspectionSample" + rows, err := fssync.InspectionSample(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateInspectionSample(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerInspectionSampleDetail: + name = "LayerInspectionSampleDetail" + rows, err := fssync.InspectionSampleDetail(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateInspectionSampleDetail(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerLandingCount: + name = "LayerLandingCount" + rows, err := fssync.LandingCount(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateLandingCount(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerLandingCountLocation: + name = "LayerLandingCountLocation" + rows, err := fssync.LandingCountLocation(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateLandingCountLocation(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerLineLocation: + name = "LayerLineLocation" + rows, err := fssync.LineLocation(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateLineLocation(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerLocationTracking: + name = "LayerLocationTracking" + rows, err := fssync.LocationTracking(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateLocationTracking(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerMosquitoInspection: + name = "LayerMosquitoInspection" + rows, err := fssync.MosquitoInspection(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateMosquitoInspection(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerOfflineMapAreas: + name = "LayerOfflineMapAreas" + rows, err := fssync.OfflineMapAreas(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateOfflineMapAreas(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerProposedTreatmentArea: + name = "LayerProposedTreatmentArea" + rows, err := fssync.ProposedTreatmentArea(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateProposedTreatmentArea(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerPointLocation: + name = "LayerPointLocation" + rows, err := fssync.PointLocation(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdatePointLocation(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerPolygonLocation: + name = "LayerPolygonLocation" + rows, err := fssync.PolygonLocation(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdatePolygonLocation(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerPoolDetail: + name = "LayerPoolDetail" + rows, err := fssync.PoolDetail(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdatePoolDetail(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerPool: + name = "LayerPool" + rows, err := fssync.Pool(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdatePool(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerPoolBuffer: + name = "LayerPoolBuffer" + rows, err := fssync.PoolBuffer(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdatePoolBuffer(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerQALarvCount: + name = "LayerQALarvCount" + rows, err := fssync.QALarvCount(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateQALarvCount(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerQAMosquitoInspection: + name = "LayerQAMosquitoInspection" + rows, err := fssync.QAMosquitoInspection(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateQAMosquitoInspection(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerQAProductObservation: + name = "LayerQAProductObservation" + rows, err := fssync.QAProductObservation(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateQAProductObservation(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerRestrictedArea: + name = "LayerRestrictedArea" + rows, err := fssync.RestrictedArea(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateRestrictedArea(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerRodentInspection: + name = "LayerRodentInspection" + rows, err := fssync.RodentInspection(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateRodentInspection(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerRodentLocation: + name = "LayerRodentLocation" + rows, err := fssync.RodentLocation(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateRodentLocation(ctx, org, rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerSampleCollection: + name = "LayerSampleCollection" + rows, err := fssync.SampleCollection(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateSampleCollection(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerSampleLocation: + name = "LayerSampleLocation" + rows, err := fssync.SampleLocation(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateSampleLocation(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerServiceRequest: + name = "LayerServiceRequest" + rows, err := fssync.ServiceRequest(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateServiceRequest(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerSpeciesAbundance: + name = "LayerSpeciesAbundance" + rows, err := fssync.SpeciesAbundance(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateSpeciesAbundance(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerStormDrain: + name = "LayerStormDrain" + rows, err := fssync.StormDrain(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateStormDrain(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerTracklog: + name = "LayerTracklog" + rows, err := fssync.Tracklog(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateTracklog(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerTrapLocation: + name = "LayerTrapLocation" + rows, err := fssync.TrapLocation(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateTrapLocation(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerTrapData: + name = "LayerTrapData" + rows, err := fssync.TrapData(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateTrapData(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerTimeCard: + name = "LayerTimeCard" + rows, err := fssync.TimeCard(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateTimeCard(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerTreatment: + name = "LayerTreatment" + rows, err := fssync.Treatment(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateTreatment(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerTreatmentArea: + name = "LayerTreatmentArea" + rows, err := fssync.TreatmentArea(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateTreatmentArea(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerULVSprayRoute: + name = "LayerULVSprayRoute" + rows, err := fssync.ULVSprayRoute(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateULVSprayRoute(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerZones: + name = "LayerZones" + rows, err := fssync.Zones(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateZones(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + return ss, err + case fieldseeker.LayerZones2: + name = "LayerZones2" + rows, err := fssync.Zones2(offset) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err) + } + inserts, updates, err = db.SaveOrUpdateZones2(rows) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err) + } + unchanged = uint(len(rows)) - inserts - updates + default: + return ss, errors.New("Unrecognized layer") + } + ss.Inserts = inserts + ss.Updates = updates + ss.Unchanged = unchanged + return ss, err + }) + } + results, err := group.Wait() + if err != nil { + return stats, fmt.Errorf("one or more tasks in the work pool failed: %w", err) + } + for _, r := range results { + stats.Inserts += r.Inserts + stats.Updates += r.Updates + stats.Unchanged += r.Unchanged + } + log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Str("layer", layer.Name).Msg("Finished layer") + return stats, nil +}