feat(deps): adopt p-limit for unbounded mass-op fan-outs
Cap concurrency on two services that were fanning out unbounded requests to external systems: 1. email-compose.service.ts — attachment resolution. User attaches 20 files → 20 simultaneous S3/MinIO GETs + 20 buffers in heap. Now capped at 4 concurrent reads; peak memory bounded by 4 × max-attachment-size regardless of attachment count. 2. document-signing-emails.service.ts — sendSigningCompleted fanned out one SMTP send per recipient simultaneously. A Sales Contract with 10 recipients (client + 5 sellers + 4 witnesses) hit SMTP provider connection limits (Mailgun/SES/Postmark all cap concurrent connections in the single digits) and dropped overflow silently. Now capped at 3 concurrent sends. Both use `pLimit(N)` from the Sindre Sorhus suite — well-tested at scale, ~1kb gzip per service. Pattern is established for the remaining audit-flagged mass-op services (brochures, backup, GDPR export) to adopt as those files are touched. Verified: tsc clean, vitest 1293/1293 pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -81,6 +81,7 @@
|
|||||||
"next-themes": "^0.4.6",
|
"next-themes": "^0.4.6",
|
||||||
"nodemailer": "^8.0.7",
|
"nodemailer": "^8.0.7",
|
||||||
"openai": "^6.37.0",
|
"openai": "^6.37.0",
|
||||||
|
"p-limit": "^7.3.0",
|
||||||
"pdf-lib": "^1.17.1",
|
"pdf-lib": "^1.17.1",
|
||||||
"pdfkit": "^0.18.0",
|
"pdfkit": "^0.18.0",
|
||||||
"pino": "^10.3.1",
|
"pino": "^10.3.1",
|
||||||
|
|||||||
17
pnpm-lock.yaml
generated
17
pnpm-lock.yaml
generated
@@ -172,6 +172,9 @@ importers:
|
|||||||
openai:
|
openai:
|
||||||
specifier: ^6.37.0
|
specifier: ^6.37.0
|
||||||
version: 6.37.0(ws@8.18.3)(zod@4.4.3)
|
version: 6.37.0(ws@8.18.3)(zod@4.4.3)
|
||||||
|
p-limit:
|
||||||
|
specifier: ^7.3.0
|
||||||
|
version: 7.3.0
|
||||||
pdf-lib:
|
pdf-lib:
|
||||||
specifier: ^1.17.1
|
specifier: ^1.17.1
|
||||||
version: 1.17.1
|
version: 1.17.1
|
||||||
@@ -4276,6 +4279,10 @@ packages:
|
|||||||
resolution: {integrity: sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==}
|
resolution: {integrity: sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==}
|
||||||
engines: {node: '>=10'}
|
engines: {node: '>=10'}
|
||||||
|
|
||||||
|
p-limit@7.3.0:
|
||||||
|
resolution: {integrity: sha512-7cIXg/Z0M5WZRblrsOla88S4wAK+zOQQWeBYfV3qJuJXMr+LnbYjaadrFaS0JILfEDPVqHyKnZ1Z/1d6J9VVUw==}
|
||||||
|
engines: {node: '>=20'}
|
||||||
|
|
||||||
p-locate@5.0.0:
|
p-locate@5.0.0:
|
||||||
resolution: {integrity: sha512-LaNjtRWUBY++zB5nE/NwcaoMylSPk+S+ZHNB1TzdbMJMny6dynpAGt7X/tl/QYq3TIeE6nxHppbo2LGymrG5Pw==}
|
resolution: {integrity: sha512-LaNjtRWUBY++zB5nE/NwcaoMylSPk+S+ZHNB1TzdbMJMny6dynpAGt7X/tl/QYq3TIeE6nxHppbo2LGymrG5Pw==}
|
||||||
engines: {node: '>=10'}
|
engines: {node: '>=10'}
|
||||||
@@ -5447,6 +5454,10 @@ packages:
|
|||||||
resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==}
|
resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==}
|
||||||
engines: {node: '>=10'}
|
engines: {node: '>=10'}
|
||||||
|
|
||||||
|
yocto-queue@1.2.2:
|
||||||
|
resolution: {integrity: sha512-4LCcse/U2MHZ63HAJVE+v71o7yOdIe4cZ70Wpf8D/IyjDKYQLV5GD46B+hSTjJsvV5PztjvHoU580EftxjDZFQ==}
|
||||||
|
engines: {node: '>=12.20'}
|
||||||
|
|
||||||
yoctocolors@2.1.2:
|
yoctocolors@2.1.2:
|
||||||
resolution: {integrity: sha512-CzhO+pFNo8ajLM2d2IW/R93ipy99LWjtwblvC1RsoSUMZgyLbYFr221TnSNT7GjGdYui6P459mw9JH/g/zW2ug==}
|
resolution: {integrity: sha512-CzhO+pFNo8ajLM2d2IW/R93ipy99LWjtwblvC1RsoSUMZgyLbYFr221TnSNT7GjGdYui6P459mw9JH/g/zW2ug==}
|
||||||
engines: {node: '>=18'}
|
engines: {node: '>=18'}
|
||||||
@@ -9330,6 +9341,10 @@ snapshots:
|
|||||||
dependencies:
|
dependencies:
|
||||||
yocto-queue: 0.1.0
|
yocto-queue: 0.1.0
|
||||||
|
|
||||||
|
p-limit@7.3.0:
|
||||||
|
dependencies:
|
||||||
|
yocto-queue: 1.2.2
|
||||||
|
|
||||||
p-locate@5.0.0:
|
p-locate@5.0.0:
|
||||||
dependencies:
|
dependencies:
|
||||||
p-limit: 3.1.0
|
p-limit: 3.1.0
|
||||||
@@ -10659,6 +10674,8 @@ snapshots:
|
|||||||
|
|
||||||
yocto-queue@0.1.0: {}
|
yocto-queue@0.1.0: {}
|
||||||
|
|
||||||
|
yocto-queue@1.2.2: {}
|
||||||
|
|
||||||
yoctocolors@2.1.2: {}
|
yoctocolors@2.1.2: {}
|
||||||
|
|
||||||
zip-stream@6.0.1:
|
zip-stream@6.0.1:
|
||||||
|
|||||||
@@ -28,6 +28,8 @@
|
|||||||
* port (single-tenant deploys can keep using Documenso's hosted UI).
|
* port (single-tenant deploys can keep using Documenso's hosted UI).
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
import pLimit from 'p-limit';
|
||||||
|
|
||||||
import { sendEmail } from '@/lib/email';
|
import { sendEmail } from '@/lib/email';
|
||||||
import { getBrandingShell } from '@/lib/email/branding-resolver';
|
import { getBrandingShell } from '@/lib/email/branding-resolver';
|
||||||
import {
|
import {
|
||||||
@@ -231,33 +233,41 @@ export async function sendSigningReminder(args: SigningReminderArgs): Promise<vo
|
|||||||
export async function sendSigningCompleted(args: SigningCompletedArgs): Promise<void> {
|
export async function sendSigningCompleted(args: SigningCompletedArgs): Promise<void> {
|
||||||
const branding = await getBrandingShell(args.portId);
|
const branding = await getBrandingShell(args.portId);
|
||||||
|
|
||||||
|
// Cap concurrency at 3: a Sales Contract with 10 recipients (client +
|
||||||
|
// 5 sellers + 4 witnesses) shouldn't fan out 10 simultaneous SMTP
|
||||||
|
// sends. Most SMTP providers (Mailgun, SES, Postmark) cap concurrent
|
||||||
|
// connections in the single digits and silently drop the overflow.
|
||||||
|
const sendLimit = pLimit(3);
|
||||||
|
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
args.recipients.map(async (recipient) => {
|
args.recipients.map((recipient) =>
|
||||||
const { subject, html, text } = signingCompletedEmail(
|
sendLimit(async () => {
|
||||||
{
|
const { subject, html, text } = signingCompletedEmail(
|
||||||
recipientName: recipient.name,
|
{
|
||||||
documentLabel: args.documentLabel,
|
recipientName: recipient.name,
|
||||||
clientName: args.clientName,
|
documentLabel: args.documentLabel,
|
||||||
portName: args.portName,
|
clientName: args.clientName,
|
||||||
completedAt: args.completedAt,
|
portName: args.portName,
|
||||||
},
|
completedAt: args.completedAt,
|
||||||
{ branding },
|
},
|
||||||
);
|
{ branding },
|
||||||
try {
|
|
||||||
await sendEmail(recipient.email, subject, html, undefined, text, args.portId, [
|
|
||||||
{ fileId: args.signedPdfFileId, filename: args.signedPdfFilename },
|
|
||||||
]);
|
|
||||||
logger.info(
|
|
||||||
{ portId: args.portId, recipient: recipient.email, documentLabel: args.documentLabel },
|
|
||||||
'Signing-completed email sent',
|
|
||||||
);
|
);
|
||||||
} catch (err) {
|
try {
|
||||||
logger.error(
|
await sendEmail(recipient.email, subject, html, undefined, text, args.portId, [
|
||||||
{ err, portId: args.portId, recipient: recipient.email },
|
{ fileId: args.signedPdfFileId, filename: args.signedPdfFilename },
|
||||||
'Signing-completed email send failed',
|
]);
|
||||||
);
|
logger.info(
|
||||||
// Don't throw — sending to one recipient shouldn't block the others.
|
{ portId: args.portId, recipient: recipient.email, documentLabel: args.documentLabel },
|
||||||
}
|
'Signing-completed email sent',
|
||||||
}),
|
);
|
||||||
|
} catch (err) {
|
||||||
|
logger.error(
|
||||||
|
{ err, portId: args.portId, recipient: recipient.email },
|
||||||
|
'Signing-completed email send failed',
|
||||||
|
);
|
||||||
|
// Don't throw — sending to one recipient shouldn't block the others.
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import nodemailer from 'nodemailer';
|
import nodemailer from 'nodemailer';
|
||||||
import { and, eq, inArray, sql } from 'drizzle-orm';
|
import { and, eq, inArray, sql } from 'drizzle-orm';
|
||||||
|
import pLimit from 'p-limit';
|
||||||
|
|
||||||
import { db } from '@/lib/db';
|
import { db } from '@/lib/db';
|
||||||
import { emailAccounts, emailMessages, emailThreads } from '@/lib/db/schema/email';
|
import { emailAccounts, emailMessages, emailThreads } from '@/lib/db/schema/email';
|
||||||
@@ -112,26 +113,33 @@ export async function sendEmail(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resolve attachments for the user-path SMTP send.
|
// Resolve attachments for the user-path SMTP send. Cap concurrency so
|
||||||
|
// a user attaching 20 large files doesn't fan out 20 simultaneous
|
||||||
|
// S3/MinIO reads + 20 buffers in memory at once — bounded at 4 means
|
||||||
|
// peak memory tops out at ~4 × max-file-size irrespective of the
|
||||||
|
// attachment count.
|
||||||
|
const attachmentLimit = pLimit(4);
|
||||||
const resolvedAttachments = data.attachments
|
const resolvedAttachments = data.attachments
|
||||||
? await Promise.all(
|
? await Promise.all(
|
||||||
data.attachments.map(async (ref) => {
|
data.attachments.map((ref) =>
|
||||||
const file = await db.query.files.findFirst({
|
attachmentLimit(async () => {
|
||||||
where: eq(files.id, ref.fileId),
|
const file = await db.query.files.findFirst({
|
||||||
});
|
where: eq(files.id, ref.fileId),
|
||||||
if (!file) throw new NotFoundError('File');
|
});
|
||||||
const { getStorageBackend } = await import('@/lib/storage');
|
if (!file) throw new NotFoundError('File');
|
||||||
const stream = await (await getStorageBackend()).get(file.storagePath);
|
const { getStorageBackend } = await import('@/lib/storage');
|
||||||
const chunks: Buffer[] = [];
|
const stream = await (await getStorageBackend()).get(file.storagePath);
|
||||||
for await (const chunk of stream) {
|
const chunks: Buffer[] = [];
|
||||||
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
|
for await (const chunk of stream) {
|
||||||
}
|
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
|
||||||
return {
|
}
|
||||||
filename: ref.filename ?? file.originalName,
|
return {
|
||||||
content: Buffer.concat(chunks),
|
filename: ref.filename ?? file.originalName,
|
||||||
...(file.mimeType ? { contentType: file.mimeType } : {}),
|
content: Buffer.concat(chunks),
|
||||||
};
|
...(file.mimeType ? { contentType: file.mimeType } : {}),
|
||||||
}),
|
};
|
||||||
|
}),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
: undefined;
|
: undefined;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user