1033 lines
33 KiB
Go
1033 lines
33 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Gleipnir-Technology/arcgis-go"
|
|
"github.com/Gleipnir-Technology/arcgis-go/fieldseeker"
|
|
enums "github.com/Gleipnir-Technology/nidus-sync/enums"
|
|
"github.com/Gleipnir-Technology/nidus-sync/models"
|
|
"github.com/Gleipnir-Technology/nidus-sync/sql"
|
|
"github.com/aarondl/opt/omit"
|
|
"github.com/aarondl/opt/omitnull"
|
|
"github.com/alitto/pond/v2"
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
var NewOAuthTokenChannel chan struct{}
|
|
var CodeVerifier string = "random_secure_string_min_43_chars_long_should_be_stored_in_session"
|
|
|
|
type ErrorResponse struct {
|
|
Error ErrorResponseContent `json:"error"`
|
|
}
|
|
|
|
type ErrorResponseContent struct {
|
|
Code int `json:"code"`
|
|
Error string `json:"error"`
|
|
ErrorDescription string `json:"error_description"`
|
|
Message string `json:"message"`
|
|
Details []string `json:"details"`
|
|
}
|
|
|
|
type OAuthTokenResponse struct {
|
|
AccessToken string `json:"access_token"`
|
|
ExpiresIn int `json:"expires_in"`
|
|
RefreshToken string `json:"refresh_token"`
|
|
RefreshTokenExpiresIn int `json:"refresh_token_expires_in"`
|
|
SSL bool `json:"ssl"`
|
|
Username string `json:"username"`
|
|
}
|
|
|
|
// Build the ArcGIS authorization URL with PKCE
|
|
func buildArcGISAuthURL(clientID string, expiration int) string {
|
|
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/authorize/"
|
|
|
|
params := url.Values{}
|
|
params.Add("client_id", clientID)
|
|
params.Add("redirect_uri", redirectURL())
|
|
params.Add("response_type", "code")
|
|
//params.Add("code_challenge", generateCodeChallenge(codeVerifier))
|
|
//params.Add("code_challenge_method", "S256")
|
|
params.Add("expiration", strconv.Itoa(expiration))
|
|
|
|
return baseURL + "?" + params.Encode()
|
|
}
|
|
|
|
func futureUTCTimestamp(secondsFromNow int) time.Time {
|
|
return time.Now().UTC().Add(time.Duration(secondsFromNow) * time.Second)
|
|
}
|
|
|
|
// Helper function to generate code challenge from code verifier
|
|
func generateCodeChallenge(codeVerifier string) string {
|
|
hash := sha256.Sum256([]byte(codeVerifier))
|
|
return base64.RawURLEncoding.EncodeToString(hash[:])
|
|
}
|
|
|
|
// Generate a random code verifier for PKCE
|
|
func generateCodeVerifier() string {
|
|
bytes := make([]byte, 64) // 64 bytes = 512 bits
|
|
rand.Read(bytes)
|
|
return base64.RawURLEncoding.EncodeToString(bytes)
|
|
}
|
|
|
|
// Find out what we can about this user
|
|
func updateArcgisUserData(ctx context.Context, user *models.User, access_token string, access_token_expires time.Time, refresh_token string, refresh_token_expires time.Time) {
|
|
client := arcgis.NewArcGIS(
|
|
arcgis.AuthenticatorOAuth{
|
|
AccessToken: access_token,
|
|
AccessTokenExpires: access_token_expires,
|
|
RefreshToken: refresh_token,
|
|
RefreshTokenExpires: refresh_token_expires,
|
|
},
|
|
)
|
|
portal, err := client.PortalsSelf()
|
|
if err != nil {
|
|
slog.Error("Failed to get ArcGIS user data", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
slog.Info("Got portals data",
|
|
slog.String("Username", portal.User.Username),
|
|
slog.String("user_id", portal.User.ID),
|
|
slog.String("org_id", portal.User.OrgID),
|
|
slog.String("org_name", portal.Name),
|
|
slog.String("license_type_id", portal.User.UserLicenseTypeID))
|
|
|
|
_, err = sql.UpdateOauthTokenOrg(portal.User.ID, portal.User.UserLicenseTypeID, refresh_token).Exec(ctx, PGInstance.BobDB)
|
|
if err != nil {
|
|
slog.Error("Failed to update oauth token portal data", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
var org *models.Organization
|
|
orgs, err := models.Organizations.Query(models.SelectWhere.Organizations.ArcgisName.EQ(portal.Name)).All(ctx, PGInstance.BobDB)
|
|
switch len(orgs) {
|
|
case 0:
|
|
setter := models.OrganizationSetter{
|
|
Name: omitnull.From(portal.Name),
|
|
ArcgisID: omitnull.From(portal.User.OrgID),
|
|
ArcgisName: omitnull.From(portal.Name),
|
|
}
|
|
org, err = models.Organizations.Insert(&setter).One(ctx, PGInstance.BobDB)
|
|
if err != nil {
|
|
slog.Error("Failed to create new organization", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
slog.Info("Created new organization", slog.Int("org_id", int(org.ID)))
|
|
case 1:
|
|
org = orgs[0]
|
|
slog.Info("Organization already exists")
|
|
default:
|
|
slog.Error("Got too many organizations, bailing")
|
|
return
|
|
|
|
}
|
|
if err != nil {
|
|
LogErrorTypeInfo(err)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
} else {
|
|
slog.Error("Failed to query for existing org", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
}
|
|
err = org.AttachUser(ctx, PGInstance.BobDB, user)
|
|
if err != nil {
|
|
slog.Error("Failed to attach user to organization", slog.String("err", err.Error()), slog.Int("user_id", int(user.ID)), slog.Int("org_id", int(org.ID)))
|
|
return
|
|
}
|
|
|
|
search, err := client.Search("Fieldseeker")
|
|
if err != nil {
|
|
slog.Error("Failed to get search FieldseekerGIS data", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
var fieldseekerClient *fieldseeker.FieldSeeker
|
|
for _, result := range search.Results {
|
|
slog.Info("Got result", slog.String("name", result.Name))
|
|
if result.Name == "FieldSeekerGIS" {
|
|
slog.Info("Found Fieldseeker", slog.String("url", result.URL))
|
|
setter := models.OrganizationSetter{
|
|
FieldseekerURL: omitnull.From(result.URL),
|
|
}
|
|
err = org.Update(ctx, PGInstance.BobDB, &setter)
|
|
if err != nil {
|
|
slog.Error("Failed to create new organization", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
fieldseekerClient, err = fieldseeker.NewFieldSeeker(
|
|
client,
|
|
result.URL,
|
|
)
|
|
if err != nil {
|
|
slog.Error("Failed to create fieldseeker client", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
arcgis_id, ok := org.ArcgisID.Get()
|
|
if !ok {
|
|
slog.Error("Cannot get webhooks - ArcGIS ID is null", slog.Int("org.id", int(org.ID)))
|
|
}
|
|
client.Context = &arcgis_id
|
|
err = maybeCreateWebhook(ctx, fieldseekerClient)
|
|
if err != nil {
|
|
slog.Error("Failed to manage webhooks", slog.String("err", err.Error()))
|
|
}
|
|
NewOAuthTokenChannel <- struct{}{}
|
|
}
|
|
|
|
func maybeCreateWebhook(ctx context.Context, client *fieldseeker.FieldSeeker) error {
|
|
webhooks, err := client.WebhookList()
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to get webhooks: %v", err)
|
|
}
|
|
for _, hook := range webhooks {
|
|
if hook.Name == "Nidus Sync" {
|
|
return nil
|
|
} else {
|
|
slog.Info("Found webhook", slog.String("name", hook.Name))
|
|
}
|
|
}
|
|
return errors.New("Not implemented")
|
|
}
|
|
|
|
func handleOauthAccessCode(ctx context.Context, user *models.User, code string) error {
|
|
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
|
|
|
|
//params.Add("code_verifier", "S256")
|
|
|
|
form := url.Values{
|
|
"grant_type": []string{"authorization_code"},
|
|
"code": []string{code},
|
|
"client_id": []string{ClientID},
|
|
"redirect_uri": []string{redirectURL()},
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode()))
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to create request: %v", err)
|
|
}
|
|
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
|
|
token, err := handleTokenRequest(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to exchange authorization code for token: %v", err)
|
|
}
|
|
accessExpires := futureUTCTimestamp(token.ExpiresIn)
|
|
refreshExpires := futureUTCTimestamp(token.RefreshTokenExpiresIn)
|
|
setter := models.OauthTokenSetter{
|
|
AccessToken: omit.From(token.AccessToken),
|
|
AccessTokenExpires: omit.From(accessExpires),
|
|
RefreshToken: omit.From(token.RefreshToken),
|
|
RefreshTokenExpires: omit.From(refreshExpires),
|
|
Username: omit.From(token.Username),
|
|
}
|
|
err = user.InsertUserOauthTokens(ctx, PGInstance.BobDB, &setter)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to save token to database: %v", err)
|
|
}
|
|
go updateArcgisUserData(context.Background(), user, token.AccessToken, accessExpires, token.RefreshToken, refreshExpires)
|
|
return nil
|
|
}
|
|
|
|
func hasFieldseekerConnection(ctx context.Context, user *models.User) (bool, error) {
|
|
result, err := sql.OauthTokenByUserId(user.ID).All(ctx, PGInstance.BobDB)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(result) > 0, nil
|
|
}
|
|
func redirectURL() string {
|
|
return BaseURL + "/arcgis/oauth/callback"
|
|
}
|
|
|
|
// This is a goroutine that is in charge of getting Fieldseeker data and keeping it fresh.
|
|
func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
|
|
for {
|
|
workerCtx, cancel := context.WithCancel(context.Background())
|
|
var wg sync.WaitGroup
|
|
|
|
oauths, err := models.OauthTokens.Query(models.SelectWhere.OauthTokens.InvalidatedAt.IsNull()).All(ctx, PGInstance.BobDB)
|
|
if err != nil {
|
|
slog.Error("Failed to get oauths", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
for _, oauth := range oauths {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := maintainOAuth(workerCtx, oauth)
|
|
if err != nil {
|
|
slog.Error("Crashed oauth maintenance goroutine", slog.String("err", err.Error()))
|
|
}
|
|
}()
|
|
}
|
|
|
|
orgs, err := models.Organizations.Query().All(ctx, PGInstance.BobDB)
|
|
if err != nil {
|
|
slog.Error("Failed to get orgs", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
for _, org := range orgs {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := periodicallyExportFieldseeker(workerCtx, org)
|
|
if err != nil {
|
|
slog.Error("Crashed fieldseeker export goroutine", slog.String("err", err.Error()))
|
|
}
|
|
}()
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
slog.Info("Exiting refresh worker...")
|
|
cancel()
|
|
wg.Wait()
|
|
return
|
|
case <-newOauthCh:
|
|
slog.Info("Updating oauth background work")
|
|
cancel()
|
|
wg.Wait()
|
|
}
|
|
}
|
|
}
|
|
|
|
type SyncStats struct {
|
|
Inserts int
|
|
Updates int
|
|
Unchanged int
|
|
}
|
|
|
|
func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (SyncStats, error) {
|
|
var stats SyncStats
|
|
count, err := fssync.QueryCount(layer.ID)
|
|
if err != nil {
|
|
return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %v", layer.Name, layer.ID, err)
|
|
}
|
|
slog.Info("Starting on layer", slog.String("name", layer.Name), slog.Int("id", layer.ID))
|
|
if count.Count == 0 {
|
|
return stats, nil
|
|
}
|
|
pool := pond.NewResultPool[SyncStats](20)
|
|
group := pool.NewGroup()
|
|
maxRecords := fssync.MaxRecordCount()
|
|
for offset := 0; offset < count.Count; offset += maxRecords {
|
|
group.SubmitErr(func() (SyncStats, error) {
|
|
query := arcgis.NewQuery()
|
|
query.ResultRecordCount = maxRecords
|
|
query.ResultOffset = offset
|
|
query.SpatialReference = "4326"
|
|
query.OutFields = "*"
|
|
query.Where = "1=1"
|
|
qr, err := fssync.DoQuery(
|
|
layer.ID,
|
|
query)
|
|
if err != nil {
|
|
return SyncStats{}, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %v", layer.Name, layer.ID, offset, err)
|
|
}
|
|
i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr, org_id)
|
|
if err != nil {
|
|
filename := fmt.Sprintf("failure-%s-%d-%d.json", layer.Name, layer.ID, offset)
|
|
saveRawQuery(fssync, layer, query, filename)
|
|
slog.Error("Faield to save DB records", slog.String("err", err.Error()))
|
|
return SyncStats{}, fmt.Errorf("Failed to save records: %v", err)
|
|
}
|
|
return SyncStats{
|
|
Inserts: i,
|
|
Updates: u,
|
|
Unchanged: len(qr.Features) - u - i,
|
|
}, nil
|
|
})
|
|
}
|
|
results, err := group.Wait()
|
|
if err != nil {
|
|
return stats, fmt.Errorf("one or more tasks in the work pool failed: %v", err)
|
|
}
|
|
for _, r := range results {
|
|
stats.Inserts += r.Inserts
|
|
stats.Updates += r.Updates
|
|
stats.Unchanged += r.Unchanged
|
|
}
|
|
slog.Info("Finished layer", slog.Int("inserts", stats.Inserts), slog.Int("updates", stats.Updates), slog.Int("no change", stats.Unchanged))
|
|
return stats, nil
|
|
}
|
|
|
|
func getOAuthForOrg(ctx context.Context, org *models.Organization) (*models.OauthToken, error) {
|
|
users, err := org.User().All(ctx, PGInstance.BobDB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to query all users for org: %v", err)
|
|
}
|
|
for _, user := range users {
|
|
oauths, err := user.UserOauthTokens(models.SelectWhere.OauthTokens.InvalidatedAt.IsNull()).All(ctx, PGInstance.BobDB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to query all oauth tokens for org: %v", err)
|
|
}
|
|
for _, oauth := range oauths {
|
|
return oauth, nil
|
|
}
|
|
}
|
|
return nil, errors.New("No oauth tokens found")
|
|
}
|
|
|
|
func periodicallyExportFieldseeker(ctx context.Context, org *models.Organization) error {
|
|
pollTicker := time.NewTicker(1)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-pollTicker.C:
|
|
oauth, err := getOAuthForOrg(ctx, org)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to get oauth for org: %v", err)
|
|
}
|
|
err = exportFieldseekerData(ctx, org, oauth)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to export Fieldseeker data: %v", err)
|
|
}
|
|
slog.Info("Completed exporting data, waiting 15 minutes to go agoin.")
|
|
pollTicker = time.NewTicker(15 * time.Minute)
|
|
}
|
|
}
|
|
}
|
|
func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth *models.OauthToken) error {
|
|
slog.Info("Update Fieldseeker data")
|
|
ar := arcgis.NewArcGIS(
|
|
arcgis.AuthenticatorOAuth{
|
|
AccessToken: oauth.AccessToken,
|
|
AccessTokenExpires: oauth.AccessTokenExpires,
|
|
RefreshToken: oauth.RefreshToken,
|
|
RefreshTokenExpires: oauth.RefreshTokenExpires,
|
|
},
|
|
)
|
|
row, err := sql.OrgByOauthId(oauth.ID).One(ctx, PGInstance.BobDB)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to get org ID: %v", err)
|
|
}
|
|
fssync, err := fieldseeker.NewFieldSeeker(
|
|
ar,
|
|
row.FieldseekerURL.MustGet(),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to create fssync: %v", err)
|
|
}
|
|
var stats SyncStats
|
|
for _, layer := range fssync.FeatureServerLayers() {
|
|
ss, err := downloadAllRecords(ctx, fssync, layer, row.OrganizationID)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to get layer %s: %v", layer, err)
|
|
}
|
|
stats.Inserts += ss.Inserts
|
|
stats.Updates += ss.Updates
|
|
stats.Unchanged += ss.Unchanged
|
|
}
|
|
|
|
setter := models.FieldseekerSyncSetter{
|
|
RecordsCreated: omit.From(int32(stats.Inserts)),
|
|
RecordsUpdated: omit.From(int32(stats.Updates)),
|
|
RecordsUnchanged: omit.From(int32(stats.Unchanged)),
|
|
}
|
|
err = org.InsertFieldseekerSyncs(ctx, PGInstance.BobDB, &setter)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to insert sync: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func maintainOAuth(ctx context.Context, oauth *models.OauthToken) error {
|
|
for {
|
|
// Refresh from the database
|
|
oauth, err := models.FindOauthToken(ctx, PGInstance.BobDB, oauth.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to update oauth token from database: %v", err)
|
|
}
|
|
accessTokenDelay := time.Until(oauth.AccessTokenExpires) - (10 * time.Second)
|
|
refreshTokenDelay := time.Until(oauth.RefreshTokenExpires) - (10 * time.Second)
|
|
if oauth.AccessTokenExpires.Before(time.Now()) {
|
|
accessTokenDelay = 0
|
|
}
|
|
if oauth.RefreshTokenExpires.Before(time.Now()) {
|
|
refreshTokenDelay = 0
|
|
}
|
|
slog.Info("Need to refresh access token", slog.Int("id", int(oauth.ID)), slog.Float64("seconds", accessTokenDelay.Seconds()), slog.String("access_token", oauth.AccessToken))
|
|
slog.Info("Need to refresh refresh token", slog.Int("id", int(oauth.ID)), slog.Float64("seconds", refreshTokenDelay.Seconds()), slog.String("refresh_token", oauth.RefreshToken))
|
|
accessTokenTicker := time.NewTicker(accessTokenDelay)
|
|
refreshTokenTicker := time.NewTicker(refreshTokenDelay)
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-accessTokenTicker.C:
|
|
err := refreshAccessToken(ctx, oauth)
|
|
if err != nil {
|
|
markTokenFailed(ctx, oauth)
|
|
return fmt.Errorf("Failed to refresh access token: %v", err)
|
|
}
|
|
case <-refreshTokenTicker.C:
|
|
err := refreshRefreshToken(ctx, oauth)
|
|
if err != nil {
|
|
markTokenFailed(ctx, oauth)
|
|
return fmt.Errorf("Failed to maintain refresh token: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// Mark that a given oauth token has failed. This includes a notification to
|
|
// the user.
|
|
func markTokenFailed(ctx context.Context, oauth *models.OauthToken) {
|
|
oauthSetter := models.OauthTokenSetter{
|
|
InvalidatedAt: omitnull.From(time.Now()),
|
|
}
|
|
err := oauth.Update(ctx, PGInstance.BobDB, &oauthSetter)
|
|
if err != nil {
|
|
slog.Error("Failed to mark token failed", slog.String("err", err.Error()))
|
|
}
|
|
user, err := models.FindUser(ctx, PGInstance.BobDB, oauth.UserID)
|
|
if err != nil {
|
|
slog.Error("Failed to get oauth user", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
notificationSetter := models.NotificationSetter{
|
|
Created: omit.From(time.Now()),
|
|
Message: omit.From("Oauth token invalidated"),
|
|
Link: omit.From("/oauth/refresh"),
|
|
Type: omit.From(enums.NotificationtypeOauthTokenInvalidated),
|
|
}
|
|
err = user.InsertUserNotifications(ctx, PGInstance.BobDB, ¬ificationSetter)
|
|
if err != nil {
|
|
slog.Error("Failed to get oauth user", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
slog.Info("Marked oauth token invalid", slog.Int("id", int(oauth.ID)))
|
|
}
|
|
|
|
// Update the access token to keep it fresh and alive
|
|
func refreshAccessToken(ctx context.Context, oauth *models.OauthToken) error {
|
|
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
|
|
|
|
form := url.Values{
|
|
"grant_type": []string{"refresh_token"},
|
|
"client_id": []string{ClientID},
|
|
"refresh_token": []string{oauth.RefreshToken},
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode()))
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to create request: %v", err)
|
|
}
|
|
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
|
|
token, err := handleTokenRequest(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to handle request: %v", err)
|
|
}
|
|
accessExpires := futureUTCTimestamp(token.ExpiresIn)
|
|
setter := models.OauthTokenSetter{
|
|
AccessToken: omit.From(token.AccessToken),
|
|
AccessTokenExpires: omit.From(accessExpires),
|
|
Username: omit.From(token.Username),
|
|
}
|
|
err = oauth.Update(ctx, PGInstance.BobDB, &setter)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to update oauth in database: %v", err)
|
|
}
|
|
slog.Info("Updated oauth token", slog.Int("oauth token id", int(oauth.ID)))
|
|
return nil
|
|
}
|
|
|
|
// Update the refresh token to keep it fresh and alive
|
|
func refreshRefreshToken(ctx context.Context, oauth *models.OauthToken) error {
|
|
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
|
|
|
|
form := url.Values{
|
|
"grant_type": []string{"exchange_refresh_token"},
|
|
"client_id": []string{ClientID},
|
|
"redirect_uri": []string{redirectURL()},
|
|
"refresh_token": []string{oauth.RefreshToken},
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode()))
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to create request: %v", err)
|
|
}
|
|
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
|
|
token, err := handleTokenRequest(ctx, req)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to handle request: %v", err)
|
|
}
|
|
refreshExpires := futureUTCTimestamp(token.ExpiresIn)
|
|
setter := models.OauthTokenSetter{
|
|
RefreshToken: omit.From(token.RefreshToken),
|
|
RefreshTokenExpires: omit.From(refreshExpires),
|
|
Username: omit.From(token.Username),
|
|
}
|
|
err = oauth.Update(ctx, PGInstance.BobDB, &setter)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to update oauth in database: %v", err)
|
|
}
|
|
slog.Info("Updated oauth token", slog.Int("oauth token id", int(oauth.ID)))
|
|
return nil
|
|
}
|
|
|
|
func newTimestampedFilename(prefix, suffix string) string {
|
|
timestamp := time.Now().Format("20060102_150405") // YYYYMMDD_HHMMSS format
|
|
return prefix + timestamp + suffix
|
|
}
|
|
|
|
func handleTokenRequest(ctx context.Context, req *http.Request) (*OAuthTokenResponse, error) {
|
|
client := http.Client{}
|
|
slog.Info("POST", slog.String("url", req.URL.String()))
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to do request: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
bodyBytes, err := io.ReadAll(resp.Body)
|
|
slog.Info("Token request", slog.Int("status", resp.StatusCode))
|
|
filename := newTimestampedFilename("token", ".json")
|
|
saveResponse(bodyBytes, filename)
|
|
if resp.StatusCode >= http.StatusBadRequest {
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Got status code %d and failed to read response body: %v", resp.StatusCode, err)
|
|
}
|
|
bodyString := string(bodyBytes)
|
|
var errorResp map[string]interface{}
|
|
if err := json.Unmarshal(bodyBytes, &errorResp); err == nil {
|
|
return nil, fmt.Errorf("API response JSON error: %d: %v", resp.StatusCode, errorResp)
|
|
}
|
|
return nil, fmt.Errorf("API returned error status %d: %s", resp.StatusCode, bodyString)
|
|
}
|
|
//logResponseHeaders(resp)
|
|
var tokenResponse OAuthTokenResponse
|
|
err = json.Unmarshal(bodyBytes, &tokenResponse)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to unmarshal JSON: %v", err)
|
|
}
|
|
// Just because we got a 200-level status code doesn't mean it worked. Experience has taught us that
|
|
// we can get errors without anything indicated in the headers or the status code
|
|
if tokenResponse == (OAuthTokenResponse{}) {
|
|
var errorResponse ErrorResponse
|
|
err = json.Unmarshal(bodyBytes, &errorResponse)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to unmarshal error JSON: %v", err)
|
|
}
|
|
if errorResponse.Error.Code > 0 {
|
|
return nil, errors.New(fmt.Sprintf("API error %d: %s: %s (%s)",
|
|
errorResponse.Error.Code,
|
|
errorResponse.Error.Error,
|
|
errorResponse.Error.ErrorDescription,
|
|
errorResponse.Error.Message,
|
|
))
|
|
}
|
|
}
|
|
slog.Info("Oauth token acquired",
|
|
slog.String("refresh token", tokenResponse.RefreshToken),
|
|
slog.String("access token", tokenResponse.AccessToken),
|
|
slog.Int("access expires", tokenResponse.ExpiresIn),
|
|
slog.Int("refresh expires", tokenResponse.RefreshTokenExpiresIn),
|
|
)
|
|
return &tokenResponse, nil
|
|
}
|
|
|
|
func logResponseHeaders(resp *http.Response) {
|
|
if resp == nil {
|
|
slog.Info("Response is nil")
|
|
return
|
|
}
|
|
|
|
slog.Info("HTTP Response headers",
|
|
"status", resp.Status,
|
|
"statusCode", resp.StatusCode)
|
|
|
|
for name, values := range resp.Header {
|
|
slog.Info("Header",
|
|
"name", name,
|
|
"values", values)
|
|
}
|
|
}
|
|
|
|
func saveResponse(data []byte, filename string) {
|
|
dest, err := os.Create(filename)
|
|
if err != nil {
|
|
slog.Error("Failed to create file", slog.String("filename", filename), slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
_, err = io.Copy(dest, bytes.NewReader(data))
|
|
if err != nil {
|
|
slog.Error("Failed to write", slog.String("filename", filename), slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
slog.Info("Wrote response", slog.String("filename", filename))
|
|
}
|
|
|
|
func saveRawQuery(fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, query *arcgis.Query, filename string) {
|
|
output, err := os.Create(filename)
|
|
if err != nil {
|
|
slog.Error("Failed to create file", slog.String("filename", filename))
|
|
return
|
|
}
|
|
qr, err := fssync.DoQueryRaw(
|
|
layer.ID,
|
|
query)
|
|
if err != nil {
|
|
slog.Error("Failed to do query", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
_, err = output.Write(qr)
|
|
if err != nil {
|
|
slog.Error("Failed to write results", slog.String("err", err.Error()))
|
|
return
|
|
}
|
|
slog.Info("Wrote failed query", slog.String("filename", filename))
|
|
}
|
|
|
|
func saveOrUpdateDBRecords(ctx context.Context, table string, qr *arcgis.QueryResult, org_id int32) (int, int, error) {
|
|
inserts, updates := 0, 0
|
|
sorted_columns := make([]string, 0, len(qr.Fields))
|
|
for _, f := range qr.Fields {
|
|
sorted_columns = append(sorted_columns, f.Name)
|
|
}
|
|
sort.Strings(sorted_columns)
|
|
|
|
objectids := make([]int, 0)
|
|
for _, l := range qr.Features {
|
|
oid := l.Attributes["OBJECTID"].(float64)
|
|
objectids = append(objectids, int(oid))
|
|
}
|
|
|
|
rows_by_objectid, err := rowmapViaQuery(ctx, table, sorted_columns, objectids)
|
|
if err != nil {
|
|
return inserts, updates, fmt.Errorf("Failed to get existing rows: %v", err)
|
|
}
|
|
// log.Println("Rows from query", len(rows_by_objectid))
|
|
|
|
for _, feature := range qr.Features {
|
|
oid := feature.Attributes["OBJECTID"].(float64)
|
|
row := rows_by_objectid[int(oid)]
|
|
// If we have no matching row we'll need to create it
|
|
if len(row) == 0 {
|
|
|
|
if err := insertRowFromFeature(ctx, table, sorted_columns, &feature, org_id); err != nil {
|
|
return inserts, updates, fmt.Errorf("Failed to insert row: %v", err)
|
|
}
|
|
inserts += 1
|
|
} else if hasUpdates(row, feature) {
|
|
if err := updateRowFromFeature(ctx, table, sorted_columns, &feature, org_id); err != nil {
|
|
return inserts, updates, fmt.Errorf("Failed to update row: %v", err)
|
|
}
|
|
updates += 1
|
|
}
|
|
}
|
|
return inserts, updates, nil
|
|
}
|
|
|
|
// Produces a map of OBJECTID to a 'row' which is in turn a map of column names to their values as strings
|
|
func rowmapViaQuery(ctx context.Context, table string, sorted_columns []string, objectids []int) (map[int]map[string]string, error) {
|
|
result := make(map[int]map[string]string)
|
|
|
|
query := selectAllFromQueryResult(table, sorted_columns)
|
|
|
|
args := pgx.NamedArgs{
|
|
"objectids": objectids,
|
|
}
|
|
rows, err := PGInstance.PGXPool.Query(ctx, query, args)
|
|
if err != nil {
|
|
return result, fmt.Errorf("Failed to query rows: %v", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
// +2 for geometry x and geometry x
|
|
columnNames := make([]string, len(sorted_columns)+2)
|
|
for i, c := range sorted_columns {
|
|
columnNames[i] = c
|
|
}
|
|
columnNames[len(sorted_columns)] = "geometry_x"
|
|
columnNames[len(sorted_columns)+1] = "geometry_y"
|
|
|
|
rowSlice, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (map[string]string, error) {
|
|
fieldDescriptions := row.FieldDescriptions()
|
|
values := make([]interface{}, len(fieldDescriptions))
|
|
valuePtrs := make([]interface{}, len(fieldDescriptions))
|
|
|
|
for i := range values {
|
|
valuePtrs[i] = &values[i]
|
|
}
|
|
|
|
if err := row.Scan(valuePtrs...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := make(map[string]string)
|
|
for i, fd := range fieldDescriptions {
|
|
if values[i] != nil {
|
|
result[fd.Name] = fmt.Sprintf("%v", values[i])
|
|
//log.Printf("col %v type %T val %v", fd.Name, values[i], values[i])
|
|
} else {
|
|
result[fd.Name] = ""
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
})
|
|
if err != nil {
|
|
return result, fmt.Errorf("Failed to collect rows: %v", err)
|
|
}
|
|
for _, row := range rowSlice {
|
|
o := row["objectid"]
|
|
objectid, err := strconv.Atoi(o)
|
|
if err != nil {
|
|
return result, fmt.Errorf("Failed to parse objectid %s: %v", o, err)
|
|
}
|
|
result[objectid] = row
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func insertRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error {
|
|
var options pgx.TxOptions
|
|
transaction, err := PGInstance.PGXPool.BeginTx(ctx, options)
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to start transaction")
|
|
}
|
|
|
|
err = insertRowFromFeatureFS(ctx, transaction, table, sorted_columns, feature, org_id)
|
|
if err != nil {
|
|
transaction.Rollback(ctx)
|
|
return fmt.Errorf("Unable to insert FS: %v", err)
|
|
}
|
|
|
|
err = insertRowFromFeatureHistory(ctx, transaction, table, sorted_columns, feature, org_id, 1)
|
|
if err != nil {
|
|
transaction.Rollback(ctx)
|
|
return fmt.Errorf("Failed to insert history: %v", err)
|
|
}
|
|
|
|
err = transaction.Commit(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to commit transaction: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func insertRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error {
|
|
// Create the query to produce the main row
|
|
var sb strings.Builder
|
|
sb.WriteString("INSERT INTO ")
|
|
sb.WriteString(table)
|
|
sb.WriteString(" (")
|
|
for _, field := range sorted_columns {
|
|
sb.WriteString(field)
|
|
sb.WriteString(",")
|
|
}
|
|
// Specially add the geometry values since they aren't in the fields
|
|
sb.WriteString("geometry_x,geometry_y,organization_id,updated")
|
|
sb.WriteString(")\nVALUES (")
|
|
for _, field := range sorted_columns {
|
|
sb.WriteString("@")
|
|
sb.WriteString(field)
|
|
sb.WriteString(",")
|
|
}
|
|
// Specially add the geometry values since they aren't in the fields
|
|
sb.WriteString("@geometry_x,@geometry_y,@organization_id,@updated)")
|
|
|
|
args := pgx.NamedArgs{}
|
|
for k, v := range feature.Attributes {
|
|
args[k] = v
|
|
}
|
|
// specially add geometry since it isn't in the list of attributes
|
|
args["geometry_x"] = feature.Geometry.X
|
|
args["geometry_y"] = feature.Geometry.Y
|
|
args["organization_id"] = org_id
|
|
args["updated"] = time.Now()
|
|
|
|
_, err := transaction.Exec(ctx, sb.String(), args)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to insert row into %s: %v", table, err)
|
|
}
|
|
return nil
|
|
}
|
|
func hasUpdates(row map[string]string, feature arcgis.Feature) bool {
|
|
for key, value := range feature.Attributes {
|
|
rowdata := row[strings.ToLower(key)]
|
|
// We'll accept any 'nil' as represented by the empty string in the database
|
|
if value == nil {
|
|
if rowdata == "" {
|
|
continue
|
|
} else if len(rowdata) > 0 {
|
|
return true
|
|
} else {
|
|
slog.Error("Looks like our original value is nil, but our row value is something non-empty with a zero length. Need a programmer to look into this.")
|
|
}
|
|
}
|
|
// check strings first, their simplest
|
|
if featureAsString, ok := value.(string); ok {
|
|
if featureAsString != rowdata {
|
|
return true
|
|
}
|
|
continue
|
|
} else if featureAsInt, ok := value.(int); ok {
|
|
// Previously had a nil value, now we have a real value
|
|
if rowdata == "" {
|
|
return true
|
|
}
|
|
rowAsInt, err := strconv.Atoi(rowdata)
|
|
if err != nil {
|
|
slog.Error(fmt.Sprintf("Failed to convert '%s' to an int to compare against %v for %v", rowdata, featureAsInt, key))
|
|
}
|
|
if rowAsInt != featureAsInt {
|
|
return true
|
|
} else {
|
|
continue
|
|
}
|
|
} else if featureAsFloat, ok := value.(float64); ok {
|
|
// Previously had a nil value, now we have a real value
|
|
if rowdata == "" {
|
|
return true
|
|
}
|
|
rowAsFloat, err := strconv.ParseFloat(rowdata, 64)
|
|
if err != nil {
|
|
slog.Error(fmt.Sprintf("Failed to convert '%s' to a float64 to compare against %v for %v", rowdata, featureAsFloat, key))
|
|
}
|
|
if rowAsFloat != featureAsFloat {
|
|
return true
|
|
} else {
|
|
continue
|
|
}
|
|
}
|
|
slog.Info(fmt.Sprintf("key: %s\tvalue: %v (type %T)\trow: %s\n", key, value, value, rowdata))
|
|
slog.Error("we've hit a point where we can't tell if we have an update or not, need a programmer to look at the above")
|
|
}
|
|
return false
|
|
}
|
|
func updateRowFromFeature(ctx context.Context, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32) error {
|
|
// Get the current highest version for the row in question
|
|
history_table := toHistoryTable(table)
|
|
var sb strings.Builder
|
|
sb.WriteString("SELECT MAX(version) FROM ")
|
|
sb.WriteString(history_table)
|
|
sb.WriteString(" WHERE OBJECTID=@objectid")
|
|
|
|
args := pgx.NamedArgs{}
|
|
o := feature.Attributes["OBJECTID"].(float64)
|
|
args["objectid"] = int(o)
|
|
|
|
var version int
|
|
if err := PGInstance.PGXPool.QueryRow(ctx, sb.String(), args).Scan(&version); err != nil {
|
|
return fmt.Errorf("Failed to query for version: %v", err)
|
|
}
|
|
|
|
var options pgx.TxOptions
|
|
transaction, err := PGInstance.PGXPool.BeginTx(ctx, options)
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to start transaction")
|
|
}
|
|
|
|
err = insertRowFromFeatureHistory(ctx, transaction, table, sorted_columns, feature, org_id, version+1)
|
|
if err != nil {
|
|
transaction.Rollback(ctx)
|
|
return fmt.Errorf("Failed to insert history: %v", err)
|
|
}
|
|
err = updateRowFromFeatureFS(ctx, transaction, table, sorted_columns, feature)
|
|
if err != nil {
|
|
transaction.Rollback(ctx)
|
|
return fmt.Errorf("Failed to update row from feature: %v", err)
|
|
}
|
|
|
|
err = transaction.Commit(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to commit transaction: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
func insertRowFromFeatureHistory(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature, org_id int32, version int) error {
|
|
history_table := toHistoryTable(table)
|
|
var sb strings.Builder
|
|
sb.WriteString("INSERT INTO ")
|
|
sb.WriteString(history_table)
|
|
sb.WriteString(" (")
|
|
for _, field := range sorted_columns {
|
|
sb.WriteString(field)
|
|
sb.WriteString(",")
|
|
}
|
|
// Specially add the geometry values since they aren't in the fields
|
|
sb.WriteString("created,geometry_x,geometry_y,organization_id,version")
|
|
sb.WriteString(")\nVALUES (")
|
|
for _, field := range sorted_columns {
|
|
sb.WriteString("@")
|
|
sb.WriteString(field)
|
|
sb.WriteString(",")
|
|
}
|
|
// Specially add the geometry values since they aren't in the fields
|
|
sb.WriteString("@created,@geometry_x,@geometry_y,@organization_id,@version)")
|
|
args := pgx.NamedArgs{}
|
|
for k, v := range feature.Attributes {
|
|
args[k] = v
|
|
}
|
|
args["created"] = time.Now()
|
|
args["organization_id"] = org_id
|
|
args["version"] = version
|
|
if _, err := transaction.Exec(ctx, sb.String(), args); err != nil {
|
|
return fmt.Errorf("Failed to insert history row into %s: %v", table, err)
|
|
}
|
|
return nil
|
|
}
|
|
func selectAllFromQueryResult(table string, sorted_columns []string) string {
|
|
var sb strings.Builder
|
|
sb.WriteString("SELECT * FROM ")
|
|
sb.WriteString(table)
|
|
sb.WriteString(" WHERE OBJECTID=ANY(@objectids)")
|
|
return sb.String()
|
|
}
|
|
func toHistoryTable(table string) string {
|
|
return "History_" + table[3:len(table)]
|
|
}
|
|
|
|
func updateRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table string, sorted_columns []string, feature *arcgis.Feature) error {
|
|
// Create the query to produce the main row
|
|
var sb strings.Builder
|
|
sb.WriteString("UPDATE ")
|
|
sb.WriteString(table)
|
|
sb.WriteString(" SET ")
|
|
for _, field := range sorted_columns {
|
|
// OBJECTID is special as our primary key, so skip it
|
|
if field == "OBJECTID" {
|
|
continue
|
|
}
|
|
sb.WriteString(field)
|
|
sb.WriteString("=@")
|
|
sb.WriteString(field)
|
|
sb.WriteString(",")
|
|
}
|
|
// Specially add the geometry values since they aren't in the fields
|
|
sb.WriteString("geometry_x=@geometry_x,geometry_y=@geometry_y,updated=@updated WHERE OBJECTID=@OBJECTID")
|
|
|
|
args := pgx.NamedArgs{}
|
|
for k, v := range feature.Attributes {
|
|
args[k] = v
|
|
}
|
|
// specially add geometry since it isn't in the list of attributes
|
|
args["geometry_x"] = feature.Geometry.X
|
|
args["geometry_y"] = feature.Geometry.Y
|
|
args["updated"] = time.Now()
|
|
|
|
_, err := transaction.Exec(ctx, sb.String(), args)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to update row into %s: %v", table, err)
|
|
}
|
|
return nil
|
|
}
|