import { and, desc, eq, ilike, or, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { emailAccounts, emailMessages, emailThreads } from '@/lib/db/schema/email'; import { clientContacts, clients } from '@/lib/db/schema/clients'; import { NotFoundError } from '@/lib/errors'; import { getDecryptedCredentials } from '@/lib/services/email-accounts.service'; import { logger } from '@/lib/logger'; import type { ListThreadsInput } from '@/lib/validators/email'; // ─── Types ──────────────────────────────────────────────────────────────────── interface ParsedEmail { messageId: string; from: string; to: string[]; cc?: string[]; subject: string; text?: string; html?: string; date: Date; inReplyTo?: string; references?: string[]; } // ─── List Threads ───────────────────────────────────────────────────────────── export async function listThreads(portId: string, query: ListThreadsInput) { const { page, limit, clientId } = query; const offset = (page - 1) * limit; const conditions = [eq(emailThreads.portId, portId)]; if (clientId) { conditions.push(eq(emailThreads.clientId, clientId)); } const where = and(...conditions); const [rows, countResult] = await Promise.all([ db .select({ thread: emailThreads, clientName: clients.fullName, }) .from(emailThreads) .leftJoin(clients, eq(emailThreads.clientId, clients.id)) .where(where) .orderBy(desc(emailThreads.lastMessageAt)) .limit(limit) .offset(offset), db .select({ count: sql`count(*)` }) .from(emailThreads) .where(where), ]); const total = parseInt(countResult[0]?.count ?? '0', 10); return { data: rows.map((r) => ({ ...r.thread, clientName: r.clientName ?? null })), total, }; } // ─── Get Thread ─────────────────────────────────────────────────────────────── export async function getThread(threadId: string, portId: string) { const thread = await db.query.emailThreads.findFirst({ where: eq(emailThreads.id, threadId), }); if (!thread) { throw new NotFoundError('Email thread'); } if (thread.portId !== portId) { throw new NotFoundError('Email thread'); } const messages = await db .select() .from(emailMessages) .where(eq(emailMessages.threadId, threadId)) .orderBy(emailMessages.sentAt); return { ...thread, messages }; } // ─── Ingest Message ─────────────────────────────────────────────────────────── export async function ingestMessage(portId: string, parsedEmail: ParsedEmail) { let threadId: string | null = null; // Step 1: Message-ID chain — check inReplyTo and references headers const referencedIds = [ ...(parsedEmail.inReplyTo ? [parsedEmail.inReplyTo] : []), ...(parsedEmail.references ?? []), ]; if (referencedIds.length > 0) { const existingMessage = await db.query.emailMessages.findFirst({ where: or(...referencedIds.map((id) => eq(emailMessages.messageIdHeader, id))), }); if (existingMessage) { // Verify thread belongs to this port const thread = await db.query.emailThreads.findFirst({ where: and( eq(emailThreads.id, existingMessage.threadId), eq(emailThreads.portId, portId), ), }); if (thread) { threadId = thread.id; } } } // Step 2: Email address match against client contacts if (!threadId) { const fromAddress = parsedEmail.from.replace(/.*<(.+)>/, '$1').trim().toLowerCase(); const contactRows = await db .select({ clientId: clientContacts.clientId, clientPortId: clients.portId, }) .from(clientContacts) .innerJoin(clients, eq(clientContacts.clientId, clients.id)) .where( and( eq(clientContacts.channel, 'email'), eq(sql`lower(${clientContacts.value})`, fromAddress), eq(clients.portId, portId), ), ) .limit(1); const contactRow = contactRows[0]; if (contactRow) { const clientId = contactRow.clientId; // Find most recent thread for this client or create one const existingThread = await db.query.emailThreads.findFirst({ where: and(eq(emailThreads.portId, portId), eq(emailThreads.clientId, clientId)), orderBy: [desc(emailThreads.lastMessageAt)], }); if (existingThread) { threadId = existingThread.id; } else { const newThreadRows = await db .insert(emailThreads) .values({ portId, clientId, subject: parsedEmail.subject, lastMessageAt: parsedEmail.date, messageCount: 0, }) .returning(); const newThread = newThreadRows[0]; if (!newThread) throw new Error('Failed to create email thread'); threadId = newThread.id; } } } // Step 3: Subject + sender fuzzy match if (!threadId) { const normalizedSubject = parsedEmail.subject .replace(/^(re|fwd|fw):\s*/i, '') .trim(); if (normalizedSubject) { const matchingThread = await db.query.emailThreads.findFirst({ where: and( eq(emailThreads.portId, portId), ilike(emailThreads.subject, `%${normalizedSubject}%`), ), orderBy: [desc(emailThreads.lastMessageAt)], }); if (matchingThread) { threadId = matchingThread.id; } } } // No thread found — create a new one if (!threadId) { const newThreadRows = await db .insert(emailThreads) .values({ portId, subject: parsedEmail.subject, lastMessageAt: parsedEmail.date, messageCount: 0, }) .returning(); const newThread = newThreadRows[0]; if (!newThread) throw new Error('Failed to create email thread'); threadId = newThread.id; } // Insert the message const messageRows = await db .insert(emailMessages) .values({ threadId, messageIdHeader: parsedEmail.messageId || null, fromAddress: parsedEmail.from, toAddresses: parsedEmail.to, ccAddresses: parsedEmail.cc ?? null, subject: parsedEmail.subject, bodyText: parsedEmail.text ?? null, bodyHtml: parsedEmail.html ?? null, direction: 'inbound', sentAt: parsedEmail.date, }) .returning(); const message = messageRows[0]; if (!message) throw new Error('Failed to insert email message'); // Update thread's lastMessageAt and messageCount await db .update(emailThreads) .set({ lastMessageAt: parsedEmail.date, messageCount: sql`${emailThreads.messageCount} + 1`, updatedAt: new Date(), }) .where(eq(emailThreads.id, threadId)); return { message, threadId }; } // ─── Sync Inbox ─────────────────────────────────────────────────────────────── export async function syncInbox(accountId: string): Promise { // Dynamic imports to avoid loading heavy IMAP/mail modules at module initialisation const imapflowModule = await import('imapflow'); const ImapFlow = imapflowModule.ImapFlow; const mailparserModule = await import('mailparser'); const simpleParser = mailparserModule.simpleParser; const account = await db.query.emailAccounts.findFirst({ where: eq(emailAccounts.id, accountId), }); if (!account) { throw new NotFoundError('Email account'); } const creds = await getDecryptedCredentials(accountId); // Determine the since date: last sync or 30 days ago const since = account.lastSyncAt ?? new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); const client = new ImapFlow({ host: account.imapHost, port: account.imapPort, secure: account.imapPort === 993, auth: { user: creds.username, pass: creds.password, }, logger: false, }); try { await client.connect(); const mailbox = await client.mailboxOpen('INBOX'); logger.info({ accountId, exists: mailbox.exists }, 'IMAP INBOX opened'); // Search for messages since the last sync date // client.search() returns false | number[] — false means nothing found const searchResult = await client.search({ since }); const uids: number[] = searchResult === false ? [] : searchResult; if (uids.length === 0) { logger.info({ accountId }, 'No new messages to sync'); return; } for await (const message of client.fetch(uids, { source: true })) { try { if (!message.source) continue; const parsed = await simpleParser(message.source); // Normalise messageId — mailparser returns string | string[] | undefined const rawMsgId = parsed.messageId; const messageId = rawMsgId == null ? '' : Array.isArray(rawMsgId) ? (rawMsgId[0] ?? '') : rawMsgId; const from = parsed.from?.text ?? ''; // Normalise to/cc — mailparser AddressObject can be an array const resolveAddresses = ( field: typeof parsed.to, ): string[] => { if (!field) return []; const arr = Array.isArray(field) ? field : [field]; return arr.flatMap((a) => (a.value ?? []).map((x: { address?: string }) => x.address ?? ''), ); }; const to = resolveAddresses(parsed.to); const cc = parsed.cc ? resolveAddresses(parsed.cc) : undefined; const rawRefs = parsed.references; const references: string[] = rawRefs == null ? [] : typeof rawRefs === 'string' ? rawRefs.split(/\s+/).filter(Boolean) : rawRefs; await ingestMessage(account.portId, { messageId, from, to, cc, subject: parsed.subject ?? '(no subject)', text: parsed.text ?? undefined, html: typeof parsed.html === 'string' ? parsed.html : undefined, date: parsed.date ?? new Date(), inReplyTo: parsed.inReplyTo ?? undefined, references, }); } catch (err) { logger.error({ err, accountId, uid: message.uid }, 'Failed to ingest email message'); } } // Update lastSyncAt on the account await db .update(emailAccounts) .set({ lastSyncAt: new Date(), updatedAt: new Date() }) .where(eq(emailAccounts.id, accountId)); logger.info({ accountId, messageCount: uids.length }, 'IMAP sync complete'); } finally { await client.logout(); } }