email server updates
This commit is contained in:
223
server/api/email/fetch-thread-v2.ts
Normal file
223
server/api/email/fetch-thread-v2.ts
Normal file
@@ -0,0 +1,223 @@
|
||||
import { getCredentialsFromSession, decryptCredentials } from '~/server/utils/encryption';
|
||||
import { getCachedEmails, syncEmailsWithRetry, getSyncMetadata } from '~/server/utils/email-sync';
|
||||
|
||||
interface EmailMessage {
|
||||
id: string;
|
||||
from: string;
|
||||
to: string | string[];
|
||||
subject: string;
|
||||
body: string;
|
||||
html?: string;
|
||||
timestamp: string;
|
||||
direction: 'sent' | 'received';
|
||||
threadId?: string;
|
||||
attachments?: any[];
|
||||
}
|
||||
|
||||
interface EmailThread {
|
||||
id: string;
|
||||
subject: string;
|
||||
emailCount: number;
|
||||
latestTimestamp: string;
|
||||
emails: EmailMessage[];
|
||||
}
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
const xTagHeader = getRequestHeader(event, "x-tag");
|
||||
|
||||
if (!xTagHeader || (xTagHeader !== "094ut234" && xTagHeader !== "pjnvü1230")) {
|
||||
throw createError({ statusCode: 401, statusMessage: "unauthenticated" });
|
||||
}
|
||||
|
||||
try {
|
||||
const body = await readBody(event);
|
||||
const { clientEmail, interestId, sessionId } = body;
|
||||
|
||||
if (!clientEmail || !sessionId || !interestId) {
|
||||
throw createError({
|
||||
statusCode: 400,
|
||||
statusMessage: "Client email, interestId and sessionId are required"
|
||||
});
|
||||
}
|
||||
|
||||
// Get encrypted credentials from session
|
||||
const encryptedCredentials = getCredentialsFromSession(sessionId);
|
||||
if (!encryptedCredentials) {
|
||||
return {
|
||||
success: true,
|
||||
emails: [],
|
||||
threads: []
|
||||
};
|
||||
}
|
||||
|
||||
// Decrypt credentials
|
||||
let userEmail: string;
|
||||
let password: string;
|
||||
|
||||
try {
|
||||
const decrypted = decryptCredentials(encryptedCredentials);
|
||||
userEmail = decrypted.email;
|
||||
password = decrypted.password;
|
||||
} catch (decryptError) {
|
||||
console.error('[Email V2] Failed to decrypt credentials:', decryptError);
|
||||
return {
|
||||
success: true,
|
||||
emails: [],
|
||||
threads: []
|
||||
};
|
||||
}
|
||||
|
||||
// First, get emails from MinIO cache (instant response)
|
||||
const cachedEmails = await getCachedEmails(interestId);
|
||||
|
||||
// Get sync metadata
|
||||
const metadata = await getSyncMetadata(interestId);
|
||||
|
||||
// Trigger background sync if not currently syncing and last sync was over 5 minutes ago
|
||||
const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000);
|
||||
const lastSync = new Date(metadata.lastSyncTime);
|
||||
|
||||
if (metadata.syncStatus !== 'syncing' && lastSync < fiveMinutesAgo) {
|
||||
// Fire and forget - don't wait for sync to complete
|
||||
syncEmailsWithRetry(sessionId, userEmail, clientEmail, interestId).catch(err => {
|
||||
console.error('[Email V2] Background sync failed:', err);
|
||||
});
|
||||
}
|
||||
|
||||
// Process cached emails
|
||||
const emails: EmailMessage[] = cachedEmails.map(email => ({
|
||||
id: email.id || email.messageId || `${Date.now()}-${Math.random()}`,
|
||||
from: email.from || '',
|
||||
to: email.to || '',
|
||||
subject: email.subject || '',
|
||||
body: email.body || email.text || '',
|
||||
html: email.html,
|
||||
timestamp: email.timestamp || new Date().toISOString(),
|
||||
direction: email.direction || (email.from?.toLowerCase().includes(userEmail.toLowerCase()) ? 'sent' : 'received'),
|
||||
threadId: email.threadId,
|
||||
attachments: email.attachments
|
||||
}));
|
||||
|
||||
// Filter to only include emails involving the client
|
||||
const filteredEmails = emails.filter(email => {
|
||||
const fromEmail = email.from.toLowerCase();
|
||||
const toEmails = Array.isArray(email.to) ? email.to.join(' ').toLowerCase() : email.to.toLowerCase();
|
||||
|
||||
return fromEmail.includes(clientEmail.toLowerCase()) ||
|
||||
toEmails.includes(clientEmail.toLowerCase()) ||
|
||||
fromEmail.includes(userEmail.toLowerCase()) ||
|
||||
toEmails.includes(userEmail.toLowerCase());
|
||||
});
|
||||
|
||||
// Sort by timestamp (newest first)
|
||||
filteredEmails.sort((a, b) =>
|
||||
new Date(b.timestamp).getTime() - new Date(a.timestamp).getTime()
|
||||
);
|
||||
|
||||
// Group into threads
|
||||
const threads = groupIntoThreads(filteredEmails);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
emails: filteredEmails,
|
||||
threads: threads,
|
||||
syncStatus: metadata.syncStatus,
|
||||
lastSync: metadata.lastSyncTime,
|
||||
totalEmails: metadata.totalEmails
|
||||
};
|
||||
} catch (error) {
|
||||
console.error('[Email V2] Failed to fetch email thread:', error);
|
||||
if (error instanceof Error) {
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: `Failed to fetch emails: ${error.message}`
|
||||
});
|
||||
} else {
|
||||
throw createError({
|
||||
statusCode: 500,
|
||||
statusMessage: "An unexpected error occurred",
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Group emails into threads based on subject and references
|
||||
function groupIntoThreads(emails: EmailMessage[]): EmailThread[] {
|
||||
const threads = new Map<string, EmailMessage[]>();
|
||||
const emailById = new Map<string, EmailMessage>();
|
||||
|
||||
// First pass: index all emails by ID
|
||||
emails.forEach(email => {
|
||||
emailById.set(email.id, email);
|
||||
});
|
||||
|
||||
// Second pass: group emails into threads
|
||||
emails.forEach(email => {
|
||||
// Normalize subject by removing Re:, Fwd:, etc.
|
||||
const normalizedSubject = email.subject
|
||||
.replace(/^(Re:|Fwd:|Fw:|RE:|FW:|FWD:)\s*/gi, '')
|
||||
.replace(/\s+/g, ' ')
|
||||
.trim()
|
||||
.toLowerCase();
|
||||
|
||||
// Check if this email belongs to an existing thread
|
||||
let threadFound = false;
|
||||
|
||||
// First, check if it has a threadId (in-reply-to header)
|
||||
if (email.threadId) {
|
||||
// Look for the parent email
|
||||
const parentEmail = emailById.get(email.threadId);
|
||||
if (parentEmail) {
|
||||
// Find which thread the parent belongs to
|
||||
for (const [threadId, threadEmails] of threads.entries()) {
|
||||
if (threadEmails.some(e => e.id === parentEmail.id)) {
|
||||
threadEmails.push(email);
|
||||
threadFound = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If not found by threadId, try to match by subject
|
||||
if (!threadFound) {
|
||||
for (const [threadId, threadEmails] of threads.entries()) {
|
||||
const threadSubject = threadEmails[0].subject
|
||||
.replace(/^(Re:|Fwd:|Fw:|RE:|FW:|FWD:)\s*/gi, '')
|
||||
.replace(/\s+/g, ' ')
|
||||
.trim()
|
||||
.toLowerCase();
|
||||
|
||||
if (threadSubject === normalizedSubject) {
|
||||
threadEmails.push(email);
|
||||
threadFound = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If still not found, create a new thread
|
||||
if (!threadFound) {
|
||||
threads.set(email.id, [email]);
|
||||
}
|
||||
});
|
||||
|
||||
// Convert to array format and sort emails within each thread
|
||||
return Array.from(threads.entries())
|
||||
.map(([threadId, threadEmails]) => {
|
||||
// Sort emails within thread by timestamp (oldest first for chronological order)
|
||||
threadEmails.sort((a, b) =>
|
||||
new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()
|
||||
);
|
||||
|
||||
return {
|
||||
id: threadId,
|
||||
subject: threadEmails[0].subject,
|
||||
emailCount: threadEmails.length,
|
||||
latestTimestamp: threadEmails[threadEmails.length - 1].timestamp,
|
||||
emails: threadEmails
|
||||
};
|
||||
})
|
||||
// Sort threads by latest activity (newest first)
|
||||
.sort((a, b) => new Date(b.latestTimestamp).getTime() - new Date(a.latestTimestamp).getTime());
|
||||
}
|
||||
371
server/utils/email-sync.ts
Normal file
371
server/utils/email-sync.ts
Normal file
@@ -0,0 +1,371 @@
|
||||
import { getMinioClient } from './minio';
|
||||
import { getIMAPPool } from './imap-pool';
|
||||
import type { ParsedMail } from 'mailparser';
|
||||
import { simpleParser } from 'mailparser';
|
||||
|
||||
// Email sync service for MinIO-based email management
|
||||
export interface EmailSyncMetadata {
|
||||
lastSyncTime: string;
|
||||
totalEmails: number;
|
||||
lastError?: string;
|
||||
syncStatus: 'idle' | 'syncing' | 'error';
|
||||
}
|
||||
|
||||
export interface EmailThreadIndex {
|
||||
threads: Array<{
|
||||
id: string;
|
||||
subject: string;
|
||||
participants: string[];
|
||||
emailCount: number;
|
||||
lastActivity: string;
|
||||
hasAttachments: boolean;
|
||||
}>;
|
||||
lastUpdated: string;
|
||||
}
|
||||
|
||||
// Get or create sync metadata
|
||||
export async function getSyncMetadata(interestId: string): Promise<EmailSyncMetadata> {
|
||||
const client = getMinioClient();
|
||||
const objectName = `interest-${interestId}/metadata.json`;
|
||||
|
||||
try {
|
||||
const stream = await client.getObject('client-emails', objectName);
|
||||
let data = '';
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
stream.on('data', (chunk) => { data += chunk; });
|
||||
stream.on('end', resolve);
|
||||
stream.on('error', reject);
|
||||
});
|
||||
|
||||
return JSON.parse(data);
|
||||
} catch (error: any) {
|
||||
// If not found, create default metadata
|
||||
if (error.code === 'NoSuchKey') {
|
||||
const defaultMetadata: EmailSyncMetadata = {
|
||||
lastSyncTime: new Date(0).toISOString(), // Start from beginning
|
||||
totalEmails: 0,
|
||||
syncStatus: 'idle'
|
||||
};
|
||||
|
||||
await saveSyncMetadata(interestId, defaultMetadata);
|
||||
return defaultMetadata;
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Save sync metadata
|
||||
export async function saveSyncMetadata(interestId: string, metadata: EmailSyncMetadata): Promise<void> {
|
||||
const client = getMinioClient();
|
||||
const objectName = `interest-${interestId}/metadata.json`;
|
||||
const buffer = Buffer.from(JSON.stringify(metadata, null, 2));
|
||||
|
||||
await client.putObject('client-emails', objectName, buffer, buffer.length, {
|
||||
'Content-Type': 'application/json'
|
||||
});
|
||||
}
|
||||
|
||||
// Get thread index
|
||||
export async function getThreadIndex(interestId: string): Promise<EmailThreadIndex> {
|
||||
const client = getMinioClient();
|
||||
const objectName = `interest-${interestId}/threads/index.json`;
|
||||
|
||||
try {
|
||||
const stream = await client.getObject('client-emails', objectName);
|
||||
let data = '';
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
stream.on('data', (chunk) => { data += chunk; });
|
||||
stream.on('end', resolve);
|
||||
stream.on('error', reject);
|
||||
});
|
||||
|
||||
return JSON.parse(data);
|
||||
} catch (error: any) {
|
||||
if (error.code === 'NoSuchKey') {
|
||||
return {
|
||||
threads: [],
|
||||
lastUpdated: new Date().toISOString()
|
||||
};
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Save thread index
|
||||
export async function saveThreadIndex(interestId: string, index: EmailThreadIndex): Promise<void> {
|
||||
const client = getMinioClient();
|
||||
const objectName = `interest-${interestId}/threads/index.json`;
|
||||
const buffer = Buffer.from(JSON.stringify(index, null, 2));
|
||||
|
||||
await client.putObject('client-emails', objectName, buffer, buffer.length, {
|
||||
'Content-Type': 'application/json'
|
||||
});
|
||||
}
|
||||
|
||||
// Sync emails with exponential backoff retry
|
||||
export async function syncEmailsWithRetry(
|
||||
sessionId: string,
|
||||
userEmail: string,
|
||||
clientEmail: string,
|
||||
interestId: string,
|
||||
maxRetries: number = 3
|
||||
): Promise<void> {
|
||||
let retryCount = 0;
|
||||
let lastError: Error | null = null;
|
||||
|
||||
while (retryCount <= maxRetries) {
|
||||
try {
|
||||
await syncEmails(sessionId, userEmail, clientEmail, interestId);
|
||||
return; // Success
|
||||
} catch (error: any) {
|
||||
lastError = error;
|
||||
retryCount++;
|
||||
|
||||
if (retryCount > maxRetries) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
// Exponential backoff: 1s, 2s, 4s, 8s
|
||||
const waitTime = Math.pow(2, retryCount - 1) * 1000;
|
||||
console.log(`[EmailSync] Retry ${retryCount}/${maxRetries} after ${waitTime}ms`);
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, waitTime));
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError || new Error('Failed to sync emails after retries');
|
||||
}
|
||||
|
||||
// Main sync function
|
||||
async function syncEmails(
|
||||
sessionId: string,
|
||||
userEmail: string,
|
||||
clientEmail: string,
|
||||
interestId: string
|
||||
): Promise<void> {
|
||||
const metadata = await getSyncMetadata(interestId);
|
||||
|
||||
// Update status to syncing
|
||||
metadata.syncStatus = 'syncing';
|
||||
await saveSyncMetadata(interestId, metadata);
|
||||
|
||||
try {
|
||||
const pool = getIMAPPool();
|
||||
const imap = await pool.getConnection(sessionId);
|
||||
|
||||
// Fetch emails newer than last sync
|
||||
const lastSyncDate = new Date(metadata.lastSyncTime);
|
||||
const newEmails = await fetchNewEmails(imap, userEmail, clientEmail, lastSyncDate);
|
||||
|
||||
if (newEmails.length > 0) {
|
||||
// Save new emails to MinIO
|
||||
for (const email of newEmails) {
|
||||
const emailId = email.messageId || `${Date.now()}-${Math.random()}`;
|
||||
const objectName = `interest-${interestId}/emails/${emailId}.json`;
|
||||
const buffer = Buffer.from(JSON.stringify(email, null, 2));
|
||||
|
||||
const client = getMinioClient();
|
||||
await client.putObject('client-emails', objectName, buffer, buffer.length, {
|
||||
'Content-Type': 'application/json'
|
||||
});
|
||||
}
|
||||
|
||||
// Update thread index
|
||||
await updateThreadIndex(interestId, newEmails);
|
||||
|
||||
// Update metadata
|
||||
metadata.lastSyncTime = new Date().toISOString();
|
||||
metadata.totalEmails += newEmails.length;
|
||||
metadata.syncStatus = 'idle';
|
||||
await saveSyncMetadata(interestId, metadata);
|
||||
}
|
||||
} catch (error: any) {
|
||||
// Update metadata with error
|
||||
metadata.syncStatus = 'error';
|
||||
metadata.lastError = error.message;
|
||||
await saveSyncMetadata(interestId, metadata);
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch new emails from IMAP
|
||||
async function fetchNewEmails(
|
||||
imap: any,
|
||||
userEmail: string,
|
||||
clientEmail: string,
|
||||
since: Date
|
||||
): Promise<any[]> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const emails: any[] = [];
|
||||
|
||||
imap.openBox('INBOX', true, (err: any) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
// Search for emails newer than last sync
|
||||
const searchCriteria = [
|
||||
['SINCE', since.toISOString().split('T')[0]],
|
||||
['OR',
|
||||
['FROM', clientEmail],
|
||||
['TO', clientEmail]
|
||||
]
|
||||
];
|
||||
|
||||
imap.search(searchCriteria, (err: any, results: number[]) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
|
||||
if (results.length === 0) {
|
||||
resolve([]);
|
||||
return;
|
||||
}
|
||||
|
||||
const fetch = imap.fetch(results, {
|
||||
bodies: '',
|
||||
struct: true,
|
||||
envelope: true
|
||||
});
|
||||
|
||||
fetch.on('message', (msg: any) => {
|
||||
msg.on('body', (stream: any) => {
|
||||
simpleParser(stream, (err: any, parsed: ParsedMail) => {
|
||||
if (!err && parsed) {
|
||||
// Handle from/to addresses which can be single or array
|
||||
const fromText = Array.isArray(parsed.from)
|
||||
? parsed.from.map(addr => addr.text).join(', ')
|
||||
: parsed.from?.text || '';
|
||||
|
||||
const toText = Array.isArray(parsed.to)
|
||||
? parsed.to.map(addr => addr.text).join(', ')
|
||||
: parsed.to?.text || '';
|
||||
|
||||
emails.push({
|
||||
id: parsed.messageId,
|
||||
from: fromText,
|
||||
to: toText,
|
||||
subject: parsed.subject,
|
||||
body: parsed.text,
|
||||
html: parsed.html,
|
||||
timestamp: parsed.date?.toISOString(),
|
||||
attachments: parsed.attachments?.map(att => ({
|
||||
filename: att.filename,
|
||||
contentType: att.contentType,
|
||||
size: att.size
|
||||
}))
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
fetch.once('end', () => {
|
||||
resolve(emails);
|
||||
});
|
||||
|
||||
fetch.once('error', (err: any) => {
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Update thread index with new emails
|
||||
async function updateThreadIndex(interestId: string, newEmails: any[]): Promise<void> {
|
||||
const index = await getThreadIndex(interestId);
|
||||
|
||||
// Group emails by thread (simplified - by subject)
|
||||
for (const email of newEmails) {
|
||||
const normalizedSubject = email.subject
|
||||
?.replace(/^(Re:|Fwd:|Fw:|RE:|FW:|FWD:)\s*/gi, '')
|
||||
.trim() || 'No Subject';
|
||||
|
||||
let thread = index.threads.find(t =>
|
||||
t.subject.replace(/^(Re:|Fwd:|Fw:|RE:|FW:|FWD:)\s*/gi, '').trim() === normalizedSubject
|
||||
);
|
||||
|
||||
if (!thread) {
|
||||
thread = {
|
||||
id: `thread-${Date.now()}-${Math.random()}`,
|
||||
subject: email.subject || 'No Subject',
|
||||
participants: [],
|
||||
emailCount: 0,
|
||||
lastActivity: email.timestamp,
|
||||
hasAttachments: false
|
||||
};
|
||||
index.threads.push(thread);
|
||||
}
|
||||
|
||||
// Update thread
|
||||
thread.emailCount++;
|
||||
thread.lastActivity = email.timestamp;
|
||||
if (email.attachments?.length > 0) {
|
||||
thread.hasAttachments = true;
|
||||
}
|
||||
|
||||
// Add participants
|
||||
const from = email.from?.match(/<(.+)>/)?.[1] || email.from;
|
||||
if (from && !thread.participants.includes(from)) {
|
||||
thread.participants.push(from);
|
||||
}
|
||||
}
|
||||
|
||||
// Sort threads by last activity
|
||||
index.threads.sort((a, b) =>
|
||||
new Date(b.lastActivity).getTime() - new Date(a.lastActivity).getTime()
|
||||
);
|
||||
|
||||
index.lastUpdated = new Date().toISOString();
|
||||
await saveThreadIndex(interestId, index);
|
||||
}
|
||||
|
||||
// Get emails from MinIO cache
|
||||
export async function getCachedEmails(interestId: string): Promise<any[]> {
|
||||
const client = getMinioClient();
|
||||
const emails: any[] = [];
|
||||
|
||||
try {
|
||||
const stream = client.listObjectsV2('client-emails', `interest-${interestId}/emails/`, true);
|
||||
const files: any[] = [];
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
stream.on('data', (obj) => {
|
||||
if (obj && obj.name && obj.name.endsWith('.json')) {
|
||||
files.push(obj.name);
|
||||
}
|
||||
});
|
||||
stream.on('error', reject);
|
||||
stream.on('end', resolve);
|
||||
});
|
||||
|
||||
// Load each email
|
||||
for (const fileName of files) {
|
||||
try {
|
||||
const objStream = await client.getObject('client-emails', fileName);
|
||||
let data = '';
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
objStream.on('data', (chunk) => { data += chunk; });
|
||||
objStream.on('end', resolve);
|
||||
objStream.on('error', reject);
|
||||
});
|
||||
|
||||
emails.push(JSON.parse(data));
|
||||
} catch (error) {
|
||||
console.error(`Failed to load email ${fileName}:`, error);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Failed to list cached emails:', error);
|
||||
}
|
||||
|
||||
return emails;
|
||||
}
|
||||
Reference in New Issue
Block a user