From 393836a86aecd6c4df9322e43cba50809824b710 Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Fri, 22 May 2026 23:19:31 +0000 Subject: [PATCH] Fix notification of job happening before transaction is closed This is kind of a wild one. Turns out that the triggers I was using actually fire before the transaction is closed and I was primarily getting lucky that the job was present on the other side of the connection rather than having things built correctly. I've fixed this by removing the trigger entirely and instead manually triggering as part of the transaction. This makes the NOTIFY call happen as soon as the transaction closes, just at the cost of making my application be in charge of ensuring the NOTIFY gets called. Seems like a win. Part of doing this is porting the existing job creation code over to use Jet. It's something I want to do anyway, so it's a win all around. --- api/audio.go | 2 +- db/gen/nidus-sync/fileupload/enum/csvtype.go | 18 +++ .../fileupload/enum/filestatustype.go | 28 ++++ .../fileupload/enum/poolcondition.go | 26 ++++ db/gen/nidus-sync/fileupload/model/csv.go | 19 +++ db/gen/nidus-sync/fileupload/model/csvtype.go | 49 ++++++ .../nidus-sync/fileupload/model/error_csv.go | 16 ++ .../nidus-sync/fileupload/model/error_file.go | 14 ++ db/gen/nidus-sync/fileupload/model/file.go | 28 ++++ .../fileupload/model/filestatustype.go | 69 ++++++++ db/gen/nidus-sync/fileupload/model/pool.go | 40 +++++ .../fileupload/model/poolcondition.go | 65 ++++++++ db/gen/nidus-sync/fileupload/table/csv.go | 87 +++++++++++ .../nidus-sync/fileupload/table/error_csv.go | 90 +++++++++++ .../nidus-sync/fileupload/table/error_file.go | 84 ++++++++++ db/gen/nidus-sync/fileupload/table/file.go | 111 +++++++++++++ db/gen/nidus-sync/fileupload/table/pool.go | 147 ++++++++++++++++++ .../fileupload/table/table_use_schema.go | 18 +++ db/jet/main.go | 1 + db/migrations/00155_job_trigger_deferred.sql | 19 +++ .../00156_fileupload_pool_condition.sql | 18 +++ db/query/fileupload/csv.go | 47 ++++++ db/query/fileupload/file.go | 58 +++++++ db/query/public/job.go | 14 +- platform/background/background.go | 54 +++---- platform/email/email.go | 2 +- platform/text/text.go | 4 +- platform/upload.go | 54 +++---- resource/upload.go | 6 +- ts/rmo/view/ReportSubmitted.vue | 10 +- 30 files changed, 1126 insertions(+), 72 deletions(-) create mode 100644 db/gen/nidus-sync/fileupload/enum/csvtype.go create mode 100644 db/gen/nidus-sync/fileupload/enum/filestatustype.go create mode 100644 db/gen/nidus-sync/fileupload/enum/poolcondition.go create mode 100644 db/gen/nidus-sync/fileupload/model/csv.go create mode 100644 db/gen/nidus-sync/fileupload/model/csvtype.go create mode 100644 db/gen/nidus-sync/fileupload/model/error_csv.go create mode 100644 db/gen/nidus-sync/fileupload/model/error_file.go create mode 100644 db/gen/nidus-sync/fileupload/model/file.go create mode 100644 db/gen/nidus-sync/fileupload/model/filestatustype.go create mode 100644 db/gen/nidus-sync/fileupload/model/pool.go create mode 100644 db/gen/nidus-sync/fileupload/model/poolcondition.go create mode 100644 db/gen/nidus-sync/fileupload/table/csv.go create mode 100644 db/gen/nidus-sync/fileupload/table/error_csv.go create mode 100644 db/gen/nidus-sync/fileupload/table/error_file.go create mode 100644 db/gen/nidus-sync/fileupload/table/file.go create mode 100644 db/gen/nidus-sync/fileupload/table/pool.go create mode 100644 db/gen/nidus-sync/fileupload/table/table_use_schema.go create mode 100644 db/migrations/00155_job_trigger_deferred.sql create mode 100644 db/migrations/00156_fileupload_pool_condition.sql create mode 100644 db/query/fileupload/csv.go create mode 100644 db/query/fileupload/file.go diff --git a/api/audio.go b/api/audio.go index ab6c886f..06132b1d 100644 --- a/api/audio.go +++ b/api/audio.go @@ -85,7 +85,7 @@ func apiAudioContentPost(w http.ResponseWriter, r *http.Request, user platform.U return } - err = background.NewAudioTranscode(ctx, db.PGInstance.BobDB, a.ID) + err = background.NewAudioTranscode(ctx, db.PGInstance.PGXPool, a.ID) if err != nil { log.Printf("Failed to transcode audio %s for org %d: %w", u_str, user.Organization.ID, err) http.Error(w, "failed to transcode audio", http.StatusBadRequest) diff --git a/db/gen/nidus-sync/fileupload/enum/csvtype.go b/db/gen/nidus-sync/fileupload/enum/csvtype.go new file mode 100644 index 00000000..b7702bcf --- /dev/null +++ b/db/gen/nidus-sync/fileupload/enum/csvtype.go @@ -0,0 +1,18 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package enum + +import "github.com/Gleipnir-Technology/jet/postgres" + +var Csvtype = &struct { + PoolList postgres.StringExpression + Flyover postgres.StringExpression +}{ + PoolList: postgres.NewEnumValue("PoolList"), + Flyover: postgres.NewEnumValue("Flyover"), +} diff --git a/db/gen/nidus-sync/fileupload/enum/filestatustype.go b/db/gen/nidus-sync/fileupload/enum/filestatustype.go new file mode 100644 index 00000000..606cc3ef --- /dev/null +++ b/db/gen/nidus-sync/fileupload/enum/filestatustype.go @@ -0,0 +1,28 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package enum + +import "github.com/Gleipnir-Technology/jet/postgres" + +var Filestatustype = &struct { + Error postgres.StringExpression + Parsed postgres.StringExpression + Uploaded postgres.StringExpression + Parsing postgres.StringExpression + Committing postgres.StringExpression + Committed postgres.StringExpression + Discarded postgres.StringExpression +}{ + Error: postgres.NewEnumValue("error"), + Parsed: postgres.NewEnumValue("parsed"), + Uploaded: postgres.NewEnumValue("uploaded"), + Parsing: postgres.NewEnumValue("parsing"), + Committing: postgres.NewEnumValue("committing"), + Committed: postgres.NewEnumValue("committed"), + Discarded: postgres.NewEnumValue("discarded"), +} diff --git a/db/gen/nidus-sync/fileupload/enum/poolcondition.go b/db/gen/nidus-sync/fileupload/enum/poolcondition.go new file mode 100644 index 00000000..c298789b --- /dev/null +++ b/db/gen/nidus-sync/fileupload/enum/poolcondition.go @@ -0,0 +1,26 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package enum + +import "github.com/Gleipnir-Technology/jet/postgres" + +var Poolcondition = &struct { + Blue postgres.StringExpression + Dry postgres.StringExpression + FalsePool postgres.StringExpression + Unknown postgres.StringExpression + Green postgres.StringExpression + Murky postgres.StringExpression +}{ + Blue: postgres.NewEnumValue("blue"), + Dry: postgres.NewEnumValue("dry"), + FalsePool: postgres.NewEnumValue("false pool"), + Unknown: postgres.NewEnumValue("unknown"), + Green: postgres.NewEnumValue("green"), + Murky: postgres.NewEnumValue("murky"), +} diff --git a/db/gen/nidus-sync/fileupload/model/csv.go b/db/gen/nidus-sync/fileupload/model/csv.go new file mode 100644 index 00000000..31ecdcd1 --- /dev/null +++ b/db/gen/nidus-sync/fileupload/model/csv.go @@ -0,0 +1,19 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package model + +import ( + "time" +) + +type Csv struct { + Committed *time.Time + FileID int32 `sql:"primary_key"` + Rowcount int32 + Type Csvtype +} diff --git a/db/gen/nidus-sync/fileupload/model/csvtype.go b/db/gen/nidus-sync/fileupload/model/csvtype.go new file mode 100644 index 00000000..c14e57f6 --- /dev/null +++ b/db/gen/nidus-sync/fileupload/model/csvtype.go @@ -0,0 +1,49 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package model + +import "errors" + +type Csvtype string + +const ( + Csvtype_PoolList Csvtype = "PoolList" + Csvtype_Flyover Csvtype = "Flyover" +) + +var CsvtypeAllValues = []Csvtype{ + Csvtype_PoolList, + Csvtype_Flyover, +} + +func (e *Csvtype) Scan(value interface{}) error { + var enumValue string + switch val := value.(type) { + case string: + enumValue = val + case []byte: + enumValue = string(val) + default: + return errors.New("jet: Invalid scan value for AllTypesEnum enum. Enum value has to be of type string or []byte") + } + + switch enumValue { + case "PoolList": + *e = Csvtype_PoolList + case "Flyover": + *e = Csvtype_Flyover + default: + return errors.New("jet: Invalid scan value '" + enumValue + "' for Csvtype enum") + } + + return nil +} + +func (e Csvtype) String() string { + return string(e) +} diff --git a/db/gen/nidus-sync/fileupload/model/error_csv.go b/db/gen/nidus-sync/fileupload/model/error_csv.go new file mode 100644 index 00000000..5be3870e --- /dev/null +++ b/db/gen/nidus-sync/fileupload/model/error_csv.go @@ -0,0 +1,16 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package model + +type ErrorCsv struct { + Col int32 + CsvFileID int32 + ID int32 `sql:"primary_key"` + Line int32 + Message string +} diff --git a/db/gen/nidus-sync/fileupload/model/error_file.go b/db/gen/nidus-sync/fileupload/model/error_file.go new file mode 100644 index 00000000..0357a492 --- /dev/null +++ b/db/gen/nidus-sync/fileupload/model/error_file.go @@ -0,0 +1,14 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package model + +type ErrorFile struct { + FileID int32 + ID int32 `sql:"primary_key"` + Message string +} diff --git a/db/gen/nidus-sync/fileupload/model/file.go b/db/gen/nidus-sync/fileupload/model/file.go new file mode 100644 index 00000000..78b42970 --- /dev/null +++ b/db/gen/nidus-sync/fileupload/model/file.go @@ -0,0 +1,28 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package model + +import ( + "github.com/google/uuid" + "time" +) + +type File struct { + ID int32 `sql:"primary_key"` + ContentType string + Created time.Time + CreatorID int32 + Deleted *time.Time + Name string + OrganizationID int32 + Status Filestatustype + SizeBytes int32 + FileUUID uuid.UUID + Committer *int32 + Error string +} diff --git a/db/gen/nidus-sync/fileupload/model/filestatustype.go b/db/gen/nidus-sync/fileupload/model/filestatustype.go new file mode 100644 index 00000000..0c3aa3c0 --- /dev/null +++ b/db/gen/nidus-sync/fileupload/model/filestatustype.go @@ -0,0 +1,69 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package model + +import "errors" + +type Filestatustype string + +const ( + Filestatustype_Error Filestatustype = "error" + Filestatustype_Parsed Filestatustype = "parsed" + Filestatustype_Uploaded Filestatustype = "uploaded" + Filestatustype_Parsing Filestatustype = "parsing" + Filestatustype_Committing Filestatustype = "committing" + Filestatustype_Committed Filestatustype = "committed" + Filestatustype_Discarded Filestatustype = "discarded" +) + +var FilestatustypeAllValues = []Filestatustype{ + Filestatustype_Error, + Filestatustype_Parsed, + Filestatustype_Uploaded, + Filestatustype_Parsing, + Filestatustype_Committing, + Filestatustype_Committed, + Filestatustype_Discarded, +} + +func (e *Filestatustype) Scan(value interface{}) error { + var enumValue string + switch val := value.(type) { + case string: + enumValue = val + case []byte: + enumValue = string(val) + default: + return errors.New("jet: Invalid scan value for AllTypesEnum enum. Enum value has to be of type string or []byte") + } + + switch enumValue { + case "error": + *e = Filestatustype_Error + case "parsed": + *e = Filestatustype_Parsed + case "uploaded": + *e = Filestatustype_Uploaded + case "parsing": + *e = Filestatustype_Parsing + case "committing": + *e = Filestatustype_Committing + case "committed": + *e = Filestatustype_Committed + case "discarded": + *e = Filestatustype_Discarded + default: + return errors.New("jet: Invalid scan value '" + enumValue + "' for Filestatustype enum") + } + + return nil +} + +func (e Filestatustype) String() string { + return string(e) +} diff --git a/db/gen/nidus-sync/fileupload/model/pool.go b/db/gen/nidus-sync/fileupload/model/pool.go new file mode 100644 index 00000000..df2283c7 --- /dev/null +++ b/db/gen/nidus-sync/fileupload/model/pool.go @@ -0,0 +1,40 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package model + +import ( + "github.com/twpayne/go-geom" + "time" +) + +type Pool struct { + AddressPostalCode string + AddressStreet string + Committed bool + Created time.Time + CreatorID int32 + CsvFile int32 + Deleted *time.Time + Geom *geom.T + H3cell *string + ID int32 `sql:"primary_key"` + IsInDistrict bool + IsNew bool + Notes string + PropertyOwnerName string + PropertyOwnerPhoneE164 *string + ResidentOwned *bool + ResidentPhoneE164 *string + LineNumber int32 + Tags string + AddressNumber string + AddressLocality string + AddressRegion string + Condition Poolcondition + AddressID *int32 +} diff --git a/db/gen/nidus-sync/fileupload/model/poolcondition.go b/db/gen/nidus-sync/fileupload/model/poolcondition.go new file mode 100644 index 00000000..c7fa5d53 --- /dev/null +++ b/db/gen/nidus-sync/fileupload/model/poolcondition.go @@ -0,0 +1,65 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package model + +import "errors" + +type Poolcondition string + +const ( + Poolcondition_Blue Poolcondition = "blue" + Poolcondition_Dry Poolcondition = "dry" + Poolcondition_FalsePool Poolcondition = "false pool" + Poolcondition_Unknown Poolcondition = "unknown" + Poolcondition_Green Poolcondition = "green" + Poolcondition_Murky Poolcondition = "murky" +) + +var PoolconditionAllValues = []Poolcondition{ + Poolcondition_Blue, + Poolcondition_Dry, + Poolcondition_FalsePool, + Poolcondition_Unknown, + Poolcondition_Green, + Poolcondition_Murky, +} + +func (e *Poolcondition) Scan(value interface{}) error { + var enumValue string + switch val := value.(type) { + case string: + enumValue = val + case []byte: + enumValue = string(val) + default: + return errors.New("jet: Invalid scan value for AllTypesEnum enum. Enum value has to be of type string or []byte") + } + + switch enumValue { + case "blue": + *e = Poolcondition_Blue + case "dry": + *e = Poolcondition_Dry + case "false pool": + *e = Poolcondition_FalsePool + case "unknown": + *e = Poolcondition_Unknown + case "green": + *e = Poolcondition_Green + case "murky": + *e = Poolcondition_Murky + default: + return errors.New("jet: Invalid scan value '" + enumValue + "' for Poolcondition enum") + } + + return nil +} + +func (e Poolcondition) String() string { + return string(e) +} diff --git a/db/gen/nidus-sync/fileupload/table/csv.go b/db/gen/nidus-sync/fileupload/table/csv.go new file mode 100644 index 00000000..72a54365 --- /dev/null +++ b/db/gen/nidus-sync/fileupload/table/csv.go @@ -0,0 +1,87 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package table + +import ( + "github.com/Gleipnir-Technology/jet/postgres" +) + +var Csv = newCsvTable("fileupload", "csv", "") + +type csvTable struct { + postgres.Table + + // Columns + Committed postgres.ColumnTimestamp + FileID postgres.ColumnInteger + Rowcount postgres.ColumnInteger + Type postgres.ColumnString + + AllColumns postgres.ColumnList + MutableColumns postgres.ColumnList + DefaultColumns postgres.ColumnList +} + +type CsvTable struct { + csvTable + + EXCLUDED csvTable +} + +// AS creates new CsvTable with assigned alias +func (a CsvTable) AS(alias string) *CsvTable { + return newCsvTable(a.SchemaName(), a.TableName(), alias) +} + +// Schema creates new CsvTable with assigned schema name +func (a CsvTable) FromSchema(schemaName string) *CsvTable { + return newCsvTable(schemaName, a.TableName(), a.Alias()) +} + +// WithPrefix creates new CsvTable with assigned table prefix +func (a CsvTable) WithPrefix(prefix string) *CsvTable { + return newCsvTable(a.SchemaName(), prefix+a.TableName(), a.TableName()) +} + +// WithSuffix creates new CsvTable with assigned table suffix +func (a CsvTable) WithSuffix(suffix string) *CsvTable { + return newCsvTable(a.SchemaName(), a.TableName()+suffix, a.TableName()) +} + +func newCsvTable(schemaName, tableName, alias string) *CsvTable { + return &CsvTable{ + csvTable: newCsvTableImpl(schemaName, tableName, alias), + EXCLUDED: newCsvTableImpl("", "excluded", ""), + } +} + +func newCsvTableImpl(schemaName, tableName, alias string) csvTable { + var ( + CommittedColumn = postgres.TimestampColumn("committed") + FileIDColumn = postgres.IntegerColumn("file_id") + RowcountColumn = postgres.IntegerColumn("rowcount") + TypeColumn = postgres.StringColumn("type_") + allColumns = postgres.ColumnList{CommittedColumn, FileIDColumn, RowcountColumn, TypeColumn} + mutableColumns = postgres.ColumnList{CommittedColumn, RowcountColumn, TypeColumn} + defaultColumns = postgres.ColumnList{} + ) + + return csvTable{ + Table: postgres.NewTable(schemaName, tableName, alias, allColumns...), + + //Columns + Committed: CommittedColumn, + FileID: FileIDColumn, + Rowcount: RowcountColumn, + Type: TypeColumn, + + AllColumns: allColumns, + MutableColumns: mutableColumns, + DefaultColumns: defaultColumns, + } +} diff --git a/db/gen/nidus-sync/fileupload/table/error_csv.go b/db/gen/nidus-sync/fileupload/table/error_csv.go new file mode 100644 index 00000000..99016887 --- /dev/null +++ b/db/gen/nidus-sync/fileupload/table/error_csv.go @@ -0,0 +1,90 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package table + +import ( + "github.com/Gleipnir-Technology/jet/postgres" +) + +var ErrorCsv = newErrorCsvTable("fileupload", "error_csv", "") + +type errorCsvTable struct { + postgres.Table + + // Columns + Col postgres.ColumnInteger + CsvFileID postgres.ColumnInteger + ID postgres.ColumnInteger + Line postgres.ColumnInteger + Message postgres.ColumnString + + AllColumns postgres.ColumnList + MutableColumns postgres.ColumnList + DefaultColumns postgres.ColumnList +} + +type ErrorCsvTable struct { + errorCsvTable + + EXCLUDED errorCsvTable +} + +// AS creates new ErrorCsvTable with assigned alias +func (a ErrorCsvTable) AS(alias string) *ErrorCsvTable { + return newErrorCsvTable(a.SchemaName(), a.TableName(), alias) +} + +// Schema creates new ErrorCsvTable with assigned schema name +func (a ErrorCsvTable) FromSchema(schemaName string) *ErrorCsvTable { + return newErrorCsvTable(schemaName, a.TableName(), a.Alias()) +} + +// WithPrefix creates new ErrorCsvTable with assigned table prefix +func (a ErrorCsvTable) WithPrefix(prefix string) *ErrorCsvTable { + return newErrorCsvTable(a.SchemaName(), prefix+a.TableName(), a.TableName()) +} + +// WithSuffix creates new ErrorCsvTable with assigned table suffix +func (a ErrorCsvTable) WithSuffix(suffix string) *ErrorCsvTable { + return newErrorCsvTable(a.SchemaName(), a.TableName()+suffix, a.TableName()) +} + +func newErrorCsvTable(schemaName, tableName, alias string) *ErrorCsvTable { + return &ErrorCsvTable{ + errorCsvTable: newErrorCsvTableImpl(schemaName, tableName, alias), + EXCLUDED: newErrorCsvTableImpl("", "excluded", ""), + } +} + +func newErrorCsvTableImpl(schemaName, tableName, alias string) errorCsvTable { + var ( + ColColumn = postgres.IntegerColumn("col") + CsvFileIDColumn = postgres.IntegerColumn("csv_file_id") + IDColumn = postgres.IntegerColumn("id") + LineColumn = postgres.IntegerColumn("line") + MessageColumn = postgres.StringColumn("message") + allColumns = postgres.ColumnList{ColColumn, CsvFileIDColumn, IDColumn, LineColumn, MessageColumn} + mutableColumns = postgres.ColumnList{ColColumn, CsvFileIDColumn, LineColumn, MessageColumn} + defaultColumns = postgres.ColumnList{IDColumn} + ) + + return errorCsvTable{ + Table: postgres.NewTable(schemaName, tableName, alias, allColumns...), + + //Columns + Col: ColColumn, + CsvFileID: CsvFileIDColumn, + ID: IDColumn, + Line: LineColumn, + Message: MessageColumn, + + AllColumns: allColumns, + MutableColumns: mutableColumns, + DefaultColumns: defaultColumns, + } +} diff --git a/db/gen/nidus-sync/fileupload/table/error_file.go b/db/gen/nidus-sync/fileupload/table/error_file.go new file mode 100644 index 00000000..827bc1da --- /dev/null +++ b/db/gen/nidus-sync/fileupload/table/error_file.go @@ -0,0 +1,84 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package table + +import ( + "github.com/Gleipnir-Technology/jet/postgres" +) + +var ErrorFile = newErrorFileTable("fileupload", "error_file", "") + +type errorFileTable struct { + postgres.Table + + // Columns + FileID postgres.ColumnInteger + ID postgres.ColumnInteger + Message postgres.ColumnString + + AllColumns postgres.ColumnList + MutableColumns postgres.ColumnList + DefaultColumns postgres.ColumnList +} + +type ErrorFileTable struct { + errorFileTable + + EXCLUDED errorFileTable +} + +// AS creates new ErrorFileTable with assigned alias +func (a ErrorFileTable) AS(alias string) *ErrorFileTable { + return newErrorFileTable(a.SchemaName(), a.TableName(), alias) +} + +// Schema creates new ErrorFileTable with assigned schema name +func (a ErrorFileTable) FromSchema(schemaName string) *ErrorFileTable { + return newErrorFileTable(schemaName, a.TableName(), a.Alias()) +} + +// WithPrefix creates new ErrorFileTable with assigned table prefix +func (a ErrorFileTable) WithPrefix(prefix string) *ErrorFileTable { + return newErrorFileTable(a.SchemaName(), prefix+a.TableName(), a.TableName()) +} + +// WithSuffix creates new ErrorFileTable with assigned table suffix +func (a ErrorFileTable) WithSuffix(suffix string) *ErrorFileTable { + return newErrorFileTable(a.SchemaName(), a.TableName()+suffix, a.TableName()) +} + +func newErrorFileTable(schemaName, tableName, alias string) *ErrorFileTable { + return &ErrorFileTable{ + errorFileTable: newErrorFileTableImpl(schemaName, tableName, alias), + EXCLUDED: newErrorFileTableImpl("", "excluded", ""), + } +} + +func newErrorFileTableImpl(schemaName, tableName, alias string) errorFileTable { + var ( + FileIDColumn = postgres.IntegerColumn("file_id") + IDColumn = postgres.IntegerColumn("id") + MessageColumn = postgres.StringColumn("message") + allColumns = postgres.ColumnList{FileIDColumn, IDColumn, MessageColumn} + mutableColumns = postgres.ColumnList{FileIDColumn, MessageColumn} + defaultColumns = postgres.ColumnList{IDColumn} + ) + + return errorFileTable{ + Table: postgres.NewTable(schemaName, tableName, alias, allColumns...), + + //Columns + FileID: FileIDColumn, + ID: IDColumn, + Message: MessageColumn, + + AllColumns: allColumns, + MutableColumns: mutableColumns, + DefaultColumns: defaultColumns, + } +} diff --git a/db/gen/nidus-sync/fileupload/table/file.go b/db/gen/nidus-sync/fileupload/table/file.go new file mode 100644 index 00000000..ffd8ed88 --- /dev/null +++ b/db/gen/nidus-sync/fileupload/table/file.go @@ -0,0 +1,111 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package table + +import ( + "github.com/Gleipnir-Technology/jet/postgres" +) + +var File = newFileTable("fileupload", "file", "") + +type fileTable struct { + postgres.Table + + // Columns + ID postgres.ColumnInteger + ContentType postgres.ColumnString + Created postgres.ColumnTimestamp + CreatorID postgres.ColumnInteger + Deleted postgres.ColumnTimestamp + Name postgres.ColumnString + OrganizationID postgres.ColumnInteger + Status postgres.ColumnString + SizeBytes postgres.ColumnInteger + FileUUID postgres.ColumnString + Committer postgres.ColumnInteger + Error postgres.ColumnString + + AllColumns postgres.ColumnList + MutableColumns postgres.ColumnList + DefaultColumns postgres.ColumnList +} + +type FileTable struct { + fileTable + + EXCLUDED fileTable +} + +// AS creates new FileTable with assigned alias +func (a FileTable) AS(alias string) *FileTable { + return newFileTable(a.SchemaName(), a.TableName(), alias) +} + +// Schema creates new FileTable with assigned schema name +func (a FileTable) FromSchema(schemaName string) *FileTable { + return newFileTable(schemaName, a.TableName(), a.Alias()) +} + +// WithPrefix creates new FileTable with assigned table prefix +func (a FileTable) WithPrefix(prefix string) *FileTable { + return newFileTable(a.SchemaName(), prefix+a.TableName(), a.TableName()) +} + +// WithSuffix creates new FileTable with assigned table suffix +func (a FileTable) WithSuffix(suffix string) *FileTable { + return newFileTable(a.SchemaName(), a.TableName()+suffix, a.TableName()) +} + +func newFileTable(schemaName, tableName, alias string) *FileTable { + return &FileTable{ + fileTable: newFileTableImpl(schemaName, tableName, alias), + EXCLUDED: newFileTableImpl("", "excluded", ""), + } +} + +func newFileTableImpl(schemaName, tableName, alias string) fileTable { + var ( + IDColumn = postgres.IntegerColumn("id") + ContentTypeColumn = postgres.StringColumn("content_type") + CreatedColumn = postgres.TimestampColumn("created") + CreatorIDColumn = postgres.IntegerColumn("creator_id") + DeletedColumn = postgres.TimestampColumn("deleted") + NameColumn = postgres.StringColumn("name") + OrganizationIDColumn = postgres.IntegerColumn("organization_id") + StatusColumn = postgres.StringColumn("status") + SizeBytesColumn = postgres.IntegerColumn("size_bytes") + FileUUIDColumn = postgres.StringColumn("file_uuid") + CommitterColumn = postgres.IntegerColumn("committer") + ErrorColumn = postgres.StringColumn("error") + allColumns = postgres.ColumnList{IDColumn, ContentTypeColumn, CreatedColumn, CreatorIDColumn, DeletedColumn, NameColumn, OrganizationIDColumn, StatusColumn, SizeBytesColumn, FileUUIDColumn, CommitterColumn, ErrorColumn} + mutableColumns = postgres.ColumnList{ContentTypeColumn, CreatedColumn, CreatorIDColumn, DeletedColumn, NameColumn, OrganizationIDColumn, StatusColumn, SizeBytesColumn, FileUUIDColumn, CommitterColumn, ErrorColumn} + defaultColumns = postgres.ColumnList{IDColumn} + ) + + return fileTable{ + Table: postgres.NewTable(schemaName, tableName, alias, allColumns...), + + //Columns + ID: IDColumn, + ContentType: ContentTypeColumn, + Created: CreatedColumn, + CreatorID: CreatorIDColumn, + Deleted: DeletedColumn, + Name: NameColumn, + OrganizationID: OrganizationIDColumn, + Status: StatusColumn, + SizeBytes: SizeBytesColumn, + FileUUID: FileUUIDColumn, + Committer: CommitterColumn, + Error: ErrorColumn, + + AllColumns: allColumns, + MutableColumns: mutableColumns, + DefaultColumns: defaultColumns, + } +} diff --git a/db/gen/nidus-sync/fileupload/table/pool.go b/db/gen/nidus-sync/fileupload/table/pool.go new file mode 100644 index 00000000..7b4c372b --- /dev/null +++ b/db/gen/nidus-sync/fileupload/table/pool.go @@ -0,0 +1,147 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package table + +import ( + "github.com/Gleipnir-Technology/jet/postgres" +) + +var Pool = newPoolTable("fileupload", "pool", "") + +type poolTable struct { + postgres.Table + + // Columns + AddressPostalCode postgres.ColumnString + AddressStreet postgres.ColumnString + Committed postgres.ColumnBool + Created postgres.ColumnTimestamp + CreatorID postgres.ColumnInteger + CsvFile postgres.ColumnInteger + Deleted postgres.ColumnTimestamp + Geom postgres.ColumnGeometry + H3cell postgres.ColumnString + ID postgres.ColumnInteger + IsInDistrict postgres.ColumnBool + IsNew postgres.ColumnBool + Notes postgres.ColumnString + PropertyOwnerName postgres.ColumnString + PropertyOwnerPhoneE164 postgres.ColumnString + ResidentOwned postgres.ColumnBool + ResidentPhoneE164 postgres.ColumnString + LineNumber postgres.ColumnInteger + Tags postgres.ColumnString + AddressNumber postgres.ColumnString + AddressLocality postgres.ColumnString + AddressRegion postgres.ColumnString + Condition postgres.ColumnString + AddressID postgres.ColumnInteger + + AllColumns postgres.ColumnList + MutableColumns postgres.ColumnList + DefaultColumns postgres.ColumnList +} + +type PoolTable struct { + poolTable + + EXCLUDED poolTable +} + +// AS creates new PoolTable with assigned alias +func (a PoolTable) AS(alias string) *PoolTable { + return newPoolTable(a.SchemaName(), a.TableName(), alias) +} + +// Schema creates new PoolTable with assigned schema name +func (a PoolTable) FromSchema(schemaName string) *PoolTable { + return newPoolTable(schemaName, a.TableName(), a.Alias()) +} + +// WithPrefix creates new PoolTable with assigned table prefix +func (a PoolTable) WithPrefix(prefix string) *PoolTable { + return newPoolTable(a.SchemaName(), prefix+a.TableName(), a.TableName()) +} + +// WithSuffix creates new PoolTable with assigned table suffix +func (a PoolTable) WithSuffix(suffix string) *PoolTable { + return newPoolTable(a.SchemaName(), a.TableName()+suffix, a.TableName()) +} + +func newPoolTable(schemaName, tableName, alias string) *PoolTable { + return &PoolTable{ + poolTable: newPoolTableImpl(schemaName, tableName, alias), + EXCLUDED: newPoolTableImpl("", "excluded", ""), + } +} + +func newPoolTableImpl(schemaName, tableName, alias string) poolTable { + var ( + AddressPostalCodeColumn = postgres.StringColumn("address_postal_code") + AddressStreetColumn = postgres.StringColumn("address_street") + CommittedColumn = postgres.BoolColumn("committed") + CreatedColumn = postgres.TimestampColumn("created") + CreatorIDColumn = postgres.IntegerColumn("creator_id") + CsvFileColumn = postgres.IntegerColumn("csv_file") + DeletedColumn = postgres.TimestampColumn("deleted") + GeomColumn = postgres.GeometryColumn("geom") + H3cellColumn = postgres.StringColumn("h3cell") + IDColumn = postgres.IntegerColumn("id") + IsInDistrictColumn = postgres.BoolColumn("is_in_district") + IsNewColumn = postgres.BoolColumn("is_new") + NotesColumn = postgres.StringColumn("notes") + PropertyOwnerNameColumn = postgres.StringColumn("property_owner_name") + PropertyOwnerPhoneE164Column = postgres.StringColumn("property_owner_phone_e164") + ResidentOwnedColumn = postgres.BoolColumn("resident_owned") + ResidentPhoneE164Column = postgres.StringColumn("resident_phone_e164") + LineNumberColumn = postgres.IntegerColumn("line_number") + TagsColumn = postgres.StringColumn("tags") + AddressNumberColumn = postgres.StringColumn("address_number") + AddressLocalityColumn = postgres.StringColumn("address_locality") + AddressRegionColumn = postgres.StringColumn("address_region") + ConditionColumn = postgres.StringColumn("condition") + AddressIDColumn = postgres.IntegerColumn("address_id") + allColumns = postgres.ColumnList{AddressPostalCodeColumn, AddressStreetColumn, CommittedColumn, CreatedColumn, CreatorIDColumn, CsvFileColumn, DeletedColumn, GeomColumn, H3cellColumn, IDColumn, IsInDistrictColumn, IsNewColumn, NotesColumn, PropertyOwnerNameColumn, PropertyOwnerPhoneE164Column, ResidentOwnedColumn, ResidentPhoneE164Column, LineNumberColumn, TagsColumn, AddressNumberColumn, AddressLocalityColumn, AddressRegionColumn, ConditionColumn, AddressIDColumn} + mutableColumns = postgres.ColumnList{AddressPostalCodeColumn, AddressStreetColumn, CommittedColumn, CreatedColumn, CreatorIDColumn, CsvFileColumn, DeletedColumn, GeomColumn, H3cellColumn, IsInDistrictColumn, IsNewColumn, NotesColumn, PropertyOwnerNameColumn, PropertyOwnerPhoneE164Column, ResidentOwnedColumn, ResidentPhoneE164Column, LineNumberColumn, TagsColumn, AddressNumberColumn, AddressLocalityColumn, AddressRegionColumn, ConditionColumn, AddressIDColumn} + defaultColumns = postgres.ColumnList{IDColumn} + ) + + return poolTable{ + Table: postgres.NewTable(schemaName, tableName, alias, allColumns...), + + //Columns + AddressPostalCode: AddressPostalCodeColumn, + AddressStreet: AddressStreetColumn, + Committed: CommittedColumn, + Created: CreatedColumn, + CreatorID: CreatorIDColumn, + CsvFile: CsvFileColumn, + Deleted: DeletedColumn, + Geom: GeomColumn, + H3cell: H3cellColumn, + ID: IDColumn, + IsInDistrict: IsInDistrictColumn, + IsNew: IsNewColumn, + Notes: NotesColumn, + PropertyOwnerName: PropertyOwnerNameColumn, + PropertyOwnerPhoneE164: PropertyOwnerPhoneE164Column, + ResidentOwned: ResidentOwnedColumn, + ResidentPhoneE164: ResidentPhoneE164Column, + LineNumber: LineNumberColumn, + Tags: TagsColumn, + AddressNumber: AddressNumberColumn, + AddressLocality: AddressLocalityColumn, + AddressRegion: AddressRegionColumn, + Condition: ConditionColumn, + AddressID: AddressIDColumn, + + AllColumns: allColumns, + MutableColumns: mutableColumns, + DefaultColumns: defaultColumns, + } +} diff --git a/db/gen/nidus-sync/fileupload/table/table_use_schema.go b/db/gen/nidus-sync/fileupload/table/table_use_schema.go new file mode 100644 index 00000000..632681ed --- /dev/null +++ b/db/gen/nidus-sync/fileupload/table/table_use_schema.go @@ -0,0 +1,18 @@ +// +// Code generated by go-jet DO NOT EDIT. +// +// WARNING: Changes to this file may cause incorrect behavior +// and will be lost if the code is regenerated +// + +package table + +// UseSchema sets a new schema name for all generated table SQL builder types. It is recommended to invoke +// this method only once at the beginning of the program. +func UseSchema(schema string) { + Csv = Csv.FromSchema(schema) + ErrorCsv = ErrorCsv.FromSchema(schema) + ErrorFile = ErrorFile.FromSchema(schema) + File = File.FromSchema(schema) + Pool = Pool.FromSchema(schema) +} diff --git a/db/jet/main.go b/db/jet/main.go index 2744e4a0..1502e9ed 100644 --- a/db/jet/main.go +++ b/db/jet/main.go @@ -16,6 +16,7 @@ import ( var schemas []string = []string{ "arcgis", "comms", + "fileupload", "public", "publicreport", "stadia", diff --git a/db/migrations/00155_job_trigger_deferred.sql b/db/migrations/00155_job_trigger_deferred.sql new file mode 100644 index 00000000..b0de59b4 --- /dev/null +++ b/db/migrations/00155_job_trigger_deferred.sql @@ -0,0 +1,19 @@ +-- +goose Up +DROP TRIGGER job_insert_trigger ON job; +DROP FUNCTION notify_new_job(); + +-- +goose Down +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION notify_new_job() +RETURNS TRIGGER AS $$ +BEGIN + PERFORM pg_notify('new_job', NEW.id::text); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +-- +goose StatementEnd + +CREATE TRIGGER job_insert_trigger + AFTER INSERT ON job + FOR EACH ROW + EXECUTE FUNCTION notify_new_job(); diff --git a/db/migrations/00156_fileupload_pool_condition.sql b/db/migrations/00156_fileupload_pool_condition.sql new file mode 100644 index 00000000..3066d217 --- /dev/null +++ b/db/migrations/00156_fileupload_pool_condition.sql @@ -0,0 +1,18 @@ +-- +goose Up +CREATE TYPE fileupload.PoolCondition AS ENUM ( + 'blue', + 'dry', + 'false pool', + 'unknown', + 'green', + 'murky' +); +ALTER TABLE fileupload.pool + ALTER COLUMN condition TYPE fileupload.poolcondition + USING condition::text::fileupload.poolcondition; + +-- +goose Down +ALTER TABLE fileupload.pool + ALTER COLUMN condition TYPE public.poolconditiontype + USING condition::text::poolconditiontype; +DROP TYPE fileupload.PoolCondition; diff --git a/db/query/fileupload/csv.go b/db/query/fileupload/csv.go new file mode 100644 index 00000000..cbf121da --- /dev/null +++ b/db/query/fileupload/csv.go @@ -0,0 +1,47 @@ +package fileupload + +import ( + "context" + + //"github.com/Gleipnir-Technology/bob" + "source.gleipnir.technology/Gleipnir/nidus-sync/db" + //"source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/public/enum" + //"github.com/Gleipnir-Technology/jet/postgres" + "source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/fileupload/model" + "source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/fileupload/table" +) + +func CSVInsert(ctx context.Context, txn db.Tx, m model.Csv) (model.Csv, error) { + statement := table.Csv.INSERT(table.Csv.MutableColumns). + MODEL(m). + RETURNING(table.Csv.AllColumns) + return db.ExecuteOneTx[model.Csv](ctx, txn, statement) +} + +/* +func CommunicationSetStatus(ctx context.Context, txn db.Tx, org_id int64, comm_id int64, status model.Communicationstatus) error { + statement := table.Communication.UPDATE(). + SET( + table.Communication.Status.SET(postgres.NewEnumValue(status.String())), + ). + WHERE(table.Communication.OrganizationID.EQ(postgres.Int(org_id)).AND( + table.Communication.ID.EQ(postgres.Int(comm_id)))) + return db.ExecuteNoneTx(ctx, txn, statement) +} + +func EmailLogFromID(ctx context.Context, id int64) (model.EmailLog, error) { + statement := table.EmailLog.SELECT( + table.EmailLog.AllColumns, + ).FROM(table.EmailLog). + WHERE(table.EmailLog.ID.EQ(postgres.Int(id))) + return db.ExecuteOne[model.EmailLog](ctx, statement) +} +func EmailLogsFromAddress(ctx context.Context, address string) ([]model.EmailLog, error) { + statement := table.EmailLog.SELECT( + table.EmailLog.AllColumns, + ).FROM(table.EmailLog). + WHERE(table.EmailLog.Source.EQ(postgres.String(address)).OR( + table.EmailLog.Destination.EQ(postgres.String(address)))) + return db.ExecuteMany[model.EmailLog](ctx, statement) +} +*/ diff --git a/db/query/fileupload/file.go b/db/query/fileupload/file.go new file mode 100644 index 00000000..bfa77345 --- /dev/null +++ b/db/query/fileupload/file.go @@ -0,0 +1,58 @@ +package fileupload + +import ( + "context" + + "github.com/Gleipnir-Technology/jet/postgres" + "source.gleipnir.technology/Gleipnir/nidus-sync/db" + "source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/fileupload/model" + "source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/fileupload/table" +) + +func FileInsert(ctx context.Context, txn db.Tx, m model.File) (model.File, error) { + statement := table.File.INSERT(table.File.MutableColumns). + MODEL(m). + RETURNING(table.File.AllColumns) + return db.ExecuteOneTx[model.File](ctx, txn, statement) +} +func FileUpdateCommitting(ctx context.Context, txn db.Tx, org_id int64, file_id int64, committer_id int64) error { + statement := table.File.UPDATE( + table.File.Status, + table.File.Committer, + ).SET( + table.File.Status.SET(postgres.NewEnumValue(model.Filestatustype_Committing.String())), + table.File.Committer.SET(postgres.Int(committer_id)), + ).WHERE(postgres.AND( + table.File.OrganizationID.EQ(postgres.Int(org_id)), + table.File.ID.EQ(postgres.Int(file_id)), + )) + return db.ExecuteNoneTx(ctx, txn, statement) +} + +/* +func CommunicationSetStatus(ctx context.Context, txn db.Tx, org_id int64, comm_id int64, status model.Communicationstatus) error { + statement := table.Communication.UPDATE(). + SET( + table.Communication.Status.SET(postgres.NewEnumValue(status.String())), + ). + WHERE(table.Communication.OrganizationID.EQ(postgres.Int(org_id)).AND( + table.Communication.ID.EQ(postgres.Int(comm_id)))) + return db.ExecuteNoneTx(ctx, txn, statement) +} + +func EmailLogFromID(ctx context.Context, id int64) (model.EmailLog, error) { + statement := table.EmailLog.SELECT( + table.EmailLog.AllColumns, + ).FROM(table.EmailLog). + WHERE(table.EmailLog.ID.EQ(postgres.Int(id))) + return db.ExecuteOne[model.EmailLog](ctx, statement) +} +func EmailLogsFromAddress(ctx context.Context, address string) ([]model.EmailLog, error) { + statement := table.EmailLog.SELECT( + table.EmailLog.AllColumns, + ).FROM(table.EmailLog). + WHERE(table.EmailLog.Source.EQ(postgres.String(address)).OR( + table.EmailLog.Destination.EQ(postgres.String(address)))) + return db.ExecuteMany[model.EmailLog](ctx, statement) +} +*/ diff --git a/db/query/public/job.go b/db/query/public/job.go index fde6d98e..f2c42f5e 100644 --- a/db/query/public/job.go +++ b/db/query/public/job.go @@ -3,15 +3,25 @@ package public import ( "context" + "github.com/Gleipnir-Technology/jet/postgres" "source.gleipnir.technology/Gleipnir/nidus-sync/db" "source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/public/model" "source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/public/table" - //"github.com/Gleipnir-Technology/jet/postgres" ) func JobInsert(ctx context.Context, txn db.Ex, m model.Job) (model.Job, error) { statement := table.Job.INSERT(table.Job.MutableColumns). MODEL(m). RETURNING(table.Job.AllColumns) - return db.ExecuteOne[model.Job](ctx, statement) + return db.ExecuteOneTx[model.Job](ctx, txn, statement) +} +func JobNotify(ctx context.Context, txn db.Ex, channel string, payload string) error { + statement := postgres.RawStatement( + "SELECT pg_notify(#channel, #payload)", + postgres.RawArgs{ + "#channel": channel, + "#payload": payload, + }, + ) + return db.ExecuteNoneTx(ctx, txn, statement) } diff --git a/platform/background/background.go b/platform/background/background.go index d75a1269..9b019e3f 100644 --- a/platform/background/background.go +++ b/platform/background/background.go @@ -3,63 +3,53 @@ package background import ( "context" "fmt" + "strconv" "time" - "github.com/Gleipnir-Technology/bob" - "github.com/aarondl/opt/omit" + "github.com/rs/zerolog/log" "source.gleipnir.technology/Gleipnir/nidus-sync/db" - "source.gleipnir.technology/Gleipnir/nidus-sync/db/enums" "source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/public/model" - "source.gleipnir.technology/Gleipnir/nidus-sync/db/models" query "source.gleipnir.technology/Gleipnir/nidus-sync/db/query/public" - //"github.com/rs/zerolog/log" ) -func NewAudioTranscode(ctx context.Context, txn bob.Executor, audio_id int32) error { - return newJob(ctx, txn, enums.JobtypeAudioTranscode, audio_id) +func NewAudioTranscode(ctx context.Context, txn db.Ex, audio_id int32) error { + return newJob(ctx, txn, model.Jobtype_AudioTranscode, audio_id) } func NewComplianceMailer(ctx context.Context, txn db.Ex, compliance_report_request_id int32) error { - return newJob2(ctx, txn, model.Jobtype_ComplianceMailerSend, compliance_report_request_id) + return newJob(ctx, txn, model.Jobtype_ComplianceMailerSend, compliance_report_request_id) } -func NewCSVCommit(ctx context.Context, txn bob.Executor, csv_id int32) error { - return newJob(ctx, txn, enums.JobtypeCSVCommit, csv_id) +func NewCSVCommit(ctx context.Context, txn db.Ex, csv_id int32) error { + return newJob(ctx, txn, model.Jobtype_CsvCommit, csv_id) } -func NewCSVImport(ctx context.Context, txn bob.Executor, csv_id int32) error { - return newJob(ctx, txn, enums.JobtypeCSVImport, csv_id) +func NewCSVImport(ctx context.Context, txn db.Ex, csv_id int32) error { + return newJob(ctx, txn, model.Jobtype_CsvImport, csv_id) } -func NewEmailSend(ctx context.Context, txn bob.Executor, email_id int32) error { - return newJob(ctx, txn, enums.JobtypeEmailSend, email_id) +func NewEmailSend(ctx context.Context, txn db.Ex, email_id int32) error { + return newJob(ctx, txn, model.Jobtype_EmailSend, email_id) } -func NewLabelStudioAudioCreate(ctx context.Context, txn bob.Executor, note_audio_id int32) error { - return newJob(ctx, txn, enums.JobtypeLabelStudioAudioCreate, note_audio_id) +func NewLabelStudioAudioCreate(ctx context.Context, txn db.Ex, note_audio_id int32) error { + return newJob(ctx, txn, model.Jobtype_LabelStudioAudioCreate, note_audio_id) } func NewTextRespond(ctx context.Context, txn db.Ex, text_id int32) error { - return newJob2(ctx, txn, model.Jobtype_TextRespond, text_id) + return newJob(ctx, txn, model.Jobtype_TextRespond, text_id) } func NewTextSend(ctx context.Context, txn db.Ex, job_id int32) error { - return newJob2(ctx, txn, model.Jobtype_TextSend, job_id) + return newJob(ctx, txn, model.Jobtype_TextSend, job_id) } -func newJob(ctx context.Context, txn bob.Executor, t enums.Jobtype, id int32) error { - _, err := models.Jobs.Insert(&models.JobSetter{ - Created: omit.From(time.Now()), - // ID - Type: omit.From(t), - RowID: omit.From(id), - }).One(ctx, txn) - if err != nil { - return fmt.Errorf("insert job: %w", err) - } - return nil -} -func newJob2(ctx context.Context, txn db.Ex, t model.Jobtype, id int32) error { +func newJob(ctx context.Context, txn db.Ex, t model.Jobtype, id int32) error { job := model.Job{ Created: time.Now(), Type: t, RowID: id, } - _, err := query.JobInsert(ctx, txn, job) + j, err := query.JobInsert(ctx, txn, job) if err != nil { return fmt.Errorf("insert job: %w", err) } + err = query.JobNotify(ctx, txn, "new_job", strconv.Itoa(int(j.ID))) + if err != nil { + return fmt.Errorf("notify job: %w", err) + } + log.Debug().Int32("id", j.ID).Int32("row_id", id).Msg("created job, added notify") return nil } diff --git a/platform/email/email.go b/platform/email/email.go index 0dcecf97..38cebe1e 100644 --- a/platform/email/email.go +++ b/platform/email/email.go @@ -114,7 +114,7 @@ func sendEmailBegin(ctx context.Context, source string, destination string, temp if err != nil { return fmt.Errorf("Failed to store email log: %w", err) } - return background.NewEmailSend(ctx, db.PGInstance.BobDB, *e) + return background.NewEmailSend(ctx, db.PGInstance.PGXPool, *e) } func sendEmailComplete(ctx context.Context, email_id int32) error { bxn := db.PGInstance.BobDB diff --git a/platform/text/text.go b/platform/text/text.go index 18c0639e..deeaa920 100644 --- a/platform/text/text.go +++ b/platform/text/text.go @@ -65,18 +65,20 @@ func HandleTextMessage(ctx context.Context, source string, destination string, c if err := txn.Commit(ctx); err != nil { return fmt.Errorf("commit: %w", err) } + log.Debug().Msg("commit handle text message") return err } func respondText(ctx context.Context, log_id int32) error { txn, err := db.BeginTxn(ctx) + log.Debug().Msg("respond text txn begin") if err != nil { return fmt.Errorf("begin tx: %w", err) } defer lint.LogOnErrRollback(txn.Rollback, ctx, "rollback") l, err := querycomms.TextLogFromID(ctx, txn, int64(log_id)) if err != nil { - return fmt.Errorf("find comms: %w", err) + return fmt.Errorf("find comms %d: %w", log_id, err) } src, err := ParsePhoneNumber(l.Source) if err != nil { diff --git a/platform/upload.go b/platform/upload.go index 514bc9dd..66900ff9 100644 --- a/platform/upload.go +++ b/platform/upload.go @@ -10,13 +10,13 @@ import ( "github.com/Gleipnir-Technology/bob/dialect/psql" "github.com/Gleipnir-Technology/bob/dialect/psql/sm" "github.com/Gleipnir-Technology/bob/dialect/psql/um" - "github.com/aarondl/opt/omit" - "github.com/aarondl/opt/omitnull" "github.com/rs/zerolog/log" "github.com/stephenafamo/scan" "source.gleipnir.technology/Gleipnir/nidus-sync/db" "source.gleipnir.technology/Gleipnir/nidus-sync/db/enums" + modelfileupload "source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/fileupload/model" "source.gleipnir.technology/Gleipnir/nidus-sync/db/models" + queryfileupload "source.gleipnir.technology/Gleipnir/nidus-sync/db/query/fileupload" "source.gleipnir.technology/Gleipnir/nidus-sync/lint" "source.gleipnir.technology/Gleipnir/nidus-sync/platform/background" "source.gleipnir.technology/Gleipnir/nidus-sync/platform/file" @@ -82,34 +82,34 @@ func GetUploadDetail(ctx context.Context, organization_id int32, file_id int32) return nil, errors.New("No idea what to do with upload type") } -func NewUpload(ctx context.Context, u User, upload file.Upload, t enums.FileuploadCsvtype) (*int32, error) { - txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) +func NewUpload(ctx context.Context, u User, upload file.Upload, t modelfileupload.Csvtype) (*int32, error) { + txn, err := db.BeginTxn(ctx) if err != nil { return nil, fmt.Errorf("Failed to begin transaction: %w", err) } defer lint.LogOnErrRollback(txn.Rollback, ctx, "rollback") - file, err := models.FileuploadFiles.Insert(&models.FileuploadFileSetter{ - ContentType: omit.From(upload.ContentType), - Created: omit.From(time.Now()), - CreatorID: omit.From(int32(u.ID)), - Deleted: omitnull.FromPtr[time.Time](nil), - Error: omit.From(""), - Name: omit.From(upload.Name), - OrganizationID: omit.From(u.Organization.ID), - Status: omit.From(enums.FileuploadFilestatustypeUploaded), - SizeBytes: omit.From(int32(upload.SizeBytes)), - FileUUID: omit.From(upload.UUID), - }).One(ctx, txn) + file, err := queryfileupload.FileInsert(ctx, txn, modelfileupload.File{ + ContentType: upload.ContentType, + Created: time.Now(), + CreatorID: int32(u.ID), + Deleted: nil, + Error: "", + Name: upload.Name, + OrganizationID: u.Organization.ID, + Status: modelfileupload.Filestatustype_Uploaded, + SizeBytes: int32(upload.SizeBytes), + FileUUID: upload.UUID, + }) if err != nil { return nil, fmt.Errorf("Failed to create file upload: %w", err) } - _, err = models.FileuploadCSVS.Insert(&models.FileuploadCSVSetter{ - Committed: omitnull.FromPtr[time.Time](nil), - FileID: omit.From(file.ID), - Rowcount: omit.From(int32(0)), - Type: omit.From(t), - }).One(ctx, txn) + _, err = queryfileupload.CSVInsert(ctx, txn, modelfileupload.Csv{ + Committed: nil, + FileID: file.ID, + Rowcount: 0, + Type: t, + }) if err != nil { return nil, fmt.Errorf("Failed to create csv: %w", err) } @@ -124,19 +124,13 @@ func NewUpload(ctx context.Context, u User, upload file.Upload, t enums.Fileuplo return &file.ID, nil } func UploadCommit(ctx context.Context, org Organization, file_id int32, committer User) error { - txn, err := db.PGInstance.BobDB.BeginTx(ctx, nil) + txn, err := db.BeginTxn(ctx) if err != nil { return fmt.Errorf("Failed to begin transaction: %w", err) } defer lint.LogOnErrRollback(txn.Rollback, ctx, "rollback") - _, err = psql.Update( - um.Table(models.FileuploadFiles.Alias()), - um.SetCol("status").ToArg("committing"), - um.SetCol("committer").ToArg(committer.ID), - um.Where(psql.Quote("id").EQ(psql.Arg(file_id))), - um.Where(psql.Quote("organization_id").EQ(psql.Arg(org.ID))), - ).Exec(ctx, txn) + err = queryfileupload.FileUpdateCommitting(ctx, txn, int64(org.ID), int64(file_id), int64(committer.ID)) if err != nil { return fmt.Errorf("update upload: %w", err) } diff --git a/resource/upload.go b/resource/upload.go index ba2be6b7..43470db5 100644 --- a/resource/upload.go +++ b/resource/upload.go @@ -8,7 +8,7 @@ import ( "github.com/gorilla/mux" "github.com/rs/zerolog/log" - "source.gleipnir.technology/Gleipnir/nidus-sync/db/enums" + modelfileupload "source.gleipnir.technology/Gleipnir/nidus-sync/db/gen/nidus-sync/fileupload/model" nhttp "source.gleipnir.technology/Gleipnir/nidus-sync/http" "source.gleipnir.technology/Gleipnir/nidus-sync/platform" "source.gleipnir.technology/Gleipnir/nidus-sync/platform/file" @@ -98,7 +98,7 @@ func (res *uploadR) PoolFlyoverCreate(ctx context.Context, r *http.Request, u pl return "", nhttp.NewErrorStatus(http.StatusBadRequest, "You must only submit one file at a time") } upload := uploads[0] - saved_upload, err := platform.NewUpload(r.Context(), u, upload, enums.FileuploadCsvtypeFlyover) + saved_upload, err := platform.NewUpload(r.Context(), u, upload, modelfileupload.Csvtype_Flyover) if err != nil { return "", nhttp.NewError("Failed to create new pool: %w", err) } @@ -112,7 +112,7 @@ func (res *uploadR) PoolCustomCreate(ctx context.Context, r *http.Request, u pla return "", nhttp.NewErrorStatus(http.StatusBadRequest, "You must only submit one file at a time") } upload := uploads[0] - pool_upload, err := platform.NewUpload(r.Context(), u, upload, enums.FileuploadCsvtypePoollist) + pool_upload, err := platform.NewUpload(r.Context(), u, upload, modelfileupload.Csvtype_PoolList) if err != nil { return "", nhttp.NewError("Failed to create new pool: %w", err) } diff --git a/ts/rmo/view/ReportSubmitted.vue b/ts/rmo/view/ReportSubmitted.vue index 1750b021..52f66fc6 100644 --- a/ts/rmo/view/ReportSubmitted.vue +++ b/ts/rmo/view/ReportSubmitted.vue @@ -291,9 +291,11 @@ import { useRouter } from "vue-router"; import ErrorNotification from "@/rmo/components/ErrorNotification.vue"; import Tooltip from "@/components/Tooltip.vue"; import { formatReportID } from "@/format"; +import { log } from "@/log"; import { useRoutes } from "@/rmo/route/use"; import { useStoreDistrict } from "@/rmo/store/district"; import { useStorePublicReport } from "@/store/publicreport"; +import { useStoreResource } from "@/store/resource"; import type { District, PublicReport } from "@/type/api"; interface FormData { @@ -355,7 +357,8 @@ const handleSubmit = async () => { if (response.ok) { router.push(routes.RegisterNotificationsComplete(props.id)); } else { - errorMessage.value = "Something went wrong. Your request could not be completed. Please try again."; + errorMessage.value = + "Something went wrong. Your request could not be completed. Please try again."; } } catch (error) { errorMessage.value = @@ -365,5 +368,8 @@ const handleSubmit = async () => { } }; -onMounted(() => {}); +onMounted(async () => { + const report = await storePublicReport.byID(props.id); + log.info("report", report); +});