Files
pn-new-crm/src/lib/services/email-compose.service.ts
Matt 221ae5784e chore(autonomous-session): consolidate uncommitted work from prior session
Bundles the prior autonomous-session output that was sitting unstaged:

- Em-dash sweep across src/ + tests/ (en-dash/em-dash to hyphen, ~2280 instances)
- country-flag-icons rollout (CountryFlag component, replaces emoji glyphs that
  never rendered on Windows; lazy-loads the 3x2 SVG index as a single chunk
  after the per-subpath dynamic-import approach silently failed in webpack)
- Admin IA Phase 1+2: 7-domain regroup, 41 to 38 pages, /admin/berths index,
  redirects (ocr to ai, reports to dashboard, invitations to users),
  docs/admin-ia-proposal.md
- Per-template email tester (registry + endpoint + UI on Email admin page)
- Cancel-document mode picker (delete-from-Documenso vs keep-for-audit)
- Dashboard PDF report: 25 widgets, SVG charts, date-range picker, 11 resolvers
- Customize-widgets per-region sortables at xl+ (charts/rails/feed); single
  flat sortable below xl when the layout stacks; per-viewport saved orders
- Audit doc updates capturing each shipped item
- Lint fixes: react-compiler immutability in DonutChart (reduce instead of
  let-reassign), set-state-in-effect disables in CountryFlag and
  UploadForSigning preview-bytes effect, unused 'confirm' destructures in
  interest contract + reservation tabs, unescaped apostrophe in test-template
  card copy
2026-05-23 00:52:59 +02:00

