import { getMinioClient } from './minio'; import { getIMAPPool } from './imap-pool'; import type { ParsedMail } from 'mailparser'; import { simpleParser } from 'mailparser'; // Email sync service for MinIO-based email management export interface EmailSyncMetadata { lastSyncTime: string; totalEmails: number; lastError?: string; syncStatus: 'idle' | 'syncing' | 'error'; } export interface EmailThreadIndex { threads: Array<{ id: string; subject: string; participants: string[]; emailCount: number; lastActivity: string; hasAttachments: boolean; }>; lastUpdated: string; } // Get or create sync metadata export async function getSyncMetadata(interestId: string): Promise { const client = getMinioClient(); const objectName = `interest-${interestId}/metadata.json`; try { const stream = await client.getObject('client-emails', objectName); let data = ''; await new Promise((resolve, reject) => { stream.on('data', (chunk) => { data += chunk; }); stream.on('end', resolve); stream.on('error', reject); }); return JSON.parse(data); } catch (error: any) { // If not found, create default metadata if (error.code === 'NoSuchKey') { const defaultMetadata: EmailSyncMetadata = { lastSyncTime: new Date(0).toISOString(), // Start from beginning totalEmails: 0, syncStatus: 'idle' }; await saveSyncMetadata(interestId, defaultMetadata); return defaultMetadata; } throw error; } } // Save sync metadata export async function saveSyncMetadata(interestId: string, metadata: EmailSyncMetadata): Promise { const client = getMinioClient(); const objectName = `interest-${interestId}/metadata.json`; const buffer = Buffer.from(JSON.stringify(metadata, null, 2)); await client.putObject('client-emails', objectName, buffer, buffer.length, { 'Content-Type': 'application/json' }); } // Get thread index export async function getThreadIndex(interestId: string): Promise { const client = getMinioClient(); const objectName = `interest-${interestId}/threads/index.json`; try { const stream = await client.getObject('client-emails', objectName); let data = ''; await new Promise((resolve, reject) => { stream.on('data', (chunk) => { data += chunk; }); stream.on('end', resolve); stream.on('error', reject); }); return JSON.parse(data); } catch (error: any) { if (error.code === 'NoSuchKey') { return { threads: [], lastUpdated: new Date().toISOString() }; } throw error; } } // Save thread index export async function saveThreadIndex(interestId: string, index: EmailThreadIndex): Promise { const client = getMinioClient(); const objectName = `interest-${interestId}/threads/index.json`; const buffer = Buffer.from(JSON.stringify(index, null, 2)); await client.putObject('client-emails', objectName, buffer, buffer.length, { 'Content-Type': 'application/json' }); } // Sync emails with exponential backoff retry export async function syncEmailsWithRetry( sessionId: string, userEmail: string, clientEmail: string, interestId: string, maxRetries: number = 3 ): Promise { let retryCount = 0; let lastError: Error | null = null; while (retryCount <= maxRetries) { try { await syncEmails(sessionId, userEmail, clientEmail, interestId); return; // Success } catch (error: any) { lastError = error; retryCount++; if (retryCount > maxRetries) { throw error; } // Exponential backoff: 1s, 2s, 4s, 8s const waitTime = Math.pow(2, retryCount - 1) * 1000; console.log(`[EmailSync] Retry ${retryCount}/${maxRetries} after ${waitTime}ms`); await new Promise(resolve => setTimeout(resolve, waitTime)); } } throw lastError || new Error('Failed to sync emails after retries'); } // Main sync function async function syncEmails( sessionId: string, userEmail: string, clientEmail: string, interestId: string ): Promise { const metadata = await getSyncMetadata(interestId); // Update status to syncing metadata.syncStatus = 'syncing'; await saveSyncMetadata(interestId, metadata); try { const pool = getIMAPPool(); const imap = await pool.getConnection(sessionId); // Fetch emails newer than last sync const lastSyncDate = new Date(metadata.lastSyncTime); const newEmails = await fetchNewEmails(imap, userEmail, clientEmail, lastSyncDate); if (newEmails.length > 0) { // Save new emails to MinIO for (const email of newEmails) { const emailId = email.messageId || `${Date.now()}-${Math.random()}`; const objectName = `interest-${interestId}/emails/${emailId}.json`; const buffer = Buffer.from(JSON.stringify(email, null, 2)); const client = getMinioClient(); await client.putObject('client-emails', objectName, buffer, buffer.length, { 'Content-Type': 'application/json' }); } // Update thread index await updateThreadIndex(interestId, newEmails); // Update metadata metadata.lastSyncTime = new Date().toISOString(); metadata.totalEmails += newEmails.length; metadata.syncStatus = 'idle'; await saveSyncMetadata(interestId, metadata); } } catch (error: any) { // Update metadata with error metadata.syncStatus = 'error'; metadata.lastError = error.message; await saveSyncMetadata(interestId, metadata); throw error; } } // Fetch new emails from IMAP async function fetchNewEmails( imap: any, userEmail: string, clientEmail: string, since: Date ): Promise { return new Promise((resolve, reject) => { const emails: any[] = []; imap.openBox('INBOX', true, (err: any) => { if (err) { reject(err); return; } // Search for emails newer than last sync const searchCriteria = [ ['SINCE', since.toISOString().split('T')[0]], ['OR', ['FROM', clientEmail], ['TO', clientEmail] ] ]; imap.search(searchCriteria, (err: any, results: number[]) => { if (err) { reject(err); return; } if (results.length === 0) { resolve([]); return; } const fetch = imap.fetch(results, { bodies: '', struct: true, envelope: true }); fetch.on('message', (msg: any) => { msg.on('body', (stream: any) => { simpleParser(stream, (err: any, parsed: ParsedMail) => { if (!err && parsed) { // Handle from/to addresses which can be single or array const fromText = Array.isArray(parsed.from) ? parsed.from.map(addr => addr.text).join(', ') : parsed.from?.text || ''; const toText = Array.isArray(parsed.to) ? parsed.to.map(addr => addr.text).join(', ') : parsed.to?.text || ''; emails.push({ id: parsed.messageId, from: fromText, to: toText, subject: parsed.subject, body: parsed.text, html: parsed.html, timestamp: parsed.date?.toISOString(), attachments: parsed.attachments?.map(att => ({ filename: att.filename, contentType: att.contentType, size: att.size })) }); } }); }); }); fetch.once('end', () => { resolve(emails); }); fetch.once('error', (err: any) => { reject(err); }); }); }); }); } // Update thread index with new emails async function updateThreadIndex(interestId: string, newEmails: any[]): Promise { const index = await getThreadIndex(interestId); // Group emails by thread (simplified - by subject) for (const email of newEmails) { const normalizedSubject = email.subject ?.replace(/^(Re:|Fwd:|Fw:|RE:|FW:|FWD:)\s*/gi, '') .trim() || 'No Subject'; let thread = index.threads.find(t => t.subject.replace(/^(Re:|Fwd:|Fw:|RE:|FW:|FWD:)\s*/gi, '').trim() === normalizedSubject ); if (!thread) { thread = { id: `thread-${Date.now()}-${Math.random()}`, subject: email.subject || 'No Subject', participants: [], emailCount: 0, lastActivity: email.timestamp, hasAttachments: false }; index.threads.push(thread); } // Update thread thread.emailCount++; thread.lastActivity = email.timestamp; if (email.attachments?.length > 0) { thread.hasAttachments = true; } // Add participants const from = email.from?.match(/<(.+)>/)?.[1] || email.from; if (from && !thread.participants.includes(from)) { thread.participants.push(from); } } // Sort threads by last activity index.threads.sort((a, b) => new Date(b.lastActivity).getTime() - new Date(a.lastActivity).getTime() ); index.lastUpdated = new Date().toISOString(); await saveThreadIndex(interestId, index); } // Get emails from MinIO cache export async function getCachedEmails(interestId: string): Promise { const client = getMinioClient(); const emails: any[] = []; try { const stream = client.listObjectsV2('client-emails', `interest-${interestId}/emails/`, true); const files: any[] = []; await new Promise((resolve, reject) => { stream.on('data', (obj) => { if (obj && obj.name && obj.name.endsWith('.json')) { files.push(obj.name); } }); stream.on('error', reject); stream.on('end', resolve); }); // Load each email for (const fileName of files) { try { const objStream = await client.getObject('client-emails', fileName); let data = ''; await new Promise((resolve, reject) => { objStream.on('data', (chunk) => { data += chunk; }); objStream.on('end', resolve); objStream.on('error', reject); }); emails.push(JSON.parse(data)); } catch (error) { console.error(`Failed to load email ${fileName}:`, error); } } } catch (error) { console.error('Failed to list cached emails:', error); } return emails; }