2025-11-24 18:08:24 +00:00
|
|
|
package db
|
2025-11-04 23:11:32 +00:00
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"database/sql"
|
|
|
|
|
"embed"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io/fs"
|
|
|
|
|
|
|
|
|
|
//"github.com/georgysavva/scany/v2/pgxscan"
|
|
|
|
|
//"github.com/jackc/pgx/v5"
|
2026-01-27 18:44:02 +00:00
|
|
|
"github.com/Gleipnir-Technology/bob"
|
2026-05-09 01:48:56 +00:00
|
|
|
"github.com/Gleipnir-Technology/jet/postgres"
|
2026-05-01 04:56:53 +00:00
|
|
|
"github.com/jackc/pgx/v5"
|
2025-11-04 23:11:32 +00:00
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
|
"github.com/jackc/pgx/v5/stdlib"
|
|
|
|
|
"github.com/pressly/goose/v3"
|
2025-11-13 20:16:23 +00:00
|
|
|
"github.com/rs/zerolog/log"
|
2026-05-01 17:28:33 +00:00
|
|
|
"github.com/stephenafamo/scan"
|
2026-05-07 10:39:17 +00:00
|
|
|
pgxgeom "github.com/twpayne/pgx-geom"
|
2025-11-04 23:11:32 +00:00
|
|
|
)
|
|
|
|
|
|
2026-05-07 10:39:17 +00:00
|
|
|
var ErrNoRows = pgx.ErrNoRows
|
|
|
|
|
|
2025-11-04 23:11:32 +00:00
|
|
|
//go:embed migrations/*.sql
|
|
|
|
|
var embedMigrations embed.FS
|
|
|
|
|
|
2026-05-01 04:56:53 +00:00
|
|
|
type pginstance struct {
|
2025-11-06 22:31:51 +00:00
|
|
|
BobDB bob.DB
|
2025-11-05 17:15:33 +00:00
|
|
|
PGXPool *pgxpool.Pool
|
2025-11-04 23:11:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
2026-05-01 04:56:53 +00:00
|
|
|
PGInstance *pginstance
|
2025-11-04 23:11:32 +00:00
|
|
|
)
|
|
|
|
|
|
2026-05-01 17:28:33 +00:00
|
|
|
func ExecuteNone(ctx context.Context, stmt postgres.Statement) error {
|
2026-05-01 04:56:53 +00:00
|
|
|
query, args := stmt.Sql()
|
|
|
|
|
|
2026-05-01 17:28:33 +00:00
|
|
|
_, err := PGInstance.PGXPool.Query(ctx, query, args...)
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-05-07 10:39:17 +00:00
|
|
|
func ExecuteNoneTx(ctx context.Context, txn Ex, stmt postgres.Statement) error {
|
2026-05-04 20:57:50 +00:00
|
|
|
query, args := stmt.Sql()
|
|
|
|
|
|
2026-05-08 22:21:27 +00:00
|
|
|
r, err := txn.Query(ctx, query, args...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("query: %w", err)
|
|
|
|
|
}
|
|
|
|
|
r.Close()
|
|
|
|
|
return nil
|
2026-05-04 20:57:50 +00:00
|
|
|
}
|
|
|
|
|
func ExecuteNoneTxBob(ctx context.Context, txn bob.Tx, stmt postgres.Statement) error {
|
2026-05-01 17:28:33 +00:00
|
|
|
query, args := stmt.Sql()
|
|
|
|
|
|
2026-05-08 22:21:27 +00:00
|
|
|
r, err := txn.QueryContext(ctx, query, args...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("query: %w", err)
|
|
|
|
|
}
|
|
|
|
|
r.Close()
|
|
|
|
|
return nil
|
2026-05-01 17:28:33 +00:00
|
|
|
}
|
2026-05-07 10:39:17 +00:00
|
|
|
func ExecuteOne[T any](ctx context.Context, stmt postgres.Statement) (T, error) {
|
2026-05-01 17:28:33 +00:00
|
|
|
query, args := stmt.Sql()
|
2026-05-01 04:56:53 +00:00
|
|
|
|
2026-05-07 10:39:17 +00:00
|
|
|
var result T
|
2026-05-01 17:28:33 +00:00
|
|
|
row, err := PGInstance.PGXPool.Query(ctx, query, args...)
|
|
|
|
|
if err != nil {
|
2026-05-07 10:39:17 +00:00
|
|
|
return result, fmt.Errorf("execute query: %w", err)
|
2026-05-01 17:28:33 +00:00
|
|
|
}
|
2026-05-07 10:39:17 +00:00
|
|
|
var collected *T
|
|
|
|
|
collected, err = pgx.CollectOneRow(row, pgx.RowToAddrOfStructByPos[T])
|
|
|
|
|
if err != nil || collected == nil {
|
|
|
|
|
return result, fmt.Errorf("collect row: %w", err)
|
|
|
|
|
}
|
|
|
|
|
return *collected, nil
|
2026-05-01 17:28:33 +00:00
|
|
|
}
|
2026-05-07 10:39:17 +00:00
|
|
|
func ExecuteOneTx[T any](ctx context.Context, txn Ex, stmt postgres.Statement) (T, error) {
|
2026-05-04 20:57:50 +00:00
|
|
|
query, args := stmt.Sql()
|
|
|
|
|
|
|
|
|
|
//result, err := scan.One(ctx, txn, scan.StructMapper[T](), query, args...)
|
2026-05-07 10:39:17 +00:00
|
|
|
row, err := txn.Query(ctx, query, args...)
|
|
|
|
|
var result T
|
2026-05-04 20:57:50 +00:00
|
|
|
if err != nil {
|
2026-05-07 10:39:17 +00:00
|
|
|
return result, fmt.Errorf("txn query: %w", err)
|
2026-05-04 20:57:50 +00:00
|
|
|
}
|
2026-05-07 10:39:17 +00:00
|
|
|
var collected *T
|
|
|
|
|
collected, err = pgx.CollectOneRow(row, pgx.RowToAddrOfStructByPos[T])
|
|
|
|
|
if err != nil || collected == nil {
|
|
|
|
|
return result, fmt.Errorf("collect row: %w", err)
|
2026-05-04 20:57:50 +00:00
|
|
|
}
|
2026-05-07 10:39:17 +00:00
|
|
|
return *collected, nil
|
2026-05-04 20:57:50 +00:00
|
|
|
}
|
2026-05-07 10:39:17 +00:00
|
|
|
func ExecuteOneTxBob[T any](ctx context.Context, txn bob.Tx, stmt postgres.Statement) (T, error) {
|
2026-05-01 17:28:33 +00:00
|
|
|
query, args := stmt.Sql()
|
|
|
|
|
|
2026-05-07 10:39:17 +00:00
|
|
|
return scan.One(ctx, txn, scan.StructMapper[T](), query, args...)
|
2026-05-01 17:28:33 +00:00
|
|
|
}
|
2026-05-07 10:39:17 +00:00
|
|
|
func ExecuteMany[T any](ctx context.Context, stmt postgres.Statement) ([]T, error) {
|
2026-05-01 17:28:33 +00:00
|
|
|
query, args := stmt.Sql()
|
|
|
|
|
|
|
|
|
|
rows, err := PGInstance.PGXPool.Query(ctx, query, args...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("execute query: %w", err)
|
|
|
|
|
}
|
2026-05-07 10:39:17 +00:00
|
|
|
collected, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByPos[T])
|
|
|
|
|
if err != nil {
|
|
|
|
|
return []T{}, fmt.Errorf("collect rows: %w", err)
|
|
|
|
|
}
|
|
|
|
|
results := make([]T, len(collected))
|
|
|
|
|
for i, c := range collected {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return results, fmt.Errorf("null collected")
|
|
|
|
|
}
|
|
|
|
|
results[i] = *c
|
|
|
|
|
}
|
|
|
|
|
return results, nil
|
|
|
|
|
}
|
|
|
|
|
func ExecuteManyTx[T any](ctx context.Context, txn Ex, stmt postgres.Statement) ([]T, error) {
|
|
|
|
|
query, args := stmt.Sql()
|
|
|
|
|
|
|
|
|
|
rows, err := txn.Query(ctx, query, args...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("execute query: %w", err)
|
|
|
|
|
}
|
|
|
|
|
collected, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByPos[T])
|
|
|
|
|
if err != nil {
|
|
|
|
|
return []T{}, fmt.Errorf("collect rows: %w", err)
|
|
|
|
|
}
|
|
|
|
|
results := make([]T, len(collected))
|
|
|
|
|
for i, c := range collected {
|
|
|
|
|
if c == nil {
|
|
|
|
|
return results, fmt.Errorf("null collected")
|
|
|
|
|
}
|
|
|
|
|
results[i] = *c
|
|
|
|
|
}
|
|
|
|
|
return results, nil
|
2026-05-01 04:56:53 +00:00
|
|
|
}
|
2025-11-04 23:11:32 +00:00
|
|
|
func doMigrations(connection_string string) error {
|
2026-02-13 21:15:09 +00:00
|
|
|
log.Debug().Str("dsn", connection_string).Msg("Connecting to database")
|
2025-11-04 23:11:32 +00:00
|
|
|
db, err := sql.Open("pgx", connection_string)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to open database connection: %w", err)
|
|
|
|
|
}
|
2026-05-02 00:37:28 +00:00
|
|
|
defer func() {
|
|
|
|
|
err := db.Close()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("failed to close database connection")
|
|
|
|
|
}
|
|
|
|
|
}()
|
2025-11-04 23:11:32 +00:00
|
|
|
row := db.QueryRowContext(context.Background(), "SELECT version()")
|
|
|
|
|
var val string
|
|
|
|
|
if err := row.Scan(&val); err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to get database version query result: %w", err)
|
|
|
|
|
}
|
2025-11-13 20:16:23 +00:00
|
|
|
log.Info().Str("version", val).Msg("Connected to database")
|
2025-11-04 23:11:32 +00:00
|
|
|
|
|
|
|
|
fsys, err := fs.Sub(embedMigrations, "migrations")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to get migrations embedded directory: %w", err)
|
|
|
|
|
}
|
|
|
|
|
provider, err := goose.NewProvider(goose.DialectPostgres, db, fsys)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to create goose provider: %w", err)
|
|
|
|
|
}
|
|
|
|
|
//goose.SetBaseFS(embedMigrations)
|
|
|
|
|
|
|
|
|
|
current, target, err := provider.GetVersions(context.Background())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Faield to get goose versions: %w", err)
|
|
|
|
|
}
|
2025-11-13 20:16:23 +00:00
|
|
|
log.Info().Int("current", int(current)).Int("target", int(target)).Msg("Migration status")
|
2025-11-04 23:11:32 +00:00
|
|
|
results, err := provider.Up(context.Background())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to run migrations: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if len(results) > 0 {
|
|
|
|
|
for _, r := range results {
|
2025-11-13 20:16:23 +00:00
|
|
|
log.Info().Int("version", int(r.Source.Version)).Str("direction", r.Direction).Msg("Migration done")
|
2025-11-04 23:11:32 +00:00
|
|
|
}
|
|
|
|
|
} else {
|
2025-11-13 20:16:23 +00:00
|
|
|
log.Info().Msg("No migrations necessary.")
|
2025-11-04 23:11:32 +00:00
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-24 18:08:24 +00:00
|
|
|
func InitializeDatabase(ctx context.Context, uri string) error {
|
2026-02-13 21:15:09 +00:00
|
|
|
log.Debug().Str("dsn", uri).Msg("Initializing database")
|
2025-11-04 23:11:32 +00:00
|
|
|
needs, err := needsMigrations(uri)
|
|
|
|
|
if err != nil {
|
2025-11-13 20:53:20 +00:00
|
|
|
return fmt.Errorf("Failed to determine if migrations are needed: %w", err)
|
2025-11-04 23:11:32 +00:00
|
|
|
}
|
|
|
|
|
if needs == nil {
|
|
|
|
|
return errors.New("Can't read variable 'needs' - it's nil")
|
|
|
|
|
}
|
|
|
|
|
if *needs {
|
|
|
|
|
//return errors.New(fmt.Sprintf("Must migrate database before connecting: %t", *needs))
|
2025-11-13 20:16:23 +00:00
|
|
|
log.Info().Msg("Handling database migrations")
|
2025-11-04 23:11:32 +00:00
|
|
|
err = doMigrations(uri)
|
|
|
|
|
if err != nil {
|
2025-11-13 20:53:20 +00:00
|
|
|
return fmt.Errorf("Failed to handle migrations: %w", err)
|
2025-11-04 23:11:32 +00:00
|
|
|
}
|
|
|
|
|
} else {
|
2026-02-13 21:15:09 +00:00
|
|
|
log.Debug().Msg("No database migrations necessary")
|
2025-11-04 23:11:32 +00:00
|
|
|
}
|
|
|
|
|
|
2026-05-07 10:39:17 +00:00
|
|
|
config, err := pgxpool.ParseConfig(uri)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("parse config: %w", err)
|
|
|
|
|
}
|
|
|
|
|
config.AfterConnect = func(ctx2 context.Context, conn *pgx.Conn) error {
|
|
|
|
|
err2 := pgxgeom.Register(ctx, conn)
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
return fmt.Errorf("pgxgeom register: %w", err2)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
db, err := pgxpool.NewWithConfig(ctx, config)
|
2025-11-04 23:11:32 +00:00
|
|
|
if err != nil {
|
2026-05-07 10:39:17 +00:00
|
|
|
return fmt.Errorf("new pool: %w", err)
|
2025-11-04 23:11:32 +00:00
|
|
|
}
|
2026-05-07 10:39:17 +00:00
|
|
|
bobDB := bob.NewDB(stdlib.OpenDBFromPool(db))
|
|
|
|
|
PGInstance = &pginstance{bobDB, db}
|
2025-11-04 23:11:32 +00:00
|
|
|
|
|
|
|
|
var current string
|
|
|
|
|
query := `SELECT current_database()`
|
|
|
|
|
err = PGInstance.BobDB.QueryRow(query).Scan(¤t)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("Failed to get database current: %w", err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func needsMigrations(connection_string string) (*bool, error) {
|
|
|
|
|
db, err := sql.Open("pgx", connection_string)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Failed to open database connection: %w", err)
|
|
|
|
|
}
|
2026-05-02 00:37:28 +00:00
|
|
|
defer func() {
|
|
|
|
|
err := db.Close()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("failed to close database connection")
|
|
|
|
|
}
|
|
|
|
|
}()
|
2025-11-04 23:11:32 +00:00
|
|
|
row := db.QueryRowContext(context.Background(), "SELECT version()")
|
|
|
|
|
var val string
|
|
|
|
|
if err := row.Scan(&val); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Failed to get database version query result: %w", err)
|
|
|
|
|
}
|
2025-11-13 20:16:23 +00:00
|
|
|
log.Info().Str("dsn", val).Msg("Connected to database")
|
2025-11-04 23:11:32 +00:00
|
|
|
|
|
|
|
|
fsys, err := fs.Sub(embedMigrations, "migrations")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Failed to get migrations embedded directory: %w", err)
|
|
|
|
|
}
|
|
|
|
|
provider, err := goose.NewProvider(goose.DialectPostgres, db, fsys)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Failed to create goose provider: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
hasPending, err := provider.HasPending(context.Background())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return &hasPending, nil
|
|
|
|
|
}
|