Initial commit: Port Nimara CRM (Layers 0-4)
Full CRM rebuild with Next.js 15, TypeScript, Tailwind, Drizzle ORM, PostgreSQL, Redis, BullMQ, MinIO, and Socket.io. Includes 461 source files covering clients, berths, interests/pipeline, documents/EOI, expenses/invoices, email, notifications, dashboard, admin, and client portal. CI/CD via Gitea Actions with Docker builds. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
354
src/lib/services/email-threads.service.ts
Normal file
354
src/lib/services/email-threads.service.ts
Normal file
@@ -0,0 +1,354 @@
|
||||
import { and, desc, eq, ilike, or, sql } from 'drizzle-orm';
|
||||
|
||||
import { db } from '@/lib/db';
|
||||
import { emailAccounts, emailMessages, emailThreads } from '@/lib/db/schema/email';
|
||||
import { clientContacts, clients } from '@/lib/db/schema/clients';
|
||||
import { NotFoundError } from '@/lib/errors';
|
||||
import { getDecryptedCredentials } from '@/lib/services/email-accounts.service';
|
||||
import { logger } from '@/lib/logger';
|
||||
import type { ListThreadsInput } from '@/lib/validators/email';
|
||||
|
||||
// ─── Types ────────────────────────────────────────────────────────────────────
|
||||
|
||||
interface ParsedEmail {
|
||||
messageId: string;
|
||||
from: string;
|
||||
to: string[];
|
||||
cc?: string[];
|
||||
subject: string;
|
||||
text?: string;
|
||||
html?: string;
|
||||
date: Date;
|
||||
inReplyTo?: string;
|
||||
references?: string[];
|
||||
}
|
||||
|
||||
// ─── List Threads ─────────────────────────────────────────────────────────────
|
||||
|
||||
export async function listThreads(portId: string, query: ListThreadsInput) {
|
||||
const { page, limit, clientId } = query;
|
||||
const offset = (page - 1) * limit;
|
||||
|
||||
const conditions = [eq(emailThreads.portId, portId)];
|
||||
if (clientId) {
|
||||
conditions.push(eq(emailThreads.clientId, clientId));
|
||||
}
|
||||
|
||||
const where = and(...conditions);
|
||||
|
||||
const [rows, countResult] = await Promise.all([
|
||||
db
|
||||
.select({
|
||||
thread: emailThreads,
|
||||
clientName: clients.fullName,
|
||||
})
|
||||
.from(emailThreads)
|
||||
.leftJoin(clients, eq(emailThreads.clientId, clients.id))
|
||||
.where(where)
|
||||
.orderBy(desc(emailThreads.lastMessageAt))
|
||||
.limit(limit)
|
||||
.offset(offset),
|
||||
db
|
||||
.select({ count: sql<string>`count(*)` })
|
||||
.from(emailThreads)
|
||||
.where(where),
|
||||
]);
|
||||
|
||||
const total = parseInt(countResult[0]?.count ?? '0', 10);
|
||||
|
||||
return {
|
||||
data: rows.map((r) => ({ ...r.thread, clientName: r.clientName ?? null })),
|
||||
total,
|
||||
};
|
||||
}
|
||||
|
||||
// ─── Get Thread ───────────────────────────────────────────────────────────────
|
||||
|
||||
export async function getThread(threadId: string, portId: string) {
|
||||
const thread = await db.query.emailThreads.findFirst({
|
||||
where: eq(emailThreads.id, threadId),
|
||||
});
|
||||
|
||||
if (!thread) {
|
||||
throw new NotFoundError('Email thread');
|
||||
}
|
||||
|
||||
if (thread.portId !== portId) {
|
||||
throw new NotFoundError('Email thread');
|
||||
}
|
||||
|
||||
const messages = await db
|
||||
.select()
|
||||
.from(emailMessages)
|
||||
.where(eq(emailMessages.threadId, threadId))
|
||||
.orderBy(emailMessages.sentAt);
|
||||
|
||||
return { ...thread, messages };
|
||||
}
|
||||
|
||||
// ─── Ingest Message ───────────────────────────────────────────────────────────
|
||||
|
||||
export async function ingestMessage(portId: string, parsedEmail: ParsedEmail) {
|
||||
let threadId: string | null = null;
|
||||
|
||||
// Step 1: Message-ID chain — check inReplyTo and references headers
|
||||
const referencedIds = [
|
||||
...(parsedEmail.inReplyTo ? [parsedEmail.inReplyTo] : []),
|
||||
...(parsedEmail.references ?? []),
|
||||
];
|
||||
|
||||
if (referencedIds.length > 0) {
|
||||
const existingMessage = await db.query.emailMessages.findFirst({
|
||||
where: or(...referencedIds.map((id) => eq(emailMessages.messageIdHeader, id))),
|
||||
});
|
||||
if (existingMessage) {
|
||||
// Verify thread belongs to this port
|
||||
const thread = await db.query.emailThreads.findFirst({
|
||||
where: and(
|
||||
eq(emailThreads.id, existingMessage.threadId),
|
||||
eq(emailThreads.portId, portId),
|
||||
),
|
||||
});
|
||||
if (thread) {
|
||||
threadId = thread.id;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2: Email address match against client contacts
|
||||
if (!threadId) {
|
||||
const fromAddress = parsedEmail.from.replace(/.*<(.+)>/, '$1').trim().toLowerCase();
|
||||
|
||||
const contactRows = await db
|
||||
.select({
|
||||
clientId: clientContacts.clientId,
|
||||
clientPortId: clients.portId,
|
||||
})
|
||||
.from(clientContacts)
|
||||
.innerJoin(clients, eq(clientContacts.clientId, clients.id))
|
||||
.where(
|
||||
and(
|
||||
eq(clientContacts.channel, 'email'),
|
||||
eq(sql`lower(${clientContacts.value})`, fromAddress),
|
||||
eq(clients.portId, portId),
|
||||
),
|
||||
)
|
||||
.limit(1);
|
||||
|
||||
const contactRow = contactRows[0];
|
||||
|
||||
if (contactRow) {
|
||||
const clientId = contactRow.clientId;
|
||||
|
||||
// Find most recent thread for this client or create one
|
||||
const existingThread = await db.query.emailThreads.findFirst({
|
||||
where: and(eq(emailThreads.portId, portId), eq(emailThreads.clientId, clientId)),
|
||||
orderBy: [desc(emailThreads.lastMessageAt)],
|
||||
});
|
||||
|
||||
if (existingThread) {
|
||||
threadId = existingThread.id;
|
||||
} else {
|
||||
const newThreadRows = await db
|
||||
.insert(emailThreads)
|
||||
.values({
|
||||
portId,
|
||||
clientId,
|
||||
subject: parsedEmail.subject,
|
||||
lastMessageAt: parsedEmail.date,
|
||||
messageCount: 0,
|
||||
})
|
||||
.returning();
|
||||
const newThread = newThreadRows[0];
|
||||
if (!newThread) throw new Error('Failed to create email thread');
|
||||
threadId = newThread.id;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: Subject + sender fuzzy match
|
||||
if (!threadId) {
|
||||
const normalizedSubject = parsedEmail.subject
|
||||
.replace(/^(re|fwd|fw):\s*/i, '')
|
||||
.trim();
|
||||
|
||||
if (normalizedSubject) {
|
||||
const matchingThread = await db.query.emailThreads.findFirst({
|
||||
where: and(
|
||||
eq(emailThreads.portId, portId),
|
||||
ilike(emailThreads.subject, `%${normalizedSubject}%`),
|
||||
),
|
||||
orderBy: [desc(emailThreads.lastMessageAt)],
|
||||
});
|
||||
|
||||
if (matchingThread) {
|
||||
threadId = matchingThread.id;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// No thread found — create a new one
|
||||
if (!threadId) {
|
||||
const newThreadRows = await db
|
||||
.insert(emailThreads)
|
||||
.values({
|
||||
portId,
|
||||
subject: parsedEmail.subject,
|
||||
lastMessageAt: parsedEmail.date,
|
||||
messageCount: 0,
|
||||
})
|
||||
.returning();
|
||||
const newThread = newThreadRows[0];
|
||||
if (!newThread) throw new Error('Failed to create email thread');
|
||||
threadId = newThread.id;
|
||||
}
|
||||
|
||||
// Insert the message
|
||||
const messageRows = await db
|
||||
.insert(emailMessages)
|
||||
.values({
|
||||
threadId,
|
||||
messageIdHeader: parsedEmail.messageId || null,
|
||||
fromAddress: parsedEmail.from,
|
||||
toAddresses: parsedEmail.to,
|
||||
ccAddresses: parsedEmail.cc ?? null,
|
||||
subject: parsedEmail.subject,
|
||||
bodyText: parsedEmail.text ?? null,
|
||||
bodyHtml: parsedEmail.html ?? null,
|
||||
direction: 'inbound',
|
||||
sentAt: parsedEmail.date,
|
||||
})
|
||||
.returning();
|
||||
|
||||
const message = messageRows[0];
|
||||
if (!message) throw new Error('Failed to insert email message');
|
||||
|
||||
// Update thread's lastMessageAt and messageCount
|
||||
await db
|
||||
.update(emailThreads)
|
||||
.set({
|
||||
lastMessageAt: parsedEmail.date,
|
||||
messageCount: sql`${emailThreads.messageCount} + 1`,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(emailThreads.id, threadId));
|
||||
|
||||
return { message, threadId };
|
||||
}
|
||||
|
||||
// ─── Sync Inbox ───────────────────────────────────────────────────────────────
|
||||
|
||||
export async function syncInbox(accountId: string): Promise<void> {
|
||||
// Dynamic imports to avoid loading heavy IMAP/mail modules at module initialisation
|
||||
const imapflowModule = await import('imapflow');
|
||||
const ImapFlow = imapflowModule.ImapFlow;
|
||||
const mailparserModule = await import('mailparser');
|
||||
const simpleParser = mailparserModule.simpleParser;
|
||||
|
||||
const account = await db.query.emailAccounts.findFirst({
|
||||
where: eq(emailAccounts.id, accountId),
|
||||
});
|
||||
|
||||
if (!account) {
|
||||
throw new NotFoundError('Email account');
|
||||
}
|
||||
|
||||
const creds = await getDecryptedCredentials(accountId);
|
||||
|
||||
// Determine the since date: last sync or 30 days ago
|
||||
const since = account.lastSyncAt ?? new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
|
||||
|
||||
const client = new ImapFlow({
|
||||
host: account.imapHost,
|
||||
port: account.imapPort,
|
||||
secure: account.imapPort === 993,
|
||||
auth: {
|
||||
user: creds.username,
|
||||
pass: creds.password,
|
||||
},
|
||||
logger: false,
|
||||
});
|
||||
|
||||
try {
|
||||
await client.connect();
|
||||
|
||||
const mailbox = await client.mailboxOpen('INBOX');
|
||||
logger.info({ accountId, exists: mailbox.exists }, 'IMAP INBOX opened');
|
||||
|
||||
// Search for messages since the last sync date
|
||||
// client.search() returns false | number[] — false means nothing found
|
||||
const searchResult = await client.search({ since });
|
||||
const uids: number[] = searchResult === false ? [] : searchResult;
|
||||
|
||||
if (uids.length === 0) {
|
||||
logger.info({ accountId }, 'No new messages to sync');
|
||||
return;
|
||||
}
|
||||
|
||||
for await (const message of client.fetch(uids, { source: true })) {
|
||||
try {
|
||||
if (!message.source) continue;
|
||||
|
||||
const parsed = await simpleParser(message.source);
|
||||
|
||||
// Normalise messageId — mailparser returns string | string[] | undefined
|
||||
const rawMsgId = parsed.messageId;
|
||||
const messageId =
|
||||
rawMsgId == null
|
||||
? ''
|
||||
: Array.isArray(rawMsgId)
|
||||
? (rawMsgId[0] ?? '')
|
||||
: rawMsgId;
|
||||
|
||||
const from = parsed.from?.text ?? '';
|
||||
|
||||
// Normalise to/cc — mailparser AddressObject can be an array
|
||||
const resolveAddresses = (
|
||||
field: typeof parsed.to,
|
||||
): string[] => {
|
||||
if (!field) return [];
|
||||
const arr = Array.isArray(field) ? field : [field];
|
||||
return arr.flatMap((a) =>
|
||||
(a.value ?? []).map((x: { address?: string }) => x.address ?? ''),
|
||||
);
|
||||
};
|
||||
|
||||
const to = resolveAddresses(parsed.to);
|
||||
const cc = parsed.cc ? resolveAddresses(parsed.cc) : undefined;
|
||||
|
||||
const rawRefs = parsed.references;
|
||||
const references: string[] =
|
||||
rawRefs == null
|
||||
? []
|
||||
: typeof rawRefs === 'string'
|
||||
? rawRefs.split(/\s+/).filter(Boolean)
|
||||
: rawRefs;
|
||||
|
||||
await ingestMessage(account.portId, {
|
||||
messageId,
|
||||
from,
|
||||
to,
|
||||
cc,
|
||||
subject: parsed.subject ?? '(no subject)',
|
||||
text: parsed.text ?? undefined,
|
||||
html: typeof parsed.html === 'string' ? parsed.html : undefined,
|
||||
date: parsed.date ?? new Date(),
|
||||
inReplyTo: parsed.inReplyTo ?? undefined,
|
||||
references,
|
||||
});
|
||||
} catch (err) {
|
||||
logger.error({ err, accountId, uid: message.uid }, 'Failed to ingest email message');
|
||||
}
|
||||
}
|
||||
|
||||
// Update lastSyncAt on the account
|
||||
await db
|
||||
.update(emailAccounts)
|
||||
.set({ lastSyncAt: new Date(), updatedAt: new Date() })
|
||||
.where(eq(emailAccounts.id, accountId));
|
||||
|
||||
logger.info({ accountId, messageCount: uids.length }, 'IMAP sync complete');
|
||||
} finally {
|
||||
await client.logout();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user