nidus-sync/arcgis.go
Eli Ribble 1395e3d3ac Remove old FieldSeeker tables, use v2 generated tables.
This requires a bunch of changes since the types on these tables are
much closer to the underlying types of the Fieldseeker data we are
getting back from the API.

I now need to use proper UUID types everywhere, which means I had to
modify the bob gen config to consistently use google UUID, my UUID
library of choice.

I also had to add the organization_id to all the fieldseeker tables
since we rely on them existing for some of our compound queries.

There were some changes to the API type signatures to get things to
build. I may yet regret those.
2025-12-24 17:58:08 -07:00

1624 lines
56 KiB
Go

package main
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/Gleipnir-Technology/arcgis-go"
"github.com/Gleipnir-Technology/arcgis-go/fieldseeker"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/db/sql"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
"github.com/alitto/pond/v2"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
"github.com/stephenafamo/bob"
"github.com/stephenafamo/bob/dialect/psql"
"github.com/stephenafamo/bob/dialect/psql/dialect"
"github.com/stephenafamo/bob/dialect/psql/im"
"github.com/uber/h3-go/v4"
)
// When the API responds that the token is now invalidated
type InvalidatedTokenError struct{}
func (e InvalidatedTokenError) Error() string { return "The token has been invalidated by the server" }
// When there is no oauth for an organization
type NoOAuthForOrg struct{}
func (e NoOAuthForOrg) Error() string { return "No oauth available for organization" }
var NewOAuthTokenChannel chan struct{}
var CodeVerifier string = "random_secure_string_min_43_chars_long_should_be_stored_in_session"
type OAuthTokenResponse struct {
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
RefreshToken string `json:"refresh_token"`
RefreshTokenExpiresIn int `json:"refresh_token_expires_in"`
SSL bool `json:"ssl"`
Username string `json:"username"`
}
// Build the ArcGIS authorization URL with PKCE
func buildArcGISAuthURL(clientID string) string {
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/authorize/"
params := url.Values{}
params.Add("client_id", clientID)
params.Add("redirect_uri", redirectURL())
params.Add("response_type", "code")
//params.Add("code_challenge", generateCodeChallenge(codeVerifier))
//params.Add("code_challenge_method", "S256")
// See https://developers.arcgis.com/rest/users-groups-and-items/token/
// expiration is defined in minutes
var expiration int
if IsProductionEnvironment() {
// 2 weeks is the maximum allowed
expiration = 20160
} else {
expiration = 20
}
params.Add("expiration", strconv.Itoa(expiration))
return baseURL + "?" + params.Encode()
}
func downloadFieldseekerSchema(ctx context.Context, fieldseekerClient *fieldseeker.FieldSeeker, arcgis_id string) {
for _, layer := range fieldseekerClient.FeatureServerLayers() {
err := os.MkdirAll(filepath.Join(FieldseekerSchemaDirectory, arcgis_id), os.ModePerm)
if err != nil {
log.Error().Err(err).Msg("Failed to create parent directory")
return
}
output, err := os.Create(fmt.Sprintf("%s/%s/%s.json", FieldseekerSchemaDirectory, arcgis_id, layer.Name))
if err != nil {
log.Error().Err(err).Msg("Failed to open output")
return
}
defer output.Close()
schema, err := fieldseekerClient.Schema(layer.ID)
if err != nil {
log.Error().Err(err).Msg("Failed to get schema")
return
}
_, err = output.Write(schema)
if err != nil {
log.Error().Err(err).Msg("Failed to write schema file")
continue
}
}
}
func futureUTCTimestamp(secondsFromNow int) time.Time {
return time.Now().UTC().Add(time.Duration(secondsFromNow) * time.Second)
}
// Helper function to generate code challenge from code verifier
func generateCodeChallenge(codeVerifier string) string {
hash := sha256.Sum256([]byte(codeVerifier))
return base64.RawURLEncoding.EncodeToString(hash[:])
}
// Generate a random code verifier for PKCE
func generateCodeVerifier() string {
bytes := make([]byte, 64) // 64 bytes = 512 bits
rand.Read(bytes)
return base64.RawURLEncoding.EncodeToString(bytes)
}
// Find out what we can about this user
func updateArcgisUserData(ctx context.Context, user *models.User, access_token string, access_token_expires time.Time, refresh_token string, refresh_token_expires time.Time) {
client := arcgis.NewArcGIS(
arcgis.AuthenticatorOAuth{
AccessToken: access_token,
AccessTokenExpires: access_token_expires,
RefreshToken: refresh_token,
RefreshTokenExpires: refresh_token_expires,
},
)
portal, err := client.PortalsSelf()
if err != nil {
log.Error().Err(err).Msg("Failed to get ArcGIS user data")
return
}
log.Info().Str("Username", portal.User.Username).Str("user_id", portal.User.ID).Str("org_id", portal.User.OrgID).Str("org_name", portal.Name).Str("license_type_id", portal.User.UserLicenseTypeID).Msg("Got portals data")
_, err = sql.UpdateOauthTokenOrg(portal.User.ID, portal.User.UserLicenseTypeID, refresh_token).Exec(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to update oauth token portal data")
return
}
var org *models.Organization
orgs, err := models.Organizations.Query(models.SelectWhere.Organizations.ArcgisName.EQ(portal.Name)).All(ctx, db.PGInstance.BobDB)
switch len(orgs) {
case 0:
setter := models.OrganizationSetter{
Name: omitnull.From(portal.Name),
ArcgisID: omitnull.From(portal.User.OrgID),
ArcgisName: omitnull.From(portal.Name),
}
org, err = models.Organizations.Insert(&setter).One(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to create new organization")
return
}
log.Info().Int("org_id", int(org.ID)).Msg("Created new organization")
case 1:
org = orgs[0]
log.Info().Msg("Organization already exists")
default:
log.Error().Msg("Got too many organizations, bailing")
return
}
if err != nil {
LogErrorTypeInfo(err)
if errors.Is(err, pgx.ErrNoRows) {
} else {
log.Error().Err(err).Msg("Failed to query for existing org")
return
}
}
err = org.AttachUser(ctx, db.PGInstance.BobDB, user)
if err != nil {
log.Error().Err(err).Int("user_id", int(user.ID)).Int("org_id", int(org.ID)).Msg("Failed to attach user to organization")
return
}
search, err := client.Search("Fieldseeker")
if err != nil {
log.Error().Err(err).Msg("Failed to get search FieldseekerGIS data")
return
}
var fieldseekerClient *fieldseeker.FieldSeeker
for _, result := range search.Results {
log.Info().Str("name", result.Name).Msg("Got result")
if result.Name == "FieldSeekerGIS" {
log.Info().Str("url", result.URL).Msg("Found Fieldseeker")
setter := models.OrganizationSetter{
FieldseekerURL: omitnull.From(result.URL),
}
err = org.Update(ctx, db.PGInstance.BobDB, &setter)
if err != nil {
log.Error().Err(err).Msg("Failed to create new organization")
return
}
fieldseekerClient, err = fieldseeker.NewFieldSeeker(
client,
result.URL,
)
if err != nil {
log.Error().Err(err).Msg("Failed to create fieldseeker client")
return
}
}
}
arcgis_id, ok := org.ArcgisID.Get()
if !ok {
log.Error().Int("org.id", int(org.ID)).Msg("Cannot get webhooks - ArcGIS ID is null")
}
client.Context = &arcgis_id
maybeCreateWebhook(ctx, fieldseekerClient)
downloadFieldseekerSchema(ctx, fieldseekerClient, arcgis_id)
clearNotificationsOauth(ctx, user)
NewOAuthTokenChannel <- struct{}{}
}
func maybeCreateWebhook(ctx context.Context, client *fieldseeker.FieldSeeker) {
webhooks, err := client.WebhookList()
if err != nil {
log.Error().Err(err).Msg("Failed to get webhooks")
}
for _, hook := range webhooks {
if hook.Name == "Nidus Sync" {
log.Info().Msg("Found nidus sync hook")
} else {
log.Info().Str("name", hook.Name).Msg("Found webhook")
}
}
}
func handleOauthAccessCode(ctx context.Context, user *models.User, code string) error {
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
//params.Add("code_verifier", "S256")
form := url.Values{
"grant_type": []string{"authorization_code"},
"code": []string{code},
"client_id": []string{ClientID},
"redirect_uri": []string{redirectURL()},
}
req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode()))
if err != nil {
return fmt.Errorf("Failed to create request: %w", err)
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
token, err := handleTokenRequest(ctx, req)
if err != nil {
return fmt.Errorf("Failed to exchange authorization code for token: %w", err)
}
accessExpires := futureUTCTimestamp(token.ExpiresIn)
refreshExpires := futureUTCTimestamp(token.RefreshTokenExpiresIn)
setter := models.OauthTokenSetter{
AccessToken: omit.From(token.AccessToken),
AccessTokenExpires: omit.From(accessExpires),
RefreshToken: omit.From(token.RefreshToken),
RefreshTokenExpires: omit.From(refreshExpires),
Username: omit.From(token.Username),
}
err = user.InsertUserOauthTokens(ctx, db.PGInstance.BobDB, &setter)
if err != nil {
return fmt.Errorf("Failed to save token to database: %w", err)
}
go updateArcgisUserData(context.Background(), user, token.AccessToken, accessExpires, token.RefreshToken, refreshExpires)
return nil
}
func hasFieldseekerConnection(ctx context.Context, user *models.User) (bool, error) {
result, err := sql.OauthTokenByUserId(user.ID).All(ctx, db.PGInstance.BobDB)
if err != nil {
return false, err
}
return len(result) > 0, nil
}
func redirectURL() string {
return BaseURL + "/arcgis/oauth/callback"
}
// This is a goroutine that is in charge of getting Fieldseeker data and keeping it fresh.
func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
for {
workerCtx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
oauths, err := models.OauthTokens.Query(models.SelectWhere.OauthTokens.InvalidatedAt.IsNull()).All(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to get oauths")
return
}
for _, oauth := range oauths {
wg.Add(1)
go func() {
defer wg.Done()
err := maintainOAuth(workerCtx, oauth)
if err != nil {
markTokenFailed(ctx, oauth)
if errors.Is(err, arcgis.InvalidatedRefreshTokenError) {
log.Info().Int("oauth_token.id", int(oauth.ID)).Msg("Marked invalid by the server")
} else {
LogErrorTypeInfo(err)
log.Error().Err(err).Msg("Crashed oauth maintenance goroutine")
}
}
}()
}
orgs, err := models.Organizations.Query().All(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to get orgs")
return
}
for _, org := range orgs {
wg.Add(1)
go func() {
defer wg.Done()
err := periodicallyExportFieldseeker(workerCtx, org)
if err != nil {
if errors.Is(err, &NoOAuthForOrg{}) {
log.Info().Int("organization_id", int(org.ID)).Msg("No oauth available for organization, exiting exporter.")
return
}
log.Error().Err(err).Msg("Crashed fieldseeker export goroutine")
}
}()
}
select {
case <-ctx.Done():
log.Info().Msg("Exiting refresh worker...")
cancel()
wg.Wait()
return
case <-newOauthCh:
log.Info().Msg("Updating oauth background work")
cancel()
wg.Wait()
}
}
}
type SyncStats struct {
Inserts uint
Updates uint
Unchanged uint
}
func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (SyncStats, error) {
var stats SyncStats
count, err := fssync.QueryCount(layer.ID)
if err != nil {
return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err)
}
log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("Starting on layer")
if count.Count == 0 {
return stats, nil
}
pool := pond.NewResultPool[SyncStats](20)
group := pool.NewGroup()
maxRecords := uint(fssync.MaxRecordCount())
for offset := uint(0); offset < uint(count.Count); offset += maxRecords {
group.SubmitErr(func() (SyncStats, error) {
/*query := arcgis.NewQuery()
query.ResultRecordCount = maxRecords
query.ResultOffset = offset
query.SpatialReference = "4326"
query.OutFields = "*"
query.Where = "1=1"
qr, err := fssync.DoQuery(
layer.ID,
query)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %w", layer.Name, layer.ID, offset, err)
}
i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr, org_id)
if err != nil {
filename := fmt.Sprintf("failure-%s-%d-%d.json", layer.Name, layer.ID, offset)
saveRawQuery(fssync, layer, query, filename)
log.Error().Err(err).Msg("Faild to save DB records")
return SyncStats{}, fmt.Errorf("Failed to save records: %w", err)
}
return SyncStats{
Inserts: i,
Updates: u,
Unchanged: len(qr.Features) - u - i,
}, nil
*/
return SyncStats{
Inserts: 0,
Updates: 0,
Unchanged: 0,
}, nil
})
}
results, err := group.Wait()
if err != nil {
return stats, fmt.Errorf("one or more tasks in the work pool failed: %w", err)
}
for _, r := range results {
stats.Inserts += r.Inserts
stats.Updates += r.Updates
stats.Unchanged += r.Unchanged
}
log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Msg("Finished layer")
return stats, nil
}
func getOAuthForOrg(ctx context.Context, org *models.Organization) (*models.OauthToken, error) {
users, err := org.User().All(ctx, db.PGInstance.BobDB)
if err != nil {
return nil, fmt.Errorf("Failed to query all users for org: %w", err)
}
for _, user := range users {
oauths, err := user.UserOauthTokens(models.SelectWhere.OauthTokens.InvalidatedAt.IsNull()).All(ctx, db.PGInstance.BobDB)
if err != nil {
return nil, fmt.Errorf("Failed to query all oauth tokens for org: %w", err)
}
for _, oauth := range oauths {
return oauth, nil
}
}
return nil, &NoOAuthForOrg{}
}
func periodicallyExportFieldseeker(ctx context.Context, org *models.Organization) error {
pollTicker := time.NewTicker(1)
for {
select {
case <-ctx.Done():
return nil
case <-pollTicker.C:
oauth, err := getOAuthForOrg(ctx, org)
if err != nil {
return fmt.Errorf("Failed to get oauth for org: %w", err)
}
err = exportFieldseekerData(ctx, org, oauth)
if err != nil {
return fmt.Errorf("Failed to export Fieldseeker data: %w", err)
}
log.Info().Msg("Completed exporting data, waiting 15 minutes to go agoin.")
pollTicker = time.NewTicker(15 * time.Minute)
}
}
}
func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth *models.OauthToken) error {
log.Info().Msg("Update Fieldseeker data")
var err error
ar := arcgis.NewArcGIS(
arcgis.AuthenticatorOAuth{
AccessToken: oauth.AccessToken,
AccessTokenExpires: oauth.AccessTokenExpires,
RefreshToken: oauth.RefreshToken,
RefreshTokenExpires: oauth.RefreshTokenExpires,
},
)
row, err := sql.OrgByOauthId(oauth.ID).One(ctx, db.PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to get org ID: %w", err)
}
fssync, err := fieldseeker.NewFieldSeeker(
ar,
row.FieldseekerURL.MustGet(),
)
if err != nil {
return fmt.Errorf("Failed to create fssync: %w", err)
}
var stats SyncStats
//layers := fssync.FeatureServerLayers()
var ss SyncStats
for _, l := range fssync.FeatureServerLayers() {
ss, err = exportFieldseekerLayer(ctx, org, fssync, l)
if err != nil {
return err
}
stats.Inserts += ss.Inserts
stats.Updates += ss.Updates
stats.Unchanged += ss.Unchanged
}
setter := models.FieldseekerSyncSetter{
RecordsCreated: omit.From(int32(stats.Inserts)),
RecordsUpdated: omit.From(int32(stats.Updates)),
RecordsUnchanged: omit.From(int32(stats.Unchanged)),
}
err = org.InsertFieldseekerSyncs(ctx, db.PGInstance.BobDB, &setter)
if err != nil {
return fmt.Errorf("Failed to insert sync: %w", err)
}
updateSummaryTables(ctx, org)
return nil
}
func maintainOAuth(ctx context.Context, oauth *models.OauthToken) error {
for {
// Refresh from the database
oauth, err := models.FindOauthToken(ctx, db.PGInstance.BobDB, oauth.ID)
if err != nil {
return fmt.Errorf("Failed to update oauth token from database: %w", err)
}
var accessTokenDelay time.Duration
if oauth.AccessTokenExpires.Before(time.Now()) {
accessTokenDelay = 1
} else {
accessTokenDelay = time.Until(oauth.AccessTokenExpires) - (3 * time.Second)
}
var refreshTokenDelay time.Duration
if oauth.RefreshTokenExpires.Before(time.Now()) {
refreshTokenDelay = 1
} else {
refreshTokenDelay = time.Until(oauth.RefreshTokenExpires) - (3 * time.Second)
}
log.Info().Int("id", int(oauth.ID)).Float64("seconds", accessTokenDelay.Seconds()).Msg("Need to refresh access token")
log.Info().Int("id", int(oauth.ID)).Float64("seconds", refreshTokenDelay.Seconds()).Msg("Need to refresh refresh token")
accessTokenTicker := time.NewTicker(accessTokenDelay)
refreshTokenTicker := time.NewTicker(refreshTokenDelay)
select {
case <-ctx.Done():
return nil
case <-accessTokenTicker.C:
err := refreshAccessToken(ctx, oauth)
if err != nil {
return fmt.Errorf("Failed to refresh access token: %w", err)
}
case <-refreshTokenTicker.C:
err := refreshRefreshToken(ctx, oauth)
if err != nil {
return fmt.Errorf("Failed to maintain refresh token: %w", err)
}
}
}
}
// Mark that a given oauth token has failed. This includes a notification to
// the user.
func markTokenFailed(ctx context.Context, oauth *models.OauthToken) {
oauthSetter := models.OauthTokenSetter{
InvalidatedAt: omitnull.From(time.Now()),
}
err := oauth.Update(ctx, db.PGInstance.BobDB, &oauthSetter)
if err != nil {
log.Error().Str("err", err.Error()).Msg("Failed to mark token failed")
}
user, err := models.FindUser(ctx, db.PGInstance.BobDB, oauth.UserID)
if err != nil {
log.Error().Str("err", err.Error()).Msg("Failed to get oauth user")
return
}
notifyOauthInvalid(ctx, user)
log.Info().Int("id", int(oauth.ID)).Msg("Marked oauth token invalid")
}
// Update the access token to keep it fresh and alive
func refreshAccessToken(ctx context.Context, oauth *models.OauthToken) error {
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
form := url.Values{
"grant_type": []string{"refresh_token"},
"client_id": []string{ClientID},
"refresh_token": []string{oauth.RefreshToken},
}
req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode()))
if err != nil {
return fmt.Errorf("Failed to create request: %w", err)
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
token, err := handleTokenRequest(ctx, req)
if err != nil {
return fmt.Errorf("Failed to handle request: %w", err)
}
accessExpires := futureUTCTimestamp(token.ExpiresIn)
setter := models.OauthTokenSetter{
AccessToken: omit.From(token.AccessToken),
AccessTokenExpires: omit.From(accessExpires),
Username: omit.From(token.Username),
}
err = oauth.Update(ctx, db.PGInstance.BobDB, &setter)
if err != nil {
return fmt.Errorf("Failed to update oauth in database: %w", err)
}
log.Info().Int("oauth token id", int(oauth.ID)).Msg("Updated oauth token")
return nil
}
// Update the refresh token to keep it fresh and alive
func refreshRefreshToken(ctx context.Context, oauth *models.OauthToken) error {
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
form := url.Values{
"grant_type": []string{"exchange_refresh_token"},
"client_id": []string{ClientID},
"redirect_uri": []string{redirectURL()},
"refresh_token": []string{oauth.RefreshToken},
}
req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode()))
if err != nil {
return fmt.Errorf("Failed to create request: %w", err)
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
token, err := handleTokenRequest(ctx, req)
if err != nil {
return fmt.Errorf("Failed to handle request: %w", err)
}
refreshExpires := futureUTCTimestamp(token.ExpiresIn)
setter := models.OauthTokenSetter{
RefreshToken: omit.From(token.RefreshToken),
RefreshTokenExpires: omit.From(refreshExpires),
Username: omit.From(token.Username),
}
err = oauth.Update(ctx, db.PGInstance.BobDB, &setter)
if err != nil {
return fmt.Errorf("Failed to update oauth in database: %w", err)
}
log.Info().Int("oauth token id", int(oauth.ID)).Msg("Updated oauth token")
return nil
}
func newTimestampedFilename(prefix, suffix string) string {
timestamp := time.Now().Format("20060102_150405") // YYYYMMDD_HHMMSS format
return prefix + timestamp + suffix
}
func handleTokenRequest(ctx context.Context, req *http.Request) (*OAuthTokenResponse, error) {
client := http.Client{}
log.Info().Str("url", req.URL.String()).Msg("POST")
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("Failed to do request: %w", err)
}
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
log.Info().Int("status", resp.StatusCode).Msg("Token request")
filename := newTimestampedFilename("token", ".json")
saveResponse(bodyBytes, filename)
if resp.StatusCode >= http.StatusBadRequest {
if err != nil {
return nil, fmt.Errorf("Got status code %d and failed to read response body: %w", resp.StatusCode, err)
}
bodyString := string(bodyBytes)
var errorResp arcgis.ErrorResponse
if err := json.Unmarshal(bodyBytes, &errorResp); err == nil {
if errorResp.Error.Code == 498 && errorResp.Error.Description == "invalidated refresh_token" {
return nil, InvalidatedTokenError{}
}
return nil, fmt.Errorf("API response JSON error: %d: %w", resp.StatusCode, errorResp)
}
return nil, fmt.Errorf("API returned error status %d: %s", resp.StatusCode, bodyString)
}
//logResponseHeaders(resp)
var tokenResponse OAuthTokenResponse
err = json.Unmarshal(bodyBytes, &tokenResponse)
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal JSON: %w", err)
}
// Just because we got a 200-level status code doesn't mean it worked. Experience has taught us that
// we can get errors without anything indicated in the headers or the status code
if tokenResponse == (OAuthTokenResponse{}) {
var errorResponse arcgis.ErrorResponse
err = json.Unmarshal(bodyBytes, &errorResponse)
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal error JSON: %w", err)
}
if errorResponse.Error.Code > 0 {
return nil, errorResponse.AsError()
}
}
log.Info().Str("refresh token", tokenResponse.RefreshToken).Str("access token", tokenResponse.AccessToken).Int("access expires", tokenResponse.ExpiresIn).Int("refresh expires", tokenResponse.RefreshTokenExpiresIn).Msg("Oauth token acquired")
return &tokenResponse, nil
}
func logResponseHeaders(resp *http.Response) {
if resp == nil {
log.Info().Msg("Response is nil")
return
}
log.Info().Str("status", resp.Status).Int("statusCode", resp.StatusCode).Msg("HTTP Response headers")
for name, values := range resp.Header {
log.Info().Str("name", name).Strs("values", values).Msg("Header")
}
}
func saveResponse(data []byte, filename string) {
dest, err := os.Create(filename)
if err != nil {
log.Error().Str("filename", filename).Str("err", err.Error()).Msg("Failed to create file")
return
}
_, err = io.Copy(dest, bytes.NewReader(data))
if err != nil {
log.Error().Str("filename", filename).Str("err", err.Error()).Msg("Failed to write")
return
}
log.Info().Str("filename", filename).Msg("Wrote response")
}
/*
func saveRawQuery(fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, query *arcgis.Query, filename string) {
output, err := os.Create(filename)
if err != nil {
log.Error().Str("filename", filename).Msg("Failed to create file")
return
}
qr, err := fssync.DoQueryRaw(
layer.ID,
query)
if err != nil {
log.Error().Str("err", err.Error()).Msg("Failed to do query")
return
}
_, err = output.Write(qr)
if err != nil {
log.Error().Str("err", err.Error()).Msg("Failed to write results")
return
}
log.Info().Str("filename", filename).Msg("Wrote failed query")
}
*/
func saveOrUpdateDBRecords(ctx context.Context, table string, qr *arcgis.QueryResult, org_id int32) (int, int, error) {
inserts, updates := 0, 0
sorted_columns := make([]string, 0, len(qr.Fields))
for _, f := range qr.Fields {
sorted_columns = append(sorted_columns, f.Name)
}
sort.Strings(sorted_columns)
objectids := make([]int, 0)
for _, l := range qr.Features {
oid := l.Attributes["OBJECTID"].(float64)
objectids = append(objectids, int(oid))
}
rows_by_objectid, err := rowmapViaQuery(ctx, table, sorted_columns, objectids)
if err != nil {
return inserts, updates, fmt.Errorf("Failed to get existing rows: %w", err)
}
// log.Println("Rows from query", len(rows_by_objectid))
for _, feature := range qr.Features {
oid := feature.Attributes["OBJECTID"].(float64)
row := rows_by_objectid[int(oid)]
// If we have no matching row we'll need to create it
if len(row) == 0 {
if err := insertRowFromFeature(ctx, table, sorted_columns, &feature, org_id); err != nil {
return inserts, updates, fmt.Errorf("Failed to insert row: %w", err)
}
inserts += 1
} else if hasUpdates(row, feature) {
if err := updateRowFromFeature(ctx, table, sorted_columns, &feature, org_id); err != nil {
return inserts, updates, fmt.Errorf("Failed to update row: %w", err)
}
updates += 1
}
}
return inserts, updates, nil
}
// Produces a map of OBJECTID to a 'row' which is in turn a map of column names to their values as strings
func rowmapViaQuery(ctx context.Context, table string, sorted_columns []string, objectids []int) (map[int]map[string]string, error) {
result := make(map[int]map[string]string)
query := selectAllFromQueryResult(table, sorted_columns)
args := pgx.NamedArgs{
"objectids": objectids,
}
rows, err := db.PGInstance.PGXPool.Query(ctx, query, args)
if err != nil {
return result, fmt.Errorf("Failed to query rows: %w", err)
}
defer rows.Close()
// +2 for geometry x and geometry x
columnNames := make([]string, len(sorted_columns)+2)
for i, c := range sorted_columns {
columnNames[i] = c
}
columnNames[len(sorted_columns)] = "geometry_x"
columnNames[len(sorted_columns)+1] = "geometry_y"
rowSlice, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (map[string]string, error) {
fieldDescriptions := row.FieldDescriptions()
values := make([]interface{}, len(fieldDescriptions))
valuePtrs := make([]interface{}, len(fieldDescriptions))
for i := range values {
valuePtrs[i] = &values[i]
}
if err := row.Scan(valuePtrs...); err != nil {
return nil, err
}
result := make(map[string]string)
for i, fd := range fieldDescriptions {
if values[i] != nil {
result[fd.Name] = fmt.Sprintf("%v", values[i])
//log.Printf("col %v type %T val %v", fd.Name, values[i], values[i])
} else {
result[fd.Name] = ""
}
}
return result, nil
})
if err != nil {
return result, fmt.Errorf("Failed to collect rows: %w", err)
}
for _, row := range rowSlice {
o := row["objectid"]
objectid, err := strconv.Atoi(o)
if err != nil {
return result, fmt.Errorf("Failed to parse objectid %s: %w", o, err)
}
result[objectid] = row
}
return result, nil
}
func insertRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error {
var options pgx.TxOptions
transaction, err := db.PGInstance.PGXPool.BeginTx(ctx, options)
if err != nil {
return fmt.Errorf("Unable to start transaction")
}
err = insertRowFromFeatureFS(ctx, transaction, table, sorted_columns, feature, org_id)
if err != nil {
transaction.Rollback(ctx)
return fmt.Errorf("Unable to insert FS: %w", err)
}
err = insertRowFromFeatureHistory(ctx, transaction, table, sorted_columns, feature, org_id, 1)
if err != nil {
transaction.Rollback(ctx)
return fmt.Errorf("Failed to insert history: %w", err)
}
err = transaction.Commit(ctx)
if err != nil {
return fmt.Errorf("Failed to commit transaction: %w", err)
}
return nil
}
func insertRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error {
// Create the query to produce the main row
var sb strings.Builder
sb.WriteString("INSERT INTO ")
sb.WriteString(table)
sb.WriteString(" (")
for _, field := range sorted_columns {
sb.WriteString(field)
sb.WriteString(",")
}
// Specially add the geometry values since they aren't in the fields
sb.WriteString("geometry_x,geometry_y,organization_id,updated")
sb.WriteString(")\nVALUES (")
for _, field := range sorted_columns {
sb.WriteString("@")
sb.WriteString(field)
sb.WriteString(",")
}
// Specially add the geometry values since they aren't in the fields
sb.WriteString("@geometry_x,@geometry_y,@organization_id,@updated)")
args := pgx.NamedArgs{}
for k, v := range feature.Attributes {
args[k] = v
}
// specially add geometry since it isn't in the list of attributes
//args["geometry_x"] = feature.Geometry.X
//args["geometry_y"] = feature.Geometry.Y
args["organization_id"] = org_id
args["updated"] = time.Now()
_, err := transaction.Exec(ctx, sb.String(), args)
if err != nil {
return fmt.Errorf("Failed to insert row into %s: %w", table, err)
}
return nil
}
func hasUpdates(row map[string]string, feature arcgis.Feature) bool {
for key, value := range feature.Attributes {
rowdata := row[strings.ToLower(key)]
// We'll accept any 'nil' as represented by the empty string in the database
if value == nil {
if rowdata == "" {
continue
} else if len(rowdata) > 0 {
return true
} else {
log.Error().Msg("Looks like our original value is nil, but our row value is something non-empty with a zero length. Need a programmer to look into this.")
}
}
// check strings first, their simplest
if featureAsString, ok := value.(string); ok {
if featureAsString != rowdata {
return true
}
continue
} else if featureAsInt, ok := value.(int); ok {
// Previously had a nil value, now we have a real value
if rowdata == "" {
return true
}
rowAsInt, err := strconv.Atoi(rowdata)
if err != nil {
log.Error().Msg(fmt.Sprintf("Failed to convert '%s' to an int to compare against %v for %v", rowdata, featureAsInt, key))
}
if rowAsInt != featureAsInt {
return true
} else {
continue
}
} else if featureAsFloat, ok := value.(float64); ok {
// Previously had a nil value, now we have a real value
if rowdata == "" {
return true
}
rowAsFloat, err := strconv.ParseFloat(rowdata, 64)
if err != nil {
log.Error().Msg(fmt.Sprintf("Failed to convert '%s' to a float64 to compare against %v for %v", rowdata, featureAsFloat, key))
}
if rowAsFloat != featureAsFloat {
return true
} else {
continue
}
}
log.Info().Msg(fmt.Sprintf("key: %s\tvalue: %w (type %T)\trow: %s\n", key, value, value, rowdata))
log.Error().Msg("we've hit a point where we can't tell if we have an update or not, need a programmer to look at the above")
}
return false
}
func updateRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error {
// Get the current highest version for the row in question
history_table := toHistoryTable(table)
var sb strings.Builder
sb.WriteString("SELECT MAX(version) FROM ")
sb.WriteString(history_table)
sb.WriteString(" WHERE OBJECTID=@objectid")
args := pgx.NamedArgs{}
o := feature.Attributes["OBJECTID"].(float64)
args["objectid"] = int(o)
var version int
if err := db.PGInstance.PGXPool.QueryRow(ctx, sb.String(), args).Scan(&version); err != nil {
return fmt.Errorf("Failed to query for version: %w", err)
}
var options pgx.TxOptions
transaction, err := db.PGInstance.PGXPool.BeginTx(ctx, options)
if err != nil {
return fmt.Errorf("Unable to start transaction")
}
err = insertRowFromFeatureHistory(ctx, transaction, table, sorted_columns, feature, org_id, version+1)
if err != nil {
transaction.Rollback(ctx)
return fmt.Errorf("Failed to insert history: %w", err)
}
err = updateRowFromFeatureFS(ctx, transaction, table, sorted_columns, feature)
if err != nil {
transaction.Rollback(ctx)
return fmt.Errorf("Failed to update row from feature: %w", err)
}
err = transaction.Commit(ctx)
if err != nil {
return fmt.Errorf("Failed to commit transaction: %w", err)
}
return nil
}
func insertRowFromFeatureHistory(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32, version int) error {
history_table := toHistoryTable(table)
var sb strings.Builder
sb.WriteString("INSERT INTO ")
sb.WriteString(history_table)
sb.WriteString(" (")
for _, field := range sorted_columns {
sb.WriteString(field)
sb.WriteString(",")
}
// Specially add the geometry values since they aren't in the fields
sb.WriteString("created,geometry_x,geometry_y,organization_id,version")
sb.WriteString(")\nVALUES (")
for _, field := range sorted_columns {
sb.WriteString("@")
sb.WriteString(field)
sb.WriteString(",")
}
// Specially add the geometry values since they aren't in the fields
sb.WriteString("@created,@geometry_x,@geometry_y,@organization_id,@version)")
args := pgx.NamedArgs{}
for k, v := range feature.Attributes {
args[k] = v
}
args["created"] = time.Now()
args["organization_id"] = org_id
args["version"] = version
if _, err := transaction.Exec(ctx, sb.String(), args); err != nil {
return fmt.Errorf("Failed to insert history row into %s: %w", table, err)
}
return nil
}
func selectAllFromQueryResult(table string, sorted_columns []string) string {
var sb strings.Builder
sb.WriteString("SELECT * FROM ")
sb.WriteString(table)
sb.WriteString(" WHERE OBJECTID=ANY(@objectids)")
return sb.String()
}
func toHistoryTable(table string) string {
return "History_" + table[3:len(table)]
}
func updateRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature) error {
// Create the query to produce the main row
var sb strings.Builder
sb.WriteString("UPDATE ")
sb.WriteString(table)
sb.WriteString(" SET ")
for _, field := range sorted_columns {
// OBJECTID is special as our primary key, so skip it
if field == "OBJECTID" {
continue
}
sb.WriteString(field)
sb.WriteString("=@")
sb.WriteString(field)
sb.WriteString(",")
}
// Specially add the geometry values since they aren't in the fields
sb.WriteString("geometry_x=@geometry_x,geometry_y=@geometry_y,updated=@updated WHERE OBJECTID=@OBJECTID")
args := pgx.NamedArgs{}
for k, v := range feature.Attributes {
args[k] = v
}
// specially add geometry since it isn't in the list of attributes
//args["geometry_x"] = feature.Geometry.X
//args["geometry_y"] = feature.Geometry.Y
args["updated"] = time.Now()
_, err := transaction.Exec(ctx, sb.String(), args)
if err != nil {
return fmt.Errorf("Failed to update row into %s: %w", table, err)
}
return nil
}
func updateSummaryTables(ctx context.Context, org *models.Organization) {
/*org, err := models.FindOrganization(ctx, PGInstance.BobDB, org_id)
if err != nil {
log.Error().Err(err).Msg("Failed to get organization")
}*/
log.Info().Int("org_id", int(org.ID)).Msg("Getting point locations")
point_locations, err := org.Pointlocations().All(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to get organization")
return
}
log.Info().Int("count", len(point_locations)).Msg("Summarizing point locations")
for i := range 16 {
log.Info().Int("resolution", i).Msg("Working summary layer")
cellToCount := make(map[h3.Cell]int, 0)
for _, p := range point_locations {
p, err := getPoint(p.Geometry)
if err != nil {
log.Error().Err(err).Msg("Failed to get geometry point")
continue
}
cell, err := h3.LatLngToCell(p, i)
if err != nil {
log.Error().Err(err).Msg("Failed to get cell")
continue
}
//log.Info().Float64("X", p.GeometryX).Float64("Y", p.GeometryY).Str("cell", cell.String()).Msg("Converted lat/lng")
cellToCount[cell] = cellToCount[cell] + 1
}
var to_insert []bob.Mod[*dialect.InsertQuery] = make([]bob.Mod[*dialect.InsertQuery], 0)
to_insert = append(to_insert, im.Into("h3_aggregation", "cell", "resolution", "count_", "type_", "organization_id", "geometry"))
for cell, count := range cellToCount {
polygon, err := cellToPostgisGeometry(cell)
if err != nil {
log.Error().Err(err).Msg("Failed to get PostGIS geometry")
continue
}
// log.Info().Str("polygon", polygon).Msg("Going to insert")
to_insert = append(to_insert, im.Values(psql.Arg(cell.String(), i, count, enums.H3aggregationtypeServicerequest, org.ID), psql.F("st_geomfromtext", psql.S(polygon), 4326)))
}
to_insert = append(to_insert, im.OnConflict("cell, organization_id, type_").DoUpdate(
im.SetCol("count_").To(psql.Raw("EXCLUDED.count_")),
))
//log.Info().Str("sql", insertQueryToString(psql.Insert(to_insert...))).Msg("Updating...")
_, err := psql.Insert(to_insert...).Exec(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Faild to add h3 aggregation")
}
}
}
func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature) (SyncStats, error) {
var stats SyncStats
count, err := fssync.QueryCount(layer.ID)
if err != nil {
return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err)
}
if count.Count == 0 {
log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("No records to download")
return stats, nil
}
log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("Starting on layer")
pool := pond.NewResultPool[SyncStats](20)
group := pool.NewGroup()
maxRecords := uint(fssync.MaxRecordCount())
l, err := fieldseeker.NameToLayerType(layer.Name)
if err != nil {
return stats, fmt.Errorf("Failed to get layer for '%s': %w", layer.Name, err)
}
for offset := uint(0); offset < uint(count.Count); offset += maxRecords {
group.SubmitErr(func() (SyncStats, error) {
var ss SyncStats
var name string
var inserts, unchanged, updates uint
var err error
switch l {
case fieldseeker.LayerAerialSpraySession:
name = "AerialSpraySession"
rows, err := fssync.AerialSpraySession(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateAerialSpraySession(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerAerialSprayLine:
name = "LayerAerialSprayLine"
rows, err := fssync.AerialSprayLine(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateAerialSprayLine(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerBarrierSpray:
name = "LayerBarrierSpray"
rows, err := fssync.BarrierSpray(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateBarrierSpray(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerBarrierSprayRoute:
name = "LayerBarrierSprayRoute"
rows, err := fssync.BarrierSprayRoute(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateBarrierSprayRoute(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerContainerRelate:
name = "LayerContainerRelate"
rows, err := fssync.ContainerRelate(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateContainerRelate(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerFieldScoutingLog:
name = "LayerFieldScoutingLog"
rows, err := fssync.FieldScoutingLog(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateFieldScoutingLog(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerHabitatRelate:
name = "LayerHabitatRelate"
rows, err := fssync.HabitatRelate(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateHabitatRelate(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerInspectionSample:
name = "LayerInspectionSample"
rows, err := fssync.InspectionSample(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateInspectionSample(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerInspectionSampleDetail:
name = "LayerInspectionSampleDetail"
rows, err := fssync.InspectionSampleDetail(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateInspectionSampleDetail(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerLandingCount:
name = "LayerLandingCount"
rows, err := fssync.LandingCount(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateLandingCount(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerLandingCountLocation:
name = "LayerLandingCountLocation"
rows, err := fssync.LandingCountLocation(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateLandingCountLocation(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerLineLocation:
name = "LayerLineLocation"
rows, err := fssync.LineLocation(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateLineLocation(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerLocationTracking:
name = "LayerLocationTracking"
rows, err := fssync.LocationTracking(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateLocationTracking(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerMosquitoInspection:
name = "LayerMosquitoInspection"
rows, err := fssync.MosquitoInspection(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateMosquitoInspection(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerOfflineMapAreas:
name = "LayerOfflineMapAreas"
rows, err := fssync.OfflineMapAreas(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateOfflineMapAreas(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerProposedTreatmentArea:
name = "LayerProposedTreatmentArea"
rows, err := fssync.ProposedTreatmentArea(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateProposedTreatmentArea(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerPointLocation:
name = "LayerPointLocation"
rows, err := fssync.PointLocation(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdatePointLocation(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerPolygonLocation:
name = "LayerPolygonLocation"
rows, err := fssync.PolygonLocation(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdatePolygonLocation(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerPoolDetail:
name = "LayerPoolDetail"
rows, err := fssync.PoolDetail(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdatePoolDetail(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerPool:
name = "LayerPool"
rows, err := fssync.Pool(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdatePool(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerPoolBuffer:
name = "LayerPoolBuffer"
rows, err := fssync.PoolBuffer(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdatePoolBuffer(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerQALarvCount:
name = "LayerQALarvCount"
rows, err := fssync.QALarvCount(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateQALarvCount(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerQAMosquitoInspection:
name = "LayerQAMosquitoInspection"
rows, err := fssync.QAMosquitoInspection(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateQAMosquitoInspection(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerQAProductObservation:
name = "LayerQAProductObservation"
rows, err := fssync.QAProductObservation(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateQAProductObservation(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerRestrictedArea:
name = "LayerRestrictedArea"
rows, err := fssync.RestrictedArea(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateRestrictedArea(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerRodentInspection:
name = "LayerRodentInspection"
rows, err := fssync.RodentInspection(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateRodentInspection(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerRodentLocation:
name = "LayerRodentLocation"
rows, err := fssync.RodentLocation(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateRodentLocation(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerSampleCollection:
name = "LayerSampleCollection"
rows, err := fssync.SampleCollection(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateSampleCollection(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerSampleLocation:
name = "LayerSampleLocation"
rows, err := fssync.SampleLocation(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateSampleLocation(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerServiceRequest:
name = "LayerServiceRequest"
rows, err := fssync.ServiceRequest(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateServiceRequest(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerSpeciesAbundance:
name = "LayerSpeciesAbundance"
rows, err := fssync.SpeciesAbundance(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateSpeciesAbundance(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerStormDrain:
name = "LayerStormDrain"
rows, err := fssync.StormDrain(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateStormDrain(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerTracklog:
name = "LayerTracklog"
rows, err := fssync.Tracklog(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateTracklog(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerTrapLocation:
name = "LayerTrapLocation"
rows, err := fssync.TrapLocation(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateTrapLocation(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerTrapData:
name = "LayerTrapData"
rows, err := fssync.TrapData(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateTrapData(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerTimeCard:
name = "LayerTimeCard"
rows, err := fssync.TimeCard(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateTimeCard(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerTreatment:
name = "LayerTreatment"
rows, err := fssync.Treatment(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateTreatment(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerTreatmentArea:
name = "LayerTreatmentArea"
rows, err := fssync.TreatmentArea(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateTreatmentArea(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerULVSprayRoute:
name = "LayerULVSprayRoute"
rows, err := fssync.ULVSprayRoute(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateULVSprayRoute(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerZones:
name = "LayerZones"
rows, err := fssync.Zones(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateZones(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
case fieldseeker.LayerZones2:
name = "LayerZones2"
rows, err := fssync.Zones2(offset)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
}
inserts, updates, err = db.SaveOrUpdateZones2(ctx, org, rows)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
}
unchanged = uint(len(rows)) - inserts - updates
default:
return ss, errors.New("Unrecognized layer")
}
ss.Inserts = inserts
ss.Updates = updates
ss.Unchanged = unchanged
return ss, err
})
}
results, err := group.Wait()
if err != nil {
return stats, fmt.Errorf("one or more tasks in the work pool failed: %w", err)
}
for _, r := range results {
stats.Inserts += r.Inserts
stats.Updates += r.Updates
stats.Unchanged += r.Unchanged
}
log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Str("layer", layer.Name).Msg("Finished layer")
return stats, nil
}