Get back to compiling, but using new jet for publicreport

This was an epically long change, and a terrible idea, but it compiles.
This was essentially a cascade that came about because I can't blend jet
and bob in the same transaction. In for a penny, I guess...
This commit is contained in:
Eli Ribble 2026-05-07 10:39:17 +00:00
parent a95e44cf42
commit fcd95f1a25
No known key found for this signature in database
65 changed files with 3129 additions and 3457 deletions

View file

@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"io/fs"
"sync"
//"github.com/georgysavva/scany/v2/pgxscan"
//"github.com/jackc/pgx/v5"
@ -19,8 +18,11 @@ import (
"github.com/pressly/goose/v3"
"github.com/rs/zerolog/log"
"github.com/stephenafamo/scan"
pgxgeom "github.com/twpayne/pgx-geom"
)
var ErrNoRows = pgx.ErrNoRows
//go:embed migrations/*.sql
var embedMigrations embed.FS
@ -31,21 +33,15 @@ type pginstance struct {
var (
PGInstance *pginstance
pgOnce sync.Once
)
type Tx = pgx.Tx
func BeginTxn(ctx context.Context) (pgx.Tx, error) {
return PGInstance.PGXPool.BeginTx(ctx, pgx.TxOptions{})
}
func ExecuteNone(ctx context.Context, stmt postgres.Statement) error {
query, args := stmt.Sql()
_, err := PGInstance.PGXPool.Query(ctx, query, args...)
return err
}
func ExecuteNoneTx(ctx context.Context, txn Tx, stmt postgres.Statement) error {
func ExecuteNoneTx(ctx context.Context, txn Ex, stmt postgres.Statement) error {
query, args := stmt.Sql()
_, err := txn.Query(ctx, query, args...)
@ -57,46 +53,81 @@ func ExecuteNoneTxBob(ctx context.Context, txn bob.Tx, stmt postgres.Statement)
_, err := txn.QueryContext(ctx, query, args...)
return err
}
func ExecuteOne[T any](ctx context.Context, stmt postgres.Statement) (*T, error) {
func ExecuteOne[T any](ctx context.Context, stmt postgres.Statement) (T, error) {
query, args := stmt.Sql()
var result T
row, err := PGInstance.PGXPool.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("execute query: %w", err)
return result, fmt.Errorf("execute query: %w", err)
}
return pgx.CollectOneRow(row, pgx.RowToAddrOfStructByPos[T])
var collected *T
collected, err = pgx.CollectOneRow(row, pgx.RowToAddrOfStructByPos[T])
if err != nil || collected == nil {
return result, fmt.Errorf("collect row: %w", err)
}
return *collected, nil
}
func ExecuteOneTx[T any](ctx context.Context, txn Tx, stmt postgres.Statement) (*T, error) {
func ExecuteOneTx[T any](ctx context.Context, txn Ex, stmt postgres.Statement) (T, error) {
query, args := stmt.Sql()
//result, err := scan.One(ctx, txn, scan.StructMapper[T](), query, args...)
rows, err := txn.Query(ctx, query, args...)
row, err := txn.Query(ctx, query, args...)
var result T
if err != nil {
return nil, fmt.Errorf("txn query: %w", err)
return result, fmt.Errorf("txn query: %w", err)
}
results, err := pgx.CollectRows(rows, pgx.RowToStructByName[T])
if err != nil {
return nil, fmt.Errorf("collect rows: %w", err)
var collected *T
collected, err = pgx.CollectOneRow(row, pgx.RowToAddrOfStructByPos[T])
if err != nil || collected == nil {
return result, fmt.Errorf("collect row: %w", err)
}
if len(results) < 1 {
return nil, fmt.Errorf("no results")
}
return &results[0], err
return *collected, nil
}
func ExecuteOneTxBob[T any](ctx context.Context, txn bob.Tx, stmt postgres.Statement) (*T, error) {
func ExecuteOneTxBob[T any](ctx context.Context, txn bob.Tx, stmt postgres.Statement) (T, error) {
query, args := stmt.Sql()
result, err := scan.One(ctx, txn, scan.StructMapper[T](), query, args...)
return &result, err
return scan.One(ctx, txn, scan.StructMapper[T](), query, args...)
}
func ExecuteMany[T any](ctx context.Context, stmt postgres.Statement) ([]*T, error) {
func ExecuteMany[T any](ctx context.Context, stmt postgres.Statement) ([]T, error) {
query, args := stmt.Sql()
rows, err := PGInstance.PGXPool.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("execute query: %w", err)
}
return pgx.CollectRows(rows, pgx.RowToAddrOfStructByPos[T])
collected, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByPos[T])
if err != nil {
return []T{}, fmt.Errorf("collect rows: %w", err)
}
results := make([]T, len(collected))
for i, c := range collected {
if c == nil {
return results, fmt.Errorf("null collected")
}
results[i] = *c
}
return results, nil
}
func ExecuteManyTx[T any](ctx context.Context, txn Ex, stmt postgres.Statement) ([]T, error) {
query, args := stmt.Sql()
rows, err := txn.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("execute query: %w", err)
}
collected, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByPos[T])
if err != nil {
return []T{}, fmt.Errorf("collect rows: %w", err)
}
results := make([]T, len(collected))
for i, c := range collected {
if c == nil {
return results, fmt.Errorf("null collected")
}
results[i] = *c
}
return results, nil
}
func doMigrations(connection_string string) error {
log.Debug().Str("dsn", connection_string).Msg("Connecting to database")
@ -166,15 +197,23 @@ func InitializeDatabase(ctx context.Context, uri string) error {
log.Debug().Msg("No database migrations necessary")
}
pgOnce.Do(func() {
db, e := pgxpool.New(ctx, uri)
bobDB := bob.NewDB(stdlib.OpenDBFromPool(db))
PGInstance = &pginstance{bobDB, db}
err = e
})
config, err := pgxpool.ParseConfig(uri)
if err != nil {
return fmt.Errorf("unable to create connection pool: %w", err)
return fmt.Errorf("parse config: %w", err)
}
config.AfterConnect = func(ctx2 context.Context, conn *pgx.Conn) error {
err2 := pgxgeom.Register(ctx, conn)
if err2 != nil {
return fmt.Errorf("pgxgeom register: %w", err2)
}
return nil
}
db, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return fmt.Errorf("new pool: %w", err)
}
bobDB := bob.NewDB(stdlib.OpenDBFromPool(db))
PGInstance = &pginstance{bobDB, db}
var current string
query := `SELECT current_database()`