diff --git a/comms/email/websocket.go b/comms/email/websocket.go index 0e85d7c6..e045b75d 100644 --- a/comms/email/websocket.go +++ b/comms/email/websocket.go @@ -2,8 +2,10 @@ package email import ( "context" + "encoding/base64" "errors" "fmt" + "net/http" "time" "github.com/gorilla/websocket" @@ -12,11 +14,11 @@ import ( var FORWARDEMAIL_WS_API = "wss://api.forwardemail.net/v1/ws" -func StartWebsocket(ctx context.Context, api_token string) { - +func StartWebsocket(ctx context.Context, username, password string) { + var err error var conn *websocket.Conn for { - err := ensureConnected(conn, api_token) + conn, err = ensureConnected(conn, username, password) if err != nil { log.Error().Err(err).Msg("Bailing on email websocket") return @@ -40,23 +42,28 @@ func StartWebsocket(ctx context.Context, api_token string) { } } -func ensureConnected(conn *websocket.Conn, api_token string) error { +func ensureConnected(conn *websocket.Conn, username, password string) (*websocket.Conn, error) { if conn != nil { - return nil + return conn, nil } - url := FORWARDEMAIL_WS_API + "?token=" + api_token + url := FORWARDEMAIL_WS_API for { - new_conn, _, err := websocket.DefaultDialer.Dial(url, nil) + encoded := base64.StdEncoding.EncodeToString([]byte(username + ":" + password)) + h := http.Header{} + h.Add("Authorization", "Basic "+encoded) + new_conn, _, err := websocket.DefaultDialer.Dial(url, h) if err == nil { + if new_conn == nil { + log.Error().Msg("new connection is nil.") + return nil, fmt.Errorf("nil new connection") + } log.Info().Msg("Connected to mail websocket") - *conn = *new_conn - return nil + return new_conn, nil } if errors.Is(err, websocket.ErrBadHandshake) { - return fmt.Errorf("Bad handshake connecting to email websocket, bailing.") + return nil, fmt.Errorf("Bad handshake connecting to email websocket, bailing.") } log.Error().Err(err).Str("url", url).Msg("Error connecting to WebSocket") time.Sleep(3 * time.Second) - } } diff --git a/platform/start.go b/platform/start.go index 3762d455..93069d31 100644 --- a/platform/start.go +++ b/platform/start.go @@ -14,8 +14,9 @@ import ( "github.com/Gleipnir-Technology/nidus-sync/db/enums" "github.com/Gleipnir-Technology/nidus-sync/db/models" //"github.com/Gleipnir-Technology/nidus-sync/platform/background" + commsemail "github.com/Gleipnir-Technology/nidus-sync/comms/email" "github.com/Gleipnir-Technology/nidus-sync/platform/csv" - "github.com/Gleipnir-Technology/nidus-sync/platform/email" + platformemail "github.com/Gleipnir-Technology/nidus-sync/platform/email" "github.com/Gleipnir-Technology/nidus-sync/platform/file" "github.com/Gleipnir-Technology/nidus-sync/platform/geocode" "github.com/Gleipnir-Technology/nidus-sync/platform/mailer" @@ -30,7 +31,7 @@ var waitGroup sync.WaitGroup var newOAuthTokenChannel chan struct{} func StartAll(ctx context.Context) error { - err := email.LoadTemplates() + err := platformemail.LoadTemplates() if err != nil { return fmt.Errorf("Failed to load email templates: %w", err) } @@ -66,6 +67,12 @@ func StartAll(ctx context.Context) error { listenForJobs(ctx) log.Debug().Msg("Exiting job listener goroutine") }() + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + commsemail.StartWebsocket(ctx, config.ForwardEmailRMOUsername, config.ForwardEmailRMOPassword) + log.Debug().Msg("Exiting email websocket") + }() err = addWaitingJobs(ctx) if err != nil { @@ -115,7 +122,7 @@ func handleJob(ctx context.Context, job *models.Job) error { case enums.JobtypeLabelStudioAudioCreate: return jobLabelStudioAudioCreate(ctx, job.RowID) case enums.JobtypeEmailSend: - return email.Job(ctx, job.RowID) + return platformemail.Job(ctx, job.RowID) case enums.JobtypeTextRespond: return text.JobRespond(ctx, job.RowID) case enums.JobtypeTextSend: