This requires a bunch of changes since the types on these tables are much closer to the underlying types of the Fieldseeker data we are getting back from the API. I now need to use proper UUID types everywhere, which means I had to modify the bob gen config to consistently use google UUID, my UUID library of choice. I also had to add the organization_id to all the fieldseeker tables since we rely on them existing for some of our compound queries. There were some changes to the API type signatures to get things to build. I may yet regret those.
1624 lines
56 KiB
Go
1624 lines
56 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Gleipnir-Technology/arcgis-go"
|
|
"github.com/Gleipnir-Technology/arcgis-go/fieldseeker"
|
|
"github.com/Gleipnir-Technology/nidus-sync/db"
|
|
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
|
|
"github.com/Gleipnir-Technology/nidus-sync/db/models"
|
|
"github.com/Gleipnir-Technology/nidus-sync/db/sql"
|
|
"github.com/aarondl/opt/omit"
|
|
"github.com/aarondl/opt/omitnull"
|
|
"github.com/alitto/pond/v2"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/rs/zerolog/log"
|
|
"github.com/stephenafamo/bob"
|
|
"github.com/stephenafamo/bob/dialect/psql"
|
|
"github.com/stephenafamo/bob/dialect/psql/dialect"
|
|
"github.com/stephenafamo/bob/dialect/psql/im"
|
|
"github.com/uber/h3-go/v4"
|
|
)
|
|
|
|
// When the API responds that the token is now invalidated
|
|
type InvalidatedTokenError struct{}
|
|
|
|
func (e InvalidatedTokenError) Error() string { return "The token has been invalidated by the server" }
|
|
|
|
// When there is no oauth for an organization
|
|
type NoOAuthForOrg struct{}
|
|
|
|
func (e NoOAuthForOrg) Error() string { return "No oauth available for organization" }
|
|
|
|
var NewOAuthTokenChannel chan struct{}
|
|
var CodeVerifier string = "random_secure_string_min_43_chars_long_should_be_stored_in_session"
|
|
|
|
type OAuthTokenResponse struct {
|
|
AccessToken string `json:"access_token"`
|
|
ExpiresIn int `json:"expires_in"`
|
|
RefreshToken string `json:"refresh_token"`
|
|
RefreshTokenExpiresIn int `json:"refresh_token_expires_in"`
|
|
SSL bool `json:"ssl"`
|
|
Username string `json:"username"`
|
|
}
|
|
|
|
// Build the ArcGIS authorization URL with PKCE
|
|
func buildArcGISAuthURL(clientID string) string {
|
|
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/authorize/"
|
|
|
|
params := url.Values{}
|
|
params.Add("client_id", clientID)
|
|
params.Add("redirect_uri", redirectURL())
|
|
params.Add("response_type", "code")
|
|
//params.Add("code_challenge", generateCodeChallenge(codeVerifier))
|
|
//params.Add("code_challenge_method", "S256")
|
|
|
|
// See https://developers.arcgis.com/rest/users-groups-and-items/token/
|
|
// expiration is defined in minutes
|
|
var expiration int
|
|
if IsProductionEnvironment() {
|
|
// 2 weeks is the maximum allowed
|
|
expiration = 20160
|
|
} else {
|
|
expiration = 20
|
|
}
|
|
params.Add("expiration", strconv.Itoa(expiration))
|
|
|
|
return baseURL + "?" + params.Encode()
|
|
}
|
|
|
|
func downloadFieldseekerSchema(ctx context.Context, fieldseekerClient *fieldseeker.FieldSeeker, arcgis_id string) {
|
|
for _, layer := range fieldseekerClient.FeatureServerLayers() {
|
|
err := os.MkdirAll(filepath.Join(FieldseekerSchemaDirectory, arcgis_id), os.ModePerm)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to create parent directory")
|
|
return
|
|
}
|
|
output, err := os.Create(fmt.Sprintf("%s/%s/%s.json", FieldseekerSchemaDirectory, arcgis_id, layer.Name))
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to open output")
|
|
return
|
|
}
|
|
defer output.Close()
|
|
schema, err := fieldseekerClient.Schema(layer.ID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get schema")
|
|
return
|
|
}
|
|
_, err = output.Write(schema)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to write schema file")
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
func futureUTCTimestamp(secondsFromNow int) time.Time {
|
|
return time.Now().UTC().Add(time.Duration(secondsFromNow) * time.Second)
|
|
}
|
|
|
|
// Helper function to generate code challenge from code verifier
|
|
func generateCodeChallenge(codeVerifier string) string {
|
|
hash := sha256.Sum256([]byte(codeVerifier))
|
|
return base64.RawURLEncoding.EncodeToString(hash[:])
|
|
}
|
|
|
|
// Generate a random code verifier for PKCE
|
|
func generateCodeVerifier() string {
|
|
bytes := make([]byte, 64) // 64 bytes = 512 bits
|
|
rand.Read(bytes)
|
|
return base64.RawURLEncoding.EncodeToString(bytes)
|
|
}
|
|
|
|
// Find out what we can about this user
|
|
func updateArcgisUserData(ctx context.Context, user *models.User, access_token string, access_token_expires time.Time, refresh_token string, refresh_token_expires time.Time) {
|
|
client := arcgis.NewArcGIS(
|
|
arcgis.AuthenticatorOAuth{
|
|
AccessToken: access_token,
|
|
AccessTokenExpires: access_token_expires,
|
|
RefreshToken: refresh_token,
|
|
RefreshTokenExpires: refresh_token_expires,
|
|
},
|
|
)
|
|
portal, err := client.PortalsSelf()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get ArcGIS user data")
|
|
return
|
|
}
|
|
log.Info().Str("Username", portal.User.Username).Str("user_id", portal.User.ID).Str("org_id", portal.User.OrgID).Str("org_name", portal.Name).Str("license_type_id", portal.User.UserLicenseTypeID).Msg("Got portals data")
|
|
|
|
_, err = sql.UpdateOauthTokenOrg(portal.User.ID, portal.User.UserLicenseTypeID, refresh_token).Exec(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to update oauth token portal data")
|
|
return
|
|
}
|
|
var org *models.Organization
|
|
orgs, err := models.Organizations.Query(models.SelectWhere.Organizations.ArcgisName.EQ(portal.Name)).All(ctx, db.PGInstance.BobDB)
|
|
switch len(orgs) {
|
|
case 0:
|
|
setter := models.OrganizationSetter{
|
|
Name: omitnull.From(portal.Name),
|
|
ArcgisID: omitnull.From(portal.User.OrgID),
|
|
ArcgisName: omitnull.From(portal.Name),
|
|
}
|
|
org, err = models.Organizations.Insert(&setter).One(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to create new organization")
|
|
return
|
|
}
|
|
log.Info().Int("org_id", int(org.ID)).Msg("Created new organization")
|
|
case 1:
|
|
org = orgs[0]
|
|
log.Info().Msg("Organization already exists")
|
|
default:
|
|
log.Error().Msg("Got too many organizations, bailing")
|
|
return
|
|
|
|
}
|
|
if err != nil {
|
|
LogErrorTypeInfo(err)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
} else {
|
|
log.Error().Err(err).Msg("Failed to query for existing org")
|
|
return
|
|
}
|
|
}
|
|
err = org.AttachUser(ctx, db.PGInstance.BobDB, user)
|
|
if err != nil {
|
|
log.Error().Err(err).Int("user_id", int(user.ID)).Int("org_id", int(org.ID)).Msg("Failed to attach user to organization")
|
|
return
|
|
}
|
|
|
|
search, err := client.Search("Fieldseeker")
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get search FieldseekerGIS data")
|
|
return
|
|
}
|
|
var fieldseekerClient *fieldseeker.FieldSeeker
|
|
for _, result := range search.Results {
|
|
log.Info().Str("name", result.Name).Msg("Got result")
|
|
if result.Name == "FieldSeekerGIS" {
|
|
log.Info().Str("url", result.URL).Msg("Found Fieldseeker")
|
|
setter := models.OrganizationSetter{
|
|
FieldseekerURL: omitnull.From(result.URL),
|
|
}
|
|
err = org.Update(ctx, db.PGInstance.BobDB, &setter)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to create new organization")
|
|
return
|
|
}
|
|
fieldseekerClient, err = fieldseeker.NewFieldSeeker(
|
|
client,
|
|
result.URL,
|
|
)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to create fieldseeker client")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
arcgis_id, ok := org.ArcgisID.Get()
|
|
if !ok {
|
|
log.Error().Int("org.id", int(org.ID)).Msg("Cannot get webhooks - ArcGIS ID is null")
|
|
}
|
|
client.Context = &arcgis_id
|
|
maybeCreateWebhook(ctx, fieldseekerClient)
|
|
downloadFieldseekerSchema(ctx, fieldseekerClient, arcgis_id)
|
|
clearNotificationsOauth(ctx, user)
|
|
NewOAuthTokenChannel <- struct{}{}
|
|
}
|
|
|
|
func maybeCreateWebhook(ctx context.Context, client *fieldseeker.FieldSeeker) {
|
|
webhooks, err := client.WebhookList()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get webhooks")
|
|
}
|
|
for _, hook := range webhooks {
|
|
if hook.Name == "Nidus Sync" {
|
|
log.Info().Msg("Found nidus sync hook")
|
|
} else {
|
|
log.Info().Str("name", hook.Name).Msg("Found webhook")
|
|
}
|
|
}
|
|
}
|
|
|
|
func handleOauthAccessCode(ctx context.Context, user *models.User, code string) error {
|
|
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
|
|
|
|
//params.Add("code_verifier", "S256")
|
|
|
|
form := url.Values{
|
|
"grant_type": []string{"authorization_code"},
|
|
"code": []string{code},
|
|
"client_id": []string{ClientID},
|
|
"redirect_uri": []string{redirectURL()},
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode()))
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to create request: %w", err)
|
|
}
|
|
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
|
|
token, err := handleTokenRequest(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to exchange authorization code for token: %w", err)
|
|
}
|
|
accessExpires := futureUTCTimestamp(token.ExpiresIn)
|
|
refreshExpires := futureUTCTimestamp(token.RefreshTokenExpiresIn)
|
|
setter := models.OauthTokenSetter{
|
|
AccessToken: omit.From(token.AccessToken),
|
|
AccessTokenExpires: omit.From(accessExpires),
|
|
RefreshToken: omit.From(token.RefreshToken),
|
|
RefreshTokenExpires: omit.From(refreshExpires),
|
|
Username: omit.From(token.Username),
|
|
}
|
|
err = user.InsertUserOauthTokens(ctx, db.PGInstance.BobDB, &setter)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to save token to database: %w", err)
|
|
}
|
|
go updateArcgisUserData(context.Background(), user, token.AccessToken, accessExpires, token.RefreshToken, refreshExpires)
|
|
return nil
|
|
}
|
|
|
|
func hasFieldseekerConnection(ctx context.Context, user *models.User) (bool, error) {
|
|
result, err := sql.OauthTokenByUserId(user.ID).All(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(result) > 0, nil
|
|
}
|
|
func redirectURL() string {
|
|
return BaseURL + "/arcgis/oauth/callback"
|
|
}
|
|
|
|
// This is a goroutine that is in charge of getting Fieldseeker data and keeping it fresh.
|
|
func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
|
|
for {
|
|
workerCtx, cancel := context.WithCancel(context.Background())
|
|
var wg sync.WaitGroup
|
|
|
|
oauths, err := models.OauthTokens.Query(models.SelectWhere.OauthTokens.InvalidatedAt.IsNull()).All(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get oauths")
|
|
return
|
|
}
|
|
for _, oauth := range oauths {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := maintainOAuth(workerCtx, oauth)
|
|
if err != nil {
|
|
markTokenFailed(ctx, oauth)
|
|
if errors.Is(err, arcgis.InvalidatedRefreshTokenError) {
|
|
log.Info().Int("oauth_token.id", int(oauth.ID)).Msg("Marked invalid by the server")
|
|
} else {
|
|
LogErrorTypeInfo(err)
|
|
log.Error().Err(err).Msg("Crashed oauth maintenance goroutine")
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
orgs, err := models.Organizations.Query().All(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get orgs")
|
|
return
|
|
}
|
|
for _, org := range orgs {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := periodicallyExportFieldseeker(workerCtx, org)
|
|
if err != nil {
|
|
if errors.Is(err, &NoOAuthForOrg{}) {
|
|
log.Info().Int("organization_id", int(org.ID)).Msg("No oauth available for organization, exiting exporter.")
|
|
return
|
|
}
|
|
log.Error().Err(err).Msg("Crashed fieldseeker export goroutine")
|
|
}
|
|
}()
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Info().Msg("Exiting refresh worker...")
|
|
cancel()
|
|
wg.Wait()
|
|
return
|
|
case <-newOauthCh:
|
|
log.Info().Msg("Updating oauth background work")
|
|
cancel()
|
|
wg.Wait()
|
|
}
|
|
}
|
|
}
|
|
|
|
type SyncStats struct {
|
|
Inserts uint
|
|
Updates uint
|
|
Unchanged uint
|
|
}
|
|
|
|
func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (SyncStats, error) {
|
|
var stats SyncStats
|
|
count, err := fssync.QueryCount(layer.ID)
|
|
if err != nil {
|
|
return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err)
|
|
}
|
|
log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("Starting on layer")
|
|
if count.Count == 0 {
|
|
return stats, nil
|
|
}
|
|
pool := pond.NewResultPool[SyncStats](20)
|
|
group := pool.NewGroup()
|
|
maxRecords := uint(fssync.MaxRecordCount())
|
|
for offset := uint(0); offset < uint(count.Count); offset += maxRecords {
|
|
group.SubmitErr(func() (SyncStats, error) {
|
|
/*query := arcgis.NewQuery()
|
|
query.ResultRecordCount = maxRecords
|
|
query.ResultOffset = offset
|
|
query.SpatialReference = "4326"
|
|
query.OutFields = "*"
|
|
query.Where = "1=1"
|
|
qr, err := fssync.DoQuery(
|
|
layer.ID,
|
|
query)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %w", layer.Name, layer.ID, offset, err)
|
|
}
|
|
|
|
i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr, org_id)
|
|
if err != nil {
|
|
filename := fmt.Sprintf("failure-%s-%d-%d.json", layer.Name, layer.ID, offset)
|
|
saveRawQuery(fssync, layer, query, filename)
|
|
log.Error().Err(err).Msg("Faild to save DB records")
|
|
return SyncStats{}, fmt.Errorf("Failed to save records: %w", err)
|
|
}
|
|
return SyncStats{
|
|
Inserts: i,
|
|
Updates: u,
|
|
Unchanged: len(qr.Features) - u - i,
|
|
}, nil
|
|
*/
|
|
return SyncStats{
|
|
Inserts: 0,
|
|
Updates: 0,
|
|
Unchanged: 0,
|
|
}, nil
|
|
})
|
|
}
|
|
results, err := group.Wait()
|
|
if err != nil {
|
|
return stats, fmt.Errorf("one or more tasks in the work pool failed: %w", err)
|
|
}
|
|
for _, r := range results {
|
|
stats.Inserts += r.Inserts
|
|
stats.Updates += r.Updates
|
|
stats.Unchanged += r.Unchanged
|
|
}
|
|
log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Msg("Finished layer")
|
|
return stats, nil
|
|
}
|
|
|
|
func getOAuthForOrg(ctx context.Context, org *models.Organization) (*models.OauthToken, error) {
|
|
users, err := org.User().All(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to query all users for org: %w", err)
|
|
}
|
|
for _, user := range users {
|
|
oauths, err := user.UserOauthTokens(models.SelectWhere.OauthTokens.InvalidatedAt.IsNull()).All(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to query all oauth tokens for org: %w", err)
|
|
}
|
|
for _, oauth := range oauths {
|
|
return oauth, nil
|
|
}
|
|
}
|
|
return nil, &NoOAuthForOrg{}
|
|
}
|
|
|
|
func periodicallyExportFieldseeker(ctx context.Context, org *models.Organization) error {
|
|
pollTicker := time.NewTicker(1)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-pollTicker.C:
|
|
oauth, err := getOAuthForOrg(ctx, org)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to get oauth for org: %w", err)
|
|
}
|
|
err = exportFieldseekerData(ctx, org, oauth)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to export Fieldseeker data: %w", err)
|
|
}
|
|
log.Info().Msg("Completed exporting data, waiting 15 minutes to go agoin.")
|
|
pollTicker = time.NewTicker(15 * time.Minute)
|
|
}
|
|
}
|
|
}
|
|
func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth *models.OauthToken) error {
|
|
log.Info().Msg("Update Fieldseeker data")
|
|
var err error
|
|
ar := arcgis.NewArcGIS(
|
|
arcgis.AuthenticatorOAuth{
|
|
AccessToken: oauth.AccessToken,
|
|
AccessTokenExpires: oauth.AccessTokenExpires,
|
|
RefreshToken: oauth.RefreshToken,
|
|
RefreshTokenExpires: oauth.RefreshTokenExpires,
|
|
},
|
|
)
|
|
row, err := sql.OrgByOauthId(oauth.ID).One(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to get org ID: %w", err)
|
|
}
|
|
fssync, err := fieldseeker.NewFieldSeeker(
|
|
ar,
|
|
row.FieldseekerURL.MustGet(),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to create fssync: %w", err)
|
|
}
|
|
var stats SyncStats
|
|
//layers := fssync.FeatureServerLayers()
|
|
|
|
var ss SyncStats
|
|
for _, l := range fssync.FeatureServerLayers() {
|
|
ss, err = exportFieldseekerLayer(ctx, org, fssync, l)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
stats.Inserts += ss.Inserts
|
|
stats.Updates += ss.Updates
|
|
stats.Unchanged += ss.Unchanged
|
|
}
|
|
|
|
setter := models.FieldseekerSyncSetter{
|
|
RecordsCreated: omit.From(int32(stats.Inserts)),
|
|
RecordsUpdated: omit.From(int32(stats.Updates)),
|
|
RecordsUnchanged: omit.From(int32(stats.Unchanged)),
|
|
}
|
|
err = org.InsertFieldseekerSyncs(ctx, db.PGInstance.BobDB, &setter)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to insert sync: %w", err)
|
|
}
|
|
|
|
updateSummaryTables(ctx, org)
|
|
return nil
|
|
}
|
|
|
|
func maintainOAuth(ctx context.Context, oauth *models.OauthToken) error {
|
|
for {
|
|
// Refresh from the database
|
|
oauth, err := models.FindOauthToken(ctx, db.PGInstance.BobDB, oauth.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to update oauth token from database: %w", err)
|
|
}
|
|
var accessTokenDelay time.Duration
|
|
if oauth.AccessTokenExpires.Before(time.Now()) {
|
|
accessTokenDelay = 1
|
|
} else {
|
|
accessTokenDelay = time.Until(oauth.AccessTokenExpires) - (3 * time.Second)
|
|
}
|
|
var refreshTokenDelay time.Duration
|
|
if oauth.RefreshTokenExpires.Before(time.Now()) {
|
|
refreshTokenDelay = 1
|
|
} else {
|
|
refreshTokenDelay = time.Until(oauth.RefreshTokenExpires) - (3 * time.Second)
|
|
}
|
|
log.Info().Int("id", int(oauth.ID)).Float64("seconds", accessTokenDelay.Seconds()).Msg("Need to refresh access token")
|
|
log.Info().Int("id", int(oauth.ID)).Float64("seconds", refreshTokenDelay.Seconds()).Msg("Need to refresh refresh token")
|
|
accessTokenTicker := time.NewTicker(accessTokenDelay)
|
|
refreshTokenTicker := time.NewTicker(refreshTokenDelay)
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-accessTokenTicker.C:
|
|
err := refreshAccessToken(ctx, oauth)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to refresh access token: %w", err)
|
|
}
|
|
case <-refreshTokenTicker.C:
|
|
err := refreshRefreshToken(ctx, oauth)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to maintain refresh token: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// Mark that a given oauth token has failed. This includes a notification to
|
|
// the user.
|
|
func markTokenFailed(ctx context.Context, oauth *models.OauthToken) {
|
|
oauthSetter := models.OauthTokenSetter{
|
|
InvalidatedAt: omitnull.From(time.Now()),
|
|
}
|
|
err := oauth.Update(ctx, db.PGInstance.BobDB, &oauthSetter)
|
|
if err != nil {
|
|
log.Error().Str("err", err.Error()).Msg("Failed to mark token failed")
|
|
}
|
|
user, err := models.FindUser(ctx, db.PGInstance.BobDB, oauth.UserID)
|
|
if err != nil {
|
|
log.Error().Str("err", err.Error()).Msg("Failed to get oauth user")
|
|
return
|
|
}
|
|
notifyOauthInvalid(ctx, user)
|
|
log.Info().Int("id", int(oauth.ID)).Msg("Marked oauth token invalid")
|
|
}
|
|
|
|
// Update the access token to keep it fresh and alive
|
|
func refreshAccessToken(ctx context.Context, oauth *models.OauthToken) error {
|
|
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
|
|
|
|
form := url.Values{
|
|
"grant_type": []string{"refresh_token"},
|
|
"client_id": []string{ClientID},
|
|
"refresh_token": []string{oauth.RefreshToken},
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode()))
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to create request: %w", err)
|
|
}
|
|
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
|
|
token, err := handleTokenRequest(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to handle request: %w", err)
|
|
}
|
|
accessExpires := futureUTCTimestamp(token.ExpiresIn)
|
|
setter := models.OauthTokenSetter{
|
|
AccessToken: omit.From(token.AccessToken),
|
|
AccessTokenExpires: omit.From(accessExpires),
|
|
Username: omit.From(token.Username),
|
|
}
|
|
err = oauth.Update(ctx, db.PGInstance.BobDB, &setter)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to update oauth in database: %w", err)
|
|
}
|
|
log.Info().Int("oauth token id", int(oauth.ID)).Msg("Updated oauth token")
|
|
return nil
|
|
}
|
|
|
|
// Update the refresh token to keep it fresh and alive
|
|
func refreshRefreshToken(ctx context.Context, oauth *models.OauthToken) error {
|
|
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
|
|
|
|
form := url.Values{
|
|
"grant_type": []string{"exchange_refresh_token"},
|
|
"client_id": []string{ClientID},
|
|
"redirect_uri": []string{redirectURL()},
|
|
"refresh_token": []string{oauth.RefreshToken},
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode()))
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to create request: %w", err)
|
|
}
|
|
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
|
|
token, err := handleTokenRequest(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to handle request: %w", err)
|
|
}
|
|
refreshExpires := futureUTCTimestamp(token.ExpiresIn)
|
|
setter := models.OauthTokenSetter{
|
|
RefreshToken: omit.From(token.RefreshToken),
|
|
RefreshTokenExpires: omit.From(refreshExpires),
|
|
Username: omit.From(token.Username),
|
|
}
|
|
err = oauth.Update(ctx, db.PGInstance.BobDB, &setter)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to update oauth in database: %w", err)
|
|
}
|
|
log.Info().Int("oauth token id", int(oauth.ID)).Msg("Updated oauth token")
|
|
return nil
|
|
}
|
|
|
|
func newTimestampedFilename(prefix, suffix string) string {
|
|
timestamp := time.Now().Format("20060102_150405") // YYYYMMDD_HHMMSS format
|
|
return prefix + timestamp + suffix
|
|
}
|
|
|
|
func handleTokenRequest(ctx context.Context, req *http.Request) (*OAuthTokenResponse, error) {
|
|
client := http.Client{}
|
|
log.Info().Str("url", req.URL.String()).Msg("POST")
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to do request: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
bodyBytes, err := io.ReadAll(resp.Body)
|
|
log.Info().Int("status", resp.StatusCode).Msg("Token request")
|
|
filename := newTimestampedFilename("token", ".json")
|
|
saveResponse(bodyBytes, filename)
|
|
if resp.StatusCode >= http.StatusBadRequest {
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Got status code %d and failed to read response body: %w", resp.StatusCode, err)
|
|
}
|
|
bodyString := string(bodyBytes)
|
|
var errorResp arcgis.ErrorResponse
|
|
if err := json.Unmarshal(bodyBytes, &errorResp); err == nil {
|
|
if errorResp.Error.Code == 498 && errorResp.Error.Description == "invalidated refresh_token" {
|
|
return nil, InvalidatedTokenError{}
|
|
}
|
|
return nil, fmt.Errorf("API response JSON error: %d: %w", resp.StatusCode, errorResp)
|
|
}
|
|
return nil, fmt.Errorf("API returned error status %d: %s", resp.StatusCode, bodyString)
|
|
}
|
|
//logResponseHeaders(resp)
|
|
var tokenResponse OAuthTokenResponse
|
|
err = json.Unmarshal(bodyBytes, &tokenResponse)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to unmarshal JSON: %w", err)
|
|
}
|
|
// Just because we got a 200-level status code doesn't mean it worked. Experience has taught us that
|
|
// we can get errors without anything indicated in the headers or the status code
|
|
if tokenResponse == (OAuthTokenResponse{}) {
|
|
var errorResponse arcgis.ErrorResponse
|
|
err = json.Unmarshal(bodyBytes, &errorResponse)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to unmarshal error JSON: %w", err)
|
|
}
|
|
if errorResponse.Error.Code > 0 {
|
|
return nil, errorResponse.AsError()
|
|
}
|
|
}
|
|
log.Info().Str("refresh token", tokenResponse.RefreshToken).Str("access token", tokenResponse.AccessToken).Int("access expires", tokenResponse.ExpiresIn).Int("refresh expires", tokenResponse.RefreshTokenExpiresIn).Msg("Oauth token acquired")
|
|
return &tokenResponse, nil
|
|
}
|
|
|
|
func logResponseHeaders(resp *http.Response) {
|
|
if resp == nil {
|
|
log.Info().Msg("Response is nil")
|
|
return
|
|
}
|
|
|
|
log.Info().Str("status", resp.Status).Int("statusCode", resp.StatusCode).Msg("HTTP Response headers")
|
|
|
|
for name, values := range resp.Header {
|
|
log.Info().Str("name", name).Strs("values", values).Msg("Header")
|
|
}
|
|
}
|
|
|
|
func saveResponse(data []byte, filename string) {
|
|
dest, err := os.Create(filename)
|
|
if err != nil {
|
|
log.Error().Str("filename", filename).Str("err", err.Error()).Msg("Failed to create file")
|
|
return
|
|
}
|
|
_, err = io.Copy(dest, bytes.NewReader(data))
|
|
if err != nil {
|
|
log.Error().Str("filename", filename).Str("err", err.Error()).Msg("Failed to write")
|
|
return
|
|
}
|
|
log.Info().Str("filename", filename).Msg("Wrote response")
|
|
}
|
|
|
|
/*
|
|
func saveRawQuery(fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, query *arcgis.Query, filename string) {
|
|
output, err := os.Create(filename)
|
|
if err != nil {
|
|
log.Error().Str("filename", filename).Msg("Failed to create file")
|
|
return
|
|
}
|
|
qr, err := fssync.DoQueryRaw(
|
|
layer.ID,
|
|
query)
|
|
if err != nil {
|
|
log.Error().Str("err", err.Error()).Msg("Failed to do query")
|
|
return
|
|
}
|
|
_, err = output.Write(qr)
|
|
if err != nil {
|
|
log.Error().Str("err", err.Error()).Msg("Failed to write results")
|
|
return
|
|
}
|
|
log.Info().Str("filename", filename).Msg("Wrote failed query")
|
|
}
|
|
*/
|
|
|
|
func saveOrUpdateDBRecords(ctx context.Context, table string, qr *arcgis.QueryResult, org_id int32) (int, int, error) {
|
|
inserts, updates := 0, 0
|
|
sorted_columns := make([]string, 0, len(qr.Fields))
|
|
for _, f := range qr.Fields {
|
|
sorted_columns = append(sorted_columns, f.Name)
|
|
}
|
|
sort.Strings(sorted_columns)
|
|
|
|
objectids := make([]int, 0)
|
|
for _, l := range qr.Features {
|
|
oid := l.Attributes["OBJECTID"].(float64)
|
|
objectids = append(objectids, int(oid))
|
|
}
|
|
|
|
rows_by_objectid, err := rowmapViaQuery(ctx, table, sorted_columns, objectids)
|
|
if err != nil {
|
|
return inserts, updates, fmt.Errorf("Failed to get existing rows: %w", err)
|
|
}
|
|
// log.Println("Rows from query", len(rows_by_objectid))
|
|
|
|
for _, feature := range qr.Features {
|
|
oid := feature.Attributes["OBJECTID"].(float64)
|
|
row := rows_by_objectid[int(oid)]
|
|
// If we have no matching row we'll need to create it
|
|
if len(row) == 0 {
|
|
|
|
if err := insertRowFromFeature(ctx, table, sorted_columns, &feature, org_id); err != nil {
|
|
return inserts, updates, fmt.Errorf("Failed to insert row: %w", err)
|
|
}
|
|
inserts += 1
|
|
} else if hasUpdates(row, feature) {
|
|
if err := updateRowFromFeature(ctx, table, sorted_columns, &feature, org_id); err != nil {
|
|
return inserts, updates, fmt.Errorf("Failed to update row: %w", err)
|
|
}
|
|
updates += 1
|
|
}
|
|
}
|
|
return inserts, updates, nil
|
|
}
|
|
|
|
// Produces a map of OBJECTID to a 'row' which is in turn a map of column names to their values as strings
|
|
func rowmapViaQuery(ctx context.Context, table string, sorted_columns []string, objectids []int) (map[int]map[string]string, error) {
|
|
result := make(map[int]map[string]string)
|
|
|
|
query := selectAllFromQueryResult(table, sorted_columns)
|
|
|
|
args := pgx.NamedArgs{
|
|
"objectids": objectids,
|
|
}
|
|
rows, err := db.PGInstance.PGXPool.Query(ctx, query, args)
|
|
if err != nil {
|
|
return result, fmt.Errorf("Failed to query rows: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
// +2 for geometry x and geometry x
|
|
columnNames := make([]string, len(sorted_columns)+2)
|
|
for i, c := range sorted_columns {
|
|
columnNames[i] = c
|
|
}
|
|
columnNames[len(sorted_columns)] = "geometry_x"
|
|
columnNames[len(sorted_columns)+1] = "geometry_y"
|
|
|
|
rowSlice, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (map[string]string, error) {
|
|
fieldDescriptions := row.FieldDescriptions()
|
|
values := make([]interface{}, len(fieldDescriptions))
|
|
valuePtrs := make([]interface{}, len(fieldDescriptions))
|
|
|
|
for i := range values {
|
|
valuePtrs[i] = &values[i]
|
|
}
|
|
|
|
if err := row.Scan(valuePtrs...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := make(map[string]string)
|
|
for i, fd := range fieldDescriptions {
|
|
if values[i] != nil {
|
|
result[fd.Name] = fmt.Sprintf("%v", values[i])
|
|
//log.Printf("col %v type %T val %v", fd.Name, values[i], values[i])
|
|
} else {
|
|
result[fd.Name] = ""
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
})
|
|
if err != nil {
|
|
return result, fmt.Errorf("Failed to collect rows: %w", err)
|
|
}
|
|
for _, row := range rowSlice {
|
|
o := row["objectid"]
|
|
objectid, err := strconv.Atoi(o)
|
|
if err != nil {
|
|
return result, fmt.Errorf("Failed to parse objectid %s: %w", o, err)
|
|
}
|
|
result[objectid] = row
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func insertRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error {
|
|
var options pgx.TxOptions
|
|
transaction, err := db.PGInstance.PGXPool.BeginTx(ctx, options)
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to start transaction")
|
|
}
|
|
|
|
err = insertRowFromFeatureFS(ctx, transaction, table, sorted_columns, feature, org_id)
|
|
if err != nil {
|
|
transaction.Rollback(ctx)
|
|
return fmt.Errorf("Unable to insert FS: %w", err)
|
|
}
|
|
|
|
err = insertRowFromFeatureHistory(ctx, transaction, table, sorted_columns, feature, org_id, 1)
|
|
if err != nil {
|
|
transaction.Rollback(ctx)
|
|
return fmt.Errorf("Failed to insert history: %w", err)
|
|
}
|
|
|
|
err = transaction.Commit(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to commit transaction: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func insertRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error {
|
|
// Create the query to produce the main row
|
|
var sb strings.Builder
|
|
sb.WriteString("INSERT INTO ")
|
|
sb.WriteString(table)
|
|
sb.WriteString(" (")
|
|
for _, field := range sorted_columns {
|
|
sb.WriteString(field)
|
|
sb.WriteString(",")
|
|
}
|
|
// Specially add the geometry values since they aren't in the fields
|
|
sb.WriteString("geometry_x,geometry_y,organization_id,updated")
|
|
sb.WriteString(")\nVALUES (")
|
|
for _, field := range sorted_columns {
|
|
sb.WriteString("@")
|
|
sb.WriteString(field)
|
|
sb.WriteString(",")
|
|
}
|
|
// Specially add the geometry values since they aren't in the fields
|
|
sb.WriteString("@geometry_x,@geometry_y,@organization_id,@updated)")
|
|
|
|
args := pgx.NamedArgs{}
|
|
for k, v := range feature.Attributes {
|
|
args[k] = v
|
|
}
|
|
// specially add geometry since it isn't in the list of attributes
|
|
//args["geometry_x"] = feature.Geometry.X
|
|
//args["geometry_y"] = feature.Geometry.Y
|
|
args["organization_id"] = org_id
|
|
args["updated"] = time.Now()
|
|
|
|
_, err := transaction.Exec(ctx, sb.String(), args)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to insert row into %s: %w", table, err)
|
|
}
|
|
return nil
|
|
}
|
|
func hasUpdates(row map[string]string, feature arcgis.Feature) bool {
|
|
for key, value := range feature.Attributes {
|
|
rowdata := row[strings.ToLower(key)]
|
|
// We'll accept any 'nil' as represented by the empty string in the database
|
|
if value == nil {
|
|
if rowdata == "" {
|
|
continue
|
|
} else if len(rowdata) > 0 {
|
|
return true
|
|
} else {
|
|
log.Error().Msg("Looks like our original value is nil, but our row value is something non-empty with a zero length. Need a programmer to look into this.")
|
|
}
|
|
}
|
|
// check strings first, their simplest
|
|
if featureAsString, ok := value.(string); ok {
|
|
if featureAsString != rowdata {
|
|
return true
|
|
}
|
|
continue
|
|
} else if featureAsInt, ok := value.(int); ok {
|
|
// Previously had a nil value, now we have a real value
|
|
if rowdata == "" {
|
|
return true
|
|
}
|
|
rowAsInt, err := strconv.Atoi(rowdata)
|
|
if err != nil {
|
|
log.Error().Msg(fmt.Sprintf("Failed to convert '%s' to an int to compare against %v for %v", rowdata, featureAsInt, key))
|
|
}
|
|
if rowAsInt != featureAsInt {
|
|
return true
|
|
} else {
|
|
continue
|
|
}
|
|
} else if featureAsFloat, ok := value.(float64); ok {
|
|
// Previously had a nil value, now we have a real value
|
|
if rowdata == "" {
|
|
return true
|
|
}
|
|
rowAsFloat, err := strconv.ParseFloat(rowdata, 64)
|
|
if err != nil {
|
|
log.Error().Msg(fmt.Sprintf("Failed to convert '%s' to a float64 to compare against %v for %v", rowdata, featureAsFloat, key))
|
|
}
|
|
if rowAsFloat != featureAsFloat {
|
|
return true
|
|
} else {
|
|
continue
|
|
}
|
|
}
|
|
log.Info().Msg(fmt.Sprintf("key: %s\tvalue: %w (type %T)\trow: %s\n", key, value, value, rowdata))
|
|
log.Error().Msg("we've hit a point where we can't tell if we have an update or not, need a programmer to look at the above")
|
|
}
|
|
return false
|
|
}
|
|
func updateRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error {
|
|
// Get the current highest version for the row in question
|
|
history_table := toHistoryTable(table)
|
|
var sb strings.Builder
|
|
sb.WriteString("SELECT MAX(version) FROM ")
|
|
sb.WriteString(history_table)
|
|
sb.WriteString(" WHERE OBJECTID=@objectid")
|
|
|
|
args := pgx.NamedArgs{}
|
|
o := feature.Attributes["OBJECTID"].(float64)
|
|
args["objectid"] = int(o)
|
|
|
|
var version int
|
|
if err := db.PGInstance.PGXPool.QueryRow(ctx, sb.String(), args).Scan(&version); err != nil {
|
|
return fmt.Errorf("Failed to query for version: %w", err)
|
|
}
|
|
|
|
var options pgx.TxOptions
|
|
transaction, err := db.PGInstance.PGXPool.BeginTx(ctx, options)
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to start transaction")
|
|
}
|
|
|
|
err = insertRowFromFeatureHistory(ctx, transaction, table, sorted_columns, feature, org_id, version+1)
|
|
if err != nil {
|
|
transaction.Rollback(ctx)
|
|
return fmt.Errorf("Failed to insert history: %w", err)
|
|
}
|
|
err = updateRowFromFeatureFS(ctx, transaction, table, sorted_columns, feature)
|
|
if err != nil {
|
|
transaction.Rollback(ctx)
|
|
return fmt.Errorf("Failed to update row from feature: %w", err)
|
|
}
|
|
|
|
err = transaction.Commit(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to commit transaction: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
func insertRowFromFeatureHistory(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32, version int) error {
|
|
history_table := toHistoryTable(table)
|
|
var sb strings.Builder
|
|
sb.WriteString("INSERT INTO ")
|
|
sb.WriteString(history_table)
|
|
sb.WriteString(" (")
|
|
for _, field := range sorted_columns {
|
|
sb.WriteString(field)
|
|
sb.WriteString(",")
|
|
}
|
|
// Specially add the geometry values since they aren't in the fields
|
|
sb.WriteString("created,geometry_x,geometry_y,organization_id,version")
|
|
sb.WriteString(")\nVALUES (")
|
|
for _, field := range sorted_columns {
|
|
sb.WriteString("@")
|
|
sb.WriteString(field)
|
|
sb.WriteString(",")
|
|
}
|
|
// Specially add the geometry values since they aren't in the fields
|
|
sb.WriteString("@created,@geometry_x,@geometry_y,@organization_id,@version)")
|
|
args := pgx.NamedArgs{}
|
|
for k, v := range feature.Attributes {
|
|
args[k] = v
|
|
}
|
|
args["created"] = time.Now()
|
|
args["organization_id"] = org_id
|
|
args["version"] = version
|
|
if _, err := transaction.Exec(ctx, sb.String(), args); err != nil {
|
|
return fmt.Errorf("Failed to insert history row into %s: %w", table, err)
|
|
}
|
|
return nil
|
|
}
|
|
func selectAllFromQueryResult(table string, sorted_columns []string) string {
|
|
var sb strings.Builder
|
|
sb.WriteString("SELECT * FROM ")
|
|
sb.WriteString(table)
|
|
sb.WriteString(" WHERE OBJECTID=ANY(@objectids)")
|
|
return sb.String()
|
|
}
|
|
func toHistoryTable(table string) string {
|
|
return "History_" + table[3:len(table)]
|
|
}
|
|
|
|
func updateRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature) error {
|
|
// Create the query to produce the main row
|
|
var sb strings.Builder
|
|
sb.WriteString("UPDATE ")
|
|
sb.WriteString(table)
|
|
sb.WriteString(" SET ")
|
|
for _, field := range sorted_columns {
|
|
// OBJECTID is special as our primary key, so skip it
|
|
if field == "OBJECTID" {
|
|
continue
|
|
}
|
|
sb.WriteString(field)
|
|
sb.WriteString("=@")
|
|
sb.WriteString(field)
|
|
sb.WriteString(",")
|
|
}
|
|
// Specially add the geometry values since they aren't in the fields
|
|
sb.WriteString("geometry_x=@geometry_x,geometry_y=@geometry_y,updated=@updated WHERE OBJECTID=@OBJECTID")
|
|
|
|
args := pgx.NamedArgs{}
|
|
for k, v := range feature.Attributes {
|
|
args[k] = v
|
|
}
|
|
// specially add geometry since it isn't in the list of attributes
|
|
//args["geometry_x"] = feature.Geometry.X
|
|
//args["geometry_y"] = feature.Geometry.Y
|
|
args["updated"] = time.Now()
|
|
|
|
_, err := transaction.Exec(ctx, sb.String(), args)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to update row into %s: %w", table, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func updateSummaryTables(ctx context.Context, org *models.Organization) {
|
|
/*org, err := models.FindOrganization(ctx, PGInstance.BobDB, org_id)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get organization")
|
|
}*/
|
|
log.Info().Int("org_id", int(org.ID)).Msg("Getting point locations")
|
|
point_locations, err := org.Pointlocations().All(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get organization")
|
|
return
|
|
}
|
|
log.Info().Int("count", len(point_locations)).Msg("Summarizing point locations")
|
|
|
|
for i := range 16 {
|
|
log.Info().Int("resolution", i).Msg("Working summary layer")
|
|
cellToCount := make(map[h3.Cell]int, 0)
|
|
for _, p := range point_locations {
|
|
p, err := getPoint(p.Geometry)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get geometry point")
|
|
continue
|
|
}
|
|
cell, err := h3.LatLngToCell(p, i)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get cell")
|
|
continue
|
|
}
|
|
//log.Info().Float64("X", p.GeometryX).Float64("Y", p.GeometryY).Str("cell", cell.String()).Msg("Converted lat/lng")
|
|
cellToCount[cell] = cellToCount[cell] + 1
|
|
}
|
|
var to_insert []bob.Mod[*dialect.InsertQuery] = make([]bob.Mod[*dialect.InsertQuery], 0)
|
|
to_insert = append(to_insert, im.Into("h3_aggregation", "cell", "resolution", "count_", "type_", "organization_id", "geometry"))
|
|
for cell, count := range cellToCount {
|
|
polygon, err := cellToPostgisGeometry(cell)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get PostGIS geometry")
|
|
continue
|
|
}
|
|
// log.Info().Str("polygon", polygon).Msg("Going to insert")
|
|
to_insert = append(to_insert, im.Values(psql.Arg(cell.String(), i, count, enums.H3aggregationtypeServicerequest, org.ID), psql.F("st_geomfromtext", psql.S(polygon), 4326)))
|
|
}
|
|
to_insert = append(to_insert, im.OnConflict("cell, organization_id, type_").DoUpdate(
|
|
im.SetCol("count_").To(psql.Raw("EXCLUDED.count_")),
|
|
))
|
|
//log.Info().Str("sql", insertQueryToString(psql.Insert(to_insert...))).Msg("Updating...")
|
|
_, err := psql.Insert(to_insert...).Exec(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Faild to add h3 aggregation")
|
|
}
|
|
}
|
|
}
|
|
|
|
func exportFieldseekerLayer(ctx context.Context, org *models.Organization, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature) (SyncStats, error) {
|
|
var stats SyncStats
|
|
count, err := fssync.QueryCount(layer.ID)
|
|
if err != nil {
|
|
return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %w", layer.Name, layer.ID, err)
|
|
}
|
|
if count.Count == 0 {
|
|
log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("No records to download")
|
|
return stats, nil
|
|
}
|
|
log.Info().Str("name", layer.Name).Uint("id", layer.ID).Msg("Starting on layer")
|
|
pool := pond.NewResultPool[SyncStats](20)
|
|
group := pool.NewGroup()
|
|
maxRecords := uint(fssync.MaxRecordCount())
|
|
l, err := fieldseeker.NameToLayerType(layer.Name)
|
|
if err != nil {
|
|
return stats, fmt.Errorf("Failed to get layer for '%s': %w", layer.Name, err)
|
|
}
|
|
for offset := uint(0); offset < uint(count.Count); offset += maxRecords {
|
|
group.SubmitErr(func() (SyncStats, error) {
|
|
var ss SyncStats
|
|
var name string
|
|
var inserts, unchanged, updates uint
|
|
var err error
|
|
switch l {
|
|
case fieldseeker.LayerAerialSpraySession:
|
|
name = "AerialSpraySession"
|
|
rows, err := fssync.AerialSpraySession(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateAerialSpraySession(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerAerialSprayLine:
|
|
name = "LayerAerialSprayLine"
|
|
rows, err := fssync.AerialSprayLine(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateAerialSprayLine(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerBarrierSpray:
|
|
name = "LayerBarrierSpray"
|
|
rows, err := fssync.BarrierSpray(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateBarrierSpray(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerBarrierSprayRoute:
|
|
name = "LayerBarrierSprayRoute"
|
|
rows, err := fssync.BarrierSprayRoute(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateBarrierSprayRoute(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerContainerRelate:
|
|
name = "LayerContainerRelate"
|
|
rows, err := fssync.ContainerRelate(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateContainerRelate(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerFieldScoutingLog:
|
|
name = "LayerFieldScoutingLog"
|
|
rows, err := fssync.FieldScoutingLog(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateFieldScoutingLog(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerHabitatRelate:
|
|
name = "LayerHabitatRelate"
|
|
rows, err := fssync.HabitatRelate(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateHabitatRelate(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerInspectionSample:
|
|
name = "LayerInspectionSample"
|
|
rows, err := fssync.InspectionSample(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateInspectionSample(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerInspectionSampleDetail:
|
|
name = "LayerInspectionSampleDetail"
|
|
rows, err := fssync.InspectionSampleDetail(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateInspectionSampleDetail(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerLandingCount:
|
|
name = "LayerLandingCount"
|
|
rows, err := fssync.LandingCount(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateLandingCount(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerLandingCountLocation:
|
|
name = "LayerLandingCountLocation"
|
|
rows, err := fssync.LandingCountLocation(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateLandingCountLocation(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerLineLocation:
|
|
name = "LayerLineLocation"
|
|
rows, err := fssync.LineLocation(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateLineLocation(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerLocationTracking:
|
|
name = "LayerLocationTracking"
|
|
rows, err := fssync.LocationTracking(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateLocationTracking(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerMosquitoInspection:
|
|
name = "LayerMosquitoInspection"
|
|
rows, err := fssync.MosquitoInspection(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateMosquitoInspection(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerOfflineMapAreas:
|
|
name = "LayerOfflineMapAreas"
|
|
rows, err := fssync.OfflineMapAreas(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateOfflineMapAreas(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerProposedTreatmentArea:
|
|
name = "LayerProposedTreatmentArea"
|
|
rows, err := fssync.ProposedTreatmentArea(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateProposedTreatmentArea(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerPointLocation:
|
|
name = "LayerPointLocation"
|
|
rows, err := fssync.PointLocation(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdatePointLocation(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerPolygonLocation:
|
|
name = "LayerPolygonLocation"
|
|
rows, err := fssync.PolygonLocation(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdatePolygonLocation(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerPoolDetail:
|
|
name = "LayerPoolDetail"
|
|
rows, err := fssync.PoolDetail(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdatePoolDetail(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerPool:
|
|
name = "LayerPool"
|
|
rows, err := fssync.Pool(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdatePool(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerPoolBuffer:
|
|
name = "LayerPoolBuffer"
|
|
rows, err := fssync.PoolBuffer(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdatePoolBuffer(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerQALarvCount:
|
|
name = "LayerQALarvCount"
|
|
rows, err := fssync.QALarvCount(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateQALarvCount(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerQAMosquitoInspection:
|
|
name = "LayerQAMosquitoInspection"
|
|
rows, err := fssync.QAMosquitoInspection(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateQAMosquitoInspection(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerQAProductObservation:
|
|
name = "LayerQAProductObservation"
|
|
rows, err := fssync.QAProductObservation(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateQAProductObservation(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerRestrictedArea:
|
|
name = "LayerRestrictedArea"
|
|
rows, err := fssync.RestrictedArea(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateRestrictedArea(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerRodentInspection:
|
|
name = "LayerRodentInspection"
|
|
rows, err := fssync.RodentInspection(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateRodentInspection(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerRodentLocation:
|
|
name = "LayerRodentLocation"
|
|
rows, err := fssync.RodentLocation(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateRodentLocation(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerSampleCollection:
|
|
name = "LayerSampleCollection"
|
|
rows, err := fssync.SampleCollection(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateSampleCollection(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerSampleLocation:
|
|
name = "LayerSampleLocation"
|
|
rows, err := fssync.SampleLocation(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateSampleLocation(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerServiceRequest:
|
|
name = "LayerServiceRequest"
|
|
rows, err := fssync.ServiceRequest(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateServiceRequest(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerSpeciesAbundance:
|
|
name = "LayerSpeciesAbundance"
|
|
rows, err := fssync.SpeciesAbundance(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateSpeciesAbundance(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerStormDrain:
|
|
name = "LayerStormDrain"
|
|
rows, err := fssync.StormDrain(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateStormDrain(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerTracklog:
|
|
name = "LayerTracklog"
|
|
rows, err := fssync.Tracklog(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateTracklog(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerTrapLocation:
|
|
name = "LayerTrapLocation"
|
|
rows, err := fssync.TrapLocation(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateTrapLocation(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerTrapData:
|
|
name = "LayerTrapData"
|
|
rows, err := fssync.TrapData(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateTrapData(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerTimeCard:
|
|
name = "LayerTimeCard"
|
|
rows, err := fssync.TimeCard(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateTimeCard(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerTreatment:
|
|
name = "LayerTreatment"
|
|
rows, err := fssync.Treatment(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateTreatment(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerTreatmentArea:
|
|
name = "LayerTreatmentArea"
|
|
rows, err := fssync.TreatmentArea(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateTreatmentArea(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerULVSprayRoute:
|
|
name = "LayerULVSprayRoute"
|
|
rows, err := fssync.ULVSprayRoute(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateULVSprayRoute(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerZones:
|
|
name = "LayerZones"
|
|
rows, err := fssync.Zones(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateZones(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
case fieldseeker.LayerZones2:
|
|
name = "LayerZones2"
|
|
rows, err := fssync.Zones2(offset)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to query %s: %w", name, err)
|
|
}
|
|
inserts, updates, err = db.SaveOrUpdateZones2(ctx, org, rows)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to update %s: %w", name, err)
|
|
}
|
|
unchanged = uint(len(rows)) - inserts - updates
|
|
default:
|
|
return ss, errors.New("Unrecognized layer")
|
|
}
|
|
ss.Inserts = inserts
|
|
ss.Updates = updates
|
|
ss.Unchanged = unchanged
|
|
return ss, err
|
|
})
|
|
}
|
|
results, err := group.Wait()
|
|
if err != nil {
|
|
return stats, fmt.Errorf("one or more tasks in the work pool failed: %w", err)
|
|
}
|
|
for _, r := range results {
|
|
stats.Inserts += r.Inserts
|
|
stats.Updates += r.Updates
|
|
stats.Unchanged += r.Unchanged
|
|
}
|
|
log.Info().Uint("inserts", stats.Inserts).Uint("updates", stats.Updates).Uint("no change", stats.Unchanged).Str("layer", layer.Name).Msg("Finished layer")
|
|
return stats, nil
|
|
}
|
|
|
|
|