330 lines
11 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import nodemailer from 'nodemailer';
import { and, eq, inArray, sql } from 'drizzle-orm';
import pLimit from 'p-limit';
import { db } from '@/lib/db';
import { emailAccounts, emailMessages, emailThreads } from '@/lib/db/schema/email';
import { documents, documentEvents, files } from '@/lib/db/schema/documents';
import { createAuditLog, type AuditMeta } from '@/lib/audit';
import { env } from '@/lib/env';
import { CodedError, NotFoundError, ForbiddenError } from '@/lib/errors';
import { logger } from '@/lib/logger';
import { getDecryptedCredentials } from '@/lib/services/email-accounts.service';
import { getPortEmailConfig } from '@/lib/services/port-config';
import { sendEmail as sendSystemEmail } from '@/lib/email';
import type { ComposeEmailInput } from '@/lib/validators/email';
// ─── Types ────────────────────────────────────────────────────────────────────
// ─── Helpers ──────────────────────────────────────────────────────────────────
async function assertAttachmentsForPort(
refs: { fileId: string }[] | undefined,
portId: string,
): Promise<void> {
if (!refs || refs.length === 0) return;
for (const r of refs) {
const file = await db.query.files.findFirst({ where: eq(files.id, r.fileId) });
if (!file) throw new NotFoundError('File');
if (file.portId !== portId) {
throw new ForbiddenError('File belongs to a different port');
}
}
}
// ─── Send Email ───────────────────────────────────────────────────────────────
export async function sendEmail(
userId: string,
portId: string,
data: ComposeEmailInput,
audit: AuditMeta,
) {
// System path: port-config noreply identity + system SMTP, with optional
// file attachments. Skips email_messages/email_threads writes; logs a
// documentEvents row when the body is keyed to a document via
// metadata.documentId at the API boundary.
if (data.senderType === 'system') {
return sendSystem(portId, data, audit);
}
if (!data.accountId) {
throw new ForbiddenError('accountId is required for user-path send');
}
// Personal-account sends are admin-gated per port.
const cfg = await getPortEmailConfig(portId);
if (!cfg.allowPersonalAccountSends) {
throw new ForbiddenError('Personal account sends are disabled for this port');
}
await assertAttachmentsForPort(data.attachments, portId);
// Verify the account belongs to the user
const account = await db.query.emailAccounts.findFirst({
where: and(eq(emailAccounts.id, data.accountId), eq(emailAccounts.userId, userId)),
});
if (!account) {
throw new NotFoundError('Email account');
}
if (account.portId !== portId) {
throw new ForbiddenError('Email account does not belong to this port');
}
// Decrypt credentials (INTERNAL - never logged or returned)
const creds = await getDecryptedCredentials(data.accountId);
// Build user-specific SMTP transporter. Same timeouts as the system
// transporter in src/lib/email/index.ts - without these a hung SMTP
// server holds the calling request for ~2min (Nodemailer's default
// connectionTimeout) and starves the documents/email worker slot.
const transporter = nodemailer.createTransport({
host: account.smtpHost,
port: account.smtpPort,
secure: account.smtpPort === 465,
connectionTimeout: 10_000,
greetingTimeout: 10_000,
socketTimeout: 30_000,
auth: { user: creds.username, pass: creds.password },
});
// Resolve threading headers if replying
let inReplyTo: string | undefined;
let references: string | undefined;
if (data.inReplyToMessageId) {
inReplyTo = data.inReplyToMessageId;
// Gather the full references chain from the thread
if (data.threadId) {
const existingMessages = await db
.select({ messageIdHeader: emailMessages.messageIdHeader })
.from(emailMessages)
.where(and(eq(emailMessages.threadId, data.threadId)))
.orderBy(emailMessages.sentAt);
const refIds = existingMessages.map((m) => m.messageIdHeader).filter(Boolean) as string[];
if (refIds.length > 0) {
references = refIds.join(' ');
}
}
}
// Resolve attachments for the user-path SMTP send. Cap concurrency so
// a user attaching 20 large files doesn't fan out 20 simultaneous
// S3/MinIO reads + 20 buffers in memory at once - bounded at 4 means
// peak memory tops out at ~4 × max-file-size irrespective of the
// attachment count.
const attachmentLimit = pLimit(4);
const resolvedAttachments = data.attachments
? await Promise.all(
data.attachments.map((ref) =>
attachmentLimit(async () => {
const file = await db.query.files.findFirst({
where: eq(files.id, ref.fileId),
});
if (!file) throw new NotFoundError('File');
const { getStorageBackend } = await import('@/lib/storage');
const stream = await (await getStorageBackend()).get(file.storagePath);
const chunks: Buffer[] = [];
for await (const chunk of stream) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
return {
filename: ref.filename ?? file.originalName,
content: Buffer.concat(chunks),
...(file.mimeType ? { contentType: file.mimeType } : {}),
};
}),
),
)
: undefined;
// Safety net: when EMAIL_REDIRECT_TO is set, every recipient is rerouted
// to that address and the subject is prefixed so the operator can see
// who would have received the message. This service builds its OWN
// transporter (per-account SMTP) so it doesn't go through sendEmail's
// redirect - we apply the same logic here.
const requestedTo = data.to.join(', ');
const requestedCc = data.cc?.join(', ');
const effectiveTo = env.EMAIL_REDIRECT_TO ?? requestedTo;
const effectiveCc = env.EMAIL_REDIRECT_TO ? undefined : requestedCc;
const effectiveSubject = env.EMAIL_REDIRECT_TO
? `[redirected from ${requestedTo}${requestedCc ? `, cc=${requestedCc}` : ''}] ${data.subject}`
: data.subject;
if (env.EMAIL_REDIRECT_TO) {
logger.info(
{
userId,
portId,
accountId: data.accountId,
originalTo: requestedTo,
originalCc: requestedCc ?? null,
redirectedTo: env.EMAIL_REDIRECT_TO,
},
'email-compose redirected to EMAIL_REDIRECT_TO',
);
}
// Send via the user's SMTP transporter
const info = await transporter.sendMail({
from: account.emailAddress,
to: effectiveTo,
cc: effectiveCc,
subject: effectiveSubject,
html: data.bodyHtml,
inReplyTo,
references,
...(resolvedAttachments && resolvedAttachments.length > 0
? { attachments: resolvedAttachments }
: {}),
});
const sentMessageId: string =
typeof info.messageId === 'string' ? info.messageId : String(info.messageId ?? '');
// Resolve or create thread
let threadId: string;
if (data.threadId) {
// Verify thread belongs to this port
const existingThread = await db.query.emailThreads.findFirst({
where: and(eq(emailThreads.id, data.threadId), eq(emailThreads.portId, portId)),
});
if (!existingThread) {
throw new NotFoundError('Email thread');
}
threadId = existingThread.id;
} else {
const newThreadRows = await db
.insert(emailThreads)
.values({
portId,
subject: data.subject,
lastMessageAt: new Date(),
messageCount: 0,
})
.returning();
const newThread = newThreadRows[0];
if (!newThread)
throw new CodedError('INSERT_RETURNING_EMPTY', {
internalMessage: 'Failed to create email thread',
});
threadId = newThread.id;
}
const now = new Date();
// Persist the outbound message
const messageRows = await db
.insert(emailMessages)
.values({
threadId,
messageIdHeader: sentMessageId || null,
fromAddress: account.emailAddress,
toAddresses: data.to,
ccAddresses: data.cc ?? null,
subject: data.subject,
bodyHtml: data.bodyHtml,
direction: 'outbound',
sentAt: now,
attachmentFileIds:
data.attachments && data.attachments.length > 0
? data.attachments.map((a) => a.fileId)
: null,
})
.returning();
const message = messageRows[0];
if (!message)
throw new CodedError('INSERT_RETURNING_EMPTY', {
internalMessage: 'Failed to persist outbound email message',
});
// Update thread metadata
await db
.update(emailThreads)
.set({
lastMessageAt: now,
messageCount: sql`${emailThreads.messageCount} + 1`,
updatedAt: now,
})
.where(eq(emailThreads.id, threadId));
void createAuditLog({
userId: audit.userId,
portId: audit.portId,
action: 'create',
entityType: 'email_message',
entityId: message.id,
metadata: {
threadId,
to: data.to,
subject: data.subject,
accountId: data.accountId,
},
ipAddress: audit.ipAddress,
userAgent: audit.userAgent,
});
return { message, threadId };
}
/**
* System-path send. Uses port-config noreply identity + system SMTP from
* `lib/email/index.ts → sendEmail()`. Skips email_messages/email_threads
* writes (no IMAP roundtrip expected). When the email targets a document's
* signed PDF (attachments include the doc's signedFileId), logs a
* documentEvents row so the document detail timeline reflects the send.
*/
async function sendSystem(
portId: string,
data: ComposeEmailInput,
audit: AuditMeta,
): Promise<{ message: { id: 'system' }; threadId: null }> {
await assertAttachmentsForPort(data.attachments, portId);
await sendSystemEmail(
data.to,
data.subject,
data.bodyHtml,
undefined,
undefined,
portId,
data.attachments,
);
// If any attachment matches a document's signedFileId, log signed_doc_emailed.
if (data.attachments && data.attachments.length > 0) {
const fileIds = data.attachments.map((a) => a.fileId);
const matchingDocs = await db
.select({ id: documents.id, signedFileId: documents.signedFileId })
.from(documents)
.where(and(eq(documents.portId, portId), inArray(documents.signedFileId, fileIds)));
for (const doc of matchingDocs) {
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'signed_doc_emailed',
eventData: { recipients: data.to, subject: data.subject },
});
}
}
void createAuditLog({
userId: audit.userId,
portId: audit.portId,
action: 'create',
entityType: 'email_message',
entityId: 'system',
metadata: { senderType: 'system', to: data.to, subject: data.subject },
ipAddress: audit.ipAddress,
userAgent: audit.userAgent,
});
return { message: { id: 'system' }, threadId: null };
}