Convert arcgis.go to use zerolog
This commit is contained in:
parent
968a934df7
commit
8e71542c3f
1 changed files with 55 additions and 69 deletions
124
arcgis.go
124
arcgis.go
|
|
@ -10,7 +10,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
|
|
@ -28,6 +27,7 @@ import (
|
|||
"github.com/aarondl/opt/omitnull"
|
||||
"github.com/alitto/pond/v2"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
var NewOAuthTokenChannel chan struct{}
|
||||
|
|
@ -108,19 +108,14 @@ func updateArcgisUserData(ctx context.Context, user *models.User, access_token s
|
|||
)
|
||||
portal, err := client.PortalsSelf()
|
||||
if err != nil {
|
||||
slog.Error("Failed to get ArcGIS user data", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Failed to get ArcGIS user data")
|
||||
return
|
||||
}
|
||||
slog.Info("Got portals data",
|
||||
slog.String("Username", portal.User.Username),
|
||||
slog.String("user_id", portal.User.ID),
|
||||
slog.String("org_id", portal.User.OrgID),
|
||||
slog.String("org_name", portal.Name),
|
||||
slog.String("license_type_id", portal.User.UserLicenseTypeID))
|
||||
log.Info().Str("Username", portal.User.Username).Str("user_id", portal.User.ID).Str("org_id", portal.User.OrgID).Str("org_name", portal.Name).Str("license_type_id", portal.User.UserLicenseTypeID).Msg("Got portals data")
|
||||
|
||||
_, err = sql.UpdateOauthTokenOrg(portal.User.ID, portal.User.UserLicenseTypeID, refresh_token).Exec(ctx, PGInstance.BobDB)
|
||||
if err != nil {
|
||||
slog.Error("Failed to update oauth token portal data", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Failed to update oauth token portal data")
|
||||
return
|
||||
}
|
||||
var org *models.Organization
|
||||
|
|
@ -134,15 +129,15 @@ func updateArcgisUserData(ctx context.Context, user *models.User, access_token s
|
|||
}
|
||||
org, err = models.Organizations.Insert(&setter).One(ctx, PGInstance.BobDB)
|
||||
if err != nil {
|
||||
slog.Error("Failed to create new organization", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Failed to create new organization")
|
||||
return
|
||||
}
|
||||
slog.Info("Created new organization", slog.Int("org_id", int(org.ID)))
|
||||
log.Info().Int("org_id", int(org.ID)).Msg("Created new organization")
|
||||
case 1:
|
||||
org = orgs[0]
|
||||
slog.Info("Organization already exists")
|
||||
log.Info().Msg("Organization already exists")
|
||||
default:
|
||||
slog.Error("Got too many organizations, bailing")
|
||||
log.Error().Msg("Got too many organizations, bailing")
|
||||
return
|
||||
|
||||
}
|
||||
|
|
@ -150,32 +145,32 @@ func updateArcgisUserData(ctx context.Context, user *models.User, access_token s
|
|||
LogErrorTypeInfo(err)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
} else {
|
||||
slog.Error("Failed to query for existing org", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Failed to query for existing org")
|
||||
return
|
||||
}
|
||||
}
|
||||
err = org.AttachUser(ctx, PGInstance.BobDB, user)
|
||||
if err != nil {
|
||||
slog.Error("Failed to attach user to organization", slog.String("err", err.Error()), slog.Int("user_id", int(user.ID)), slog.Int("org_id", int(org.ID)))
|
||||
log.Error().Err(err).Int("user_id", int(user.ID)).Int("org_id", int(org.ID)).Msg("Failed to attach user to organization")
|
||||
return
|
||||
}
|
||||
|
||||
search, err := client.Search("Fieldseeker")
|
||||
if err != nil {
|
||||
slog.Error("Failed to get search FieldseekerGIS data", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Failed to get search FieldseekerGIS data")
|
||||
return
|
||||
}
|
||||
var fieldseekerClient *fieldseeker.FieldSeeker
|
||||
for _, result := range search.Results {
|
||||
slog.Info("Got result", slog.String("name", result.Name))
|
||||
log.Info().Str("name", result.Name).Msg("Got result")
|
||||
if result.Name == "FieldSeekerGIS" {
|
||||
slog.Info("Found Fieldseeker", slog.String("url", result.URL))
|
||||
log.Info().Str("url", result.URL).Msg("Found Fieldseeker")
|
||||
setter := models.OrganizationSetter{
|
||||
FieldseekerURL: omitnull.From(result.URL),
|
||||
}
|
||||
err = org.Update(ctx, PGInstance.BobDB, &setter)
|
||||
if err != nil {
|
||||
slog.Error("Failed to create new organization", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Failed to create new organization")
|
||||
return
|
||||
}
|
||||
fieldseekerClient, err = fieldseeker.NewFieldSeeker(
|
||||
|
|
@ -183,14 +178,14 @@ func updateArcgisUserData(ctx context.Context, user *models.User, access_token s
|
|||
result.URL,
|
||||
)
|
||||
if err != nil {
|
||||
slog.Error("Failed to create fieldseeker client", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Failed to create fieldseeker client")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
arcgis_id, ok := org.ArcgisID.Get()
|
||||
if !ok {
|
||||
slog.Error("Cannot get webhooks - ArcGIS ID is null", slog.Int("org.id", int(org.ID)))
|
||||
log.Error().Int("org.id", int(org.ID)).Msg("Cannot get webhooks - ArcGIS ID is null")
|
||||
}
|
||||
client.Context = &arcgis_id
|
||||
maybeCreateWebhook(ctx, fieldseekerClient)
|
||||
|
|
@ -201,13 +196,13 @@ func updateArcgisUserData(ctx context.Context, user *models.User, access_token s
|
|||
func maybeCreateWebhook(ctx context.Context, client *fieldseeker.FieldSeeker) {
|
||||
webhooks, err := client.WebhookList()
|
||||
if err != nil {
|
||||
slog.Error("Failed to get webhooks", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Failed to get webhooks")
|
||||
}
|
||||
for _, hook := range webhooks {
|
||||
if hook.Name == "Nidus Sync" {
|
||||
slog.Info("Found nidus sync hook")
|
||||
log.Info().Msg("Found nidus sync hook")
|
||||
} else {
|
||||
slog.Info("Found webhook", slog.String("name", hook.Name))
|
||||
log.Info().Str("name", hook.Name).Msg("Found webhook")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -269,7 +264,7 @@ func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
|
|||
|
||||
oauths, err := models.OauthTokens.Query(models.SelectWhere.OauthTokens.InvalidatedAt.IsNull()).All(ctx, PGInstance.BobDB)
|
||||
if err != nil {
|
||||
slog.Error("Failed to get oauths", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Failed to get oauths")
|
||||
return
|
||||
}
|
||||
for _, oauth := range oauths {
|
||||
|
|
@ -278,14 +273,14 @@ func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
|
|||
defer wg.Done()
|
||||
err := maintainOAuth(workerCtx, oauth)
|
||||
if err != nil {
|
||||
slog.Error("Crashed oauth maintenance goroutine", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Crashed oauth maintenance goroutine")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
orgs, err := models.Organizations.Query().All(ctx, PGInstance.BobDB)
|
||||
if err != nil {
|
||||
slog.Error("Failed to get orgs", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Failed to get orgs")
|
||||
return
|
||||
}
|
||||
for _, org := range orgs {
|
||||
|
|
@ -294,19 +289,19 @@ func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
|
|||
defer wg.Done()
|
||||
err := periodicallyExportFieldseeker(workerCtx, org)
|
||||
if err != nil {
|
||||
slog.Error("Crashed fieldseeker export goroutine", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Crashed fieldseeker export goroutine")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
slog.Info("Exiting refresh worker...")
|
||||
log.Info().Msg("Exiting refresh worker...")
|
||||
cancel()
|
||||
wg.Wait()
|
||||
return
|
||||
case <-newOauthCh:
|
||||
slog.Info("Updating oauth background work")
|
||||
log.Info().Msg("Updating oauth background work")
|
||||
cancel()
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
@ -325,7 +320,7 @@ func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, la
|
|||
if err != nil {
|
||||
return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %v", layer.Name, layer.ID, err)
|
||||
}
|
||||
slog.Info("Starting on layer", slog.String("name", layer.Name), slog.Int("id", layer.ID))
|
||||
log.Info().Str("name", layer.Name).Int("id", layer.ID).Msg("Starting on layer")
|
||||
if count.Count == 0 {
|
||||
return stats, nil
|
||||
}
|
||||
|
|
@ -350,7 +345,7 @@ func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, la
|
|||
if err != nil {
|
||||
filename := fmt.Sprintf("failure-%s-%d-%d.json", layer.Name, layer.ID, offset)
|
||||
saveRawQuery(fssync, layer, query, filename)
|
||||
slog.Error("Faield to save DB records", slog.String("err", err.Error()))
|
||||
log.Error().Err(err).Msg("Faild to save DB records")
|
||||
return SyncStats{}, fmt.Errorf("Failed to save records: %v", err)
|
||||
}
|
||||
return SyncStats{
|
||||
|
|
@ -369,7 +364,7 @@ func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, la
|
|||
stats.Updates += r.Updates
|
||||
stats.Unchanged += r.Unchanged
|
||||
}
|
||||
slog.Info("Finished layer", slog.Int("inserts", stats.Inserts), slog.Int("updates", stats.Updates), slog.Int("no change", stats.Unchanged))
|
||||
log.Info().Int("inserts", stats.Inserts).Int("updates", stats.Updates).Int("no change", stats.Unchanged).Msg("Finished layer")
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
|
|
@ -405,13 +400,13 @@ func periodicallyExportFieldseeker(ctx context.Context, org *models.Organization
|
|||
if err != nil {
|
||||
return fmt.Errorf("Failed to export Fieldseeker data: %v", err)
|
||||
}
|
||||
slog.Info("Completed exporting data, waiting 15 minutes to go agoin.")
|
||||
log.Info().Msg("Completed exporting data, waiting 15 minutes to go agoin.")
|
||||
pollTicker = time.NewTicker(15 * time.Minute)
|
||||
}
|
||||
}
|
||||
}
|
||||
func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth *models.OauthToken) error {
|
||||
slog.Info("Update Fieldseeker data")
|
||||
log.Info().Msg("Update Fieldseeker data")
|
||||
ar := arcgis.NewArcGIS(
|
||||
arcgis.AuthenticatorOAuth{
|
||||
AccessToken: oauth.AccessToken,
|
||||
|
|
@ -470,8 +465,8 @@ func maintainOAuth(ctx context.Context, oauth *models.OauthToken) error {
|
|||
if oauth.RefreshTokenExpires.Before(time.Now()) {
|
||||
refreshTokenDelay = 0
|
||||
}
|
||||
slog.Info("Need to refresh access token", slog.Int("id", int(oauth.ID)), slog.Float64("seconds", accessTokenDelay.Seconds()), slog.String("access_token", oauth.AccessToken))
|
||||
slog.Info("Need to refresh refresh token", slog.Int("id", int(oauth.ID)), slog.Float64("seconds", refreshTokenDelay.Seconds()), slog.String("refresh_token", oauth.RefreshToken))
|
||||
log.Info().Int("id", int(oauth.ID)).Float64("seconds", accessTokenDelay.Seconds()).Str("access_token", oauth.AccessToken).Msg("Need to refresh access token")
|
||||
log.Info().Int("id", int(oauth.ID)).Float64("seconds", refreshTokenDelay.Seconds()).Str("refresh_token", oauth.RefreshToken).Msg("Need to refresh refresh token")
|
||||
accessTokenTicker := time.NewTicker(accessTokenDelay)
|
||||
refreshTokenTicker := time.NewTicker(refreshTokenDelay)
|
||||
select {
|
||||
|
|
@ -502,15 +497,15 @@ func markTokenFailed(ctx context.Context, oauth *models.OauthToken) {
|
|||
}
|
||||
err := oauth.Update(ctx, PGInstance.BobDB, &oauthSetter)
|
||||
if err != nil {
|
||||
slog.Error("Failed to mark token failed", slog.String("err", err.Error()))
|
||||
log.Error().Str("err", err.Error()).Msg("Failed to mark token failed")
|
||||
}
|
||||
user, err := models.FindUser(ctx, PGInstance.BobDB, oauth.UserID)
|
||||
if err != nil {
|
||||
slog.Error("Failed to get oauth user", slog.String("err", err.Error()))
|
||||
log.Error().Str("err", err.Error()).Msg("Failed to get oauth user")
|
||||
return
|
||||
}
|
||||
notifyOauthInvalid(ctx, user)
|
||||
slog.Info("Marked oauth token invalid", slog.Int("id", int(oauth.ID)))
|
||||
log.Info().Int("id", int(oauth.ID)).Msg("Marked oauth token invalid")
|
||||
}
|
||||
|
||||
// Update the access token to keep it fresh and alive
|
||||
|
|
@ -542,7 +537,7 @@ func refreshAccessToken(ctx context.Context, oauth *models.OauthToken) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("Failed to update oauth in database: %v", err)
|
||||
}
|
||||
slog.Info("Updated oauth token", slog.Int("oauth token id", int(oauth.ID)))
|
||||
log.Info().Int("oauth token id", int(oauth.ID)).Msg("Updated oauth token")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -576,7 +571,7 @@ func refreshRefreshToken(ctx context.Context, oauth *models.OauthToken) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("Failed to update oauth in database: %v", err)
|
||||
}
|
||||
slog.Info("Updated oauth token", slog.Int("oauth token id", int(oauth.ID)))
|
||||
log.Info().Int("oauth token id", int(oauth.ID)).Msg("Updated oauth token")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -587,14 +582,14 @@ func newTimestampedFilename(prefix, suffix string) string {
|
|||
|
||||
func handleTokenRequest(ctx context.Context, req *http.Request) (*OAuthTokenResponse, error) {
|
||||
client := http.Client{}
|
||||
slog.Info("POST", slog.String("url", req.URL.String()))
|
||||
log.Info().Str("url", req.URL.String()).Msg("POST")
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to do request: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
bodyBytes, err := io.ReadAll(resp.Body)
|
||||
slog.Info("Token request", slog.Int("status", resp.StatusCode))
|
||||
log.Info().Int("status", resp.StatusCode).Msg("Token request")
|
||||
filename := newTimestampedFilename("token", ".json")
|
||||
saveResponse(bodyBytes, filename)
|
||||
if resp.StatusCode >= http.StatusBadRequest {
|
||||
|
|
@ -631,65 +626,56 @@ func handleTokenRequest(ctx context.Context, req *http.Request) (*OAuthTokenResp
|
|||
))
|
||||
}
|
||||
}
|
||||
slog.Info("Oauth token acquired",
|
||||
slog.String("refresh token", tokenResponse.RefreshToken),
|
||||
slog.String("access token", tokenResponse.AccessToken),
|
||||
slog.Int("access expires", tokenResponse.ExpiresIn),
|
||||
slog.Int("refresh expires", tokenResponse.RefreshTokenExpiresIn),
|
||||
)
|
||||
log.Info().Str("refresh token", tokenResponse.RefreshToken).Str("access token", tokenResponse.AccessToken).Int("access expires", tokenResponse.ExpiresIn).Int("refresh expires", tokenResponse.RefreshTokenExpiresIn).Msg("Oauth token acquired")
|
||||
return &tokenResponse, nil
|
||||
}
|
||||
|
||||
func logResponseHeaders(resp *http.Response) {
|
||||
if resp == nil {
|
||||
slog.Info("Response is nil")
|
||||
log.Info().Msg("Response is nil")
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("HTTP Response headers",
|
||||
"status", resp.Status,
|
||||
"statusCode", resp.StatusCode)
|
||||
log.Info().Str("status", resp.Status).Int("statusCode", resp.StatusCode).Msg("HTTP Response headers")
|
||||
|
||||
for name, values := range resp.Header {
|
||||
slog.Info("Header",
|
||||
"name", name,
|
||||
"values", values)
|
||||
log.Info().Str("name", name).Strs("values", values).Msg("Header")
|
||||
}
|
||||
}
|
||||
|
||||
func saveResponse(data []byte, filename string) {
|
||||
dest, err := os.Create(filename)
|
||||
if err != nil {
|
||||
slog.Error("Failed to create file", slog.String("filename", filename), slog.String("err", err.Error()))
|
||||
log.Error().Str("filename", filename).Str("err", err.Error()).Msg("Failed to create file")
|
||||
return
|
||||
}
|
||||
_, err = io.Copy(dest, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
slog.Error("Failed to write", slog.String("filename", filename), slog.String("err", err.Error()))
|
||||
log.Error().Str("filename", filename).Str("err", err.Error()).Msg("Failed to write")
|
||||
return
|
||||
}
|
||||
slog.Info("Wrote response", slog.String("filename", filename))
|
||||
log.Info().Str("filename", filename).Msg("Wrote response")
|
||||
}
|
||||
|
||||
func saveRawQuery(fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, query *arcgis.Query, filename string) {
|
||||
output, err := os.Create(filename)
|
||||
if err != nil {
|
||||
slog.Error("Failed to create file", slog.String("filename", filename))
|
||||
log.Error().Str("filename", filename).Msg("Failed to create file")
|
||||
return
|
||||
}
|
||||
qr, err := fssync.DoQueryRaw(
|
||||
layer.ID,
|
||||
query)
|
||||
if err != nil {
|
||||
slog.Error("Failed to do query", slog.String("err", err.Error()))
|
||||
log.Error().Str("err", err.Error()).Msg("Failed to do query")
|
||||
return
|
||||
}
|
||||
_, err = output.Write(qr)
|
||||
if err != nil {
|
||||
slog.Error("Failed to write results", slog.String("err", err.Error()))
|
||||
log.Error().Str("err", err.Error()).Msg("Failed to write results")
|
||||
return
|
||||
}
|
||||
slog.Info("Wrote failed query", slog.String("filename", filename))
|
||||
log.Info().Str("filename", filename).Msg("Wrote failed query")
|
||||
}
|
||||
|
||||
func saveOrUpdateDBRecords(ctx context.Context, table string, qr *arcgis.QueryResult, org_id int32) (int, int, error) {
|
||||
|
|
@ -867,7 +853,7 @@ func hasUpdates(row map[string]string, feature arcgis.Feature) bool {
|
|||
} else if len(rowdata) > 0 {
|
||||
return true
|
||||
} else {
|
||||
slog.Error("Looks like our original value is nil, but our row value is something non-empty with a zero length. Need a programmer to look into this.")
|
||||
log.Error().Msg("Looks like our original value is nil, but our row value is something non-empty with a zero length. Need a programmer to look into this.")
|
||||
}
|
||||
}
|
||||
// check strings first, their simplest
|
||||
|
|
@ -883,7 +869,7 @@ func hasUpdates(row map[string]string, feature arcgis.Feature) bool {
|
|||
}
|
||||
rowAsInt, err := strconv.Atoi(rowdata)
|
||||
if err != nil {
|
||||
slog.Error(fmt.Sprintf("Failed to convert '%s' to an int to compare against %v for %v", rowdata, featureAsInt, key))
|
||||
log.Error().Msg(fmt.Sprintf("Failed to convert '%s' to an int to compare against %v for %v", rowdata, featureAsInt, key))
|
||||
}
|
||||
if rowAsInt != featureAsInt {
|
||||
return true
|
||||
|
|
@ -897,7 +883,7 @@ func hasUpdates(row map[string]string, feature arcgis.Feature) bool {
|
|||
}
|
||||
rowAsFloat, err := strconv.ParseFloat(rowdata, 64)
|
||||
if err != nil {
|
||||
slog.Error(fmt.Sprintf("Failed to convert '%s' to a float64 to compare against %v for %v", rowdata, featureAsFloat, key))
|
||||
log.Error().Msg(fmt.Sprintf("Failed to convert '%s' to a float64 to compare against %v for %v", rowdata, featureAsFloat, key))
|
||||
}
|
||||
if rowAsFloat != featureAsFloat {
|
||||
return true
|
||||
|
|
@ -905,8 +891,8 @@ func hasUpdates(row map[string]string, feature arcgis.Feature) bool {
|
|||
continue
|
||||
}
|
||||
}
|
||||
slog.Info(fmt.Sprintf("key: %s\tvalue: %v (type %T)\trow: %s\n", key, value, value, rowdata))
|
||||
slog.Error("we've hit a point where we can't tell if we have an update or not, need a programmer to look at the above")
|
||||
log.Info().Msg(fmt.Sprintf("key: %s\tvalue: %v (type %T)\trow: %s\n", key, value, value, rowdata))
|
||||
log.Error().Msg("we've hit a point where we can't tell if we have an update or not, need a programmer to look at the above")
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue