Overhaul publicreport storage layer, create unified tables

This is a huge change. I was getting really sick of the split between
nuisance/water tables when more than half of the data they store is
common. I finally bit off the big work of switching it all.

This creates a single unified table, publicreport.report and copies the
existing report data into it. It also ports existing data from the
original tables into the new table.

Along with all of this I also overhauled the system for handling
asynchronous work to use a LISTEN/NOTIFY connection from the database
and a single cache table to avoid ever losing work.
This commit is contained in:
Eli Ribble 2026-03-18 15:36:20 +00:00
parent 2538638c9d
commit 1e071d5ce5
No known key found for this signature in database
109 changed files with 22903 additions and 11713 deletions

View file

@ -60,6 +60,11 @@ func StartAll(ctx context.Context) error {
defer waitGroup.Done()
refreshFieldseekerData(ctx, newOAuthTokenChannel)
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
listenForJobs(ctx)
}()
err = addWaitingJobs(ctx)
if err != nil {
@ -109,8 +114,10 @@ func handleJob(ctx context.Context, txn bob.Executor, job *models.Job) error {
return handleJobLabelStudioAudioCreate(ctx, txn, job.RowID)
case enums.JobtypeEmailSend:
return email.Job(ctx, txn, job.RowID)
case enums.JobtypeTextRespond:
return text.JobRespond(ctx, txn, job.RowID)
case enums.JobtypeTextSend:
return text.Job(ctx, txn, job.RowID)
return text.JobSend(ctx, txn, job.RowID)
default:
return fmt.Errorf("No handler for job type %s", string(job.Type))
}
@ -151,6 +158,7 @@ func listenAndDoOneJob(ctx context.Context) error {
}
for {
log.Debug().Msg("wait for notification")
notification, err := conn.Conn().WaitForNotification(ctx)
if err != nil {
//if !pgconn.Timeout(err) {
@ -161,12 +169,14 @@ func listenAndDoOneJob(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to parse int from payload '%s': %w", notification.Payload, err)
}
log.Debug().Int("job_id", job_id).Msg("got notification for job")
c := bobpgx.NewConn(conn.Conn())
job, err := models.FindJob(ctx, c, int32(job_id))
if err != nil {
return fmt.Errorf("Failed to find job %d: %w", job_id, err)
}
sublog := log.With().Int32("job", job.ID).Int32("row_id", job.RowID).Str("type", string(job.Type)).Logger()
//tx, err := c.BeginTx(ctx, pgx.TxOptions{})
tx, err := conn.BeginTx(ctx, pgx.TxOptions{})
@ -176,7 +186,6 @@ func listenAndDoOneJob(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
txn := bobpgx.NewTx(tx, cancel)
sublog := log.With().Int32("job", job.ID).Int32("row_id", job.RowID).Logger()
err = handleJob(ctx, txn, job)
if err != nil {
sublog.Error().Err(err).Msg("failed to handle job")
@ -190,5 +199,6 @@ func listenAndDoOneJob(ctx context.Context) error {
return fmt.Errorf("delete job: %w", err)
}
txn.Commit(ctx)
sublog.Debug().Msg("job complete")
}
}