Full CRM rebuild with Next.js 15, TypeScript, Tailwind, Drizzle ORM, PostgreSQL, Redis, BullMQ, MinIO, and Socket.io. Includes 461 source files covering clients, berths, interests/pipeline, documents/EOI, expenses/invoices, email, notifications, dashboard, admin, and client portal. CI/CD via Gitea Actions with Docker builds. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
355 lines
11 KiB
TypeScript
355 lines
11 KiB
TypeScript
import { and, desc, eq, ilike, or, sql } from 'drizzle-orm';
|
|
|
|
import { db } from '@/lib/db';
|
|
import { emailAccounts, emailMessages, emailThreads } from '@/lib/db/schema/email';
|
|
import { clientContacts, clients } from '@/lib/db/schema/clients';
|
|
import { NotFoundError } from '@/lib/errors';
|
|
import { getDecryptedCredentials } from '@/lib/services/email-accounts.service';
|
|
import { logger } from '@/lib/logger';
|
|
import type { ListThreadsInput } from '@/lib/validators/email';
|
|
|
|
// ─── Types ────────────────────────────────────────────────────────────────────
|
|
|
|
interface ParsedEmail {
|
|
messageId: string;
|
|
from: string;
|
|
to: string[];
|
|
cc?: string[];
|
|
subject: string;
|
|
text?: string;
|
|
html?: string;
|
|
date: Date;
|
|
inReplyTo?: string;
|
|
references?: string[];
|
|
}
|
|
|
|
// ─── List Threads ─────────────────────────────────────────────────────────────
|
|
|
|
export async function listThreads(portId: string, query: ListThreadsInput) {
|
|
const { page, limit, clientId } = query;
|
|
const offset = (page - 1) * limit;
|
|
|
|
const conditions = [eq(emailThreads.portId, portId)];
|
|
if (clientId) {
|
|
conditions.push(eq(emailThreads.clientId, clientId));
|
|
}
|
|
|
|
const where = and(...conditions);
|
|
|
|
const [rows, countResult] = await Promise.all([
|
|
db
|
|
.select({
|
|
thread: emailThreads,
|
|
clientName: clients.fullName,
|
|
})
|
|
.from(emailThreads)
|
|
.leftJoin(clients, eq(emailThreads.clientId, clients.id))
|
|
.where(where)
|
|
.orderBy(desc(emailThreads.lastMessageAt))
|
|
.limit(limit)
|
|
.offset(offset),
|
|
db
|
|
.select({ count: sql<string>`count(*)` })
|
|
.from(emailThreads)
|
|
.where(where),
|
|
]);
|
|
|
|
const total = parseInt(countResult[0]?.count ?? '0', 10);
|
|
|
|
return {
|
|
data: rows.map((r) => ({ ...r.thread, clientName: r.clientName ?? null })),
|
|
total,
|
|
};
|
|
}
|
|
|
|
// ─── Get Thread ───────────────────────────────────────────────────────────────
|
|
|
|
export async function getThread(threadId: string, portId: string) {
|
|
const thread = await db.query.emailThreads.findFirst({
|
|
where: eq(emailThreads.id, threadId),
|
|
});
|
|
|
|
if (!thread) {
|
|
throw new NotFoundError('Email thread');
|
|
}
|
|
|
|
if (thread.portId !== portId) {
|
|
throw new NotFoundError('Email thread');
|
|
}
|
|
|
|
const messages = await db
|
|
.select()
|
|
.from(emailMessages)
|
|
.where(eq(emailMessages.threadId, threadId))
|
|
.orderBy(emailMessages.sentAt);
|
|
|
|
return { ...thread, messages };
|
|
}
|
|
|
|
// ─── Ingest Message ───────────────────────────────────────────────────────────
|
|
|
|
export async function ingestMessage(portId: string, parsedEmail: ParsedEmail) {
|
|
let threadId: string | null = null;
|
|
|
|
// Step 1: Message-ID chain — check inReplyTo and references headers
|
|
const referencedIds = [
|
|
...(parsedEmail.inReplyTo ? [parsedEmail.inReplyTo] : []),
|
|
...(parsedEmail.references ?? []),
|
|
];
|
|
|
|
if (referencedIds.length > 0) {
|
|
const existingMessage = await db.query.emailMessages.findFirst({
|
|
where: or(...referencedIds.map((id) => eq(emailMessages.messageIdHeader, id))),
|
|
});
|
|
if (existingMessage) {
|
|
// Verify thread belongs to this port
|
|
const thread = await db.query.emailThreads.findFirst({
|
|
where: and(
|
|
eq(emailThreads.id, existingMessage.threadId),
|
|
eq(emailThreads.portId, portId),
|
|
),
|
|
});
|
|
if (thread) {
|
|
threadId = thread.id;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 2: Email address match against client contacts
|
|
if (!threadId) {
|
|
const fromAddress = parsedEmail.from.replace(/.*<(.+)>/, '$1').trim().toLowerCase();
|
|
|
|
const contactRows = await db
|
|
.select({
|
|
clientId: clientContacts.clientId,
|
|
clientPortId: clients.portId,
|
|
})
|
|
.from(clientContacts)
|
|
.innerJoin(clients, eq(clientContacts.clientId, clients.id))
|
|
.where(
|
|
and(
|
|
eq(clientContacts.channel, 'email'),
|
|
eq(sql`lower(${clientContacts.value})`, fromAddress),
|
|
eq(clients.portId, portId),
|
|
),
|
|
)
|
|
.limit(1);
|
|
|
|
const contactRow = contactRows[0];
|
|
|
|
if (contactRow) {
|
|
const clientId = contactRow.clientId;
|
|
|
|
// Find most recent thread for this client or create one
|
|
const existingThread = await db.query.emailThreads.findFirst({
|
|
where: and(eq(emailThreads.portId, portId), eq(emailThreads.clientId, clientId)),
|
|
orderBy: [desc(emailThreads.lastMessageAt)],
|
|
});
|
|
|
|
if (existingThread) {
|
|
threadId = existingThread.id;
|
|
} else {
|
|
const newThreadRows = await db
|
|
.insert(emailThreads)
|
|
.values({
|
|
portId,
|
|
clientId,
|
|
subject: parsedEmail.subject,
|
|
lastMessageAt: parsedEmail.date,
|
|
messageCount: 0,
|
|
})
|
|
.returning();
|
|
const newThread = newThreadRows[0];
|
|
if (!newThread) throw new Error('Failed to create email thread');
|
|
threadId = newThread.id;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 3: Subject + sender fuzzy match
|
|
if (!threadId) {
|
|
const normalizedSubject = parsedEmail.subject
|
|
.replace(/^(re|fwd|fw):\s*/i, '')
|
|
.trim();
|
|
|
|
if (normalizedSubject) {
|
|
const matchingThread = await db.query.emailThreads.findFirst({
|
|
where: and(
|
|
eq(emailThreads.portId, portId),
|
|
ilike(emailThreads.subject, `%${normalizedSubject}%`),
|
|
),
|
|
orderBy: [desc(emailThreads.lastMessageAt)],
|
|
});
|
|
|
|
if (matchingThread) {
|
|
threadId = matchingThread.id;
|
|
}
|
|
}
|
|
}
|
|
|
|
// No thread found — create a new one
|
|
if (!threadId) {
|
|
const newThreadRows = await db
|
|
.insert(emailThreads)
|
|
.values({
|
|
portId,
|
|
subject: parsedEmail.subject,
|
|
lastMessageAt: parsedEmail.date,
|
|
messageCount: 0,
|
|
})
|
|
.returning();
|
|
const newThread = newThreadRows[0];
|
|
if (!newThread) throw new Error('Failed to create email thread');
|
|
threadId = newThread.id;
|
|
}
|
|
|
|
// Insert the message
|
|
const messageRows = await db
|
|
.insert(emailMessages)
|
|
.values({
|
|
threadId,
|
|
messageIdHeader: parsedEmail.messageId || null,
|
|
fromAddress: parsedEmail.from,
|
|
toAddresses: parsedEmail.to,
|
|
ccAddresses: parsedEmail.cc ?? null,
|
|
subject: parsedEmail.subject,
|
|
bodyText: parsedEmail.text ?? null,
|
|
bodyHtml: parsedEmail.html ?? null,
|
|
direction: 'inbound',
|
|
sentAt: parsedEmail.date,
|
|
})
|
|
.returning();
|
|
|
|
const message = messageRows[0];
|
|
if (!message) throw new Error('Failed to insert email message');
|
|
|
|
// Update thread's lastMessageAt and messageCount
|
|
await db
|
|
.update(emailThreads)
|
|
.set({
|
|
lastMessageAt: parsedEmail.date,
|
|
messageCount: sql`${emailThreads.messageCount} + 1`,
|
|
updatedAt: new Date(),
|
|
})
|
|
.where(eq(emailThreads.id, threadId));
|
|
|
|
return { message, threadId };
|
|
}
|
|
|
|
// ─── Sync Inbox ───────────────────────────────────────────────────────────────
|
|
|
|
export async function syncInbox(accountId: string): Promise<void> {
|
|
// Dynamic imports to avoid loading heavy IMAP/mail modules at module initialisation
|
|
const imapflowModule = await import('imapflow');
|
|
const ImapFlow = imapflowModule.ImapFlow;
|
|
const mailparserModule = await import('mailparser');
|
|
const simpleParser = mailparserModule.simpleParser;
|
|
|
|
const account = await db.query.emailAccounts.findFirst({
|
|
where: eq(emailAccounts.id, accountId),
|
|
});
|
|
|
|
if (!account) {
|
|
throw new NotFoundError('Email account');
|
|
}
|
|
|
|
const creds = await getDecryptedCredentials(accountId);
|
|
|
|
// Determine the since date: last sync or 30 days ago
|
|
const since = account.lastSyncAt ?? new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
|
|
|
|
const client = new ImapFlow({
|
|
host: account.imapHost,
|
|
port: account.imapPort,
|
|
secure: account.imapPort === 993,
|
|
auth: {
|
|
user: creds.username,
|
|
pass: creds.password,
|
|
},
|
|
logger: false,
|
|
});
|
|
|
|
try {
|
|
await client.connect();
|
|
|
|
const mailbox = await client.mailboxOpen('INBOX');
|
|
logger.info({ accountId, exists: mailbox.exists }, 'IMAP INBOX opened');
|
|
|
|
// Search for messages since the last sync date
|
|
// client.search() returns false | number[] — false means nothing found
|
|
const searchResult = await client.search({ since });
|
|
const uids: number[] = searchResult === false ? [] : searchResult;
|
|
|
|
if (uids.length === 0) {
|
|
logger.info({ accountId }, 'No new messages to sync');
|
|
return;
|
|
}
|
|
|
|
for await (const message of client.fetch(uids, { source: true })) {
|
|
try {
|
|
if (!message.source) continue;
|
|
|
|
const parsed = await simpleParser(message.source);
|
|
|
|
// Normalise messageId — mailparser returns string | string[] | undefined
|
|
const rawMsgId = parsed.messageId;
|
|
const messageId =
|
|
rawMsgId == null
|
|
? ''
|
|
: Array.isArray(rawMsgId)
|
|
? (rawMsgId[0] ?? '')
|
|
: rawMsgId;
|
|
|
|
const from = parsed.from?.text ?? '';
|
|
|
|
// Normalise to/cc — mailparser AddressObject can be an array
|
|
const resolveAddresses = (
|
|
field: typeof parsed.to,
|
|
): string[] => {
|
|
if (!field) return [];
|
|
const arr = Array.isArray(field) ? field : [field];
|
|
return arr.flatMap((a) =>
|
|
(a.value ?? []).map((x: { address?: string }) => x.address ?? ''),
|
|
);
|
|
};
|
|
|
|
const to = resolveAddresses(parsed.to);
|
|
const cc = parsed.cc ? resolveAddresses(parsed.cc) : undefined;
|
|
|
|
const rawRefs = parsed.references;
|
|
const references: string[] =
|
|
rawRefs == null
|
|
? []
|
|
: typeof rawRefs === 'string'
|
|
? rawRefs.split(/\s+/).filter(Boolean)
|
|
: rawRefs;
|
|
|
|
await ingestMessage(account.portId, {
|
|
messageId,
|
|
from,
|
|
to,
|
|
cc,
|
|
subject: parsed.subject ?? '(no subject)',
|
|
text: parsed.text ?? undefined,
|
|
html: typeof parsed.html === 'string' ? parsed.html : undefined,
|
|
date: parsed.date ?? new Date(),
|
|
inReplyTo: parsed.inReplyTo ?? undefined,
|
|
references,
|
|
});
|
|
} catch (err) {
|
|
logger.error({ err, accountId, uid: message.uid }, 'Failed to ingest email message');
|
|
}
|
|
}
|
|
|
|
// Update lastSyncAt on the account
|
|
await db
|
|
.update(emailAccounts)
|
|
.set({ lastSyncAt: new Date(), updatedAt: new Date() })
|
|
.where(eq(emailAccounts.id, accountId));
|
|
|
|
logger.info({ accountId, messageCount: uids.length }, 'IMAP sync complete');
|
|
} finally {
|
|
await client.logout();
|
|
}
|
|
}
|