Get email websocket connection working

This commit is contained in:
Eli Ribble 2026-05-13 16:50:31 +00:00
parent 17c6bf8d50
commit 085935fa66
No known key found for this signature in database
2 changed files with 28 additions and 14 deletions

View file

@ -2,8 +2,10 @@ package email
import ( import (
"context" "context"
"encoding/base64"
"errors" "errors"
"fmt" "fmt"
"net/http"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -12,11 +14,11 @@ import (
var FORWARDEMAIL_WS_API = "wss://api.forwardemail.net/v1/ws" var FORWARDEMAIL_WS_API = "wss://api.forwardemail.net/v1/ws"
func StartWebsocket(ctx context.Context, api_token string) { func StartWebsocket(ctx context.Context, username, password string) {
var err error
var conn *websocket.Conn var conn *websocket.Conn
for { for {
err := ensureConnected(conn, api_token) conn, err = ensureConnected(conn, username, password)
if err != nil { if err != nil {
log.Error().Err(err).Msg("Bailing on email websocket") log.Error().Err(err).Msg("Bailing on email websocket")
return return
@ -40,23 +42,28 @@ func StartWebsocket(ctx context.Context, api_token string) {
} }
} }
func ensureConnected(conn *websocket.Conn, api_token string) error { func ensureConnected(conn *websocket.Conn, username, password string) (*websocket.Conn, error) {
if conn != nil { if conn != nil {
return nil return conn, nil
} }
url := FORWARDEMAIL_WS_API + "?token=" + api_token url := FORWARDEMAIL_WS_API
for { for {
new_conn, _, err := websocket.DefaultDialer.Dial(url, nil) encoded := base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
h := http.Header{}
h.Add("Authorization", "Basic "+encoded)
new_conn, _, err := websocket.DefaultDialer.Dial(url, h)
if err == nil { if err == nil {
if new_conn == nil {
log.Error().Msg("new connection is nil.")
return nil, fmt.Errorf("nil new connection")
}
log.Info().Msg("Connected to mail websocket") log.Info().Msg("Connected to mail websocket")
*conn = *new_conn return new_conn, nil
return nil
} }
if errors.Is(err, websocket.ErrBadHandshake) { if errors.Is(err, websocket.ErrBadHandshake) {
return fmt.Errorf("Bad handshake connecting to email websocket, bailing.") return nil, fmt.Errorf("Bad handshake connecting to email websocket, bailing.")
} }
log.Error().Err(err).Str("url", url).Msg("Error connecting to WebSocket") log.Error().Err(err).Str("url", url).Msg("Error connecting to WebSocket")
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
} }
} }

View file

@ -14,8 +14,9 @@ import (
"github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/Gleipnir-Technology/nidus-sync/db/enums"
"github.com/Gleipnir-Technology/nidus-sync/db/models" "github.com/Gleipnir-Technology/nidus-sync/db/models"
//"github.com/Gleipnir-Technology/nidus-sync/platform/background" //"github.com/Gleipnir-Technology/nidus-sync/platform/background"
commsemail "github.com/Gleipnir-Technology/nidus-sync/comms/email"
"github.com/Gleipnir-Technology/nidus-sync/platform/csv" "github.com/Gleipnir-Technology/nidus-sync/platform/csv"
"github.com/Gleipnir-Technology/nidus-sync/platform/email" platformemail "github.com/Gleipnir-Technology/nidus-sync/platform/email"
"github.com/Gleipnir-Technology/nidus-sync/platform/file" "github.com/Gleipnir-Technology/nidus-sync/platform/file"
"github.com/Gleipnir-Technology/nidus-sync/platform/geocode" "github.com/Gleipnir-Technology/nidus-sync/platform/geocode"
"github.com/Gleipnir-Technology/nidus-sync/platform/mailer" "github.com/Gleipnir-Technology/nidus-sync/platform/mailer"
@ -30,7 +31,7 @@ var waitGroup sync.WaitGroup
var newOAuthTokenChannel chan struct{} var newOAuthTokenChannel chan struct{}
func StartAll(ctx context.Context) error { func StartAll(ctx context.Context) error {
err := email.LoadTemplates() err := platformemail.LoadTemplates()
if err != nil { if err != nil {
return fmt.Errorf("Failed to load email templates: %w", err) return fmt.Errorf("Failed to load email templates: %w", err)
} }
@ -66,6 +67,12 @@ func StartAll(ctx context.Context) error {
listenForJobs(ctx) listenForJobs(ctx)
log.Debug().Msg("Exiting job listener goroutine") log.Debug().Msg("Exiting job listener goroutine")
}() }()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
commsemail.StartWebsocket(ctx, config.ForwardEmailRMOUsername, config.ForwardEmailRMOPassword)
log.Debug().Msg("Exiting email websocket")
}()
err = addWaitingJobs(ctx) err = addWaitingJobs(ctx)
if err != nil { if err != nil {
@ -115,7 +122,7 @@ func handleJob(ctx context.Context, job *models.Job) error {
case enums.JobtypeLabelStudioAudioCreate: case enums.JobtypeLabelStudioAudioCreate:
return jobLabelStudioAudioCreate(ctx, job.RowID) return jobLabelStudioAudioCreate(ctx, job.RowID)
case enums.JobtypeEmailSend: case enums.JobtypeEmailSend:
return email.Job(ctx, job.RowID) return platformemail.Job(ctx, job.RowID)
case enums.JobtypeTextRespond: case enums.JobtypeTextRespond:
return text.JobRespond(ctx, job.RowID) return text.JobRespond(ctx, job.RowID)
case enums.JobtypeTextSend: case enums.JobtypeTextSend: