Fix email websocket to receive new mail events
We aren't doing much with them yet, but we do receive them.
This commit is contained in:
parent
df359f59bb
commit
70f78e4ae5
2 changed files with 89 additions and 6 deletions
|
|
@ -3,9 +3,12 @@ package email
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/mail"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
|
|
@ -14,6 +17,47 @@ import (
|
||||||
|
|
||||||
var FORWARDEMAIL_WS_API = "wss://api.forwardemail.net/v1/ws"
|
var FORWARDEMAIL_WS_API = "wss://api.forwardemail.net/v1/ws"
|
||||||
|
|
||||||
|
const (
|
||||||
|
EventTypeConnected = "connected"
|
||||||
|
EventTypeNewMessage = "newMessage"
|
||||||
|
EventTypePing = "ping"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EventBase struct {
|
||||||
|
Event string `json:"event"`
|
||||||
|
}
|
||||||
|
type EventConnected struct {
|
||||||
|
EventBase
|
||||||
|
Alias string `json:"aliasId"`
|
||||||
|
}
|
||||||
|
type EventNewMessage struct {
|
||||||
|
EventBase
|
||||||
|
Mailbox string `json:"mailbox"`
|
||||||
|
Message Message `json:"message"`
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
}
|
||||||
|
type Message struct {
|
||||||
|
FolderPath string `json:"folder_path"`
|
||||||
|
Flags []string `json:"flags"`
|
||||||
|
IsUnread bool `json:"is_unread"`
|
||||||
|
IsFlagged bool `json:"is_flagged"`
|
||||||
|
IsDeleted bool `json:"is_deleted"`
|
||||||
|
IsDraft bool `json:"is_draft"`
|
||||||
|
IsEncrypted bool `json:"is_encrypted"`
|
||||||
|
EML string `json:"eml"`
|
||||||
|
Object string `json:"object"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m Message) ParseEML() (mail.Message, error) {
|
||||||
|
result, err := mail.ReadMessage(strings.NewReader(m.EML))
|
||||||
|
if err != nil {
|
||||||
|
return mail.Message{}, fmt.Errorf("read message: %w", err)
|
||||||
|
}
|
||||||
|
if result == nil {
|
||||||
|
return mail.Message{}, fmt.Errorf("nil result")
|
||||||
|
}
|
||||||
|
return *result, nil
|
||||||
|
}
|
||||||
func StartWebsocket(ctx context.Context, username, password string) {
|
func StartWebsocket(ctx context.Context, username, password string) {
|
||||||
var err error
|
var err error
|
||||||
var conn *websocket.Conn
|
var conn *websocket.Conn
|
||||||
|
|
@ -30,18 +74,57 @@ func StartWebsocket(ctx context.Context, username, password string) {
|
||||||
// Read message
|
// Read message
|
||||||
message_type, message, err := conn.ReadMessage()
|
message_type, message, err := conn.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||||
conn = nil
|
conn = nil
|
||||||
|
log.Info().Msg("email websocket closed, waiting and reconnecting")
|
||||||
|
time.Sleep(10000)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
log.Error().Err(err).Msg("Error reading message")
|
log.Error().Err(err).Msg("Error reading message")
|
||||||
}
|
}
|
||||||
|
err = handleMessage(ctx, message_type, message)
|
||||||
// Process and log the message
|
if err != nil {
|
||||||
log.Info().Int("message_type", message_type).Bytes("message", message).Msg("Got email notification")
|
log.Error().Err(err).Int("type", message_type).Str("msg", string(message)).Msg("Failed to handle email event")
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func handleMessage(ctx context.Context, message_type int, message []byte) error {
|
||||||
|
var err error
|
||||||
|
var event_base EventBase
|
||||||
|
err = json.Unmarshal(message, &event_base)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unmarshal base message: %w", err)
|
||||||
|
}
|
||||||
|
switch event_base.Event {
|
||||||
|
case EventTypeConnected:
|
||||||
|
var event_connected EventConnected
|
||||||
|
err = json.Unmarshal(message, &event_connected)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unmarshal connected message: %w", err)
|
||||||
|
}
|
||||||
|
log.Info().Str("alias", event_connected.Alias).Msg("Connection to email websocket established")
|
||||||
|
return nil
|
||||||
|
case EventTypeNewMessage:
|
||||||
|
var event_new_message EventNewMessage
|
||||||
|
err = json.Unmarshal(message, &event_new_message)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unmarshal connected message: %w", err)
|
||||||
|
}
|
||||||
|
msg, err := event_new_message.Message.ParseEML()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parse EML: %w", err)
|
||||||
|
}
|
||||||
|
log.Info().Str("from", msg.Header.Get("From")).Str("subject", msg.Header.Get("Subject")).Str("date", msg.Header.Get("Date")).Msg("new email")
|
||||||
|
return nil
|
||||||
|
case EventTypePing:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown event type: '%s'", event_base.Event)
|
||||||
|
}
|
||||||
|
}
|
||||||
func ensureConnected(conn *websocket.Conn, username, password string) (*websocket.Conn, error) {
|
func ensureConnected(conn *websocket.Conn, username, password string) (*websocket.Conn, error) {
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
return conn, nil
|
return conn, nil
|
||||||
|
|
@ -57,7 +140,7 @@ func ensureConnected(conn *websocket.Conn, username, password string) (*websocke
|
||||||
log.Error().Msg("new connection is nil.")
|
log.Error().Msg("new connection is nil.")
|
||||||
return nil, fmt.Errorf("nil new connection")
|
return nil, fmt.Errorf("nil new connection")
|
||||||
}
|
}
|
||||||
log.Info().Msg("Connected to mail websocket")
|
log.Info().Msg("Connection to email websocket begun")
|
||||||
return new_conn, nil
|
return new_conn, nil
|
||||||
}
|
}
|
||||||
if errors.Is(err, websocket.ErrBadHandshake) {
|
if errors.Is(err, websocket.ErrBadHandshake) {
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import (
|
||||||
modelpublicreport "github.com/Gleipnir-Technology/nidus-sync/db/gen/nidus-sync/publicreport/model"
|
modelpublicreport "github.com/Gleipnir-Technology/nidus-sync/db/gen/nidus-sync/publicreport/model"
|
||||||
nhttp "github.com/Gleipnir-Technology/nidus-sync/http"
|
nhttp "github.com/Gleipnir-Technology/nidus-sync/http"
|
||||||
"github.com/Gleipnir-Technology/nidus-sync/platform"
|
"github.com/Gleipnir-Technology/nidus-sync/platform"
|
||||||
//"github.com/rs/zerolog/log":q
|
//"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type communicationR struct {
|
type communicationR struct {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue