diff --git a/arcgis.go b/arcgis.go index 0c2d0d4a..f77d1c1a 100644 --- a/arcgis.go +++ b/arcgis.go @@ -26,6 +26,7 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/sql" "github.com/aarondl/opt/omit" "github.com/aarondl/opt/omitnull" + "github.com/alitto/pond/v2" "github.com/jackc/pgx/v5" ) @@ -266,45 +267,63 @@ func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) { } } -func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (int, int, int, error) { - inserts := 0 - updates := 0 - offset := 0 +type SyncStats struct { + Inserts int + Updates int + Unchanged int +} + +func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (SyncStats, error) { + var stats SyncStats count, err := fssync.QueryCount(layer.ID) if err != nil { - return inserts, updates, 0, fmt.Errorf("Failed to get counts for layer %s (%d): %v", layer.Name, layer.ID, err) + return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %v", layer.Name, layer.ID, err) } slog.Info("Starting on layer", slog.String("name", layer.Name), slog.Int("id", layer.ID)) if count.Count == 0 { - return inserts, updates, 0, nil + return stats, nil } - for { - if offset >= count.Count { - break - } - query := arcgis.NewQuery() - query.ResultRecordCount = fssync.MaxRecordCount() - query.ResultOffset = offset - query.SpatialReference = "4326" - query.OutFields = "*" - query.Where = "1=1" - qr, err := fssync.DoQuery( - layer.ID, - query) - if err != nil { - return inserts, updates, count.Count - inserts - updates, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %v", layer.Name, layer.ID, offset, err) - } - i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr, org_id) - if err != nil { - saveRawQuery(fssync, layer, query, "failure.json") - return inserts, updates, count.Count - inserts - updates, fmt.Errorf("Failed to save records: %v", err) - } - inserts += i - updates += u - offset += len(qr.Features) + pool := pond.NewResultPool[SyncStats](20) + group := pool.NewGroup() + maxRecords := fssync.MaxRecordCount() + for offset := 0; offset < count.Count; offset += maxRecords { + group.SubmitErr(func() (SyncStats, error) { + query := arcgis.NewQuery() + query.ResultRecordCount = maxRecords + query.ResultOffset = offset + query.SpatialReference = "4326" + query.OutFields = "*" + query.Where = "1=1" + qr, err := fssync.DoQuery( + layer.ID, + query) + if err != nil { + return SyncStats{}, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %v", layer.Name, layer.ID, offset, err) + } + i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr, org_id) + if err != nil { + filename := fmt.Sprintf("failure-%s-%d.json", layer.Name, layer.ID) + saveRawQuery(fssync, layer, query, filename) + return SyncStats{}, fmt.Errorf("Failed to save records: %v", err) + } + return SyncStats{ + Inserts: i, + Updates: u, + Unchanged: len(qr.Features) - u - i, + }, nil + }) } - slog.Info("Finished layer", slog.Int("inserts", inserts), slog.Int("updates", updates), slog.Int("no change", count.Count-inserts-updates)) - return inserts, updates, count.Count - inserts - updates, nil + results, err := group.Wait() + if err != nil { + return stats, fmt.Errorf("one or more tasks in the work pool failed: %v", err) + } + for _, r := range results { + stats.Inserts += r.Inserts + stats.Updates += r.Updates + stats.Unchanged += r.Unchanged + } + slog.Info("Finished layer", slog.Int("inserts", stats.Inserts), slog.Int("updates", stats.Updates), slog.Int("no change", stats.Unchanged)) + return stats, nil } func getOAuthForOrg(ctx context.Context, org *models.Organization) (*models.OauthToken, error) { @@ -364,23 +383,21 @@ func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth if err != nil { return fmt.Errorf("Failed to create fssync: %v", err) } - inserts := 0 - updates := 0 - unchanged := 0 + var stats SyncStats for _, layer := range fssync.FeatureServerLayers() { - i, u, un, err := downloadAllRecords(ctx, fssync, layer, row.OrganizationID) + ss, err := downloadAllRecords(ctx, fssync, layer, row.OrganizationID) if err != nil { return fmt.Errorf("Failed to get layer %s: %v", layer, err) } - inserts += i - updates += u - unchanged += un + stats.Inserts += ss.Inserts + stats.Updates += ss.Updates + stats.Unchanged += ss.Unchanged } setter := models.FieldseekerSyncSetter{ - RecordsCreated: omit.From(int32(inserts)), - RecordsUpdated: omit.From(int32(updates)), - RecordsUnchanged: omit.From(int32(unchanged)), + RecordsCreated: omit.From(int32(stats.Inserts)), + RecordsUpdated: omit.From(int32(stats.Updates)), + RecordsUnchanged: omit.From(int32(stats.Unchanged)), } err = org.InsertFieldseekerSyncs(ctx, PGInstance.BobDB, &setter) //err = user.InsertUserOauthTokens(ctx, PGInstance.BobDB, &setter) diff --git a/endpoint.go b/endpoint.go index 012da7ae..263bedd9 100644 --- a/endpoint.go +++ b/endpoint.go @@ -173,7 +173,7 @@ func getRoot(w http.ResponseWriter, r *http.Request) { return } if has { - err = htmlDashboard(w, user) + err = htmlDashboard(r.Context(), w, user) } else { err = htmlOauthPrompt(w, user) } diff --git a/go.mod b/go.mod index 0a0bb3d9..6589ee29 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/aarondl/opt v0.0.0-20250607033636-982744e1bd65 github.com/alexedwards/scs/pgxstore v0.0.0-20251002162104-209de6e426de github.com/alexedwards/scs/v2 v2.9.0 + github.com/alitto/pond/v2 v2.5.0 github.com/go-chi/chi/v5 v5.2.3 github.com/go-webauthn/webauthn v0.14.0 github.com/golang-jwt/jwt/v5 v5.3.0 @@ -45,4 +46,5 @@ require ( golang.org/x/text v0.29.0 // indirect google.golang.org/protobuf v1.36.5 // indirect ) + replace github.com/Gleipnir-Technology/arcgis-go => ./arcgis-go diff --git a/go.sum b/go.sum index bc620258..d2f58746 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,6 @@ dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= -github.com/Gleipnir-Technology/arcgis-go v0.0.2 h1:mmIh8M22rYUTHttX0JQ29CON99izzT43RrQbf/NX4w0= -github.com/Gleipnir-Technology/arcgis-go v0.0.2/go.mod h1:INOPGfEriR2hcwD4lXIuQOuYigx3OuAnK5PmbKlj9bs= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/aarondl/opt v0.0.0-20250607033636-982744e1bd65 h1:lbdPe4LBNmNDzeQFwNhEc88w90841qv737MI4+aXSYU= @@ -12,6 +10,8 @@ github.com/alexedwards/scs/pgxstore v0.0.0-20251002162104-209de6e426de h1:wNJVpr github.com/alexedwards/scs/pgxstore v0.0.0-20251002162104-209de6e426de/go.mod h1:hwveArYcjyOK66EViVgVU5Iqj7zyEsWjKXMQhDJrTLI= github.com/alexedwards/scs/v2 v2.9.0 h1:xa05mVpwTBm1iLeTMNFfAWpKUm4fXAW7CeAViqBVS90= github.com/alexedwards/scs/v2 v2.9.0/go.mod h1:ToaROZxyKukJKT/xLcVQAChi5k6+Pn1Gvmdl7h3RRj8= +github.com/alitto/pond/v2 v2.5.0 h1:vPzS5GnvSDRhWQidmj2djHllOmjFExVFbDGCw1jdqDw= +github.com/alitto/pond/v2 v2.5.0/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= diff --git a/html.go b/html.go index 6efb9d3a..3fb33787 100644 --- a/html.go +++ b/html.go @@ -1,6 +1,7 @@ package main import ( + "context" "errors" "fmt" "html/template" @@ -8,9 +9,12 @@ import ( "log/slog" "os" "strings" + "time" "github.com/Gleipnir-Technology/nidus-sync/models" + "github.com/aarondl/opt/null" "github.com/riverqueue/river/rivershared/util/slogutil" + "github.com/stephenafamo/bob/dialect/psql/sm" ) var ( @@ -45,7 +49,12 @@ type ContentReportDiagnostic struct { URL string } type ContentDashboard struct { - User User + CountInspections int + CountMosquitoSources int + CountServiceRequests int + LastSync string + Org string + User User } type ContentPlaceholder struct { } @@ -89,8 +98,37 @@ func extractInitials(name string) string { return initials.String() } -func htmlDashboard(w io.Writer, user *models.User) error { +func htmlDashboard(ctx context.Context, w io.Writer, user *models.User) error { + org, err := user.Organization().One(ctx, PGInstance.BobDB) + if err != nil { + return fmt.Errorf("Failed to get org: %v", err) + } + var syncString string + sync, err := org.FieldseekerSyncs(sm.OrderBy("created")).One(ctx, PGInstance.BobDB) + if err != nil { + //return fmt.Errorf("Failed to get sync: %v", err) + syncString = "never" + } else { + syncString = sync.Created.String() + } + inspectionCount, err := org.FSMosquitoinspections().Count(ctx, PGInstance.BobDB) + if err != nil { + return fmt.Errorf("Failed to get inspection count: %v", err) + } + sourceCount, err := org.FSPointlocations().Count(ctx, PGInstance.BobDB) + if err != nil { + return fmt.Errorf("Failed to get inspection count: %v", err) + } + serviceCount, err := org.FSServicerequests().Count(ctx, PGInstance.BobDB) + if err != nil { + return fmt.Errorf("Failed to get service count: %v", err) + } data := ContentDashboard{ + CountInspections: int(inspectionCount), + CountMosquitoSources: int(sourceCount), + CountServiceRequests: int(serviceCount), + LastSync: syncString, + Org: org.Name.MustGet(), User: User{ DisplayName: user.DisplayName, Initials: extractInitials(user.DisplayName), @@ -181,7 +219,10 @@ func htmlSignup(w io.Writer, path string) error { } func makeFuncMap() template.FuncMap { - funcMap := template.FuncMap{} + funcMap := template.FuncMap{ + "timeElapsed": timeElapsed, + "timeSince": timeSince, + } return funcMap } func newBuiltTemplate(files ...string) BuiltTemplate { @@ -227,3 +268,36 @@ func parseFromDisk(files []string) (*template.Template, error) { } return templ, nil } +func timeElapsed(seconds null.Val[float32]) string { + if !seconds.IsValue() { + return "none" + } + s := int(seconds.MustGet()) + hours := s / 3600 + remainder := s - (hours * 3600) + minutes := remainder / 60 + remainder = remainder - (minutes * 60) + if hours > 0 { + return fmt.Sprintf("%02d:%02d:%02d", hours, minutes, remainder) + } else if minutes > 0 { + return fmt.Sprintf("%02d:%02d", minutes, remainder) + } else { + return fmt.Sprintf("%d seconds", remainder) + } +} + +func timeSince(t time.Time) string { + now := time.Now() + diff := now.Sub(t) + + hours := diff.Hours() + if hours < 1 { + minutes := diff.Minutes() + return fmt.Sprintf("%d minutes ago", int(minutes)) + } else if hours < 24 { + return fmt.Sprintf("%d hours ago", int(hours)) + } else { + days := hours / 24 + return fmt.Sprintf("%d days ago", int(days)) + } +} diff --git a/templates/dashboard.html b/templates/dashboard.html index b43285f6..b7250469 100644 --- a/templates/dashboard.html +++ b/templates/dashboard.html @@ -64,12 +64,12 @@ body {
-

Mosquito District Dashboard

+

{{ .Org }} Dashboard

Overview of mosquito control activities in your district

- Last updated: 3 hours ago + Last updated: {{ .LastSync }}

@@ -85,7 +85,7 @@ body {
Last Data Refresh
-

3h

+

{{ .LastSync }}

Last sync: 12:45 PM

@@ -99,7 +99,7 @@ body {
Service Requests
-

48

+

{{ .CountServiceRequests }}

12% @@ -117,7 +117,7 @@ body {

Mosquito Sources
-

124

+

{{ .CountMosquitoSources }}

8% @@ -135,7 +135,7 @@ body {

Inspections
-

76

+

{{ .CountInspections }}

15%