372 lines
11 KiB
TypeScript
372 lines
11 KiB
TypeScript
|
|
import { getMinioClient } from './minio';
|
||
|
|
import { getIMAPPool } from './imap-pool';
|
||
|
|
import type { ParsedMail } from 'mailparser';
|
||
|
|
import { simpleParser } from 'mailparser';
|
||
|
|
|
||
|
|
// Email sync service for MinIO-based email management
|
||
|
|
export interface EmailSyncMetadata {
|
||
|
|
lastSyncTime: string;
|
||
|
|
totalEmails: number;
|
||
|
|
lastError?: string;
|
||
|
|
syncStatus: 'idle' | 'syncing' | 'error';
|
||
|
|
}
|
||
|
|
|
||
|
|
export interface EmailThreadIndex {
|
||
|
|
threads: Array<{
|
||
|
|
id: string;
|
||
|
|
subject: string;
|
||
|
|
participants: string[];
|
||
|
|
emailCount: number;
|
||
|
|
lastActivity: string;
|
||
|
|
hasAttachments: boolean;
|
||
|
|
}>;
|
||
|
|
lastUpdated: string;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Get or create sync metadata
|
||
|
|
export async function getSyncMetadata(interestId: string): Promise<EmailSyncMetadata> {
|
||
|
|
const client = getMinioClient();
|
||
|
|
const objectName = `interest-${interestId}/metadata.json`;
|
||
|
|
|
||
|
|
try {
|
||
|
|
const stream = await client.getObject('client-emails', objectName);
|
||
|
|
let data = '';
|
||
|
|
|
||
|
|
await new Promise((resolve, reject) => {
|
||
|
|
stream.on('data', (chunk) => { data += chunk; });
|
||
|
|
stream.on('end', resolve);
|
||
|
|
stream.on('error', reject);
|
||
|
|
});
|
||
|
|
|
||
|
|
return JSON.parse(data);
|
||
|
|
} catch (error: any) {
|
||
|
|
// If not found, create default metadata
|
||
|
|
if (error.code === 'NoSuchKey') {
|
||
|
|
const defaultMetadata: EmailSyncMetadata = {
|
||
|
|
lastSyncTime: new Date(0).toISOString(), // Start from beginning
|
||
|
|
totalEmails: 0,
|
||
|
|
syncStatus: 'idle'
|
||
|
|
};
|
||
|
|
|
||
|
|
await saveSyncMetadata(interestId, defaultMetadata);
|
||
|
|
return defaultMetadata;
|
||
|
|
}
|
||
|
|
|
||
|
|
throw error;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Save sync metadata
|
||
|
|
export async function saveSyncMetadata(interestId: string, metadata: EmailSyncMetadata): Promise<void> {
|
||
|
|
const client = getMinioClient();
|
||
|
|
const objectName = `interest-${interestId}/metadata.json`;
|
||
|
|
const buffer = Buffer.from(JSON.stringify(metadata, null, 2));
|
||
|
|
|
||
|
|
await client.putObject('client-emails', objectName, buffer, buffer.length, {
|
||
|
|
'Content-Type': 'application/json'
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
// Get thread index
|
||
|
|
export async function getThreadIndex(interestId: string): Promise<EmailThreadIndex> {
|
||
|
|
const client = getMinioClient();
|
||
|
|
const objectName = `interest-${interestId}/threads/index.json`;
|
||
|
|
|
||
|
|
try {
|
||
|
|
const stream = await client.getObject('client-emails', objectName);
|
||
|
|
let data = '';
|
||
|
|
|
||
|
|
await new Promise((resolve, reject) => {
|
||
|
|
stream.on('data', (chunk) => { data += chunk; });
|
||
|
|
stream.on('end', resolve);
|
||
|
|
stream.on('error', reject);
|
||
|
|
});
|
||
|
|
|
||
|
|
return JSON.parse(data);
|
||
|
|
} catch (error: any) {
|
||
|
|
if (error.code === 'NoSuchKey') {
|
||
|
|
return {
|
||
|
|
threads: [],
|
||
|
|
lastUpdated: new Date().toISOString()
|
||
|
|
};
|
||
|
|
}
|
||
|
|
throw error;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Save thread index
|
||
|
|
export async function saveThreadIndex(interestId: string, index: EmailThreadIndex): Promise<void> {
|
||
|
|
const client = getMinioClient();
|
||
|
|
const objectName = `interest-${interestId}/threads/index.json`;
|
||
|
|
const buffer = Buffer.from(JSON.stringify(index, null, 2));
|
||
|
|
|
||
|
|
await client.putObject('client-emails', objectName, buffer, buffer.length, {
|
||
|
|
'Content-Type': 'application/json'
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
// Sync emails with exponential backoff retry
|
||
|
|
export async function syncEmailsWithRetry(
|
||
|
|
sessionId: string,
|
||
|
|
userEmail: string,
|
||
|
|
clientEmail: string,
|
||
|
|
interestId: string,
|
||
|
|
maxRetries: number = 3
|
||
|
|
): Promise<void> {
|
||
|
|
let retryCount = 0;
|
||
|
|
let lastError: Error | null = null;
|
||
|
|
|
||
|
|
while (retryCount <= maxRetries) {
|
||
|
|
try {
|
||
|
|
await syncEmails(sessionId, userEmail, clientEmail, interestId);
|
||
|
|
return; // Success
|
||
|
|
} catch (error: any) {
|
||
|
|
lastError = error;
|
||
|
|
retryCount++;
|
||
|
|
|
||
|
|
if (retryCount > maxRetries) {
|
||
|
|
throw error;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Exponential backoff: 1s, 2s, 4s, 8s
|
||
|
|
const waitTime = Math.pow(2, retryCount - 1) * 1000;
|
||
|
|
console.log(`[EmailSync] Retry ${retryCount}/${maxRetries} after ${waitTime}ms`);
|
||
|
|
|
||
|
|
await new Promise(resolve => setTimeout(resolve, waitTime));
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
throw lastError || new Error('Failed to sync emails after retries');
|
||
|
|
}
|
||
|
|
|
||
|
|
// Main sync function
|
||
|
|
async function syncEmails(
|
||
|
|
sessionId: string,
|
||
|
|
userEmail: string,
|
||
|
|
clientEmail: string,
|
||
|
|
interestId: string
|
||
|
|
): Promise<void> {
|
||
|
|
const metadata = await getSyncMetadata(interestId);
|
||
|
|
|
||
|
|
// Update status to syncing
|
||
|
|
metadata.syncStatus = 'syncing';
|
||
|
|
await saveSyncMetadata(interestId, metadata);
|
||
|
|
|
||
|
|
try {
|
||
|
|
const pool = getIMAPPool();
|
||
|
|
const imap = await pool.getConnection(sessionId);
|
||
|
|
|
||
|
|
// Fetch emails newer than last sync
|
||
|
|
const lastSyncDate = new Date(metadata.lastSyncTime);
|
||
|
|
const newEmails = await fetchNewEmails(imap, userEmail, clientEmail, lastSyncDate);
|
||
|
|
|
||
|
|
if (newEmails.length > 0) {
|
||
|
|
// Save new emails to MinIO
|
||
|
|
for (const email of newEmails) {
|
||
|
|
const emailId = email.messageId || `${Date.now()}-${Math.random()}`;
|
||
|
|
const objectName = `interest-${interestId}/emails/${emailId}.json`;
|
||
|
|
const buffer = Buffer.from(JSON.stringify(email, null, 2));
|
||
|
|
|
||
|
|
const client = getMinioClient();
|
||
|
|
await client.putObject('client-emails', objectName, buffer, buffer.length, {
|
||
|
|
'Content-Type': 'application/json'
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
// Update thread index
|
||
|
|
await updateThreadIndex(interestId, newEmails);
|
||
|
|
|
||
|
|
// Update metadata
|
||
|
|
metadata.lastSyncTime = new Date().toISOString();
|
||
|
|
metadata.totalEmails += newEmails.length;
|
||
|
|
metadata.syncStatus = 'idle';
|
||
|
|
await saveSyncMetadata(interestId, metadata);
|
||
|
|
}
|
||
|
|
} catch (error: any) {
|
||
|
|
// Update metadata with error
|
||
|
|
metadata.syncStatus = 'error';
|
||
|
|
metadata.lastError = error.message;
|
||
|
|
await saveSyncMetadata(interestId, metadata);
|
||
|
|
|
||
|
|
throw error;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Fetch new emails from IMAP
|
||
|
|
async function fetchNewEmails(
|
||
|
|
imap: any,
|
||
|
|
userEmail: string,
|
||
|
|
clientEmail: string,
|
||
|
|
since: Date
|
||
|
|
): Promise<any[]> {
|
||
|
|
return new Promise((resolve, reject) => {
|
||
|
|
const emails: any[] = [];
|
||
|
|
|
||
|
|
imap.openBox('INBOX', true, (err: any) => {
|
||
|
|
if (err) {
|
||
|
|
reject(err);
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Search for emails newer than last sync
|
||
|
|
const searchCriteria = [
|
||
|
|
['SINCE', since.toISOString().split('T')[0]],
|
||
|
|
['OR',
|
||
|
|
['FROM', clientEmail],
|
||
|
|
['TO', clientEmail]
|
||
|
|
]
|
||
|
|
];
|
||
|
|
|
||
|
|
imap.search(searchCriteria, (err: any, results: number[]) => {
|
||
|
|
if (err) {
|
||
|
|
reject(err);
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
if (results.length === 0) {
|
||
|
|
resolve([]);
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
const fetch = imap.fetch(results, {
|
||
|
|
bodies: '',
|
||
|
|
struct: true,
|
||
|
|
envelope: true
|
||
|
|
});
|
||
|
|
|
||
|
|
fetch.on('message', (msg: any) => {
|
||
|
|
msg.on('body', (stream: any) => {
|
||
|
|
simpleParser(stream, (err: any, parsed: ParsedMail) => {
|
||
|
|
if (!err && parsed) {
|
||
|
|
// Handle from/to addresses which can be single or array
|
||
|
|
const fromText = Array.isArray(parsed.from)
|
||
|
|
? parsed.from.map(addr => addr.text).join(', ')
|
||
|
|
: parsed.from?.text || '';
|
||
|
|
|
||
|
|
const toText = Array.isArray(parsed.to)
|
||
|
|
? parsed.to.map(addr => addr.text).join(', ')
|
||
|
|
: parsed.to?.text || '';
|
||
|
|
|
||
|
|
emails.push({
|
||
|
|
id: parsed.messageId,
|
||
|
|
from: fromText,
|
||
|
|
to: toText,
|
||
|
|
subject: parsed.subject,
|
||
|
|
body: parsed.text,
|
||
|
|
html: parsed.html,
|
||
|
|
timestamp: parsed.date?.toISOString(),
|
||
|
|
attachments: parsed.attachments?.map(att => ({
|
||
|
|
filename: att.filename,
|
||
|
|
contentType: att.contentType,
|
||
|
|
size: att.size
|
||
|
|
}))
|
||
|
|
});
|
||
|
|
}
|
||
|
|
});
|
||
|
|
});
|
||
|
|
});
|
||
|
|
|
||
|
|
fetch.once('end', () => {
|
||
|
|
resolve(emails);
|
||
|
|
});
|
||
|
|
|
||
|
|
fetch.once('error', (err: any) => {
|
||
|
|
reject(err);
|
||
|
|
});
|
||
|
|
});
|
||
|
|
});
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
// Update thread index with new emails
|
||
|
|
async function updateThreadIndex(interestId: string, newEmails: any[]): Promise<void> {
|
||
|
|
const index = await getThreadIndex(interestId);
|
||
|
|
|
||
|
|
// Group emails by thread (simplified - by subject)
|
||
|
|
for (const email of newEmails) {
|
||
|
|
const normalizedSubject = email.subject
|
||
|
|
?.replace(/^(Re:|Fwd:|Fw:|RE:|FW:|FWD:)\s*/gi, '')
|
||
|
|
.trim() || 'No Subject';
|
||
|
|
|
||
|
|
let thread = index.threads.find(t =>
|
||
|
|
t.subject.replace(/^(Re:|Fwd:|Fw:|RE:|FW:|FWD:)\s*/gi, '').trim() === normalizedSubject
|
||
|
|
);
|
||
|
|
|
||
|
|
if (!thread) {
|
||
|
|
thread = {
|
||
|
|
id: `thread-${Date.now()}-${Math.random()}`,
|
||
|
|
subject: email.subject || 'No Subject',
|
||
|
|
participants: [],
|
||
|
|
emailCount: 0,
|
||
|
|
lastActivity: email.timestamp,
|
||
|
|
hasAttachments: false
|
||
|
|
};
|
||
|
|
index.threads.push(thread);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Update thread
|
||
|
|
thread.emailCount++;
|
||
|
|
thread.lastActivity = email.timestamp;
|
||
|
|
if (email.attachments?.length > 0) {
|
||
|
|
thread.hasAttachments = true;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Add participants
|
||
|
|
const from = email.from?.match(/<(.+)>/)?.[1] || email.from;
|
||
|
|
if (from && !thread.participants.includes(from)) {
|
||
|
|
thread.participants.push(from);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Sort threads by last activity
|
||
|
|
index.threads.sort((a, b) =>
|
||
|
|
new Date(b.lastActivity).getTime() - new Date(a.lastActivity).getTime()
|
||
|
|
);
|
||
|
|
|
||
|
|
index.lastUpdated = new Date().toISOString();
|
||
|
|
await saveThreadIndex(interestId, index);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Get emails from MinIO cache
|
||
|
|
export async function getCachedEmails(interestId: string): Promise<any[]> {
|
||
|
|
const client = getMinioClient();
|
||
|
|
const emails: any[] = [];
|
||
|
|
|
||
|
|
try {
|
||
|
|
const stream = client.listObjectsV2('client-emails', `interest-${interestId}/emails/`, true);
|
||
|
|
const files: any[] = [];
|
||
|
|
|
||
|
|
await new Promise((resolve, reject) => {
|
||
|
|
stream.on('data', (obj) => {
|
||
|
|
if (obj && obj.name && obj.name.endsWith('.json')) {
|
||
|
|
files.push(obj.name);
|
||
|
|
}
|
||
|
|
});
|
||
|
|
stream.on('error', reject);
|
||
|
|
stream.on('end', resolve);
|
||
|
|
});
|
||
|
|
|
||
|
|
// Load each email
|
||
|
|
for (const fileName of files) {
|
||
|
|
try {
|
||
|
|
const objStream = await client.getObject('client-emails', fileName);
|
||
|
|
let data = '';
|
||
|
|
|
||
|
|
await new Promise((resolve, reject) => {
|
||
|
|
objStream.on('data', (chunk) => { data += chunk; });
|
||
|
|
objStream.on('end', resolve);
|
||
|
|
objStream.on('error', reject);
|
||
|
|
});
|
||
|
|
|
||
|
|
emails.push(JSON.parse(data));
|
||
|
|
} catch (error) {
|
||
|
|
console.error(`Failed to load email ${fileName}:`, error);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
} catch (error) {
|
||
|
|
console.error('Failed to list cached emails:', error);
|
||
|
|
}
|
||
|
|
|
||
|
|
return emails;
|
||
|
|
}
|