perf(services): p-limit fan-outs on berth-pdf, custom-fields, notifications
Phase 6 — bounds three remaining unbounded Promise.all fan-outs that the audit flagged as potential prod-incident vectors. Same pattern proven by email-compose (4 concurrent S3 reads) and document-signing-emails (3 concurrent SMTP sends) in earlier commits. berth-pdf.service.ts:574 — presignDownload S3 round-trips bound: pLimit(8). A 20-version berth used to issue 20 simultaneous presigns. ~1× round-trip latency preserved on typical 5-15-version berths; pathological 100-version case no longer saturates the keep-alive pool. custom-fields.service.ts:327 — pg upserts on bulk field-value writes bound: pLimit(8). Port admin stacking 50+ field definitions on one client would have burst 50 concurrent upserts at the pg pool. notifications.service.ts:344 — createNotification fan-out across watchers bound: pLimit(8). Hot pipeline items can accumulate many watchers; a document event used to fan out N notification inserts + N socket emits in one burst. Audit also flagged brochures.service.ts and backup.service.ts as candidates — verified neither actually has an unbounded fan-out, just sequential queries. No change needed; speculative entries removed from BACKLOG implicitly. 1298/1298 vitest green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
|
||||
import { createHash } from 'node:crypto';
|
||||
|
||||
import pLimit from 'p-limit';
|
||||
import { and, desc, eq, isNull, max, sql } from 'drizzle-orm';
|
||||
|
||||
import { db } from '@/lib/db';
|
||||
@@ -568,16 +569,21 @@ export async function listBerthPdfVersions(
|
||||
.orderBy(desc(berthPdfVersions.versionNumber));
|
||||
|
||||
const backend = await getStorageBackend();
|
||||
// Presign in parallel — for an S3 backend each call is a separate HTTP
|
||||
// round-trip, so a 20-version berth used to take 20× the latency in
|
||||
// the sequential loop. Promise.all collapses to ~1× round-trip.
|
||||
// Presign with bounded concurrency — for an S3 backend each call is
|
||||
// a separate HTTP round-trip. 8 in flight at once keeps the latency
|
||||
// close to ~1× round-trip on typical 5-15-version berths while
|
||||
// preventing a 100-version pathological case from saturating the
|
||||
// S3 client's keep-alive pool.
|
||||
const presignLimit = pLimit(8);
|
||||
const presigned = await Promise.all(
|
||||
rows.map((row) =>
|
||||
backend.presignDownload(row.storageKey, {
|
||||
expirySeconds: 900,
|
||||
filename: row.fileName,
|
||||
contentType: 'application/pdf',
|
||||
}),
|
||||
presignLimit(() =>
|
||||
backend.presignDownload(row.storageKey, {
|
||||
expirySeconds: 900,
|
||||
filename: row.fileName,
|
||||
contentType: 'application/pdf',
|
||||
}),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import pLimit from 'p-limit';
|
||||
import { and, eq, count } from 'drizzle-orm';
|
||||
|
||||
import { db } from '@/lib/db';
|
||||
@@ -323,27 +324,33 @@ export async function setValues(
|
||||
}
|
||||
}
|
||||
|
||||
// Upsert all values
|
||||
// Upsert all values with bounded concurrency. Custom-field sets are
|
||||
// typically 5-15 fields, but a port admin could stack 50+ definitions
|
||||
// on a single client. Cap at 8 in flight so the pg pool isn't swamped
|
||||
// on a bulk-update fan-out.
|
||||
const upsertLimit = pLimit(8);
|
||||
const results = await Promise.all(
|
||||
values.map(async ({ fieldId, value }) => {
|
||||
const [upserted] = await db
|
||||
.insert(customFieldValues)
|
||||
.values({
|
||||
fieldId,
|
||||
entityId,
|
||||
value: value as Record<string, unknown>,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: [customFieldValues.fieldId, customFieldValues.entityId],
|
||||
set: {
|
||||
values.map(({ fieldId, value }) =>
|
||||
upsertLimit(async () => {
|
||||
const [upserted] = await db
|
||||
.insert(customFieldValues)
|
||||
.values({
|
||||
fieldId,
|
||||
entityId,
|
||||
value: value as Record<string, unknown>,
|
||||
updatedAt: new Date(),
|
||||
},
|
||||
})
|
||||
.returning();
|
||||
return upserted;
|
||||
}),
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: [customFieldValues.fieldId, customFieldValues.entityId],
|
||||
set: {
|
||||
value: value as Record<string, unknown>,
|
||||
updatedAt: new Date(),
|
||||
},
|
||||
})
|
||||
.returning();
|
||||
return upserted;
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
void createAuditLog({
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import pLimit from 'p-limit';
|
||||
import { and, count, eq, gt, sql } from 'drizzle-orm';
|
||||
|
||||
import { db } from '@/lib/db';
|
||||
@@ -341,19 +342,26 @@ export async function notifyDocumentEvent(
|
||||
const title = DOCUMENT_EVENT_TITLES[eventType];
|
||||
const notifType = DOCUMENT_EVENT_NOTIF_TYPES[eventType];
|
||||
|
||||
// Cap the user fan-out — a document with many watchers (rare but
|
||||
// possible on hot pipeline items) would otherwise issue an unbounded
|
||||
// burst of notification inserts + socket emits. 8 in flight keeps
|
||||
// peak DB writes bounded and emits steady on the socket bus.
|
||||
const notifyLimit = pLimit(8);
|
||||
await Promise.all(
|
||||
Array.from(recipientIds).map((userId) =>
|
||||
createNotification({
|
||||
portId: doc.portId,
|
||||
userId,
|
||||
type: notifType,
|
||||
title,
|
||||
description: `"${doc.title}"`,
|
||||
link: `/documents/${doc.id}`,
|
||||
entityType: 'document',
|
||||
entityId: doc.id,
|
||||
dedupeKey: `document:${doc.id}:${eventType}`,
|
||||
}),
|
||||
notifyLimit(() =>
|
||||
createNotification({
|
||||
portId: doc.portId,
|
||||
userId,
|
||||
type: notifType,
|
||||
title,
|
||||
description: `"${doc.title}"`,
|
||||
link: `/documents/${doc.id}`,
|
||||
entityType: 'document',
|
||||
entityId: doc.id,
|
||||
dedupeKey: `document:${doc.id}:${eventType}`,
|
||||
}),
|
||||
),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user