Create clean shutdown logic for http worker
This commit is contained in:
parent
a08cd87813
commit
cf01c8c5c6
4 changed files with 76 additions and 27 deletions
|
|
@ -16,7 +16,7 @@ tmp_dir = "tmp"
|
|||
include_dir = []
|
||||
include_ext = ["go"]
|
||||
include_file = []
|
||||
kill_delay = "0s"
|
||||
kill_delay = "5s"
|
||||
log = "build-errors.log"
|
||||
poll = false
|
||||
poll_interval = 0
|
||||
|
|
@ -24,7 +24,7 @@ tmp_dir = "tmp"
|
|||
pre_cmd = []
|
||||
rerun = false
|
||||
rerun_delay = 500
|
||||
send_interrupt = false
|
||||
send_interrupt = true
|
||||
stop_on_error = true
|
||||
|
||||
[color]
|
||||
|
|
@ -40,7 +40,7 @@ tmp_dir = "tmp"
|
|||
time = true
|
||||
|
||||
[misc]
|
||||
clean_on_exit = true
|
||||
clean_on_exit = false
|
||||
|
||||
[proxy]
|
||||
app_port = 0
|
||||
|
|
|
|||
11
arcgis.go
11
arcgis.go
|
|
@ -216,5 +216,14 @@ func redirectURL() string {
|
|||
}
|
||||
|
||||
// This is a goroutine that is in charge of getting Fieldseeker data and keeping it fresh.
|
||||
func refreshFieldseekerData(newOauthCh <-chan int, done <-chan struct{}) {
|
||||
func refreshFieldseekerData(ctx context.Context, newOauthCh <-chan int) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
slog.Info("Exiting refresh worker")
|
||||
return
|
||||
case id := <-newOauthCh:
|
||||
slog.Info("Adding oauth to background work", slog.Int("oauth id", id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
22
database.go
22
database.go
|
|
@ -7,7 +7,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"log"
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
//"github.com/georgysavva/scany/v2/pgxscan"
|
||||
|
|
@ -33,7 +33,7 @@ var (
|
|||
)
|
||||
|
||||
func doMigrations(connection_string string) error {
|
||||
log.Println("Connecting to database at", connection_string)
|
||||
slog.Info("Connecting to database", slog.String("dsn", connection_string))
|
||||
db, err := sql.Open("pgx", connection_string)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to open database connection: %w", err)
|
||||
|
|
@ -44,7 +44,7 @@ func doMigrations(connection_string string) error {
|
|||
if err := row.Scan(&val); err != nil {
|
||||
return fmt.Errorf("Failed to get database version query result: %w", err)
|
||||
}
|
||||
log.Printf("Connected to: %s", val)
|
||||
slog.Info("Connected to database", slog.String("version", val))
|
||||
|
||||
fsys, err := fs.Sub(embedMigrations, "migrations")
|
||||
if err != nil {
|
||||
|
|
@ -60,17 +60,17 @@ func doMigrations(connection_string string) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("Faield to get goose versions: %w", err)
|
||||
}
|
||||
log.Printf("Current version %d, need to be at version %d", current, target)
|
||||
slog.Info("Migration status", slog.Int("current", int(current)), slog.Int("target", int(target)))
|
||||
results, err := provider.Up(context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to run migrations: %w", err)
|
||||
}
|
||||
if len(results) > 0 {
|
||||
for _, r := range results {
|
||||
log.Printf("Migration %d %s", r.Source.Version, r.Direction)
|
||||
slog.Info("Migration done", slog.Int("version", int(r.Source.Version)), slog.String("direction", r.Direction))
|
||||
}
|
||||
} else {
|
||||
log.Println("No migrations necessary.")
|
||||
slog.Info("No migrations necessary.")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -85,13 +85,13 @@ func initializeDatabase(ctx context.Context, uri string) error {
|
|||
}
|
||||
if *needs {
|
||||
//return errors.New(fmt.Sprintf("Must migrate database before connecting: %t", *needs))
|
||||
log.Println("Handling database migrations")
|
||||
slog.Info("Handling database migrations")
|
||||
err = doMigrations(uri)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to handle migrations: %v", err)
|
||||
}
|
||||
} else {
|
||||
log.Println("No database migrations necessary")
|
||||
slog.Info("No database migrations necessary")
|
||||
}
|
||||
|
||||
pgOnce.Do(func() {
|
||||
|
|
@ -110,12 +110,12 @@ func initializeDatabase(ctx context.Context, uri string) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("Failed to get database current: %w", err)
|
||||
}
|
||||
log.Println("Connected to", current)
|
||||
slog.Info("Connected to database", slog.String("database", current))
|
||||
return nil
|
||||
}
|
||||
|
||||
func needsMigrations(connection_string string) (*bool, error) {
|
||||
log.Println("Connecting to database at", connection_string)
|
||||
slog.Info("Connecting to database", slog.String("dsn", connection_string))
|
||||
db, err := sql.Open("pgx", connection_string)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to open database connection: %w", err)
|
||||
|
|
@ -126,7 +126,7 @@ func needsMigrations(connection_string string) (*bool, error) {
|
|||
if err := row.Scan(&val); err != nil {
|
||||
return nil, fmt.Errorf("Failed to get database version query result: %w", err)
|
||||
}
|
||||
log.Printf("Connected to: %s", val)
|
||||
slog.Info("Connected to database", slog.String("dsn", val))
|
||||
|
||||
fsys, err := fs.Sub(embedMigrations, "migrations")
|
||||
if err != nil {
|
||||
|
|
|
|||
64
main.go
64
main.go
|
|
@ -2,9 +2,12 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/alexedwards/scs/pgxstore"
|
||||
|
|
@ -20,17 +23,17 @@ var BaseURL, ClientID, ClientSecret string
|
|||
func main() {
|
||||
ClientID = os.Getenv("ARCGIS_CLIENT_ID")
|
||||
if ClientID == "" {
|
||||
log.Println("You must specify a non-empty ARCGIS_CLIENT_ID")
|
||||
slog.Error("You must specify a non-empty ARCGIS_CLIENT_ID")
|
||||
os.Exit(1)
|
||||
}
|
||||
ClientSecret = os.Getenv("ARCGIS_CLIENT_SECRET")
|
||||
if ClientSecret == "" {
|
||||
log.Println("You must specify a non-empty ARCGIS_CLIENT_SECRET")
|
||||
slog.Error("You must specify a non-empty ARCGIS_CLIENT_SECRET")
|
||||
os.Exit(1)
|
||||
}
|
||||
BaseURL = os.Getenv("BASE_URL")
|
||||
if BaseURL == "" {
|
||||
log.Println("You must specify a non-empty BASE_URL")
|
||||
slog.Error("You must specify a non-empty BASE_URL")
|
||||
os.Exit(1)
|
||||
}
|
||||
bind := os.Getenv("BIND")
|
||||
|
|
@ -39,14 +42,14 @@ func main() {
|
|||
}
|
||||
pg_dsn := os.Getenv("POSTGRES_DSN")
|
||||
if pg_dsn == "" {
|
||||
log.Println("You must specify a non-empty POSTGRES_DSN")
|
||||
slog.Error("You must specify a non-empty POSTGRES_DSN")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
log.Println("Starting...")
|
||||
slog.Info("Starting...")
|
||||
err := initializeDatabase(context.TODO(), pg_dsn)
|
||||
if err != nil {
|
||||
log.Printf("Failed to connect to database: %v", err)
|
||||
slog.Error("Failed to connect to database", slog.String("err", err.Error()))
|
||||
os.Exit(2)
|
||||
}
|
||||
sessionManager = scs.New()
|
||||
|
|
@ -76,9 +79,46 @@ func main() {
|
|||
localFS := http.Dir("./static")
|
||||
FileServer(r, "/static", localFS, embeddedStaticFS, "static")
|
||||
|
||||
newTokenChannel := make(chan int)
|
||||
endChannel := make(chan struct{})
|
||||
go refreshFieldseekerData(newTokenChannel, endChannel)
|
||||
log.Printf("Serving on %s", bind)
|
||||
log.Fatal(http.ListenAndServe(bind, r))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
newTokenChannel := make(chan int, 10)
|
||||
|
||||
var waitGroup sync.WaitGroup
|
||||
|
||||
waitGroup.Add(1)
|
||||
go func() {
|
||||
defer waitGroup.Done()
|
||||
refreshFieldseekerData(ctx, newTokenChannel)
|
||||
}()
|
||||
|
||||
server := &http.Server{
|
||||
Addr: bind,
|
||||
Handler: r,
|
||||
}
|
||||
go func() {
|
||||
slog.Info("Serving HTTP requests", slog.String("address", bind))
|
||||
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
slog.Error("HTTP Server Error", slog.String("err", err.Error()))
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for the interrupt signal to gracefully shut down
|
||||
signalCh := make(chan os.Signal, 1)
|
||||
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-signalCh
|
||||
|
||||
slog.Info("Received shutdown signal, shutting down...")
|
||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer shutdownCancel()
|
||||
|
||||
if err := server.Shutdown(shutdownCtx); err != nil {
|
||||
slog.Error("HTTP server shutdown error", slog.String("err", err.Error()))
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
||||
waitGroup.Wait()
|
||||
|
||||
slog.Info("Shutdown complete")
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue