The goal of this rework is to make it so I can pass around platform.User instead of a pair of models.Organization and models.User. This is useful for reason I kind of forget now, but it started with working on notifications and ballooned massively from there into refactoring a number of things that were bugging me. This also includes a tiny amount of work on server-side events (SSE). * background stuff lives inside the platform now, which I need for having it push updates through SSE * userfile now lives in the platform, under file, so other platform functions can safely use it * oauth is broken into pieces and inside platform because other stuff was calling it already, but badly. * notifications go into the platform as well
165 lines
5.2 KiB
Go
165 lines
5.2 KiB
Go
package background
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/Gleipnir-Technology/bob"
|
|
"github.com/Gleipnir-Technology/bob/dialect/psql"
|
|
"github.com/Gleipnir-Technology/bob/dialect/psql/dialect"
|
|
"github.com/Gleipnir-Technology/bob/dialect/psql/dm"
|
|
"github.com/Gleipnir-Technology/bob/dialect/psql/im"
|
|
"github.com/Gleipnir-Technology/nidus-sync/db"
|
|
"github.com/Gleipnir-Technology/nidus-sync/db/enums"
|
|
"github.com/Gleipnir-Technology/nidus-sync/db/models"
|
|
"github.com/Gleipnir-Technology/nidus-sync/h3utils"
|
|
"github.com/rs/zerolog/log"
|
|
"github.com/uber/h3-go/v4"
|
|
)
|
|
|
|
func updateSummaryTables(ctx context.Context, org *models.Organization) {
|
|
updateSummaryMosquitoSource(ctx, org)
|
|
updateSummaryServiceRequest(ctx, org)
|
|
updateSummaryTrap(ctx, org)
|
|
}
|
|
|
|
func aggregateAtResolution(ctx context.Context, resolution int, org_id int32, type_ enums.H3aggregationtype, cells []h3.Cell) error {
|
|
var err error
|
|
log.Info().Int("resolution", resolution).Str("type", string(type_)).Msg("Working summary layer")
|
|
cellToCount := make(map[h3.Cell]int, 0)
|
|
for _, cell := range cells {
|
|
scaled, err := cell.Parent(resolution)
|
|
if err != nil {
|
|
log.Error().Err(err).Int("resolution", resolution).Msg("Failed to get cell's parent at resolution")
|
|
continue
|
|
}
|
|
cellToCount[scaled] = cellToCount[scaled] + 1
|
|
}
|
|
|
|
_, err = models.H3Aggregations.Delete(
|
|
dm.Where(
|
|
psql.And(
|
|
models.H3Aggregations.Columns.OrganizationID.EQ(psql.Arg(org_id)),
|
|
models.H3Aggregations.Columns.Resolution.EQ(psql.Arg(resolution)),
|
|
models.H3Aggregations.Columns.Type.EQ(psql.Arg(type_)),
|
|
),
|
|
),
|
|
).Exec(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to clear previous aggregation: %w", err)
|
|
}
|
|
var to_insert []bob.Mod[*dialect.InsertQuery] = make([]bob.Mod[*dialect.InsertQuery], 0)
|
|
to_insert = append(to_insert, im.Into("h3_aggregation", "cell", "resolution", "count_", "type_", "organization_id", "geometry"))
|
|
for cell, count := range cellToCount {
|
|
polygon, err := h3utils.CellToPostgisGeometry(cell)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get PostGIS geometry")
|
|
continue
|
|
}
|
|
// log.Info().Str("polygon", polygon).Msg("Going to insert")
|
|
to_insert = append(to_insert, im.Values(psql.Arg(cell.String(), resolution, count, type_, org_id), psql.F("st_geomfromtext", psql.S(polygon), 4326)))
|
|
}
|
|
to_insert = append(to_insert, im.OnConflict("cell, organization_id, type_").DoUpdate(
|
|
im.SetCol("count_").To(psql.Raw("EXCLUDED.count_")),
|
|
))
|
|
//log.Info().Str("sql", insertQueryToString(psql.Insert(to_insert...))).Msg("Updating...")
|
|
_, err = psql.Insert(to_insert...).Exec(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to add h3 aggregation: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func updateSummaryMosquitoSource(ctx context.Context, org *models.Organization) {
|
|
point_locations, err := org.Pointlocations().All(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get all point locations")
|
|
return
|
|
}
|
|
if len(point_locations) == 0 {
|
|
log.Info().Int("org_id", int(org.ID)).Msg("No updates to perform")
|
|
return
|
|
}
|
|
|
|
cells := make([]h3.Cell, 0)
|
|
for _, p := range point_locations {
|
|
if p.H3cell.IsNull() {
|
|
continue
|
|
}
|
|
cell, err := h3utils.ToCell(p.H3cell.MustGet())
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get geometry point")
|
|
continue
|
|
}
|
|
cells = append(cells, cell)
|
|
}
|
|
|
|
for i := range 16 {
|
|
err = aggregateAtResolution(ctx, i, org.ID, enums.H3aggregationtypeMosquitosource, cells)
|
|
if err != nil {
|
|
log.Error().Err(err).Int("resolution", i).Msg("Failed to aggregate mosquito source")
|
|
}
|
|
}
|
|
}
|
|
|
|
func updateSummaryServiceRequest(ctx context.Context, org *models.Organization) {
|
|
service_requests, err := org.Servicerequests().All(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get all service requests")
|
|
return
|
|
}
|
|
if len(service_requests) == 0 {
|
|
log.Info().Int("org_id", int(org.ID)).Msg("No updates to perform")
|
|
return
|
|
}
|
|
|
|
cells := make([]h3.Cell, 0)
|
|
for _, p := range service_requests {
|
|
if p.H3cell.IsNull() {
|
|
continue
|
|
}
|
|
cell, err := h3utils.ToCell(p.H3cell.MustGet())
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get geometry point")
|
|
continue
|
|
}
|
|
cells = append(cells, cell)
|
|
}
|
|
for i := range 16 {
|
|
err = aggregateAtResolution(ctx, i, org.ID, enums.H3aggregationtypeServicerequest, cells)
|
|
if err != nil {
|
|
log.Error().Err(err).Int("resolution", i).Msg("Failed to aggregate service request")
|
|
}
|
|
}
|
|
}
|
|
|
|
func updateSummaryTrap(ctx context.Context, org *models.Organization) {
|
|
traps, err := org.Traplocations().All(ctx, db.PGInstance.BobDB)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get all trap locations")
|
|
return
|
|
}
|
|
if len(traps) == 0 {
|
|
log.Info().Int("org_id", int(org.ID)).Msg("No updates to perform")
|
|
return
|
|
}
|
|
|
|
cells := make([]h3.Cell, 0)
|
|
for _, t := range traps {
|
|
if t.H3cell.IsNull() {
|
|
continue
|
|
}
|
|
cell, err := h3utils.ToCell(t.H3cell.MustGet())
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to get geometry point")
|
|
continue
|
|
}
|
|
cells = append(cells, cell)
|
|
}
|
|
for i := range 16 {
|
|
err = aggregateAtResolution(ctx, i, org.ID, enums.H3aggregationtypeTrap, cells)
|
|
if err != nil {
|
|
log.Error().Err(err).Int("resolution", i).Msg("Failed to aggregate trap")
|
|
}
|
|
}
|
|
}
|