Do update work in a pool for speed, and show some actual data.

This commit is contained in:
Eli Ribble 2025-11-07 10:45:59 +00:00
parent b0432f3243
commit ed1b878b8d
No known key found for this signature in database
6 changed files with 147 additions and 54 deletions

101
arcgis.go
View file

@ -26,6 +26,7 @@ import (
"github.com/Gleipnir-Technology/nidus-sync/sql"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
"github.com/alitto/pond/v2"
"github.com/jackc/pgx/v5"
)
@ -266,45 +267,63 @@ func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan struct{}) {
}
}
func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (int, int, int, error) {
inserts := 0
updates := 0
offset := 0
type SyncStats struct {
Inserts int
Updates int
Unchanged int
}
func downloadAllRecords(ctx context.Context, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature, org_id int32) (SyncStats, error) {
var stats SyncStats
count, err := fssync.QueryCount(layer.ID)
if err != nil {
return inserts, updates, 0, fmt.Errorf("Failed to get counts for layer %s (%d): %v", layer.Name, layer.ID, err)
return stats, fmt.Errorf("Failed to get counts for layer %s (%d): %v", layer.Name, layer.ID, err)
}
slog.Info("Starting on layer", slog.String("name", layer.Name), slog.Int("id", layer.ID))
if count.Count == 0 {
return inserts, updates, 0, nil
return stats, nil
}
for {
if offset >= count.Count {
break
}
query := arcgis.NewQuery()
query.ResultRecordCount = fssync.MaxRecordCount()
query.ResultOffset = offset
query.SpatialReference = "4326"
query.OutFields = "*"
query.Where = "1=1"
qr, err := fssync.DoQuery(
layer.ID,
query)
if err != nil {
return inserts, updates, count.Count - inserts - updates, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %v", layer.Name, layer.ID, offset, err)
}
i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr, org_id)
if err != nil {
saveRawQuery(fssync, layer, query, "failure.json")
return inserts, updates, count.Count - inserts - updates, fmt.Errorf("Failed to save records: %v", err)
}
inserts += i
updates += u
offset += len(qr.Features)
pool := pond.NewResultPool[SyncStats](20)
group := pool.NewGroup()
maxRecords := fssync.MaxRecordCount()
for offset := 0; offset < count.Count; offset += maxRecords {
group.SubmitErr(func() (SyncStats, error) {
query := arcgis.NewQuery()
query.ResultRecordCount = maxRecords
query.ResultOffset = offset
query.SpatialReference = "4326"
query.OutFields = "*"
query.Where = "1=1"
qr, err := fssync.DoQuery(
layer.ID,
query)
if err != nil {
return SyncStats{}, fmt.Errorf("Failed to get layer %s (%d) at offset %d: %v", layer.Name, layer.ID, offset, err)
}
i, u, err := saveOrUpdateDBRecords(ctx, "FS_"+layer.Name, qr, org_id)
if err != nil {
filename := fmt.Sprintf("failure-%s-%d.json", layer.Name, layer.ID)
saveRawQuery(fssync, layer, query, filename)
return SyncStats{}, fmt.Errorf("Failed to save records: %v", err)
}
return SyncStats{
Inserts: i,
Updates: u,
Unchanged: len(qr.Features) - u - i,
}, nil
})
}
slog.Info("Finished layer", slog.Int("inserts", inserts), slog.Int("updates", updates), slog.Int("no change", count.Count-inserts-updates))
return inserts, updates, count.Count - inserts - updates, nil
results, err := group.Wait()
if err != nil {
return stats, fmt.Errorf("one or more tasks in the work pool failed: %v", err)
}
for _, r := range results {
stats.Inserts += r.Inserts
stats.Updates += r.Updates
stats.Unchanged += r.Unchanged
}
slog.Info("Finished layer", slog.Int("inserts", stats.Inserts), slog.Int("updates", stats.Updates), slog.Int("no change", stats.Unchanged))
return stats, nil
}
func getOAuthForOrg(ctx context.Context, org *models.Organization) (*models.OauthToken, error) {
@ -364,23 +383,21 @@ func exportFieldseekerData(ctx context.Context, org *models.Organization, oauth
if err != nil {
return fmt.Errorf("Failed to create fssync: %v", err)
}
inserts := 0
updates := 0
unchanged := 0
var stats SyncStats
for _, layer := range fssync.FeatureServerLayers() {
i, u, un, err := downloadAllRecords(ctx, fssync, layer, row.OrganizationID)
ss, err := downloadAllRecords(ctx, fssync, layer, row.OrganizationID)
if err != nil {
return fmt.Errorf("Failed to get layer %s: %v", layer, err)
}
inserts += i
updates += u
unchanged += un
stats.Inserts += ss.Inserts
stats.Updates += ss.Updates
stats.Unchanged += ss.Unchanged
}
setter := models.FieldseekerSyncSetter{
RecordsCreated: omit.From(int32(inserts)),
RecordsUpdated: omit.From(int32(updates)),
RecordsUnchanged: omit.From(int32(unchanged)),
RecordsCreated: omit.From(int32(stats.Inserts)),
RecordsUpdated: omit.From(int32(stats.Updates)),
RecordsUnchanged: omit.From(int32(stats.Unchanged)),
}
err = org.InsertFieldseekerSyncs(ctx, PGInstance.BobDB, &setter)
//err = user.InsertUserOauthTokens(ctx, PGInstance.BobDB, &setter)

View file

@ -173,7 +173,7 @@ func getRoot(w http.ResponseWriter, r *http.Request) {
return
}
if has {
err = htmlDashboard(w, user)
err = htmlDashboard(r.Context(), w, user)
} else {
err = htmlOauthPrompt(w, user)
}

2
go.mod
View file

@ -7,6 +7,7 @@ require (
github.com/aarondl/opt v0.0.0-20250607033636-982744e1bd65
github.com/alexedwards/scs/pgxstore v0.0.0-20251002162104-209de6e426de
github.com/alexedwards/scs/v2 v2.9.0
github.com/alitto/pond/v2 v2.5.0
github.com/go-chi/chi/v5 v5.2.3
github.com/go-webauthn/webauthn v0.14.0
github.com/golang-jwt/jwt/v5 v5.3.0
@ -45,4 +46,5 @@ require (
golang.org/x/text v0.29.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
)
replace github.com/Gleipnir-Technology/arcgis-go => ./arcgis-go

4
go.sum
View file

@ -2,8 +2,6 @@ dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Gleipnir-Technology/arcgis-go v0.0.2 h1:mmIh8M22rYUTHttX0JQ29CON99izzT43RrQbf/NX4w0=
github.com/Gleipnir-Technology/arcgis-go v0.0.2/go.mod h1:INOPGfEriR2hcwD4lXIuQOuYigx3OuAnK5PmbKlj9bs=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/aarondl/opt v0.0.0-20250607033636-982744e1bd65 h1:lbdPe4LBNmNDzeQFwNhEc88w90841qv737MI4+aXSYU=
@ -12,6 +10,8 @@ github.com/alexedwards/scs/pgxstore v0.0.0-20251002162104-209de6e426de h1:wNJVpr
github.com/alexedwards/scs/pgxstore v0.0.0-20251002162104-209de6e426de/go.mod h1:hwveArYcjyOK66EViVgVU5Iqj7zyEsWjKXMQhDJrTLI=
github.com/alexedwards/scs/v2 v2.9.0 h1:xa05mVpwTBm1iLeTMNFfAWpKUm4fXAW7CeAViqBVS90=
github.com/alexedwards/scs/v2 v2.9.0/go.mod h1:ToaROZxyKukJKT/xLcVQAChi5k6+Pn1Gvmdl7h3RRj8=
github.com/alitto/pond/v2 v2.5.0 h1:vPzS5GnvSDRhWQidmj2djHllOmjFExVFbDGCw1jdqDw=
github.com/alitto/pond/v2 v2.5.0/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI=

80
html.go
View file

@ -1,6 +1,7 @@
package main
import (
"context"
"errors"
"fmt"
"html/template"
@ -8,9 +9,12 @@ import (
"log/slog"
"os"
"strings"
"time"
"github.com/Gleipnir-Technology/nidus-sync/models"
"github.com/aarondl/opt/null"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/stephenafamo/bob/dialect/psql/sm"
)
var (
@ -45,7 +49,12 @@ type ContentReportDiagnostic struct {
URL string
}
type ContentDashboard struct {
User User
CountInspections int
CountMosquitoSources int
CountServiceRequests int
LastSync string
Org string
User User
}
type ContentPlaceholder struct {
}
@ -89,8 +98,37 @@ func extractInitials(name string) string {
return initials.String()
}
func htmlDashboard(w io.Writer, user *models.User) error {
func htmlDashboard(ctx context.Context, w io.Writer, user *models.User) error {
org, err := user.Organization().One(ctx, PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to get org: %v", err)
}
var syncString string
sync, err := org.FieldseekerSyncs(sm.OrderBy("created")).One(ctx, PGInstance.BobDB)
if err != nil {
//return fmt.Errorf("Failed to get sync: %v", err)
syncString = "never"
} else {
syncString = sync.Created.String()
}
inspectionCount, err := org.FSMosquitoinspections().Count(ctx, PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to get inspection count: %v", err)
}
sourceCount, err := org.FSPointlocations().Count(ctx, PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to get inspection count: %v", err)
}
serviceCount, err := org.FSServicerequests().Count(ctx, PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to get service count: %v", err)
}
data := ContentDashboard{
CountInspections: int(inspectionCount),
CountMosquitoSources: int(sourceCount),
CountServiceRequests: int(serviceCount),
LastSync: syncString,
Org: org.Name.MustGet(),
User: User{
DisplayName: user.DisplayName,
Initials: extractInitials(user.DisplayName),
@ -181,7 +219,10 @@ func htmlSignup(w io.Writer, path string) error {
}
func makeFuncMap() template.FuncMap {
funcMap := template.FuncMap{}
funcMap := template.FuncMap{
"timeElapsed": timeElapsed,
"timeSince": timeSince,
}
return funcMap
}
func newBuiltTemplate(files ...string) BuiltTemplate {
@ -227,3 +268,36 @@ func parseFromDisk(files []string) (*template.Template, error) {
}
return templ, nil
}
func timeElapsed(seconds null.Val[float32]) string {
if !seconds.IsValue() {
return "none"
}
s := int(seconds.MustGet())
hours := s / 3600
remainder := s - (hours * 3600)
minutes := remainder / 60
remainder = remainder - (minutes * 60)
if hours > 0 {
return fmt.Sprintf("%02d:%02d:%02d", hours, minutes, remainder)
} else if minutes > 0 {
return fmt.Sprintf("%02d:%02d", minutes, remainder)
} else {
return fmt.Sprintf("%d seconds", remainder)
}
}
func timeSince(t time.Time) string {
now := time.Now()
diff := now.Sub(t)
hours := diff.Hours()
if hours < 1 {
minutes := diff.Minutes()
return fmt.Sprintf("%d minutes ago", int(minutes))
} else if hours < 24 {
return fmt.Sprintf("%d hours ago", int(hours))
} else {
days := hours / 24
return fmt.Sprintf("%d days ago", int(days))
}
}

View file

@ -64,12 +64,12 @@ body {
<!-- Dashboard Header -->
<div class="row mb-4">
<div class="col-md-6">
<h1>Mosquito District Dashboard</h1>
<h1>{{ .Org }} Dashboard</h1>
<p class="text-muted">Overview of mosquito control activities in your district</p>
</div>
<div class="col-md-6 text-md-end d-flex align-items-center justify-content-md-end">
<p class="last-refreshed mb-0">
<i class="fas fa-sync-alt me-2"></i>Last updated: <span id="last-refreshed-time">3 hours ago</span>
<i class="fas fa-sync-alt me-2"></i>Last updated: <span id="last-refreshed-time">{{ .LastSync }}</span>
<button class="btn btn-sm btn-outline-primary ms-3">Refresh Data</button>
</p>
</div>
@ -85,7 +85,7 @@ body {
<i class="fas fa-clock"></i>
</div>
<h5 class="card-title">Last Data Refresh</h5>
<p class="metric-value">3h</p>
<p class="metric-value">{{ .LastSync }}</p>
<p class="card-text text-muted">Last sync: 12:45 PM</p>
</div>
</div>
@ -99,7 +99,7 @@ body {
<i class="fas fa-ticket-alt"></i>
</div>
<h5 class="card-title">Service Requests</h5>
<p class="metric-value">48</p>
<p class="metric-value">{{ .CountServiceRequests }}</p>
<p class="card-text text-muted">
<span class="text-success">
<i class="fas fa-arrow-up"></i> 12%
@ -117,7 +117,7 @@ body {
<i class="fas fa-bug"></i>
</div>
<h5 class="card-title">Mosquito Sources</h5>
<p class="metric-value">124</p>
<p class="metric-value">{{ .CountMosquitoSources }}</p>
<p class="card-text text-muted">
<span class="text-danger">
<i class="fas fa-arrow-up"></i> 8%
@ -135,7 +135,7 @@ body {
<i class="fas fa-clipboard-check"></i>
</div>
<h5 class="card-title">Inspections</h5>
<p class="metric-value">76</p>
<p class="metric-value">{{ .CountInspections }}</p>
<p class="card-text text-muted">
<span class="text-success">
<i class="fas fa-arrow-up"></i> 15%