diff --git a/comms/email/websocket.go b/comms/email/websocket.go index e045b75d..e2e68480 100644 --- a/comms/email/websocket.go +++ b/comms/email/websocket.go @@ -3,9 +3,12 @@ package email import ( "context" "encoding/base64" + "encoding/json" "errors" "fmt" "net/http" + "net/mail" + "strings" "time" "github.com/gorilla/websocket" @@ -14,6 +17,47 @@ import ( var FORWARDEMAIL_WS_API = "wss://api.forwardemail.net/v1/ws" +const ( + EventTypeConnected = "connected" + EventTypeNewMessage = "newMessage" + EventTypePing = "ping" +) + +type EventBase struct { + Event string `json:"event"` +} +type EventConnected struct { + EventBase + Alias string `json:"aliasId"` +} +type EventNewMessage struct { + EventBase + Mailbox string `json:"mailbox"` + Message Message `json:"message"` + Timestamp int64 `json:"timestamp"` +} +type Message struct { + FolderPath string `json:"folder_path"` + Flags []string `json:"flags"` + IsUnread bool `json:"is_unread"` + IsFlagged bool `json:"is_flagged"` + IsDeleted bool `json:"is_deleted"` + IsDraft bool `json:"is_draft"` + IsEncrypted bool `json:"is_encrypted"` + EML string `json:"eml"` + Object string `json:"object"` +} + +func (m Message) ParseEML() (mail.Message, error) { + result, err := mail.ReadMessage(strings.NewReader(m.EML)) + if err != nil { + return mail.Message{}, fmt.Errorf("read message: %w", err) + } + if result == nil { + return mail.Message{}, fmt.Errorf("nil result") + } + return *result, nil +} func StartWebsocket(ctx context.Context, username, password string) { var err error var conn *websocket.Conn @@ -30,18 +74,57 @@ func StartWebsocket(ctx context.Context, username, password string) { // Read message message_type, message, err := conn.ReadMessage() if err != nil { - if !websocket.IsCloseError(err, websocket.CloseNormalClosure) { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { conn = nil + log.Info().Msg("email websocket closed, waiting and reconnecting") + time.Sleep(10000) + continue } log.Error().Err(err).Msg("Error reading message") } - - // Process and log the message - log.Info().Int("message_type", message_type).Bytes("message", message).Msg("Got email notification") + err = handleMessage(ctx, message_type, message) + if err != nil { + log.Error().Err(err).Int("type", message_type).Str("msg", string(message)).Msg("Failed to handle email event") + continue + } } } } +func handleMessage(ctx context.Context, message_type int, message []byte) error { + var err error + var event_base EventBase + err = json.Unmarshal(message, &event_base) + if err != nil { + return fmt.Errorf("unmarshal base message: %w", err) + } + switch event_base.Event { + case EventTypeConnected: + var event_connected EventConnected + err = json.Unmarshal(message, &event_connected) + if err != nil { + return fmt.Errorf("unmarshal connected message: %w", err) + } + log.Info().Str("alias", event_connected.Alias).Msg("Connection to email websocket established") + return nil + case EventTypeNewMessage: + var event_new_message EventNewMessage + err = json.Unmarshal(message, &event_new_message) + if err != nil { + return fmt.Errorf("unmarshal connected message: %w", err) + } + msg, err := event_new_message.Message.ParseEML() + if err != nil { + return fmt.Errorf("parse EML: %w", err) + } + log.Info().Str("from", msg.Header.Get("From")).Str("subject", msg.Header.Get("Subject")).Str("date", msg.Header.Get("Date")).Msg("new email") + return nil + case EventTypePing: + return nil + default: + return fmt.Errorf("unknown event type: '%s'", event_base.Event) + } +} func ensureConnected(conn *websocket.Conn, username, password string) (*websocket.Conn, error) { if conn != nil { return conn, nil @@ -57,7 +140,7 @@ func ensureConnected(conn *websocket.Conn, username, password string) (*websocke log.Error().Msg("new connection is nil.") return nil, fmt.Errorf("nil new connection") } - log.Info().Msg("Connected to mail websocket") + log.Info().Msg("Connection to email websocket begun") return new_conn, nil } if errors.Is(err, websocket.ErrBadHandshake) { diff --git a/resource/communication.go b/resource/communication.go index 58b6edb1..f47a8f70 100644 --- a/resource/communication.go +++ b/resource/communication.go @@ -11,7 +11,7 @@ import ( modelpublicreport "github.com/Gleipnir-Technology/nidus-sync/db/gen/nidus-sync/publicreport/model" nhttp "github.com/Gleipnir-Technology/nidus-sync/http" "github.com/Gleipnir-Technology/nidus-sync/platform" - //"github.com/rs/zerolog/log":q + //"github.com/rs/zerolog/log" ) type communicationR struct {