Rip apart several new packages for inter-dependence

This will help make it clear what depends on what for rendering html
pages
This commit is contained in:
Eli Ribble 2026-01-07 16:07:51 +00:00
parent 4c23eba5d7
commit 572b8a9de9
11 changed files with 277 additions and 263 deletions

View file

@ -1,4 +1,4 @@
package main package background
import ( import (
"bytes" "bytes"
@ -22,11 +22,14 @@ import (
"github.com/Gleipnir-Technology/arcgis-go" "github.com/Gleipnir-Technology/arcgis-go"
"github.com/Gleipnir-Technology/arcgis-go/fieldseeker" "github.com/Gleipnir-Technology/arcgis-go/fieldseeker"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/db/sql" "github.com/Gleipnir-Technology/nidus-sync/db/sql"
"github.com/Gleipnir-Technology/nidus-sync/debug" "github.com/Gleipnir-Technology/nidus-sync/debug"
"github.com/Gleipnir-Technology/nidus-sync/h3utils"
"github.com/Gleipnir-Technology/nidus-sync/notification"
"github.com/aarondl/opt/omit" "github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull" "github.com/aarondl/opt/omitnull"
"github.com/alitto/pond/v2" "github.com/alitto/pond/v2"
@ -63,39 +66,133 @@ type OAuthTokenResponse struct {
Username string `json:"username"` Username string `json:"username"`
} }
// Build the ArcGIS authorization URL with PKCE func HandleOauthAccessCode(ctx context.Context, user *models.User, code string) error {
func buildArcGISAuthURL(clientID string) string { baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/authorize/"
params := url.Values{} //params.Add("code_verifier", "S256")
params.Add("client_id", clientID)
params.Add("redirect_uri", redirectURL())
params.Add("response_type", "code")
//params.Add("code_challenge", generateCodeChallenge(codeVerifier))
//params.Add("code_challenge_method", "S256")
// See https://developers.arcgis.com/rest/users-groups-and-items/token/ form := url.Values{
// expiration is defined in minutes "grant_type": []string{"authorization_code"},
var expiration int "code": []string{code},
if IsProductionEnvironment() { "client_id": []string{config.ClientID},
// 2 weeks is the maximum allowed "redirect_uri": []string{config.RedirectURL()},
expiration = 20160
} else {
expiration = 20
} }
params.Add("expiration", strconv.Itoa(expiration))
return baseURL + "?" + params.Encode() req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode()))
if err != nil {
return fmt.Errorf("Failed to create request: %w", err)
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
token, err := handleTokenRequest(ctx, req)
if err != nil {
return fmt.Errorf("Failed to exchange authorization code for token: %w", err)
}
accessExpires := futureUTCTimestamp(token.ExpiresIn)
refreshExpires := futureUTCTimestamp(token.RefreshTokenExpiresIn)
setter := models.OauthTokenSetter{
AccessToken: omit.From(token.AccessToken),
AccessTokenExpires: omit.From(accessExpires),
RefreshToken: omit.From(token.RefreshToken),
RefreshTokenExpires: omit.From(refreshExpires),
Username: omit.From(token.Username),
}
err = user.InsertUserOauthTokens(ctx, db.PGInstance.BobDB, &setter)
if err != nil {
return fmt.Errorf("Failed to save token to database: %w", err)
}
go updateArcgisUserData(context.Background(), user, token.AccessToken, accessExpires, token.RefreshToken, refreshExpires)
return nil
}
func HasFieldseekerConnection(ctx context.Context, user *models.User) (bool, error) {
result, err := sql.OauthTokenByUserId(user.ID).All(ctx, db.PGInstance.BobDB)
if err != nil {
return false, err
}
return len(result) > 0, nil
}
func IsSyncOngoing(org_id int32) bool {
return syncStatusByOrg[org_id]
}
// This is a goroutine that is in charge of getting Fieldseeker data and keeping it fresh.
func RefreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
syncStatusByOrg = make(map[int32]bool, 0)
for {
workerCtx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
oauths, err := models.OauthTokens.Query(models.SelectWhere.OauthTokens.InvalidatedAt.IsNull()).All(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to get oauths")
return
}
for _, oauth := range oauths {
wg.Add(1)
go func() {
defer wg.Done()
err := maintainOAuth(workerCtx, oauth)
if err != nil {
markTokenFailed(ctx, oauth)
if errors.Is(err, arcgis.InvalidatedRefreshTokenError) {
log.Info().Int("oauth_token.id", int(oauth.ID)).Msg("Marked invalid by the server")
} else {
debug.LogErrorTypeInfo(err)
log.Error().Err(err).Msg("Crashed oauth maintenance goroutine")
}
}
}()
}
orgs, err := models.Organizations.Query().All(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to get orgs")
return
}
for _, org := range orgs {
wg.Add(1)
go func() {
defer wg.Done()
err := periodicallyExportFieldseeker(workerCtx, org)
if err != nil {
if errors.Is(err, &NoOAuthForOrg{}) {
log.Info().Int("organization_id", int(org.ID)).Msg("No oauth available for organization, exiting exporter.")
return
}
log.Error().Err(err).Msg("Crashed fieldseeker export goroutine")
}
}()
}
select {
case <-ctx.Done():
log.Info().Msg("Exiting refresh worker...")
cancel()
wg.Wait()
return
case <-newOauthCh:
log.Info().Msg("Updating oauth background work")
cancel()
wg.Wait()
}
}
}
type SyncStats struct {
Inserts uint
Updates uint
Unchanged uint
} }
func downloadFieldseekerSchema(ctx context.Context, fieldseekerClient *fieldseeker.FieldSeeker, arcgis_id string) { func downloadFieldseekerSchema(ctx context.Context, fieldseekerClient *fieldseeker.FieldSeeker, arcgis_id string) {
for _, layer := range fieldseekerClient.FeatureServerLayers() { for _, layer := range fieldseekerClient.FeatureServerLayers() {
err := os.MkdirAll(filepath.Join(FieldseekerSchemaDirectory, arcgis_id), os.ModePerm) err := os.MkdirAll(filepath.Join(config.FieldseekerSchemaDirectory, arcgis_id), os.ModePerm)
if err != nil { if err != nil {
log.Error().Err(err).Msg("Failed to create parent directory") log.Error().Err(err).Msg("Failed to create parent directory")
return return
} }
output, err := os.Create(fmt.Sprintf("%s/%s/%s.json", FieldseekerSchemaDirectory, arcgis_id, layer.Name)) output, err := os.Create(fmt.Sprintf("%s/%s/%s.json", config.FieldseekerSchemaDirectory, arcgis_id, layer.Name))
if err != nil { if err != nil {
log.Error().Err(err).Msg("Failed to open output") log.Error().Err(err).Msg("Failed to open output")
return return
@ -131,10 +228,6 @@ func generateCodeVerifier() string {
return base64.RawURLEncoding.EncodeToString(bytes) return base64.RawURLEncoding.EncodeToString(bytes)
} }
func isSyncOngoing(org_id int32) bool {
return syncStatusByOrg[org_id]
}
// Find out what we can about this user // Find out what we can about this user
func updateArcgisUserData(ctx context.Context, user *models.User, access_token string, access_token_expires time.Time, refresh_token string, refresh_token_expires time.Time) { func updateArcgisUserData(ctx context.Context, user *models.User, access_token string, access_token_expires time.Time, refresh_token string, refresh_token_expires time.Time) {
client := arcgis.NewArcGIS( client := arcgis.NewArcGIS(
@ -202,7 +295,7 @@ func updateArcgisUserData(ctx context.Context, user *models.User, access_token s
client.Context = &arcgis_id client.Context = &arcgis_id
maybeCreateWebhook(ctx, fieldseekerClient) maybeCreateWebhook(ctx, fieldseekerClient)
downloadFieldseekerSchema(ctx, fieldseekerClient, arcgis_id) downloadFieldseekerSchema(ctx, fieldseekerClient, arcgis_id)
clearNotificationsOauth(ctx, user) notification.ClearOauth(ctx, user)
NewOAuthTokenChannel <- struct{}{} NewOAuthTokenChannel <- struct{}{}
} }
@ -220,124 +313,6 @@ func maybeCreateWebhook(ctx context.Context, client *fieldseeker.FieldSeeker) {
} }
} }
func handleOauthAccessCode(ctx context.Context, user *models.User, code string) error {
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/token/"
//params.Add("code_verifier", "S256")
form := url.Values{
"grant_type": []string{"authorization_code"},
"code": []string{code},
"client_id": []string{ClientID},
"redirect_uri": []string{redirectURL()},
}
req, err := http.NewRequest("POST", baseURL, strings.NewReader(form.Encode()))
if err != nil {
return fmt.Errorf("Failed to create request: %w", err)
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
token, err := handleTokenRequest(ctx, req)
if err != nil {
return fmt.Errorf("Failed to exchange authorization code for token: %w", err)
}
accessExpires := futureUTCTimestamp(token.ExpiresIn)
refreshExpires := futureUTCTimestamp(token.RefreshTokenExpiresIn)
setter := models.OauthTokenSetter{
AccessToken: omit.From(token.AccessToken),
AccessTokenExpires: omit.From(accessExpires),
RefreshToken: omit.From(token.RefreshToken),
RefreshTokenExpires: omit.From(refreshExpires),
Username: omit.From(token.Username),
}
err = user.InsertUserOauthTokens(ctx, db.PGInstance.BobDB, &setter)
if err != nil {
return fmt.Errorf("Failed to save token to database: %w", err)
}
go updateArcgisUserData(context.Background(), user, token.AccessToken, accessExpires, token.RefreshToken, refreshExpires)
return nil
}
func hasFieldseekerConnection(ctx context.Context, user *models.User) (bool, error) {
result, err := sql.OauthTokenByUserId(user.ID).All(ctx, db.PGInstance.BobDB)
if err != nil {
return false, err
}
return len(result) > 0, nil
}
func redirectURL() string {
return urlSync("/arcgis/oauth/callback")
}
// This is a goroutine that is in charge of getting Fieldseeker data and keeping it fresh.
func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
syncStatusByOrg = make(map[int32]bool, 0)
for {
workerCtx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
oauths, err := models.OauthTokens.Query(models.SelectWhere.OauthTokens.InvalidatedAt.IsNull()).All(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to get oauths")
return
}
for _, oauth := range oauths {
wg.Add(1)
go func() {
defer wg.Done()
err := maintainOAuth(workerCtx, oauth)
if err != nil {
markTokenFailed(ctx, oauth)
if errors.Is(err, arcgis.InvalidatedRefreshTokenError) {
log.Info().Int("oauth_token.id", int(oauth.ID)).Msg("Marked invalid by the server")
} else {
debug.LogErrorTypeInfo(err)
log.Error().Err(err).Msg("Crashed oauth maintenance goroutine")
}
}
}()
}
orgs, err := models.Organizations.Query().All(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to get orgs")
return
}
for _, org := range orgs {
wg.Add(1)
go func() {
defer wg.Done()
err := periodicallyExportFieldseeker(workerCtx, org)
if err != nil {
if errors.Is(err, &NoOAuthForOrg{}) {
log.Info().Int("organization_id", int(org.ID)).Msg("No oauth available for organization, exiting exporter.")
return
}
log.Error().Err(err).Msg("Crashed fieldseeker export goroutine")
}
}()
}
select {
case <-ctx.Done():
log.Info().Msg("Exiting refresh worker...")
cancel()
wg.Wait()
return
case <-newOauthCh:
log.Info().Msg("Updating oauth background work")
cancel()
wg.Wait()
}
}
}
type SyncStats struct {
Inserts uint
Updates uint
Unchanged uint
}
func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (SyncStats, error) { func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (SyncStats, error) {
var stats SyncStats var stats SyncStats
count, err := fssync.QueryCount(layer.ID) count, err := fssync.QueryCount(layer.ID)
@ -544,7 +519,7 @@ func markTokenFailed(ctx context.Context, oauth *models.OauthToken) {
log.Error().Str("err", err.Error()).Msg("Failed to get oauth user") log.Error().Str("err", err.Error()).Msg("Failed to get oauth user")
return return
} }
notifyOauthInvalid(ctx, user) notification.NotifyOauthInvalid(ctx, user)
log.Info().Int("id", int(oauth.ID)).Msg("Marked oauth token invalid") log.Info().Int("id", int(oauth.ID)).Msg("Marked oauth token invalid")
} }
@ -554,7 +529,7 @@ func refreshAccessToken(ctx context.Context, oauth *models.OauthToken) error {
form := url.Values{ form := url.Values{
"grant_type": []string{"refresh_token"}, "grant_type": []string{"refresh_token"},
"client_id": []string{ClientID}, "client_id": []string{config.ClientID},
"refresh_token": []string{oauth.RefreshToken}, "refresh_token": []string{oauth.RefreshToken},
} }
@ -587,8 +562,8 @@ func refreshRefreshToken(ctx context.Context, oauth *models.OauthToken) error {
form := url.Values{ form := url.Values{
"grant_type": []string{"exchange_refresh_token"}, "grant_type": []string{"exchange_refresh_token"},
"client_id": []string{ClientID}, "client_id": []string{config.ClientID},
"redirect_uri": []string{redirectURL()}, "redirect_uri": []string{config.RedirectURL()},
"refresh_token": []string{oauth.RefreshToken}, "refresh_token": []string{oauth.RefreshToken},
} }
@ -1073,7 +1048,7 @@ func updateSummaryTables(ctx context.Context, org *models.Organization) {
if p.H3cell.IsNull() { if p.H3cell.IsNull() {
continue continue
} }
cell, err := toH3Cell(p.H3cell.MustGet()) cell, err := h3utils.ToCell(p.H3cell.MustGet())
if err != nil { if err != nil {
log.Error().Err(err).Msg("Failed to get geometry point") log.Error().Err(err).Msg("Failed to get geometry point")
continue continue
@ -1088,7 +1063,7 @@ func updateSummaryTables(ctx context.Context, org *models.Organization) {
var to_insert []bob.Mod[*dialect.InsertQuery] = make([]bob.Mod[*dialect.InsertQuery], 0) var to_insert []bob.Mod[*dialect.InsertQuery] = make([]bob.Mod[*dialect.InsertQuery], 0)
to_insert = append(to_insert, im.Into("h3_aggregation", "cell", "resolution", "count_", "type_", "organization_id", "geometry")) to_insert = append(to_insert, im.Into("h3_aggregation", "cell", "resolution", "count_", "type_", "organization_id", "geometry"))
for cell, count := range cellToCount { for cell, count := range cellToCount {
polygon, err := cellToPostgisGeometry(cell) polygon, err := h3utils.CellToPostgisGeometry(cell)
if err != nil { if err != nil {
log.Error().Err(err).Msg("Failed to get PostGIS geometry") log.Error().Err(err).Msg("Failed to get PostGIS geometry")
continue continue

94
config/config.go Normal file
View file

@ -0,0 +1,94 @@
package config
import (
"fmt"
"net/url"
"os"
"strconv"
)
var Bind, ClientID, ClientSecret, Environment, FieldseekerSchemaDirectory, MapboxToken, PGDSN, URLReport, URLSync, UserFilesDirectory string
// Build the ArcGIS authorization URL with PKCE
func BuildArcGISAuthURL(clientID string) string {
baseURL := "https://www.arcgis.com/sharing/rest/oauth2/authorize/"
params := url.Values{}
params.Add("client_id", clientID)
params.Add("redirect_uri", RedirectURL())
params.Add("response_type", "code")
//params.Add("code_challenge", generateCodeChallenge(codeVerifier))
//params.Add("code_challenge_method", "S256")
// See https://developers.arcgis.com/rest/users-groups-and-items/token/
// expiration is defined in minutes
var expiration int
if IsProductionEnvironment() {
// 2 weeks is the maximum allowed
expiration = 20160
} else {
expiration = 20
}
params.Add("expiration", strconv.Itoa(expiration))
return baseURL + "?" + params.Encode()
}
func IsProductionEnvironment() bool {
return Environment == "PRODUCTION"
}
func MakeURLSync(path string) string {
return fmt.Sprintf("https://%s%s", URLSync, path)
}
func Parse() error {
ClientID = os.Getenv("ARCGIS_CLIENT_ID")
if ClientID == "" {
return fmt.Errorf("You must specify a non-empty ARCGIS_CLIENT_ID")
}
ClientSecret = os.Getenv("ARCGIS_CLIENT_SECRET")
if ClientSecret == "" {
return fmt.Errorf("You must specify a non-empty ARCGIS_CLIENT_SECRET")
}
URLReport = os.Getenv("URL_REPORT")
if URLReport == "" {
return fmt.Errorf("You must specify a non-empty URL_REPORT")
}
URLSync = os.Getenv("URL_SYNC")
if URLSync == "" {
return fmt.Errorf("You must specify a non-empty URL_SYNC")
}
Bind = os.Getenv("BIND")
if Bind == "" {
Bind = ":9001"
}
Environment = os.Getenv("ENVIRONMENT")
if Environment == "" {
return fmt.Errorf("You must specify a non-empty ENVIRONMENT")
}
if !(Environment == "PRODUCTION" || Environment == "DEVELOPMENT") {
return fmt.Errorf("ENVIRONMENT should be either DEVELOPMENT or PRODUCTION")
}
MapboxToken = os.Getenv("MAPBOX_TOKEN")
if MapboxToken == "" {
return fmt.Errorf("You must specify a non-empty MAPBOX_TOKEN")
}
PGDSN = os.Getenv("POSTGRES_DSN")
if PGDSN == "" {
return fmt.Errorf("You must specify a non-empty POSTGRES_DSN")
}
FieldseekerSchemaDirectory = os.Getenv("FIELDSEEKER_SCHEMA_DIRECTORY")
if FieldseekerSchemaDirectory == "" {
return fmt.Errorf("You must specify a non-empty FIELDSEEKER_SCHEMA_DIRECTORY")
}
UserFilesDirectory = os.Getenv("USER_FILES_DIRECTORY")
if UserFilesDirectory == "" {
return fmt.Errorf("You must specify a non-empty USER_FILES_DIRECTORY")
}
return nil
}
func RedirectURL() string {
return MakeURLSync("/arcgis/oauth/callback")
}

View file

@ -76,6 +76,7 @@ func doMigrations(connection_string string) error {
} }
func InitializeDatabase(ctx context.Context, uri string) error { func InitializeDatabase(ctx context.Context, uri string) error {
log.Info().Str("dsn", uri).Msg("Connecting to database")
needs, err := needsMigrations(uri) needs, err := needsMigrations(uri)
if err != nil { if err != nil {
return fmt.Errorf("Failed to determine if migrations are needed: %w", err) return fmt.Errorf("Failed to determine if migrations are needed: %w", err)
@ -119,7 +120,6 @@ func InitializeDatabase(ctx context.Context, uri string) error {
} }
func needsMigrations(connection_string string) (*bool, error) { func needsMigrations(connection_string string) (*bool, error) {
log.Info().Str("dsn", connection_string).Msg("Connecting to database")
db, err := sql.Open("pgx", connection_string) db, err := sql.Open("pgx", connection_string)
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to open database connection: %w", err) return nil, fmt.Errorf("Failed to open database connection: %w", err)

View file

@ -10,6 +10,8 @@ import (
"strings" "strings"
"github.com/Gleipnir-Technology/nidus-sync/auth" "github.com/Gleipnir-Technology/nidus-sync/auth"
"github.com/Gleipnir-Technology/nidus-sync/background"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/htmlpage" "github.com/Gleipnir-Technology/nidus-sync/htmlpage"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
@ -19,7 +21,7 @@ import (
) )
func getArcgisOauthBegin(w http.ResponseWriter, r *http.Request) { func getArcgisOauthBegin(w http.ResponseWriter, r *http.Request) {
authURL := buildArcGISAuthURL(ClientID) authURL := config.BuildArcGISAuthURL(config.ClientID)
http.Redirect(w, r, authURL, http.StatusFound) http.Redirect(w, r, authURL, http.StatusFound)
} }
@ -35,12 +37,12 @@ func getArcgisOauthCallback(w http.ResponseWriter, r *http.Request) {
respondError(w, "You're not currently authenticated, which really shouldn't happen.", err, http.StatusUnauthorized) respondError(w, "You're not currently authenticated, which really shouldn't happen.", err, http.StatusUnauthorized)
return return
} }
err = handleOauthAccessCode(r.Context(), user, code) err = background.HandleOauthAccessCode(r.Context(), user, code)
if err != nil { if err != nil {
respondError(w, "Failed to handle access code", err, http.StatusInternalServerError) respondError(w, "Failed to handle access code", err, http.StatusInternalServerError)
return return
} }
http.Redirect(w, r, urlSync("/"), http.StatusFound) http.Redirect(w, r, config.MakeURLSync("/"), http.StatusFound)
} }
func getCellDetails(w http.ResponseWriter, r *http.Request, user *models.User) { func getCellDetails(w http.ResponseWriter, r *http.Request, user *models.User) {
@ -77,7 +79,7 @@ func getQRCodeReport(w http.ResponseWriter, r *http.Request) {
if code == "" { if code == "" {
respondError(w, "There should always be a code", nil, http.StatusBadRequest) respondError(w, "There should always be a code", nil, http.StatusBadRequest)
} }
content := urlSync("/report/" + code) content := config.MakeURLSync("/report/" + code)
// Get optional size parameter (default to 256) // Get optional size parameter (default to 256)
size := 256 size := 256
if sizeStr := r.URL.Query().Get("size"); sizeStr != "" { if sizeStr := r.URL.Query().Get("size"); sizeStr != "" {
@ -149,7 +151,7 @@ func getRoot(w http.ResponseWriter, r *http.Request) {
htmlpage.Signin(w, errorCode) htmlpage.Signin(w, errorCode)
return return
} else { } else {
has, err := hasFieldseekerConnection(r.Context(), user) has, err := background.HasFieldseekerConnection(r.Context(), user)
if err != nil { if err != nil {
respondError(w, "Failed to check for ArcGIS connection", err, http.StatusInternalServerError) respondError(w, "Failed to check for ArcGIS connection", err, http.StatusInternalServerError)
return return

View file

@ -1,4 +1,4 @@
package htmlpage package h3utils
import ( import (
"fmt" "fmt"
@ -22,14 +22,14 @@ func h3ToBoundsGeoJSON(c h3.Cell) (string, error) {
} }
*/ */
func toH3Cell(s string) (h3.Cell, error) { func ToCell(s string) (h3.Cell, error) {
c := h3.CellFromString(s) c := h3.CellFromString(s)
if !c.IsValid() { if !c.IsValid() {
return c, fmt.Errorf("Invalid cell definition '%s'", s) return c, fmt.Errorf("Invalid cell definition '%s'", s)
} }
return c, nil return c, nil
} }
func h3ToGeoJSON(indexes []h3.Cell) (interface{}, error) { func H3ToGeoJSON(indexes []h3.Cell) (interface{}, error) {
featureCollection, err := geojson2h3.ToFeatureCollection(indexes) featureCollection, err := geojson2h3.ToFeatureCollection(indexes)
if err != nil { if err != nil {
return "", fmt.Errorf("Failed to get feature collection: %w", err) return "", fmt.Errorf("Failed to get feature collection: %w", err)
@ -82,7 +82,7 @@ func getCell(x, y float64, resolution int) (h3.Cell, error) {
return h3.LatLngToCell(latLng, resolution) return h3.LatLngToCell(latLng, resolution)
} }
func cellToPostgisGeometry(c h3.Cell) (string, error) { func CellToPostgisGeometry(c h3.Cell) (string, error) {
boundary, err := h3.CellToBoundary(c) boundary, err := h3.CellToBoundary(c)
if err != nil { if err != nil {
return "", fmt.Errorf("Failed to get cell boundary: %w", err) return "", fmt.Errorf("Failed to get cell boundary: %w", err)

View file

@ -15,9 +15,13 @@ import (
"strings" "strings"
"time" "time"
"github.com/Gleipnir-Technology/nidus-sync/background"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/db/sql" "github.com/Gleipnir-Technology/nidus-sync/db/sql"
"github.com/Gleipnir-Technology/nidus-sync/h3utils"
"github.com/Gleipnir-Technology/nidus-sync/notification"
"github.com/aarondl/opt/null" "github.com/aarondl/opt/null"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -30,8 +34,6 @@ import (
//go:embed templates/* //go:embed templates/*
var embeddedFiles embed.FS var embeddedFiles embed.FS
var MapboxToken string
// Authenticated pages // Authenticated pages
var ( var (
cell = newBuiltTemplate("cell", "authenticated") cell = newBuiltTemplate("cell", "authenticated")
@ -190,7 +192,7 @@ type ServiceRequestSummary struct {
type User struct { type User struct {
DisplayName string DisplayName string
Initials string Initials string
Notifications []Notification Notifications []notification.Notification
Username string Username string
} }
@ -228,7 +230,7 @@ func bigNumber(n int) string {
} }
func contentForUser(ctx context.Context, user *models.User) (User, error) { func contentForUser(ctx context.Context, user *models.User) (User, error) {
notifications, err := notificationsForUser(ctx, user) notifications, err := notification.ForUser(ctx, user)
if err != nil { if err != nil {
return User{}, err return User{}, err
} }
@ -279,7 +281,7 @@ func Cell(ctx context.Context, w http.ResponseWriter, user *models.User, c int64
respondError(w, "Failed to get inspections by cell", err, http.StatusInternalServerError) respondError(w, "Failed to get inspections by cell", err, http.StatusInternalServerError)
return return
} }
geojson, err := h3ToGeoJSON([]h3.Cell{h3.Cell(c)}) geojson, err := h3utils.H3ToGeoJSON([]h3.Cell{h3.Cell(c)})
if err != nil { if err != nil {
respondError(w, "Failed to get boundaries", err, http.StatusInternalServerError) respondError(w, "Failed to get boundaries", err, http.StatusInternalServerError)
return return
@ -305,7 +307,7 @@ func Cell(ctx context.Context, w http.ResponseWriter, user *models.User, c int64
Lng: center.Lng, Lng: center.Lng,
}, },
GeoJSON: geojson, GeoJSON: geojson,
MapboxToken: MapboxToken, MapboxToken: config.MapboxToken,
Zoom: resolution + 5, Zoom: resolution + 5,
}, },
Treatments: treatments, Treatments: treatments,
@ -330,7 +332,7 @@ func Dashboard(ctx context.Context, w http.ResponseWriter, user *models.User) {
} else { } else {
lastSync = &sync.Created lastSync = &sync.Created
} }
is_syncing := isSyncOngoing(org.ID) is_syncing := background.IsSyncOngoing(org.ID)
inspectionCount, err := org.Mosquitoinspections().Count(ctx, db.PGInstance.BobDB) inspectionCount, err := org.Mosquitoinspections().Count(ctx, db.PGInstance.BobDB)
if err != nil { if err != nil {
respondError(w, "Failed to get inspection count", err, http.StatusInternalServerError) respondError(w, "Failed to get inspection count", err, http.StatusInternalServerError)
@ -372,7 +374,7 @@ func Dashboard(ctx context.Context, w http.ResponseWriter, user *models.User) {
IsSyncOngoing: is_syncing, IsSyncOngoing: is_syncing,
LastSync: lastSync, LastSync: lastSync,
MapData: ComponentMap{ MapData: ComponentMap{
MapboxToken: MapboxToken, MapboxToken: config.MapboxToken,
}, },
Org: org.Name.MustGet(), Org: org.Name.MustGet(),
RecentRequests: requests, RecentRequests: requests,
@ -490,7 +492,7 @@ func Source(w http.ResponseWriter, r *http.Request, user *models.User, id uuid.U
MapData: ComponentMap{ MapData: ComponentMap{
Center: latlng, Center: latlng,
//GeoJSON: //GeoJSON:
MapboxToken: MapboxToken, MapboxToken: config.MapboxToken,
Markers: []MapMarker{ Markers: []MapMarker{
MapMarker{ MapMarker{
LatLng: latlng, LatLng: latlng,

View file

@ -7,6 +7,7 @@ import (
"github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/db/sql" "github.com/Gleipnir-Technology/nidus-sync/db/sql"
"github.com/Gleipnir-Technology/nidus-sync/h3utils"
"github.com/aarondl/opt/null" "github.com/aarondl/opt/null"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -210,7 +211,7 @@ func toTemplateTrapData(trap_data models.FieldseekerTrapdatumSlice) ([]TrapData,
if r.H3cell.IsNull() { if r.H3cell.IsNull() {
continue continue
} }
cell, err := toH3Cell(r.H3cell.MustGet()) cell, err := h3utils.ToCell(r.H3cell.MustGet())
if err != nil { if err != nil {
log.Error().Err(err).Msg("Failed to get location for trap data") log.Error().Err(err).Msg("Failed to get location for trap data")
continue continue
@ -333,7 +334,7 @@ func toTemplateBreedingSource(source *models.FieldseekerPointlocation) *Breeding
log.Error().Msg("h3 cell is null") log.Error().Msg("h3 cell is null")
return nil return nil
} }
cell, err := toH3Cell(source.H3cell.MustGet()) cell, err := h3utils.ToCell(source.H3cell.MustGet())
if err != nil { if err != nil {
log.Error().Err(err).Msg("Failed to get h3 cell from point location") log.Error().Err(err).Msg("Failed to get h3 cell from point location")
return nil return nil

97
main.go
View file

@ -13,11 +13,11 @@ import (
"github.com/Gleipnir-Technology/nidus-sync/api" "github.com/Gleipnir-Technology/nidus-sync/api"
"github.com/Gleipnir-Technology/nidus-sync/auth" "github.com/Gleipnir-Technology/nidus-sync/auth"
"github.com/Gleipnir-Technology/nidus-sync/background"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/htmlpage"
"github.com/Gleipnir-Technology/nidus-sync/queue" "github.com/Gleipnir-Technology/nidus-sync/queue"
"github.com/Gleipnir-Technology/nidus-sync/report" "github.com/Gleipnir-Technology/nidus-sync/report"
"github.com/Gleipnir-Technology/nidus-sync/userfile"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware" "github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/hostrouter" "github.com/go-chi/hostrouter"
@ -25,70 +25,19 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
var ClientID, ClientSecret, Environment, FieldseekerSchemaDirectory, URLReport, URLSync string
func main() { func main() {
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
ClientID = os.Getenv("ARCGIS_CLIENT_ID") err := config.Parse()
if ClientID == "" {
log.Error().Msg("You must specify a non-empty ARCGIS_CLIENT_ID")
os.Exit(1)
}
ClientSecret = os.Getenv("ARCGIS_CLIENT_SECRET")
if ClientSecret == "" {
log.Error().Msg("You must specify a non-empty ARCGIS_CLIENT_SECRET")
os.Exit(1)
}
URLReport = os.Getenv("URL_REPORT")
if URLReport == "" {
log.Error().Msg("You must specify a non-empty URL_REPORT")
os.Exit(1)
}
URLSync = os.Getenv("URL_SYNC")
if URLSync == "" {
log.Error().Msg("You must specify a non-empty URL_SYNC")
os.Exit(1)
}
bind := os.Getenv("BIND")
if bind == "" {
bind = ":9001"
}
Environment = os.Getenv("ENVIRONMENT")
if Environment == "" {
log.Error().Msg("You must specify a non-empty ENVIRONMENT")
os.Exit(1)
}
if !(Environment == "PRODUCTION" || Environment == "DEVELOPMENT") {
log.Error().Str("ENVIRONMENT", Environment).Msg("ENVIRONMENT should be either DEVELOPMENT or PRODUCTION")
os.Exit(2)
}
htmlpage.MapboxToken = os.Getenv("MAPBOX_TOKEN")
if htmlpage.MapboxToken == "" {
log.Error().Msg("You must specify a non-empty MAPBOX_TOKEN")
os.Exit(1)
}
pg_dsn := os.Getenv("POSTGRES_DSN")
if pg_dsn == "" {
log.Error().Msg("You must specify a non-empty POSTGRES_DSN")
os.Exit(1)
}
FieldseekerSchemaDirectory = os.Getenv("FIELDSEEKER_SCHEMA_DIRECTORY")
if FieldseekerSchemaDirectory == "" {
log.Error().Msg("You must specify a non-empty FIELDSEEKER_SCHEMA_DIRECTORY")
os.Exit(1)
}
userfile.UserFilesDirectory = os.Getenv("USER_FILES_DIRECTORY")
if userfile.UserFilesDirectory == "" {
log.Error().Msg("You must specify a non-empty USER_FILES_DIRECTORY")
os.Exit(1)
}
log.Info().Msg("Starting...")
err := db.InitializeDatabase(context.TODO(), pg_dsn)
if err != nil { if err != nil {
log.Error().Str("err", err.Error()).Msg("Failed to connect to database") log.Error().Err(err).Msg("Failed to parse config")
os.Exit(1)
}
log.Info().Msg("Starting...")
err = db.InitializeDatabase(context.TODO(), config.PGDSN)
if err != nil {
log.Error().Err(err).Msg("Failed to connect to database")
os.Exit(2) os.Exit(2)
} }
@ -103,25 +52,25 @@ func main() {
// Set up routing by hostname // Set up routing by hostname
sr := syncRouter() sr := syncRouter()
hr.Map("", sr) // default hr.Map("", sr) // default
hr.Map("*", sr) // default hr.Map("*", sr) // default
hr.Map(URLReport, report.Router()) // report.mosquitoes.online hr.Map(config.URLReport, report.Router()) // report.mosquitoes.online
hr.Map(URLSync, sr) hr.Map(config.URLSync, sr)
r.Mount("/", hr) r.Mount("/", hr)
log.Info().Str("report url", URLReport).Str("sync url", URLSync).Msg("Serving at URLs") log.Info().Str("report url", config.URLReport).Str("sync url", config.URLSync).Msg("Serving at URLs")
// Start up background processes // Start up background processes
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
NewOAuthTokenChannel = make(chan struct{}, 10) background.NewOAuthTokenChannel = make(chan struct{}, 10)
var waitGroup sync.WaitGroup var waitGroup sync.WaitGroup
waitGroup.Add(1) waitGroup.Add(1)
go func() { go func() {
defer waitGroup.Done() defer waitGroup.Done()
refreshFieldseekerData(ctx, NewOAuthTokenChannel) background.RefreshFieldseekerData(ctx, background.NewOAuthTokenChannel)
}() }()
waitGroup.Add(1) waitGroup.Add(1)
@ -131,11 +80,11 @@ func main() {
}() }()
server := &http.Server{ server := &http.Server{
Addr: bind, Addr: config.Bind,
Handler: r, Handler: r,
} }
go func() { go func() {
log.Info().Str("address", bind).Msg("Serving HTTP requests") log.Info().Str("address", config.Bind).Msg("Serving HTTP requests")
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Error().Str("err", err.Error()).Msg("HTTP Server Error") log.Error().Str("err", err.Error()).Msg("HTTP Server Error")
} }
@ -225,10 +174,6 @@ func syncRouter() chi.Router {
return r return r
} }
func IsProductionEnvironment() bool {
return Environment == "PRODUCTION"
}
func LoggerMiddleware(logger *zerolog.Logger) func(next http.Handler) http.Handler { func LoggerMiddleware(logger *zerolog.Logger) func(next http.Handler) http.Handler {
return func(next http.Handler) http.Handler { return func(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) { fn := func(w http.ResponseWriter, r *http.Request) {
@ -280,7 +225,3 @@ func LoggerMiddleware(logger *zerolog.Logger) func(next http.Handler) http.Handl
return http.HandlerFunc(fn) return http.HandlerFunc(fn)
} }
} }
func urlSync(path string) string {
return fmt.Sprintf("https://%s%s", URLSync, path)
}

View file

@ -1,4 +1,4 @@
package htmlpage package notification
import ( import (
"context" "context"
@ -27,7 +27,7 @@ type Notification struct {
} }
// Clear all notifications for a given user with the given path // Clear all notifications for a given user with the given path
func clearNotificationsOauth(ctx context.Context, user *models.User) { func ClearOauth(ctx context.Context, user *models.User) {
setter := models.NotificationSetter{ setter := models.NotificationSetter{
ResolvedAt: omitnull.From(time.Now()), ResolvedAt: omitnull.From(time.Now()),
} }
@ -43,7 +43,7 @@ func clearNotificationsOauth(ctx context.Context, user *models.User) {
//).UpdateAll() //).UpdateAll()
} }
func notifyOauthInvalid(ctx context.Context, user *models.User) { func NotifyOauthInvalid(ctx context.Context, user *models.User) {
msg := "Oauth token invalidated" msg := "Oauth token invalidated"
notificationSetter := models.NotificationSetter{ notificationSetter := models.NotificationSetter{
Created: omit.From(time.Now()), Created: omit.From(time.Now()),
@ -63,7 +63,7 @@ func notifyOauthInvalid(ctx context.Context, user *models.User) {
} }
} }
func notificationsForUser(ctx context.Context, u *models.User) ([]Notification, error) { func ForUser(ctx context.Context, u *models.User) ([]Notification, error) {
results := make([]Notification, 0) results := make([]Notification, 0)
notifications, err := u.UserNotifications( notifications, err := u.UserNotifications(
models.SelectWhere.Notifications.ResolvedAt.IsNull(), models.SelectWhere.Notifications.ResolvedAt.IsNull(),

View file

@ -8,11 +8,11 @@ import (
"log" "log"
"os" "os"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/label-studio" "github.com/Gleipnir-Technology/nidus-sync/label-studio"
"github.com/Gleipnir-Technology/nidus-sync/minio" "github.com/Gleipnir-Technology/nidus-sync/minio"
"github.com/Gleipnir-Technology/nidus-sync/userfile"
"github.com/google/uuid" "github.com/google/uuid"
) )
@ -130,7 +130,7 @@ func processLabelTask(ctx context.Context, minioClient *minio.Client, minioBucke
func createTask(client *labelstudio.Client, project *labelstudio.Project, minioClient *minio.Client, bucket string, customer string, note *models.NoteAudio) error { func createTask(client *labelstudio.Client, project *labelstudio.Project, minioClient *minio.Client, bucket string, customer string, note *models.NoteAudio) error {
audioRef := fmt.Sprintf("s3://%s/%s-normalized.m4a", bucket, note.UUID) audioRef := fmt.Sprintf("s3://%s/%s-normalized.m4a", bucket, note.UUID)
audioFile := fmt.Sprintf("%s/%s-normalized.m4a", userfile.UserFilesDirectory, note.UUID) audioFile := fmt.Sprintf("%s/%s-normalized.m4a", config.UserFilesDirectory, note.UUID)
uploadPath := fmt.Sprintf("%s-normalized.m4a", note.UUID) uploadPath := fmt.Sprintf("%s-normalized.m4a", note.UUID)
if !minioClient.ObjectExists(bucket, uploadPath) { if !minioClient.ObjectExists(bucket, uploadPath) {

View file

@ -6,22 +6,21 @@ import (
"log" "log"
"os" "os"
"github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/google/uuid" "github.com/google/uuid"
) )
var UserFilesDirectory string
func AudioFileContentPathRaw(audioUUID string) string { func AudioFileContentPathRaw(audioUUID string) string {
return fmt.Sprintf("%s/%s.m4a", UserFilesDirectory, audioUUID) return fmt.Sprintf("%s/%s.m4a", config.UserFilesDirectory, audioUUID)
} }
func AudioFileContentPathMp3(audioUUID string) string { func AudioFileContentPathMp3(audioUUID string) string {
return fmt.Sprintf("%s/%s.mp3", UserFilesDirectory, audioUUID) return fmt.Sprintf("%s/%s.mp3", config.UserFilesDirectory, audioUUID)
} }
func AudioFileContentPathNormalized(audioUUID string) string { func AudioFileContentPathNormalized(audioUUID string) string {
return fmt.Sprintf("%s/%s-normalized.m4a", UserFilesDirectory, audioUUID) return fmt.Sprintf("%s/%s-normalized.m4a", config.UserFilesDirectory, audioUUID)
} }
func AudioFileContentPathOgg(audioUUID string) string { func AudioFileContentPathOgg(audioUUID string) string {
return fmt.Sprintf("%s/%s.ogg", UserFilesDirectory, audioUUID) return fmt.Sprintf("%s/%s.ogg", config.UserFilesDirectory, audioUUID)
} }
func AudioFileContentWrite(audioUUID uuid.UUID, body io.Reader) error { func AudioFileContentWrite(audioUUID uuid.UUID, body io.Reader) error {
// Create file in configured directory // Create file in configured directory
@ -42,7 +41,7 @@ func AudioFileContentWrite(audioUUID uuid.UUID, body io.Reader) error {
return nil return nil
} }
func ImageFileContentPathRaw(uid string) string { func ImageFileContentPathRaw(uid string) string {
return fmt.Sprintf("%s/%s.raw", UserFilesDirectory, uid) return fmt.Sprintf("%s/%s.raw", config.UserFilesDirectory, uid)
} }
func ImageFileContentWrite(uid uuid.UUID, body io.Reader) error { func ImageFileContentWrite(uid uuid.UUID, body io.Reader) error {
filepath := ImageFileContentPathRaw(uid.String()) filepath := ImageFileContentPathRaw(uid.String())