Make aggregate layers clean up, add service request aggregate

This commit is contained in:
Eli Ribble 2026-01-15 04:10:54 +00:00
parent 248cffd323
commit d46d988b4d
No known key found for this signature in database
3 changed files with 157 additions and 66 deletions

View file

@ -24,24 +24,18 @@ import (
"github.com/Gleipnir-Technology/arcgis-go/fieldseeker" "github.com/Gleipnir-Technology/arcgis-go/fieldseeker"
"github.com/Gleipnir-Technology/nidus-sync/config" "github.com/Gleipnir-Technology/nidus-sync/config"
"github.com/Gleipnir-Technology/nidus-sync/db" "github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/db/sql" "github.com/Gleipnir-Technology/nidus-sync/db/sql"
"github.com/Gleipnir-Technology/nidus-sync/debug" "github.com/Gleipnir-Technology/nidus-sync/debug"
"github.com/Gleipnir-Technology/nidus-sync/h3utils"
"github.com/Gleipnir-Technology/nidus-sync/notification" "github.com/Gleipnir-Technology/nidus-sync/notification"
"github.com/aarondl/opt/omit" "github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull" "github.com/aarondl/opt/omitnull"
"github.com/alitto/pond/v2" "github.com/alitto/pond/v2"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/stephenafamo/bob"
"github.com/stephenafamo/bob/dialect/psql" "github.com/stephenafamo/bob/dialect/psql"
"github.com/stephenafamo/bob/dialect/psql/dialect"
"github.com/stephenafamo/bob/dialect/psql/im"
"github.com/stephenafamo/bob/dialect/psql/sm" "github.com/stephenafamo/bob/dialect/psql/sm"
"github.com/stephenafamo/bob/dialect/psql/um" "github.com/stephenafamo/bob/dialect/psql/um"
"github.com/uber/h3-go/v4"
) )
var syncStatusByOrg map[int32]bool var syncStatusByOrg map[int32]bool
@ -468,6 +462,11 @@ func logPermissions(ctx context.Context, org *models.Organization, oauth *models
log.Error().Err(err).Msg("Failed to create fieldseeker client in log permissions") log.Error().Err(err).Msg("Failed to create fieldseeker client in log permissions")
return return
} }
_, err = fssync.AdminInfo()
if err != nil {
log.Error().Err(err).Msg("Failed to get admin info log permissions")
return
}
permissions, err := fssync.PermissionList() permissions, err := fssync.PermissionList()
if err != nil { if err != nil {
log.Error().Err(err).Msg("Failed to query permissions in log permissions") log.Error().Err(err).Msg("Failed to query permissions in log permissions")
@ -477,6 +476,7 @@ func logPermissions(ctx context.Context, org *models.Organization, oauth *models
log.Info().Str("p", p.Principal).Msg("Permission!") log.Info().Str("p", p.Principal).Msg("Permission!")
} }
} }
func maintainOAuth(ctx context.Context, oauth *models.OauthToken) error { func maintainOAuth(ctx context.Context, oauth *models.OauthToken) error {
for { for {
// Refresh from the database // Refresh from the database
@ -1042,64 +1042,6 @@ func updateRowFromFeatureFS(ctx context.Context, transaction pgx.Tx, table strin
return nil return nil
} }
func updateSummaryTables(ctx context.Context, org *models.Organization) {
/*org, err := models.FindOrganization(ctx, PGInstance.BobDB, org_id)
if err != nil {
log.Error().Err(err).Msg("Failed to get organization")
}*/
log.Info().Int("org_id", int(org.ID)).Msg("Getting point locations")
point_locations, err := org.Pointlocations().All(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to get organization")
return
}
if len(point_locations) == 0 {
log.Info().Int("org_id", int(org.ID)).Msg("No updates to perform")
return
}
log.Info().Int("count", len(point_locations)).Msg("Summarizing point locations")
for i := range 16 {
log.Info().Int("resolution", i).Msg("Working summary layer")
cellToCount := make(map[h3.Cell]int, 0)
for _, p := range point_locations {
if p.H3cell.IsNull() {
continue
}
cell, err := h3utils.ToCell(p.H3cell.MustGet())
if err != nil {
log.Error().Err(err).Msg("Failed to get geometry point")
continue
}
scaled, err := cell.Parent(i)
if err != nil {
log.Error().Err(err).Int("resolution", i).Msg("Failed to get cell's parent at resolution")
continue
}
cellToCount[scaled] = cellToCount[scaled] + 1
}
var to_insert []bob.Mod[*dialect.InsertQuery] = make([]bob.Mod[*dialect.InsertQuery], 0)
to_insert = append(to_insert, im.Into("h3_aggregation", "cell", "resolution", "count_", "type_", "organization_id", "geometry"))
for cell, count := range cellToCount {
polygon, err := h3utils.CellToPostgisGeometry(cell)
if err != nil {
log.Error().Err(err).Msg("Failed to get PostGIS geometry")
continue
}
// log.Info().Str("polygon", polygon).Msg("Going to insert")
to_insert = append(to_insert, im.Values(psql.Arg(cell.String(), i, count, enums.H3aggregationtypeServicerequest, org.ID), psql.F("st_geomfromtext", psql.S(polygon), 4326)))
}
to_insert = append(to_insert, im.OnConflict("cell, organization_id, type_").DoUpdate(
im.SetCol("count_").To(psql.Raw("EXCLUDED.count_")),
))
//log.Info().Str("sql", insertQueryToString(psql.Insert(to_insert...))).Msg("Updating...")
_, err := psql.Insert(to_insert...).Exec(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Faild to add h3 aggregation")
}
}
}
func exportFieldseekerLayer(ctx context.Context, group pond.ResultTaskGroup[SyncStats], org *models.Organization, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature) (SyncStats, error) { func exportFieldseekerLayer(ctx context.Context, group pond.ResultTaskGroup[SyncStats], org *models.Organization, fssync *fieldseeker.FieldSeeker, layer arcgis.LayerFeature) (SyncStats, error) {
var stats SyncStats var stats SyncStats
count, err := fssync.QueryCount(layer.ID) count, err := fssync.QueryCount(layer.ID)

137
background/summary.go Normal file
View file

@ -0,0 +1,137 @@
package background
import (
"context"
"fmt"
"github.com/Gleipnir-Technology/nidus-sync/db"
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models"
"github.com/Gleipnir-Technology/nidus-sync/h3utils"
"github.com/rs/zerolog/log"
"github.com/stephenafamo/bob"
"github.com/stephenafamo/bob/dialect/psql"
"github.com/stephenafamo/bob/dialect/psql/dialect"
"github.com/stephenafamo/bob/dialect/psql/dm"
"github.com/stephenafamo/bob/dialect/psql/im"
"github.com/uber/h3-go/v4"
)
func updateSummaryTables(ctx context.Context, org *models.Organization) {
updateSummaryMosquitoSource(ctx, org)
updateSummaryServiceRequest(ctx, org)
}
func aggregateAtResolution(ctx context.Context, resolution int, org_id int32, type_ enums.H3aggregationtype, cells []h3.Cell) error {
var err error
log.Info().Int("resolution", resolution).Str("type", string(type_)).Msg("Working summary layer")
cellToCount := make(map[h3.Cell]int, 0)
for _, cell := range cells {
scaled, err := cell.Parent(resolution)
if err != nil {
log.Error().Err(err).Int("resolution", resolution).Msg("Failed to get cell's parent at resolution")
continue
}
cellToCount[scaled] = cellToCount[scaled] + 1
}
_, err = models.H3Aggregations.Delete(
dm.Where(
psql.And(
models.H3Aggregations.Columns.OrganizationID.EQ(psql.Arg(org_id)),
models.H3Aggregations.Columns.Resolution.EQ(psql.Arg(resolution)),
models.H3Aggregations.Columns.Type.EQ(psql.Arg(type_)),
),
),
).Exec(ctx, db.PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to clear previous aggregation: %w", err)
}
var to_insert []bob.Mod[*dialect.InsertQuery] = make([]bob.Mod[*dialect.InsertQuery], 0)
to_insert = append(to_insert, im.Into("h3_aggregation", "cell", "resolution", "count_", "type_", "organization_id", "geometry"))
for cell, count := range cellToCount {
polygon, err := h3utils.CellToPostgisGeometry(cell)
if err != nil {
log.Error().Err(err).Msg("Failed to get PostGIS geometry")
continue
}
// log.Info().Str("polygon", polygon).Msg("Going to insert")
to_insert = append(to_insert, im.Values(psql.Arg(cell.String(), resolution, count, type_, org_id), psql.F("st_geomfromtext", psql.S(polygon), 4326)))
}
to_insert = append(to_insert, im.OnConflict("cell, organization_id, type_").DoUpdate(
im.SetCol("count_").To(psql.Raw("EXCLUDED.count_")),
))
//log.Info().Str("sql", insertQueryToString(psql.Insert(to_insert...))).Msg("Updating...")
_, err = psql.Insert(to_insert...).Exec(ctx, db.PGInstance.BobDB)
if err != nil {
return fmt.Errorf("Failed to add h3 aggregation: %w", err)
}
return nil
}
func updateSummaryMosquitoSource(ctx context.Context, org *models.Organization) {
log.Info().Int("org_id", int(org.ID)).Msg("Getting point locations")
point_locations, err := org.Pointlocations().All(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to get all point locations")
return
}
if len(point_locations) == 0 {
log.Info().Int("org_id", int(org.ID)).Msg("No updates to perform")
return
}
log.Info().Int("count", len(point_locations)).Msg("Summarizing point locations")
cells := make([]h3.Cell, 0)
for _, p := range point_locations {
if p.H3cell.IsNull() {
continue
}
cell, err := h3utils.ToCell(p.H3cell.MustGet())
if err != nil {
log.Error().Err(err).Msg("Failed to get geometry point")
continue
}
cells = append(cells, cell)
}
for i := range 16 {
err = aggregateAtResolution(ctx, i, org.ID, enums.H3aggregationtypeMosquitosource, cells)
if err != nil {
log.Error().Err(err).Int("resolution", i).Msg("Failed to aggregate mosquito source")
}
}
}
func updateSummaryServiceRequest(ctx context.Context, org *models.Organization) {
log.Info().Int("org_id", int(org.ID)).Msg("Getting service requests")
service_requests, err := org.Servicerequests().All(ctx, db.PGInstance.BobDB)
if err != nil {
log.Error().Err(err).Msg("Failed to get all service requests")
return
}
if len(service_requests) == 0 {
log.Info().Int("org_id", int(org.ID)).Msg("No updates to perform")
return
}
log.Info().Int("count", len(service_requests)).Msg("Summarizing point locations")
cells := make([]h3.Cell, 0)
for _, p := range service_requests {
if p.H3cell.IsNull() {
continue
}
cell, err := h3utils.ToCell(p.H3cell.MustGet())
if err != nil {
log.Error().Err(err).Msg("Failed to get geometry point")
continue
}
cells = append(cells, cell)
}
for i := range 16 {
err = aggregateAtResolution(ctx, i, org.ID, enums.H3aggregationtypeServicerequest, cells)
if err != nil {
log.Error().Err(err).Int("resolution", i).Msg("Failed to aggregate service request")
}
}
}

View file

@ -26,17 +26,29 @@ function onLoad() {
//'maxzoom': 14 //'maxzoom': 14
}); });
map.addLayer({ map.addLayer({
'id': 'nidus', // Layer ID 'id': 'mosquito_source', // Layer ID
'type': 'fill', 'type': 'fill',
'filter': ['==', ['zoom'], ['+', 2, ['to-number', ['get', 'resolution']]]], 'filter': ['==', ['zoom'], ['+', 2, ['to-number', ['get', 'resolution']]]],
'source': 'tegola-nidus', // ID of the tile source created above 'source': 'tegola-nidus', // ID of the tile source created above
'source-layer': 'h3_aggregation', 'source-layer': 'mosquito_source',
'paint': { 'paint': {
'fill-opacity': 0.3, 'fill-opacity': 0.3,
'fill-color': 'rgb(250, 100, 100)' 'fill-color': 'rgb(250, 100, 100)'
} }
//slot: 'middle' // middle slot in Mapbox Standard style //slot: 'middle' // middle slot in Mapbox Standard style
}); });
map.addLayer({
'id': 'service_request', // Layer ID
'type': 'fill',
'filter': ['==', ['zoom'], ['+', 2, ['to-number', ['get', 'resolution']]]],
'source': 'tegola-nidus', // ID of the tile source created above
'source-layer': 'service_request',
'paint': {
'fill-opacity': 0.3,
'fill-color': 'rgb(100, 100, 250)'
}
//slot: 'middle' // middle slot in Mapbox Standard style
});
map.addInteraction("nidus-click-interaction", { map.addInteraction("nidus-click-interaction", {
type: 'click', type: 'click',
target: { layerId: 'nidus' }, target: { layerId: 'nidus' },