port-nimara-client-portal/server/api/email/fetch-thread.ts

470 lines
15 KiB
TypeScript

import Imap from 'imap';
import { simpleParser } from 'mailparser';
import { getCredentialsFromSession, decryptCredentials } from '~/server/utils/encryption';
import { listFiles, getFileStats, getMinioClient, uploadFile } from '~/server/utils/minio';
interface EmailMessage {
id: string;
from: string;
to: string | string[];
subject: string;
body: string;
html?: string;
timestamp: string;
direction: 'sent' | 'received';
threadId?: string;
}
export default defineEventHandler(async (event) => {
const xTagHeader = getRequestHeader(event, "x-tag");
if (!xTagHeader || (xTagHeader !== "094ut234" && xTagHeader !== "pjnvü1230")) {
throw createError({ statusCode: 401, statusMessage: "unauthenticated" });
}
try {
const body = await readBody(event);
// Limit IMAP emails but load all cached emails
const { clientEmail, interestId, sessionId, limit = 50 } = body;
if (!clientEmail || !sessionId) {
throw createError({
statusCode: 400,
statusMessage: "Client email and sessionId are required"
});
}
// Get encrypted credentials from session
const encryptedCredentials = getCredentialsFromSession(sessionId);
if (!encryptedCredentials) {
// Return empty results instead of throwing error
return {
success: true,
emails: [],
threads: []
};
}
// Decrypt credentials
let userEmail: string;
let password: string;
try {
const decrypted = decryptCredentials(encryptedCredentials);
userEmail = decrypted.email;
password = decrypted.password;
} catch (decryptError) {
console.error('Failed to decrypt credentials - session may be invalid after restart:', decryptError);
// Return empty results for invalid session
return {
success: true,
emails: [],
threads: []
};
}
// First, get emails from MinIO cache if available
const cachedEmails: EmailMessage[] = [];
if (interestId) {
try {
// List files from the client-emails bucket
const client = getMinioClient();
const stream = client.listObjectsV2('client-emails', `interest-${interestId}/`, true);
const files: any[] = [];
await new Promise((resolve, reject) => {
stream.on('data', (obj) => {
if (obj && obj.name) {
files.push({ name: obj.name, size: obj.size || 0 });
}
});
stream.on('error', reject);
stream.on('end', resolve);
});
console.log('Found cached email files:', files.length);
// Load ALL cached emails (no limit) - just filter JSON files
const jsonFiles = files.filter(f => f.name.endsWith('.json'));
for (const file of jsonFiles) {
try {
// Read file directly on server using MinIO client (works with private buckets)
const client = getMinioClient();
// Use the client-emails bucket directly
const bucketName = 'client-emails';
// The file.name is already the correct path within the bucket
const fileName = file.name;
// Get object as stream
const stream = await client.getObject(bucketName, fileName);
// Convert stream to string
let data = '';
stream.on('data', (chunk) => {
data += chunk;
});
await new Promise((resolve, reject) => {
stream.on('end', () => resolve(data));
stream.on('error', reject);
});
const emailData = JSON.parse(data);
cachedEmails.push(emailData);
} catch (err) {
console.error('Failed to read cached email:', file.name, err);
}
}
} catch (err) {
console.error('Failed to list cached emails:', err);
}
}
// Configure IMAP
const imapConfig = {
user: userEmail,
password: password,
host: process.env.NUXT_EMAIL_IMAP_HOST || 'mail.portnimara.com',
port: parseInt(process.env.NUXT_EMAIL_IMAP_PORT || '993'),
tls: true,
tlsOptions: {
rejectUnauthorized: false
},
connTimeout: 10000, // 10 seconds connection timeout
authTimeout: 5000 // 5 seconds auth timeout
};
// Fetch emails from IMAP with timeout (increased to 30 seconds)
let imapEmails: EmailMessage[] = [];
const timeoutPromise = new Promise<EmailMessage[]>((_, reject) =>
setTimeout(() => reject(new Error('IMAP connection timeout')), 30000)
);
try {
imapEmails = await Promise.race([
fetchImapEmails(imapConfig, userEmail, clientEmail, limit, interestId),
timeoutPromise
]);
} catch (imapError) {
console.error('IMAP fetch failed:', imapError);
// Continue with cached emails only
}
// Combine cached and IMAP emails, remove duplicates
const allEmails = [...cachedEmails, ...imapEmails];
const uniqueEmails = Array.from(
new Map(allEmails.map(email => [email.id, email])).values()
);
// Sort by timestamp (newest first)
uniqueEmails.sort((a, b) =>
new Date(b.timestamp).getTime() - new Date(a.timestamp).getTime()
);
// Group into threads
const threads = groupIntoThreads(uniqueEmails);
return {
success: true,
emails: uniqueEmails,
threads: threads
};
} catch (error) {
console.error('Failed to fetch email thread:', error);
if (error instanceof Error) {
throw createError({
statusCode: 500,
statusMessage: `Failed to fetch emails: ${error.message}`
});
} else {
throw createError({
statusCode: 500,
statusMessage: "An unexpected error occurred",
});
}
}
});
// Separate function for IMAP fetching with proper cleanup
async function fetchImapEmails(
imapConfig: any,
userEmail: string,
clientEmail: string,
limit: number,
interestId?: string
): Promise<EmailMessage[]> {
return new Promise((resolve, reject) => {
const emails: EmailMessage[] = [];
const imap = new Imap(imapConfig);
let isResolved = false;
const cleanup = () => {
if (!isResolved) {
isResolved = true;
try {
imap.end();
} catch (e) {
console.error('Error closing IMAP connection:', e);
}
}
};
imap.once('ready', () => {
// Search in both INBOX and Sent folders
const foldersToSearch = ['INBOX', 'Sent', 'Sent Items', 'Sent Mail'];
let currentFolderIndex = 0;
const allEmails: EmailMessage[] = [];
const searchNextFolder = () => {
if (currentFolderIndex >= foldersToSearch.length) {
cleanup();
resolve(allEmails);
return;
}
const folderName = foldersToSearch[currentFolderIndex];
currentFolderIndex++;
imap.openBox(folderName, true, (err, box) => {
if (err) {
console.log(`Folder ${folderName} not found, trying next...`);
searchNextFolder();
return;
}
console.log(`Searching in folder: ${folderName}`);
// Search for emails both sent and received with this client
// Note: BCC search might not be supported by all IMAP servers
if (!clientEmail || clientEmail.trim() === '') {
console.log('No client email provided, skipping search');
searchNextFolder();
return;
}
// For specific client searches, don't use date filter to ensure we get complete threads
// Just get ALL emails and filter manually
imap.search(['ALL'], (err, results) => {
if (err) {
console.error(`Search error in ${folderName}:`, err);
searchNextFolder();
return;
}
if (!results || results.length === 0) {
console.log(`No emails found in ${folderName}`);
searchNextFolder();
return;
}
console.log(`Found ${results.length} emails in ${folderName}`);
const messagesToFetch = results.slice(-limit);
let messagesProcessed = 0;
const fetch = imap.fetch(messagesToFetch, {
bodies: '',
struct: true,
envelope: true
});
fetch.on('message', (msg, seqno) => {
msg.on('body', (stream, info) => {
simpleParser(stream as any, async (err: any, parsed: any) => {
if (err) {
console.error('Parse error:', err);
messagesProcessed++;
if (messagesProcessed === messagesToFetch.length) {
searchNextFolder();
}
return;
}
// Check if this email involves the client
const fromEmail = parsed.from?.text || '';
const toEmails = Array.isArray(parsed.to)
? parsed.to.map((addr: any) => addr.text).join(', ')
: parsed.to?.text || '';
const ccEmails = Array.isArray(parsed.cc)
? parsed.cc.map((addr: any) => addr.text).join(', ')
: parsed.cc?.text || '';
// Filter to only include emails to/from the client
const involvesClient =
fromEmail.toLowerCase().includes(clientEmail.toLowerCase()) ||
toEmails.toLowerCase().includes(clientEmail.toLowerCase()) ||
ccEmails.toLowerCase().includes(clientEmail.toLowerCase());
if (!involvesClient) {
messagesProcessed++;
if (messagesProcessed === messagesToFetch.length) {
searchNextFolder();
}
return;
}
const email: EmailMessage = {
id: parsed.messageId || `${Date.now()}-${seqno}`,
from: fromEmail,
to: toEmails,
subject: parsed.subject || '',
body: parsed.text || '',
html: parsed.html || undefined,
timestamp: parsed.date?.toISOString() || new Date().toISOString(),
direction: fromEmail.toLowerCase().includes(userEmail.toLowerCase()) ? 'sent' : 'received'
};
if (parsed.headers.has('in-reply-to')) {
email.threadId = parsed.headers.get('in-reply-to') as string;
}
allEmails.push(email);
// Cache this email if we have an interestId
if (interestId && involvesClient) {
try {
const emailData = {
...email,
interestId: interestId
};
const objectName = `interest-${interestId}/${Date.now()}-${email.direction}.json`;
const buffer = Buffer.from(JSON.stringify(emailData, null, 2));
// Upload to the client-emails bucket
const client = getMinioClient();
client.putObject('client-emails', objectName, buffer, buffer.length, {
'Content-Type': 'application/json',
}).catch(err => {
console.error('Failed to cache email:', err);
});
} catch (cacheError) {
console.error('Failed to cache email:', cacheError);
}
}
messagesProcessed++;
if (messagesProcessed === messagesToFetch.length) {
searchNextFolder();
}
});
});
});
fetch.once('error', (err) => {
console.error('Fetch error:', err);
searchNextFolder();
});
fetch.once('end', () => {
if (messagesProcessed === 0) {
searchNextFolder();
}
});
});
});
};
searchNextFolder();
});
imap.once('error', (err: any) => {
cleanup();
reject(err);
});
imap.once('end', () => {
if (!isResolved) {
isResolved = true;
resolve(emails);
}
});
imap.connect();
});
}
// Group emails into threads based on subject and references
function groupIntoThreads(emails: EmailMessage[]): any[] {
const threads = new Map<string, EmailMessage[]>();
const emailById = new Map<string, EmailMessage>();
// First pass: index all emails by ID
emails.forEach(email => {
emailById.set(email.id, email);
});
// Second pass: group emails into threads
emails.forEach(email => {
// Normalize subject by removing Re:, Fwd:, etc. and extra whitespace
const normalizedSubject = email.subject
.replace(/^(Re:|Fwd:|Fw:|RE:|FW:|FWD:)\s*/gi, '')
.replace(/\s+/g, ' ')
.trim()
.toLowerCase();
// Check if this email belongs to an existing thread
let threadFound = false;
// First, check if it has a threadId (in-reply-to header)
if (email.threadId) {
// Look for the parent email
const parentEmail = emailById.get(email.threadId);
if (parentEmail) {
// Find which thread the parent belongs to
for (const [threadId, threadEmails] of threads.entries()) {
if (threadEmails.some(e => e.id === parentEmail.id)) {
threadEmails.push(email);
threadFound = true;
break;
}
}
}
}
// If not found by threadId, try to match by subject
if (!threadFound) {
for (const [threadId, threadEmails] of threads.entries()) {
const threadSubject = threadEmails[0].subject
.replace(/^(Re:|Fwd:|Fw:|RE:|FW:|FWD:)\s*/gi, '')
.replace(/\s+/g, ' ')
.trim()
.toLowerCase();
// Check if subjects match (case-insensitive)
if (threadSubject === normalizedSubject) {
threadEmails.push(email);
threadFound = true;
break;
}
}
}
// If still not found, create a new thread
if (!threadFound) {
threads.set(email.id, [email]);
}
});
// Convert to array format and sort emails within each thread
return Array.from(threads.entries())
.map(([threadId, threadEmails]) => {
// Sort emails within thread by timestamp (oldest first for chronological order)
threadEmails.sort((a, b) =>
new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()
);
return {
id: threadId,
subject: threadEmails[0].subject,
emailCount: threadEmails.length,
latestTimestamp: threadEmails[threadEmails.length - 1].timestamp, // Latest email
emails: threadEmails
};
})
// Sort threads by latest activity (newest first)
.sort((a, b) => new Date(b.latestTimestamp).getTime() - new Date(a.latestTimestamp).getTime());
}