Handle changes to arcgis-go
This commit is contained in:
parent
bbbc06583a
commit
8e68230f4a
2 changed files with 82 additions and 49 deletions
|
|
@ -157,7 +157,7 @@ func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
|
||||||
err := maintainOAuth(workerCtx, oauth)
|
err := maintainOAuth(workerCtx, oauth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
markTokenFailed(ctx, oauth)
|
markTokenFailed(ctx, oauth)
|
||||||
if errors.Is(err, arcgis.ErrorInvalidatedRefreshToken) {
|
if errors.Is(err, arcgis.ErrorInvalidRefreshToken) {
|
||||||
log.Info().Int("oauth_token.id", int(oauth.ID)).Msg("Marked invalid by the server")
|
log.Info().Int("oauth_token.id", int(oauth.ID)).Msg("Marked invalid by the server")
|
||||||
} else {
|
} else {
|
||||||
debug.LogErrorTypeInfo(err)
|
debug.LogErrorTypeInfo(err)
|
||||||
|
|
@ -208,7 +208,12 @@ type SyncStats struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func downloadFieldseekerSchema(ctx context.Context, fieldseekerClient *fieldseeker.FieldSeeker, arcgis_id string) {
|
func downloadFieldseekerSchema(ctx context.Context, fieldseekerClient *fieldseeker.FieldSeeker, arcgis_id string) {
|
||||||
for _, layer := range fieldseekerClient.FeatureServerLayers() {
|
layers, err := fieldseekerClient.FeatureServerLayers(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to get fieldseeker layers")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, layer := range layers {
|
||||||
err := os.MkdirAll(filepath.Join(config.FieldseekerSchemaDirectory, arcgis_id), os.ModePerm)
|
err := os.MkdirAll(filepath.Join(config.FieldseekerSchemaDirectory, arcgis_id), os.ModePerm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("Failed to create parent directory")
|
log.Error().Err(err).Msg("Failed to create parent directory")
|
||||||
|
|
@ -220,7 +225,7 @@ func downloadFieldseekerSchema(ctx context.Context, fieldseekerClient *fieldseek
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer output.Close()
|
defer output.Close()
|
||||||
schema, err := fieldseekerClient.Schema(ctx, layer.ID)
|
schema, err := fieldseekerClient.SchemaRaw(ctx, layer.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("Failed to get schema")
|
log.Error().Err(err).Msg("Failed to get schema")
|
||||||
return
|
return
|
||||||
|
|
@ -271,16 +276,20 @@ func generateCodeVerifier() string {
|
||||||
|
|
||||||
// Find out what we can about this user
|
// Find out what we can about this user
|
||||||
func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models.OauthToken) {
|
func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models.OauthToken) {
|
||||||
client := arcgis.NewArcGIS(
|
client, err := arcgis.NewArcGIS(
|
||||||
|
ctx,
|
||||||
|
nil,
|
||||||
arcgis.AuthenticatorOAuth{
|
arcgis.AuthenticatorOAuth{
|
||||||
AccessToken: oauth.AccessToken,
|
AccessToken: oauth.AccessToken,
|
||||||
AccessTokenExpires: oauth.AccessTokenExpires,
|
AccessTokenExpires: oauth.AccessTokenExpires,
|
||||||
RefreshToken: oauth.RefreshToken,
|
RefreshToken: oauth.RefreshToken,
|
||||||
RefreshTokenExpires: oauth.RefreshTokenExpires,
|
RefreshTokenExpires: oauth.RefreshTokenExpires,
|
||||||
},
|
},
|
||||||
nil,
|
|
||||||
nil,
|
|
||||||
)
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Failed to create ArcGIS client")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
portal, err := updatePortalData(ctx, client, user.ID)
|
portal, err := updatePortalData(ctx, client, user.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -334,12 +343,12 @@ func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
search, err := client.Search(ctx, "Fieldseeker")
|
search, err := client.Search(ctx, "FieldseekerGIS")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("Failed to get search FieldseekerGIS data")
|
log.Error().Err(err).Msg("Failed to get search FieldseekerGIS data")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var fieldseekerClient *fieldseeker.FieldSeeker
|
var fieldseeker_client *fieldseeker.FieldSeeker
|
||||||
for _, result := range search.Results {
|
for _, result := range search.Results {
|
||||||
log.Info().Str("name", result.Name).Msg("Got result")
|
log.Info().Str("name", result.Name).Msg("Got result")
|
||||||
if result.Name == "FieldSeekerGIS" {
|
if result.Name == "FieldSeekerGIS" {
|
||||||
|
|
@ -352,9 +361,9 @@ func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models.
|
||||||
log.Error().Err(err).Msg("Failed to create new organization")
|
log.Error().Err(err).Msg("Failed to create new organization")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fieldseekerClient, err = newFieldSeeker(ctx, oauth)
|
fieldseeker_client, err = NewFieldSeeker(ctx, oauth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("Failed to create fieldseeker client")
|
log.Error().Err(err).Msg("Failed to create new fieldseeker")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -363,13 +372,13 @@ func updateArcgisUserData(ctx context.Context, user *models.User, oauth *models.
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Error().Int("org.id", int(org.ID)).Msg("Cannot get webhooks - ArcGIS ID is null")
|
log.Error().Int("org.id", int(org.ID)).Msg("Cannot get webhooks - ArcGIS ID is null")
|
||||||
}
|
}
|
||||||
maybeCreateWebhook(ctx, fieldseekerClient)
|
maybeCreateWebhook(ctx, fieldseeker_client)
|
||||||
downloadFieldseekerSchema(ctx, fieldseekerClient, arcgis_id)
|
downloadFieldseekerSchema(ctx, fieldseeker_client, arcgis_id)
|
||||||
notification.ClearOauth(ctx, user)
|
notification.ClearOauth(ctx, user)
|
||||||
newOAuthTokenChannel <- struct{}{}
|
newOAuthTokenChannel <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFieldSeeker(ctx context.Context, oauth *models.OauthToken) (*fieldseeker.FieldSeeker, error) {
|
func NewFieldSeeker(ctx context.Context, oauth *models.OauthToken) (*fieldseeker.FieldSeeker, error) {
|
||||||
row, err := sql.OrgByOauthId(oauth.ID).One(ctx, db.PGInstance.BobDB)
|
row, err := sql.OrgByOauthId(oauth.ID).One(ctx, db.PGInstance.BobDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed to get org ID: %w", err)
|
return nil, fmt.Errorf("Failed to get org ID: %w", err)
|
||||||
|
|
@ -385,24 +394,35 @@ func newFieldSeeker(ctx context.Context, oauth *models.OauthToken) (*fieldseeker
|
||||||
return nil, errors.New("Didn't get enough path parts")
|
return nil, errors.New("Didn't get enough path parts")
|
||||||
}
|
}
|
||||||
context := pathParts[0]
|
context := pathParts[0]
|
||||||
ar := arcgis.NewArcGIS(
|
ar, err := arcgis.NewArcGIS(
|
||||||
|
ctx,
|
||||||
|
&host,
|
||||||
arcgis.AuthenticatorOAuth{
|
arcgis.AuthenticatorOAuth{
|
||||||
AccessToken: oauth.AccessToken,
|
AccessToken: oauth.AccessToken,
|
||||||
AccessTokenExpires: oauth.AccessTokenExpires,
|
AccessTokenExpires: oauth.AccessTokenExpires,
|
||||||
RefreshToken: oauth.RefreshToken,
|
RefreshToken: oauth.RefreshToken,
|
||||||
RefreshTokenExpires: oauth.RefreshTokenExpires,
|
RefreshTokenExpires: oauth.RefreshTokenExpires,
|
||||||
},
|
},
|
||||||
&context,
|
|
||||||
&host,
|
|
||||||
)
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to create ArcGIS client: %w", err)
|
||||||
|
}
|
||||||
log.Info().Str("context", context).Str("host", host).Msg("Using base fieldseeker URL")
|
log.Info().Str("context", context).Str("host", host).Msg("Using base fieldseeker URL")
|
||||||
//ar.Context = &context
|
fssync, err := fieldseeker.NewFieldSeeker(ar, row.FieldseekerURL.MustGet())
|
||||||
//ar.Host = host
|
if err != nil {
|
||||||
return fieldseeker.NewFieldSeeker(
|
return nil, fmt.Errorf("Failed to create Fieldseeker client: %w", err)
|
||||||
ctx,
|
}
|
||||||
ar,
|
// check that the authentication is valid
|
||||||
row.FieldseekerURL.MustGet(),
|
_, err = fssync.FeatureServerLayers(ctx)
|
||||||
)
|
if err != nil {
|
||||||
|
if errors.Is(err, arcgis.ErrorInvalidAuthToken) {
|
||||||
|
return nil, InvalidatedTokenError{}
|
||||||
|
} else if errors.Is(err, arcgis.ErrorInvalidRefreshToken) {
|
||||||
|
return nil, InvalidatedTokenError{}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("Unrecognized error checking oauth validity: %w", err)
|
||||||
|
}
|
||||||
|
return fssync, nil
|
||||||
}
|
}
|
||||||
func updatePortalData(ctx context.Context, client *arcgis.ArcGIS, user_id int32) (*arcgis.PortalsResponse, error) {
|
func updatePortalData(ctx context.Context, client *arcgis.ArcGIS, user_id int32) (*arcgis.PortalsResponse, error) {
|
||||||
p, err := client.PortalsSelf(ctx)
|
p, err := client.PortalsSelf(ctx)
|
||||||
|
|
@ -484,7 +504,11 @@ func maybeCreateWebhook(ctx context.Context, client *fieldseeker.FieldSeeker) {
|
||||||
log.Error().Err(err).Msg("Failed to get webhooks")
|
log.Error().Err(err).Msg("Failed to get webhooks")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, hook := range webhooks {
|
if webhooks == nil {
|
||||||
|
log.Error().Msg("nil webhooks")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, hook := range *webhooks {
|
||||||
if hook.Name == "Nidus Sync" {
|
if hook.Name == "Nidus Sync" {
|
||||||
log.Info().Msg("Found nidus sync hook")
|
log.Info().Msg("Found nidus sync hook")
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -505,8 +529,16 @@ func periodicallyExportFieldseeker(ctx context.Context, org *models.Organization
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to get oauth for org: %w", err)
|
return fmt.Errorf("Failed to get oauth for org: %w", err)
|
||||||
}
|
}
|
||||||
logPermissions(ctx, org, oauth)
|
fssync, err := NewFieldSeeker(
|
||||||
err = exportFieldseekerData(ctx, org, oauth)
|
ctx,
|
||||||
|
oauth,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to create fieldseeker client: %w", err)
|
||||||
|
}
|
||||||
|
logPermissions(ctx, fssync)
|
||||||
|
syncStatusByOrg[org.ID] = true
|
||||||
|
err = exportFieldseekerData(ctx, fssync, org)
|
||||||
syncStatusByOrg[org.ID] = false
|
syncStatusByOrg[org.ID] = false
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to export Fieldseeker data: %w", err)
|
return fmt.Errorf("Failed to export Fieldseeker data: %w", err)
|
||||||
|
|
@ -515,20 +547,19 @@ func periodicallyExportFieldseeker(ctx context.Context, org *models.Organization
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth *models.OauthToken) error {
|
func exportFieldseekerData(ctx context.Context, fssync *fieldseeker.FieldSeeker, org *models.Organization) error {
|
||||||
log.Info().Msg("Update Fieldseeker data")
|
log.Info().Msg("Update Fieldseeker data")
|
||||||
syncStatusByOrg[org.ID] = true
|
|
||||||
var err error
|
var err error
|
||||||
fssync, err := newFieldSeeker(ctx, oauth)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Failed to create fssync: %w", err)
|
|
||||||
}
|
|
||||||
var stats SyncStats
|
var stats SyncStats
|
||||||
|
|
||||||
pool := pond.NewResultPool[SyncStats](20)
|
pool := pond.NewResultPool[SyncStats](20)
|
||||||
group := pool.NewGroup()
|
group := pool.NewGroup()
|
||||||
var ss SyncStats
|
var ss SyncStats
|
||||||
for _, l := range fssync.FeatureServerLayers() {
|
layers, err := fssync.FeatureServerLayers(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to get layers: %w", err)
|
||||||
|
}
|
||||||
|
for _, l := range layers {
|
||||||
ss, err = exportFieldseekerLayer(ctx, group, org, fssync, l)
|
ss, err = exportFieldseekerLayer(ctx, group, org, fssync, l)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -561,7 +592,7 @@ func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func logPermissions(ctx context.Context, org *models.Organization, oauth *models.OauthToken) {
|
func logPermissions(ctx context.Context, fssync *fieldseeker.FieldSeeker) {
|
||||||
/*row, err := sql.OrgByOauthId(oauth.ID).One(ctx, db.PGInstance.BobDB)
|
/*row, err := sql.OrgByOauthId(oauth.ID).One(ctx, db.PGInstance.BobDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("Failed to get org in log permissions")
|
log.Error().Err(err).Msg("Failed to get org in log permissions")
|
||||||
|
|
@ -573,15 +604,7 @@ func logPermissions(ctx context.Context, org *models.Organization, oauth *models
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
fssync, err := newFieldSeeker(
|
_, err := fssync.AdminInfo(ctx)
|
||||||
ctx,
|
|
||||||
oauth,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Msg("Failed to create fieldseeker client in log permissions")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_, err = fssync.AdminInfo(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, arcgis.ErrorNotPermitted) {
|
if errors.Is(err, arcgis.ErrorNotPermitted) {
|
||||||
log.Info().Msg("This oauth token is not allowed to query for admin info")
|
log.Info().Msg("This oauth token is not allowed to query for admin info")
|
||||||
|
|
@ -595,7 +618,11 @@ func logPermissions(ctx context.Context, org *models.Organization, oauth *models
|
||||||
log.Error().Err(err).Msg("Failed to query permissions in log permissions")
|
log.Error().Err(err).Msg("Failed to query permissions in log permissions")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, p := range permissions {
|
if permissions == nil {
|
||||||
|
log.Error().Msg("nil permissions")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, p := range *permissions {
|
||||||
log.Info().Str("p", p.Principal).Msg("Permission!")
|
log.Info().Str("p", p.Principal).Msg("Permission!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -808,7 +835,7 @@ func saveResponse(data []byte, filename string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
func saveRawQuery(fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, query *arcgis.Query, filename string) {
|
func saveRawQuery(fssync fieldseeker.FieldSeeker, layer arcgis.LayerFeature, query *arcgis.Query, filename string) {
|
||||||
output, err := os.Create(filename)
|
output, err := os.Create(filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Str("filename", filename).Msg("Failed to create file")
|
log.Error().Str("filename", filename).Msg("Failed to create file")
|
||||||
|
|
@ -1175,13 +1202,16 @@ func exportFieldseekerLayer(ctx context.Context, group pond.ResultTaskGroup[Sync
|
||||||
log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Msg("No records to download")
|
log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Msg("No records to download")
|
||||||
return stats, nil
|
return stats, nil
|
||||||
}
|
}
|
||||||
maxRecords := uint(fssync.MaxRecordCount())
|
max_records, err := fssync.MaxRecordCount(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return stats, fmt.Errorf("Failed to get max records: %w", err)
|
||||||
|
}
|
||||||
l, err := fieldseeker.NameToLayerType(layer.Name)
|
l, err := fieldseeker.NameToLayerType(layer.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return stats, fmt.Errorf("Failed to get layer for '%s': %w", layer.Name, err)
|
return stats, fmt.Errorf("Failed to get layer for '%s': %w", layer.Name, err)
|
||||||
}
|
}
|
||||||
log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Int("count", count.Count).Uint("iterations", uint(count.Count)/maxRecords).Msg("Queuing jobs for layer")
|
log.Info().Str("name", layer.Name).Uint("layer_id", layer.ID).Int32("org_id", org.ID).Int("count", count.Count).Uint("iterations", uint(count.Count)/uint(max_records)).Msg("Queuing jobs for layer")
|
||||||
for offset := uint(0); offset < uint(count.Count); offset += maxRecords {
|
for offset := uint(0); offset < uint(count.Count); offset += uint(max_records) {
|
||||||
group.SubmitErr(func() (SyncStats, error) {
|
group.SubmitErr(func() (SyncStats, error) {
|
||||||
var ss SyncStats
|
var ss SyncStats
|
||||||
var name string
|
var name string
|
||||||
|
|
|
||||||
5
main.go
5
main.go
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/Gleipnir-Technology/arcgis-go"
|
||||||
"github.com/Gleipnir-Technology/nidus-sync/auth"
|
"github.com/Gleipnir-Technology/nidus-sync/auth"
|
||||||
"github.com/Gleipnir-Technology/nidus-sync/background"
|
"github.com/Gleipnir-Technology/nidus-sync/background"
|
||||||
"github.com/Gleipnir-Technology/nidus-sync/config"
|
"github.com/Gleipnir-Technology/nidus-sync/config"
|
||||||
|
|
@ -143,7 +144,9 @@ func main() {
|
||||||
log.Error().Err(err).Msg("Failed to start openAI client")
|
log.Error().Err(err).Msg("Failed to start openAI client")
|
||||||
os.Exit(8)
|
os.Exit(8)
|
||||||
}
|
}
|
||||||
background.Start(ctx)
|
custom_logger := log.With().Logger().Level(zerolog.InfoLevel)
|
||||||
|
background_ctx := arcgis.WithLogger(ctx, custom_logger)
|
||||||
|
background.Start(background_ctx)
|
||||||
server := &http.Server{
|
server := &http.Server{
|
||||||
Addr: config.Bind,
|
Addr: config.Bind,
|
||||||
Handler: r,
|
Handler: r,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue