A focused review of every external integration surfaced six issues the
original audit missed. Fixed here.
HIGH
* Socket.IO had an unconditional 30-second idle disconnect on every
socket. The comment on the line acknowledged it was "for development
only, would be longer in prod" but no NODE_ENV guard existed, and the
`socket.onAny` listener only resets on inbound client events — every
dashboard connection that received only server-push events would have
been torn down every 30s in production. Removed the manual idle
timer entirely; Socket.IO's pingTimeout / pingInterval handles
dead-transport detection at the protocol level.
* SMTP transporters had no `connectionTimeout` / `greetingTimeout` /
`socketTimeout`. Nodemailer's defaults are 2 minutes for connect
and unlimited for socket — a hung SMTP server would have held a
BullMQ `email` worker concurrency slot for up to 10 min per job
(5 retries × 2 min). Set 10s/10s/30s on both the system transporter
in `src/lib/email/index.ts` and the user-account transporter in
`email-compose.service.ts`.
MEDIUM
* PostgreSQL pool had no `statement_timeout` /
`idle_in_transaction_session_timeout`. A slow query or transaction
held by a crashed handler would have eventually exhausted the
20-connection pool. 30s statement cap, 10s idle-in-tx cap, plus
`max_lifetime: 30min` to recycle connections.
* `umami_password` and `umami_api_token` were stored as plaintext in
`system_settings` (the SMTP and S3 secret paths use AES-GCM). The
reader now passes them through `readSecret()` which auto-detects
the encrypted `iv:cipher:tag` shape and decrypts, falling back to
legacy plaintext so operators can rotate without a flag-day.
* AI email-draft worker interpolated `additionalInstructions` (user-
controlled) directly into the OpenAI prompt — a hostile rep could
close the instructions block and inject prompt directives that
override the system prompt. Added `sanitizeForPrompt()` that
strips newlines + quote chars, caps at 500 chars, and the prompt
now wraps the value in a "treat as data not commands" preamble.
LOW
* Legacy `ensureBucket()` in `src/lib/minio/index.ts` was unguarded —
if any future code imported it (currently no callers), a misconfigured
prod deploy could mint a fresh empty bucket. Now matches the gate
used by the pluggable S3Backend (`MINIO_AUTO_CREATE_BUCKET=true`
required) so the legacy export and the new pluggable path agree.
Confirmed not-an-issue: BullMQ Workers create connections via
`{ url }` options object, and BullMQ sets `maxRetriesPerRequest: null`
internally for those — no fix needed. The shared `redis` singleton
that does keep `maxRetriesPerRequest: 3` is used only for direct
Redis ops (rate-limit sliding window, etc.), never for blocking
BullMQ commands, so the value is correct there.
Test status: 1175/1175 vitest, tsc clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
322 lines
11 KiB
TypeScript
322 lines
11 KiB
TypeScript
import nodemailer from 'nodemailer';
|
|
import { and, eq, sql } from 'drizzle-orm';
|
|
|
|
import { db } from '@/lib/db';
|
|
import { emailAccounts, emailMessages, emailThreads } from '@/lib/db/schema/email';
|
|
import { documents, documentEvents, files } from '@/lib/db/schema/documents';
|
|
import { createAuditLog, type AuditMeta } from '@/lib/audit';
|
|
import { env } from '@/lib/env';
|
|
import { CodedError, NotFoundError, ForbiddenError } from '@/lib/errors';
|
|
import { logger } from '@/lib/logger';
|
|
import { getDecryptedCredentials } from '@/lib/services/email-accounts.service';
|
|
import { getPortEmailConfig } from '@/lib/services/port-config';
|
|
import { sendEmail as sendSystemEmail } from '@/lib/email';
|
|
import type { ComposeEmailInput } from '@/lib/validators/email';
|
|
|
|
// ─── Types ────────────────────────────────────────────────────────────────────
|
|
|
|
// ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
async function assertAttachmentsForPort(
|
|
refs: { fileId: string }[] | undefined,
|
|
portId: string,
|
|
): Promise<void> {
|
|
if (!refs || refs.length === 0) return;
|
|
for (const r of refs) {
|
|
const file = await db.query.files.findFirst({ where: eq(files.id, r.fileId) });
|
|
if (!file) throw new NotFoundError('File');
|
|
if (file.portId !== portId) {
|
|
throw new ForbiddenError('File belongs to a different port');
|
|
}
|
|
}
|
|
}
|
|
|
|
// ─── Send Email ───────────────────────────────────────────────────────────────
|
|
|
|
export async function sendEmail(
|
|
userId: string,
|
|
portId: string,
|
|
data: ComposeEmailInput,
|
|
audit: AuditMeta,
|
|
) {
|
|
// System path: port-config noreply identity + system SMTP, with optional
|
|
// file attachments. Skips email_messages/email_threads writes; logs a
|
|
// documentEvents row when the body is keyed to a document via
|
|
// metadata.documentId at the API boundary.
|
|
if (data.senderType === 'system') {
|
|
return sendSystem(portId, data, audit);
|
|
}
|
|
|
|
if (!data.accountId) {
|
|
throw new ForbiddenError('accountId is required for user-path send');
|
|
}
|
|
|
|
// Personal-account sends are admin-gated per port.
|
|
const cfg = await getPortEmailConfig(portId);
|
|
if (!cfg.allowPersonalAccountSends) {
|
|
throw new ForbiddenError('Personal account sends are disabled for this port');
|
|
}
|
|
|
|
await assertAttachmentsForPort(data.attachments, portId);
|
|
|
|
// Verify the account belongs to the user
|
|
const account = await db.query.emailAccounts.findFirst({
|
|
where: and(eq(emailAccounts.id, data.accountId), eq(emailAccounts.userId, userId)),
|
|
});
|
|
|
|
if (!account) {
|
|
throw new NotFoundError('Email account');
|
|
}
|
|
|
|
if (account.portId !== portId) {
|
|
throw new ForbiddenError('Email account does not belong to this port');
|
|
}
|
|
|
|
// Decrypt credentials (INTERNAL - never logged or returned)
|
|
const creds = await getDecryptedCredentials(data.accountId);
|
|
|
|
// Build user-specific SMTP transporter. Same timeouts as the system
|
|
// transporter in src/lib/email/index.ts — without these a hung SMTP
|
|
// server holds the calling request for ~2min (Nodemailer's default
|
|
// connectionTimeout) and starves the documents/email worker slot.
|
|
const transporter = nodemailer.createTransport({
|
|
host: account.smtpHost,
|
|
port: account.smtpPort,
|
|
secure: account.smtpPort === 465,
|
|
connectionTimeout: 10_000,
|
|
greetingTimeout: 10_000,
|
|
socketTimeout: 30_000,
|
|
auth: { user: creds.username, pass: creds.password },
|
|
});
|
|
|
|
// Resolve threading headers if replying
|
|
let inReplyTo: string | undefined;
|
|
let references: string | undefined;
|
|
|
|
if (data.inReplyToMessageId) {
|
|
inReplyTo = data.inReplyToMessageId;
|
|
|
|
// Gather the full references chain from the thread
|
|
if (data.threadId) {
|
|
const existingMessages = await db
|
|
.select({ messageIdHeader: emailMessages.messageIdHeader })
|
|
.from(emailMessages)
|
|
.where(and(eq(emailMessages.threadId, data.threadId)))
|
|
.orderBy(emailMessages.sentAt);
|
|
|
|
const refIds = existingMessages.map((m) => m.messageIdHeader).filter(Boolean) as string[];
|
|
|
|
if (refIds.length > 0) {
|
|
references = refIds.join(' ');
|
|
}
|
|
}
|
|
}
|
|
|
|
// Resolve attachments for the user-path SMTP send.
|
|
const resolvedAttachments = data.attachments
|
|
? await Promise.all(
|
|
data.attachments.map(async (ref) => {
|
|
const file = await db.query.files.findFirst({
|
|
where: eq(files.id, ref.fileId),
|
|
});
|
|
if (!file) throw new NotFoundError('File');
|
|
const { getStorageBackend } = await import('@/lib/storage');
|
|
const stream = await (await getStorageBackend()).get(file.storagePath);
|
|
const chunks: Buffer[] = [];
|
|
for await (const chunk of stream) {
|
|
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
|
|
}
|
|
return {
|
|
filename: ref.filename ?? file.originalName,
|
|
content: Buffer.concat(chunks),
|
|
...(file.mimeType ? { contentType: file.mimeType } : {}),
|
|
};
|
|
}),
|
|
)
|
|
: undefined;
|
|
|
|
// Safety net: when EMAIL_REDIRECT_TO is set, every recipient is rerouted
|
|
// to that address and the subject is prefixed so the operator can see
|
|
// who would have received the message. This service builds its OWN
|
|
// transporter (per-account SMTP) so it doesn't go through sendEmail's
|
|
// redirect - we apply the same logic here.
|
|
const requestedTo = data.to.join(', ');
|
|
const requestedCc = data.cc?.join(', ');
|
|
const effectiveTo = env.EMAIL_REDIRECT_TO ?? requestedTo;
|
|
const effectiveCc = env.EMAIL_REDIRECT_TO ? undefined : requestedCc;
|
|
const effectiveSubject = env.EMAIL_REDIRECT_TO
|
|
? `[redirected from ${requestedTo}${requestedCc ? `, cc=${requestedCc}` : ''}] ${data.subject}`
|
|
: data.subject;
|
|
if (env.EMAIL_REDIRECT_TO) {
|
|
logger.info(
|
|
{
|
|
userId,
|
|
portId,
|
|
accountId: data.accountId,
|
|
originalTo: requestedTo,
|
|
originalCc: requestedCc ?? null,
|
|
redirectedTo: env.EMAIL_REDIRECT_TO,
|
|
},
|
|
'email-compose redirected to EMAIL_REDIRECT_TO',
|
|
);
|
|
}
|
|
|
|
// Send via the user's SMTP transporter
|
|
const info = await transporter.sendMail({
|
|
from: account.emailAddress,
|
|
to: effectiveTo,
|
|
cc: effectiveCc,
|
|
subject: effectiveSubject,
|
|
html: data.bodyHtml,
|
|
inReplyTo,
|
|
references,
|
|
...(resolvedAttachments && resolvedAttachments.length > 0
|
|
? { attachments: resolvedAttachments }
|
|
: {}),
|
|
});
|
|
|
|
const sentMessageId: string =
|
|
typeof info.messageId === 'string' ? info.messageId : String(info.messageId ?? '');
|
|
|
|
// Resolve or create thread
|
|
let threadId: string;
|
|
|
|
if (data.threadId) {
|
|
// Verify thread belongs to this port
|
|
const existingThread = await db.query.emailThreads.findFirst({
|
|
where: and(eq(emailThreads.id, data.threadId), eq(emailThreads.portId, portId)),
|
|
});
|
|
if (!existingThread) {
|
|
throw new NotFoundError('Email thread');
|
|
}
|
|
threadId = existingThread.id;
|
|
} else {
|
|
const newThreadRows = await db
|
|
.insert(emailThreads)
|
|
.values({
|
|
portId,
|
|
subject: data.subject,
|
|
lastMessageAt: new Date(),
|
|
messageCount: 0,
|
|
})
|
|
.returning();
|
|
const newThread = newThreadRows[0];
|
|
if (!newThread)
|
|
throw new CodedError('INSERT_RETURNING_EMPTY', {
|
|
internalMessage: 'Failed to create email thread',
|
|
});
|
|
threadId = newThread.id;
|
|
}
|
|
|
|
const now = new Date();
|
|
|
|
// Persist the outbound message
|
|
const messageRows = await db
|
|
.insert(emailMessages)
|
|
.values({
|
|
threadId,
|
|
messageIdHeader: sentMessageId || null,
|
|
fromAddress: account.emailAddress,
|
|
toAddresses: data.to,
|
|
ccAddresses: data.cc ?? null,
|
|
subject: data.subject,
|
|
bodyHtml: data.bodyHtml,
|
|
direction: 'outbound',
|
|
sentAt: now,
|
|
attachmentFileIds:
|
|
data.attachments && data.attachments.length > 0
|
|
? data.attachments.map((a) => a.fileId)
|
|
: null,
|
|
})
|
|
.returning();
|
|
|
|
const message = messageRows[0];
|
|
if (!message)
|
|
throw new CodedError('INSERT_RETURNING_EMPTY', {
|
|
internalMessage: 'Failed to persist outbound email message',
|
|
});
|
|
|
|
// Update thread metadata
|
|
await db
|
|
.update(emailThreads)
|
|
.set({
|
|
lastMessageAt: now,
|
|
messageCount: sql`${emailThreads.messageCount} + 1`,
|
|
updatedAt: now,
|
|
})
|
|
.where(eq(emailThreads.id, threadId));
|
|
|
|
void createAuditLog({
|
|
userId: audit.userId,
|
|
portId: audit.portId,
|
|
action: 'create',
|
|
entityType: 'email_message',
|
|
entityId: message.id,
|
|
metadata: {
|
|
threadId,
|
|
to: data.to,
|
|
subject: data.subject,
|
|
accountId: data.accountId,
|
|
},
|
|
ipAddress: audit.ipAddress,
|
|
userAgent: audit.userAgent,
|
|
});
|
|
|
|
return { message, threadId };
|
|
}
|
|
|
|
/**
|
|
* System-path send. Uses port-config noreply identity + system SMTP from
|
|
* `lib/email/index.ts → sendEmail()`. Skips email_messages/email_threads
|
|
* writes (no IMAP roundtrip expected). When the email targets a document's
|
|
* signed PDF (attachments include the doc's signedFileId), logs a
|
|
* documentEvents row so the document detail timeline reflects the send.
|
|
*/
|
|
async function sendSystem(
|
|
portId: string,
|
|
data: ComposeEmailInput,
|
|
audit: AuditMeta,
|
|
): Promise<{ message: { id: 'system' }; threadId: null }> {
|
|
await assertAttachmentsForPort(data.attachments, portId);
|
|
|
|
await sendSystemEmail(
|
|
data.to,
|
|
data.subject,
|
|
data.bodyHtml,
|
|
undefined,
|
|
undefined,
|
|
portId,
|
|
data.attachments,
|
|
);
|
|
|
|
// If any attachment matches a document's signedFileId, log signed_doc_emailed.
|
|
if (data.attachments && data.attachments.length > 0) {
|
|
const fileIds = data.attachments.map((a) => a.fileId);
|
|
const matchingDocs = await db
|
|
.select({ id: documents.id, signedFileId: documents.signedFileId })
|
|
.from(documents)
|
|
.where(and(eq(documents.portId, portId), sql`${documents.signedFileId} = ANY(${fileIds})`));
|
|
|
|
for (const doc of matchingDocs) {
|
|
await db.insert(documentEvents).values({
|
|
documentId: doc.id,
|
|
eventType: 'signed_doc_emailed',
|
|
eventData: { recipients: data.to, subject: data.subject },
|
|
});
|
|
}
|
|
}
|
|
|
|
void createAuditLog({
|
|
userId: audit.userId,
|
|
portId: audit.portId,
|
|
action: 'create',
|
|
entityType: 'email_message',
|
|
entityId: 'system',
|
|
metadata: { senderType: 'system', to: data.to, subject: data.subject },
|
|
ipAddress: audit.ipAddress,
|
|
userAgent: audit.userAgent,
|
|
});
|
|
|
|
return { message: { id: 'system' }, threadId: null };
|
|
}
|