Files
pn-new-crm/src/lib/services/email-compose.service.ts
Matt a65aadc530 feat(deps): adopt p-limit for unbounded mass-op fan-outs
Cap concurrency on two services that were fanning out unbounded
requests to external systems:

1. email-compose.service.ts — attachment resolution. User attaches
   20 files → 20 simultaneous S3/MinIO GETs + 20 buffers in heap.
   Now capped at 4 concurrent reads; peak memory bounded by
   4 × max-attachment-size regardless of attachment count.

2. document-signing-emails.service.ts — sendSigningCompleted fanned
   out one SMTP send per recipient simultaneously. A Sales Contract
   with 10 recipients (client + 5 sellers + 4 witnesses) hit SMTP
   provider connection limits (Mailgun/SES/Postmark all cap concurrent
   connections in the single digits) and dropped overflow silently.
   Now capped at 3 concurrent sends.

Both use `pLimit(N)` from the Sindre Sorhus suite — well-tested at
scale, ~1kb gzip per service. Pattern is established for the
remaining audit-flagged mass-op services (brochures, backup, GDPR
export) to adopt as those files are touched.

Verified: tsc clean, vitest 1293/1293 pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 18:35:56 +02:00

330 lines
11 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import nodemailer from 'nodemailer';
import { and, eq, inArray, sql } from 'drizzle-orm';
import pLimit from 'p-limit';
import { db } from '@/lib/db';
import { emailAccounts, emailMessages, emailThreads } from '@/lib/db/schema/email';
import { documents, documentEvents, files } from '@/lib/db/schema/documents';
import { createAuditLog, type AuditMeta } from '@/lib/audit';
import { env } from '@/lib/env';
import { CodedError, NotFoundError, ForbiddenError } from '@/lib/errors';
import { logger } from '@/lib/logger';
import { getDecryptedCredentials } from '@/lib/services/email-accounts.service';
import { getPortEmailConfig } from '@/lib/services/port-config';
import { sendEmail as sendSystemEmail } from '@/lib/email';
import type { ComposeEmailInput } from '@/lib/validators/email';
// ─── Types ────────────────────────────────────────────────────────────────────
// ─── Helpers ──────────────────────────────────────────────────────────────────
async function assertAttachmentsForPort(
refs: { fileId: string }[] | undefined,
portId: string,
): Promise<void> {
if (!refs || refs.length === 0) return;
for (const r of refs) {
const file = await db.query.files.findFirst({ where: eq(files.id, r.fileId) });
if (!file) throw new NotFoundError('File');
if (file.portId !== portId) {
throw new ForbiddenError('File belongs to a different port');
}
}
}
// ─── Send Email ───────────────────────────────────────────────────────────────
export async function sendEmail(
userId: string,
portId: string,
data: ComposeEmailInput,
audit: AuditMeta,
) {
// System path: port-config noreply identity + system SMTP, with optional
// file attachments. Skips email_messages/email_threads writes; logs a
// documentEvents row when the body is keyed to a document via
// metadata.documentId at the API boundary.
if (data.senderType === 'system') {
return sendSystem(portId, data, audit);
}
if (!data.accountId) {
throw new ForbiddenError('accountId is required for user-path send');
}
// Personal-account sends are admin-gated per port.
const cfg = await getPortEmailConfig(portId);
if (!cfg.allowPersonalAccountSends) {
throw new ForbiddenError('Personal account sends are disabled for this port');
}
await assertAttachmentsForPort(data.attachments, portId);
// Verify the account belongs to the user
const account = await db.query.emailAccounts.findFirst({
where: and(eq(emailAccounts.id, data.accountId), eq(emailAccounts.userId, userId)),
});
if (!account) {
throw new NotFoundError('Email account');
}
if (account.portId !== portId) {
throw new ForbiddenError('Email account does not belong to this port');
}
// Decrypt credentials (INTERNAL - never logged or returned)
const creds = await getDecryptedCredentials(data.accountId);
// Build user-specific SMTP transporter. Same timeouts as the system
// transporter in src/lib/email/index.ts — without these a hung SMTP
// server holds the calling request for ~2min (Nodemailer's default
// connectionTimeout) and starves the documents/email worker slot.
const transporter = nodemailer.createTransport({
host: account.smtpHost,
port: account.smtpPort,
secure: account.smtpPort === 465,
connectionTimeout: 10_000,
greetingTimeout: 10_000,
socketTimeout: 30_000,
auth: { user: creds.username, pass: creds.password },
});
// Resolve threading headers if replying
let inReplyTo: string | undefined;
let references: string | undefined;
if (data.inReplyToMessageId) {
inReplyTo = data.inReplyToMessageId;
// Gather the full references chain from the thread
if (data.threadId) {
const existingMessages = await db
.select({ messageIdHeader: emailMessages.messageIdHeader })
.from(emailMessages)
.where(and(eq(emailMessages.threadId, data.threadId)))
.orderBy(emailMessages.sentAt);
const refIds = existingMessages.map((m) => m.messageIdHeader).filter(Boolean) as string[];
if (refIds.length > 0) {
references = refIds.join(' ');
}
}
}
// Resolve attachments for the user-path SMTP send. Cap concurrency so
// a user attaching 20 large files doesn't fan out 20 simultaneous
// S3/MinIO reads + 20 buffers in memory at once — bounded at 4 means
// peak memory tops out at ~4 × max-file-size irrespective of the
// attachment count.
const attachmentLimit = pLimit(4);
const resolvedAttachments = data.attachments
? await Promise.all(
data.attachments.map((ref) =>
attachmentLimit(async () => {
const file = await db.query.files.findFirst({
where: eq(files.id, ref.fileId),
});
if (!file) throw new NotFoundError('File');
const { getStorageBackend } = await import('@/lib/storage');
const stream = await (await getStorageBackend()).get(file.storagePath);
const chunks: Buffer[] = [];
for await (const chunk of stream) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
return {
filename: ref.filename ?? file.originalName,
content: Buffer.concat(chunks),
...(file.mimeType ? { contentType: file.mimeType } : {}),
};
}),
),
)
: undefined;
// Safety net: when EMAIL_REDIRECT_TO is set, every recipient is rerouted
// to that address and the subject is prefixed so the operator can see
// who would have received the message. This service builds its OWN
// transporter (per-account SMTP) so it doesn't go through sendEmail's
// redirect - we apply the same logic here.
const requestedTo = data.to.join(', ');
const requestedCc = data.cc?.join(', ');
const effectiveTo = env.EMAIL_REDIRECT_TO ?? requestedTo;
const effectiveCc = env.EMAIL_REDIRECT_TO ? undefined : requestedCc;
const effectiveSubject = env.EMAIL_REDIRECT_TO
? `[redirected from ${requestedTo}${requestedCc ? `, cc=${requestedCc}` : ''}] ${data.subject}`
: data.subject;
if (env.EMAIL_REDIRECT_TO) {
logger.info(
{
userId,
portId,
accountId: data.accountId,
originalTo: requestedTo,
originalCc: requestedCc ?? null,
redirectedTo: env.EMAIL_REDIRECT_TO,
},
'email-compose redirected to EMAIL_REDIRECT_TO',
);
}
// Send via the user's SMTP transporter
const info = await transporter.sendMail({
from: account.emailAddress,
to: effectiveTo,
cc: effectiveCc,
subject: effectiveSubject,
html: data.bodyHtml,
inReplyTo,
references,
...(resolvedAttachments && resolvedAttachments.length > 0
? { attachments: resolvedAttachments }
: {}),
});
const sentMessageId: string =
typeof info.messageId === 'string' ? info.messageId : String(info.messageId ?? '');
// Resolve or create thread
let threadId: string;
if (data.threadId) {
// Verify thread belongs to this port
const existingThread = await db.query.emailThreads.findFirst({
where: and(eq(emailThreads.id, data.threadId), eq(emailThreads.portId, portId)),
});
if (!existingThread) {
throw new NotFoundError('Email thread');
}
threadId = existingThread.id;
} else {
const newThreadRows = await db
.insert(emailThreads)
.values({
portId,
subject: data.subject,
lastMessageAt: new Date(),
messageCount: 0,
})
.returning();
const newThread = newThreadRows[0];
if (!newThread)
throw new CodedError('INSERT_RETURNING_EMPTY', {
internalMessage: 'Failed to create email thread',
});
threadId = newThread.id;
}
const now = new Date();
// Persist the outbound message
const messageRows = await db
.insert(emailMessages)
.values({
threadId,
messageIdHeader: sentMessageId || null,
fromAddress: account.emailAddress,
toAddresses: data.to,
ccAddresses: data.cc ?? null,
subject: data.subject,
bodyHtml: data.bodyHtml,
direction: 'outbound',
sentAt: now,
attachmentFileIds:
data.attachments && data.attachments.length > 0
? data.attachments.map((a) => a.fileId)
: null,
})
.returning();
const message = messageRows[0];
if (!message)
throw new CodedError('INSERT_RETURNING_EMPTY', {
internalMessage: 'Failed to persist outbound email message',
});
// Update thread metadata
await db
.update(emailThreads)
.set({
lastMessageAt: now,
messageCount: sql`${emailThreads.messageCount} + 1`,
updatedAt: now,
})
.where(eq(emailThreads.id, threadId));
void createAuditLog({
userId: audit.userId,
portId: audit.portId,
action: 'create',
entityType: 'email_message',
entityId: message.id,
metadata: {
threadId,
to: data.to,
subject: data.subject,
accountId: data.accountId,
},
ipAddress: audit.ipAddress,
userAgent: audit.userAgent,
});
return { message, threadId };
}
/**
* System-path send. Uses port-config noreply identity + system SMTP from
* `lib/email/index.ts → sendEmail()`. Skips email_messages/email_threads
* writes (no IMAP roundtrip expected). When the email targets a document's
* signed PDF (attachments include the doc's signedFileId), logs a
* documentEvents row so the document detail timeline reflects the send.
*/
async function sendSystem(
portId: string,
data: ComposeEmailInput,
audit: AuditMeta,
): Promise<{ message: { id: 'system' }; threadId: null }> {
await assertAttachmentsForPort(data.attachments, portId);
await sendSystemEmail(
data.to,
data.subject,
data.bodyHtml,
undefined,
undefined,
portId,
data.attachments,
);
// If any attachment matches a document's signedFileId, log signed_doc_emailed.
if (data.attachments && data.attachments.length > 0) {
const fileIds = data.attachments.map((a) => a.fileId);
const matchingDocs = await db
.select({ id: documents.id, signedFileId: documents.signedFileId })
.from(documents)
.where(and(eq(documents.portId, portId), inArray(documents.signedFileId, fileIds)));
for (const doc of matchingDocs) {
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'signed_doc_emailed',
eventData: { recipients: data.to, subject: data.subject },
});
}
}
void createAuditLog({
userId: audit.userId,
portId: audit.portId,
action: 'create',
entityType: 'email_message',
entityId: 'system',
metadata: { senderType: 'system', to: data.to, subject: data.subject },
ipAddress: audit.ipAddress,
userAgent: audit.userAgent,
});
return { message: { id: 'system' }, threadId: null };
}