perf(audit-tier-3): bulk-fetch the five hot N+1 loops

Replaces per-row fan-out with grouped queries / inArray pre-fetches
across the five dashboard + cron hotspots flagged in the audit
(MED §13 / HIGH §11–14):

* reminders.processFollowUpReminders — was 3 round trips per
  enabled-and-due interest.  Now: filter in JS, single clients
  bulk-fetch, single reminders bulk-insert, single interests
  bulk-update, one summary socket emit.  1k due interests: 6 round
  trips total instead of 3000+.
* portal.getClientInvoices — was a full-table scan filtered in JS.
  Now an inArray push-down on lower(billingEmail) + defensive
  limit(100).  After 12mo this would have been the worst portal
  endpoint.
* interest-scoring.calculateBulkScores — was 6N round trips
  (1 redis + 1 findFirst + 4 counts per interest).  Now 4 grouped
  count queries on the port's interest set + a single redis pipeline
  to refresh the cache.  1k interests: ~7 round trips.
* document-reminders.processReminderQueue — was 5N round trips per
  cron tick (port + template + lastReminder + pendingSigners + send
  per doc).  Now hoists port + per-type template map + grouped
  lastReminder + bulk pendingSigners; per-row work collapses to a
  Map.get and the documenso send.  500 docs: ~7 round trips.
* inquiry-notifications.sendInquiryNotifications — was sequential
  createNotification + emailQueue.add per recipient inside a public
  POST.  Now Promise.all'd; a 20-user port stops blocking the public
  inquiry POST on ~80 round trips.

Test status: 1168/1168 vitest, tsc clean.

Refs: docs/audit-comprehensive-2026-05-05.md HIGH §§11–14 (auditor-I
Issues 1–4) + MED §13 (auditor-I Issue 5).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Matt Ciaccio
2026-05-05 20:41:23 +02:00
parent d3a6a9beef
commit 7854cbabe4
5 changed files with 351 additions and 64 deletions

View File

@@ -185,10 +185,26 @@ export async function sendReminderIfAllowed(
* (override or template) is set, then attempts auto-fire on each. * (override or template) is set, then attempts auto-fire on each.
* `interests.reminderEnabled` is no longer part of the gating - per-doc * `interests.reminderEnabled` is no longer part of the gating - per-doc
* `remindersDisabled` is the kill switch instead. * `remindersDisabled` is the kill switch instead.
*
* Performance: the pre-bulk version called `sendReminderIfAllowed` per
* doc, which re-fetched the port row (invariant), the template-by-type
* map (repeats heavily), the last reminder event, and the pending
* signers — 5×N round trips per cron tick. This implementation hoists
* the invariants out of the loop and turns the per-row queries into
* grouped scans (one per dimension), so a port with 500 in-flight docs
* is now ~7 round trips total instead of ~2500.
*/ */
export async function processReminderQueue(portId: string): Promise<void> { export async function processReminderQueue(portId: string): Promise<void> {
const activeDocs = await db const activeDocs = await db
.select({ id: documents.id }) .select({
id: documents.id,
documentType: documents.documentType,
documensoId: documents.documensoId,
status: documents.status,
remindersDisabled: documents.remindersDisabled,
reminderCadenceOverride: documents.reminderCadenceOverride,
fileId: documents.fileId,
})
.from(documents) .from(documents)
.leftJoin(documentTemplates, eq(documentTemplates.templateType, documents.documentType)) .leftJoin(documentTemplates, eq(documentTemplates.templateType, documents.documentType))
.where( .where(
@@ -201,9 +217,95 @@ export async function processReminderQueue(portId: string): Promise<void> {
), ),
); );
if (activeDocs.length === 0) return;
// Hoist invariants out of the per-doc loop ────────────────────────────────
// (1) Port row (timezone) — invariant across the whole batch.
const port = await db.query.ports.findFirst({ where: eq(ports.id, portId) });
const timezone = port?.timezone ?? 'UTC';
const currentHour = getCurrentHourInTimezone(timezone);
if (currentHour < 9 || currentHour >= 16) {
// Outside the 9-16 window — nothing to do this tick.
return;
}
// (2) Per-type template cadence map — repeats per documentType.
const distinctTypes = Array.from(new Set(activeDocs.map((d) => d.documentType)));
const templateRows = await db
.select({
templateType: documentTemplates.templateType,
reminderCadenceDays: documentTemplates.reminderCadenceDays,
})
.from(documentTemplates)
.where(
and(
eq(documentTemplates.portId, portId),
inArray(documentTemplates.templateType, distinctTypes),
),
);
const templateCadenceByType = new Map(
templateRows.map((r) => [r.templateType, r.reminderCadenceDays ?? null]),
);
// (3) Latest reminder_sent event per doc — one grouped query.
const docIds = activeDocs.map((d) => d.id);
const lastReminderRows = await db
.select({
documentId: documentEvents.documentId,
lastAt: sql<Date>`max(${documentEvents.createdAt})`,
})
.from(documentEvents)
.where(
and(
inArray(documentEvents.documentId, docIds),
eq(documentEvents.eventType, 'reminder_sent'),
),
)
.groupBy(documentEvents.documentId);
const lastReminderByDoc = new Map(lastReminderRows.map((r) => [r.documentId, r.lastAt]));
// (4) Pending signers per doc — one inArray scan.
const pendingSignerRows = await db
.select()
.from(documentSigners)
.where(and(inArray(documentSigners.documentId, docIds), eq(documentSigners.status, 'pending')))
.orderBy(sql`${documentSigners.signingOrder} ASC`);
const pendingByDoc = new Map<string, typeof pendingSignerRows>();
for (const row of pendingSignerRows) {
const arr = pendingByDoc.get(row.documentId) ?? [];
arr.push(row);
pendingByDoc.set(row.documentId, arr);
}
// Per-doc fire — at this point every per-row query is a Map.get.
for (const doc of activeDocs) { for (const doc of activeDocs) {
try { try {
await sendReminderIfAllowed(doc.id, portId, { auto: true }); const due = isReminderDue({
status: doc.status,
documensoId: doc.documensoId,
remindersDisabled: doc.remindersDisabled,
reminderCadenceOverride: doc.reminderCadenceOverride,
templateCadenceDays: templateCadenceByType.get(doc.documentType) ?? null,
lastReminderAt: lastReminderByDoc.get(doc.id) ?? null,
});
if (!due) continue;
const pending = pendingByDoc.get(doc.id) ?? [];
const target = pending[0];
if (!target || !doc.documensoId) continue;
await documensoRemind(doc.documensoId, target.id, portId);
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'reminder_sent',
signerId: target.id,
eventData: {
signerEmail: target.signerEmail,
signerRole: target.signerRole,
auto: true,
},
});
} catch (err) { } catch (err) {
logger.error({ err, documentId: doc.id, portId }, 'Reminder processing failed'); logger.error({ err, documentId: doc.id, portId }, 'Reminder processing failed');
} }

View File

@@ -58,33 +58,48 @@ export async function sendInquiryNotifications(params: InquiryNotificationParams
logger.error({ err, interestId }, 'Failed to queue client confirmation email'); logger.error({ err, interestId }, 'Failed to queue client confirmation email');
} }
// 2. Notify CRM users with interests.view permission on this port // 2. Notify CRM users with interests.view permission on this port.
// The previous implementation `await`ed createNotification per user,
// burning ≥3 DB round trips + 2 socket emits per call serially — a
// port with 20 users meant ~80 round trips before this public POST
// could even respond. Promise.all parallelises the DB writes; the
// socket emit fan-out is the only thing that still scales linearly,
// and that's a fire-and-forget local broadcast.
try { try {
const usersWithAccess = await findUsersWithInterestsPermission(portId); const usersWithAccess = await findUsersWithInterestsPermission(portId);
const crmUrl = `/${portSlug}/interests/${interestId}`; const crmUrl = `/${portSlug}/interests/${interestId}`;
const description = `${clientFullName} has registered interest${
mooringNumber ? ` in Berth ${mooringNumber}` : ''
} via the website`;
for (const userId of usersWithAccess) { const settled = await Promise.allSettled(
try { usersWithAccess.map((userId) =>
await createNotification({ createNotification({
portId, portId,
userId, userId,
type: 'new_registration', type: 'new_registration',
title: 'New Interest Registered', title: 'New Interest Registered',
description: `${clientFullName} has registered interest${mooringNumber ? ` in Berth ${mooringNumber}` : ''} via the website`, description,
link: crmUrl, link: crmUrl,
entityType: 'interest', entityType: 'interest',
entityId: interestId, entityId: interestId,
dedupeKey: `inquiry-${interestId}`, dedupeKey: `inquiry-${interestId}`,
}); }),
} catch (err) { ),
logger.error({ err, userId, interestId }, 'Failed to create notification for user'); );
for (const [i, r] of settled.entries()) {
if (r.status === 'rejected') {
logger.error(
{ err: r.reason, userId: usersWithAccess[i], interestId },
'Failed to create notification for user',
);
} }
} }
} catch (err) { } catch (err) {
logger.error({ err, interestId }, 'Failed to notify CRM users'); logger.error({ err, interestId }, 'Failed to notify CRM users');
} }
// 3. Notify external recipients // 3. Notify external recipients (parallel queue enqueues).
try { try {
const recipientsSetting = await getSetting('inquiry_notification_recipients', portId); const recipientsSetting = await getSetting('inquiry_notification_recipients', portId);
const externalEmails: string[] = Array.isArray(recipientsSetting?.value) const externalEmails: string[] = Array.isArray(recipientsSetting?.value)
@@ -96,16 +111,18 @@ export async function sendInquiryNotifications(params: InquiryNotificationParams
const appUrl = process.env.APP_URL ?? ''; const appUrl = process.env.APP_URL ?? '';
const crmUrl = `${appUrl}/${portSlug}/interests/${interestId}`; const crmUrl = `${appUrl}/${portSlug}/interests/${interestId}`;
for (const externalEmail of externalEmails) { await Promise.all(
await emailQueue.add('send-inquiry-sales-notification', { externalEmails.map((externalEmail) =>
to: externalEmail, emailQueue.add('send-inquiry-sales-notification', {
fullName: clientFullName, to: externalEmail,
email: clientEmail, fullName: clientFullName,
phone: clientPhone, email: clientEmail,
mooringNumber, phone: clientPhone,
crmUrl, mooringNumber,
}); crmUrl,
} }),
),
);
} }
} catch (err) { } catch (err) {
logger.error({ err, interestId }, 'Failed to notify external recipients'); logger.error({ err, interestId }, 'Failed to notify external recipients');

View File

@@ -1,4 +1,4 @@
import { and, count, eq, gte, isNull } from 'drizzle-orm'; import { and, count, eq, gte, inArray, isNull } from 'drizzle-orm';
import { db } from '@/lib/db'; import { db } from '@/lib/db';
import { redis } from '@/lib/redis'; import { redis } from '@/lib/redis';
@@ -212,25 +212,161 @@ export async function calculateInterestScore(
// ─── Bulk scoring ───────────────────────────────────────────────────────────── // ─── Bulk scoring ─────────────────────────────────────────────────────────────
/**
* Score every active interest in a port. The previous implementation
* fanned out one scoring call per interest, each issuing 1 redis read +
* 1 interests.findFirst + 4 count queries → 6N round trips per
* dashboard render (≈6000 for a 1k-interest port). Cold-cache flushes
* pegged the API for a couple of seconds.
*
* The new path replaces those 4N count queries with 4 grouped queries
* (one per dimension, filtered by inArray on the port's interest ids)
* and merges in JS. The redis cache is still consulted, but only as a
* map merged onto the freshly computed scores so cached values short-
* circuit recomputation without re-issuing the per-row count fan-out.
*/
export async function calculateBulkScores( export async function calculateBulkScores(
portId: string, portId: string,
): Promise<Array<{ interestId: string; score: InterestScore }>> { ): Promise<Array<{ interestId: string; score: InterestScore }>> {
const allInterests = await db const allInterests = await db
.select({ id: interests.id }) .select({
id: interests.id,
clientId: interests.clientId,
pipelineStage: interests.pipelineStage,
createdAt: interests.createdAt,
eoiStatus: interests.eoiStatus,
contractStatus: interests.contractStatus,
depositStatus: interests.depositStatus,
dateEoiSigned: interests.dateEoiSigned,
dateContractSigned: interests.dateContractSigned,
dateDepositReceived: interests.dateDepositReceived,
})
.from(interests) .from(interests)
.where(and(eq(interests.portId, portId), isNull(interests.archivedAt))); .where(and(eq(interests.portId, portId), isNull(interests.archivedAt)));
const results = await Promise.allSettled( if (allInterests.length === 0) return [];
allInterests.map(async (i) => {
const score = await calculateInterestScore(i.id, portId); const ids = allInterests.map((i) => i.id);
return { interestId: i.id, score }; const clientIds = Array.from(
}), new Set(allInterests.map((i) => i.clientId).filter((v): v is string => Boolean(v))),
);
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
// Four grouped aggregates against the port's interest set. Each is a
// single index-friendly scan on `interest_id` (or `client_id` for the
// email-threads case) — no per-row round trips.
const [notesGrouped, remindersGrouped, emailsGrouped, berthLinksGrouped] = await Promise.all([
db
.select({ interestId: interestNotes.interestId, value: count() })
.from(interestNotes)
.where(
and(inArray(interestNotes.interestId, ids), gte(interestNotes.createdAt, thirtyDaysAgo)),
)
.groupBy(interestNotes.interestId),
db
.select({ interestId: reminders.interestId, value: count() })
.from(reminders)
.where(
and(
inArray(reminders.interestId, ids),
eq(reminders.status, 'completed'),
gte(reminders.completedAt, thirtyDaysAgo),
),
)
.groupBy(reminders.interestId),
clientIds.length > 0
? db
.select({ clientId: emailThreads.clientId, value: count() })
.from(emailThreads)
.where(
and(
inArray(emailThreads.clientId, clientIds),
eq(emailThreads.portId, portId),
gte(emailThreads.lastMessageAt, thirtyDaysAgo),
),
)
.groupBy(emailThreads.clientId)
: Promise.resolve([] as Array<{ clientId: string | null; value: number }>),
db
.select({ interestId: interestBerths.interestId, value: count() })
.from(interestBerths)
.where(inArray(interestBerths.interestId, ids))
.groupBy(interestBerths.interestId),
]);
const notesByInterest = new Map(
notesGrouped
.filter((r): r is { interestId: string; value: number } => r.interestId !== null)
.map((r) => [r.interestId, r.value]),
);
const remindersByInterest = new Map(
remindersGrouped
.filter((r): r is { interestId: string; value: number } => r.interestId !== null)
.map((r) => [r.interestId, r.value]),
);
const emailsByClient = new Map(
emailsGrouped
.filter((r): r is { clientId: string; value: number } => r.clientId !== null)
.map((r) => [r.clientId, r.value]),
);
const berthLinksByInterest = new Map(
berthLinksGrouped
.filter((r): r is { interestId: string; value: number } => r.interestId !== null)
.map((r) => [r.interestId, r.value]),
); );
return results const RAW_MAX = 425;
.filter( const calculatedAt = new Date();
(r): r is PromiseFulfilledResult<{ interestId: string; score: InterestScore }> => const calculatedAtIso = calculatedAt.toISOString();
r.status === 'fulfilled',
const scored = allInterests.map((interest) => {
const pipelineAge = scorePipelineAge(interest.createdAt);
const stageSpeed = scoreStageSpeed(interest.createdAt, interest.pipelineStage);
const documentCompleteness = scoreDocumentCompleteness({
eoiStatus: interest.eoiStatus,
contractStatus: interest.contractStatus,
depositStatus: interest.depositStatus,
dateEoiSigned: interest.dateEoiSigned,
dateContractSigned: interest.dateContractSigned,
dateDepositReceived: interest.dateDepositReceived,
});
const notesCount = notesByInterest.get(interest.id) ?? 0;
const remindersCount = remindersByInterest.get(interest.id) ?? 0;
const emailCount = interest.clientId ? (emailsByClient.get(interest.clientId) ?? 0) : 0;
const notesScore = Math.min(notesCount * 10, 50);
const emailScore = Math.min(emailCount * 5, 30);
const remindersScore = Math.min(remindersCount * 10, 20);
const engagement = Math.min(notesScore + emailScore + remindersScore, 100);
const berthLinked = (berthLinksByInterest.get(interest.id) ?? 0) > 0 ? 25 : 0;
const rawTotal = pipelineAge + stageSpeed + documentCompleteness + engagement + berthLinked;
const totalScore = Math.round((rawTotal / RAW_MAX) * 100);
const score: InterestScore = {
totalScore,
breakdown: { pipelineAge, stageSpeed, documentCompleteness, engagement, berthLinked },
calculatedAt,
};
return { interestId: interest.id, score };
});
// Refresh the redis cache for each interest in a single pipeline so
// single-interest reads downstream short-circuit the per-row queries.
// Fire-and-forget — bulk scoring's correctness doesn't depend on the
// cache write succeeding.
redis
.pipeline(
scored.map(({ interestId, score }) => [
'setex',
SCORE_KEY(portId, interestId),
SCORE_TTL,
JSON.stringify({ ...score, calculatedAt: calculatedAtIso }),
]),
) )
.map((r) => r.value); .exec()
.catch((err) => logger.warn({ err, portId }, 'Redis bulk cache write failed'));
return scored;
} }

View File

@@ -1,4 +1,4 @@
import { and, eq, count, inArray, isNull, desc } from 'drizzle-orm'; import { and, eq, count, inArray, isNull, desc, sql } from 'drizzle-orm';
import { db } from '@/lib/db'; import { db } from '@/lib/db';
import { clients } from '@/lib/db/schema/clients'; import { clients } from '@/lib/db/schema/clients';
@@ -250,16 +250,22 @@ export async function getClientInvoices(
if (emailContacts.length === 0) return []; if (emailContacts.length === 0) return [];
// Fetch invoices matching any of the client's email addresses // Fetch only the invoices matching any of the client's email addresses.
const allInvoices = await db // Without the inArray push-down here every portal invoice page-load
// full-scanned the invoices table and filtered in JS — by 12mo it would
// have been the worst portal endpoint in the platform. Defensive limit
// 100 caps the upper bound for clients with abnormally many invoices.
const clientInvoices = await db
.select() .select()
.from(invoices) .from(invoices)
.where(eq(invoices.portId, portId)) .where(
.orderBy(invoices.createdAt); and(
eq(invoices.portId, portId),
const clientInvoices = allInvoices.filter( inArray(sql`lower(${invoices.billingEmail})`, emailContacts),
(inv) => inv.billingEmail && emailContacts.includes(inv.billingEmail.toLowerCase()), ),
); )
.orderBy(invoices.createdAt)
.limit(100);
return clientInvoices.map((inv) => ({ return clientInvoices.map((inv) => ({
id: inv.id, id: inv.id,

View File

@@ -427,27 +427,37 @@ export async function processFollowUpReminders() {
const now = new Date(); const now = new Date();
for (const interest of enabledInterests) { // Pick the interests whose follow-up window has elapsed. Pre-filtering
if (!interest.reminderDays) continue; // here means the per-row N+1 walk that used to issue (1 client lookup
// + 1 reminder insert + 1 interest update) per interest is replaced by
// Check if enough days have passed since last activity // a single client-bulk-fetch + a single reminder bulk-insert + a
// single interests bulk-update against an `inArray` set.
const dueInterests = enabledInterests.filter((interest) => {
if (!interest.reminderDays) return false;
const lastActivity = interest.reminderLastFired ?? interest.updatedAt; const lastActivity = interest.reminderLastFired ?? interest.updatedAt;
const daysSinceActivity = (now.getTime() - lastActivity.getTime()) / (1000 * 60 * 60 * 24); const daysSinceActivity = (now.getTime() - lastActivity.getTime()) / (1000 * 60 * 60 * 24);
return daysSinceActivity >= interest.reminderDays;
});
if (daysSinceActivity < interest.reminderDays) continue; if (dueInterests.length === 0) continue;
// Get client name for the reminder title const clientIds = Array.from(
const client = interest.clientId new Set(dueInterests.map((i) => i.clientId).filter((v): v is string => Boolean(v))),
? await db.query.clients.findFirst({ where: eq(clients.id, interest.clientId) }) );
: null; const clientsByIdEntries =
clientIds.length > 0
? await db
.select({ id: clients.id, fullName: clients.fullName })
.from(clients)
.where(inArray(clients.id, clientIds))
: [];
const clientById = new Map(clientsByIdEntries.map((c) => [c.id, c]));
const title = client ? `Follow up with ${client.fullName}` : 'Follow up on interest'; const newReminders = dueInterests.map((interest) => {
const client = interest.clientId ? clientById.get(interest.clientId) : null;
// Find the assigned user (first userPortRole for this port, or fallback) return {
// For now, leave assignedTo null - the notification goes to the port room
await db.insert(reminders).values({
portId: port.id, portId: port.id,
title, title: client ? `Follow up with ${client.fullName}` : 'Follow up on interest',
note: 'Auto-generated: no activity detected within the configured follow-up window.', note: 'Auto-generated: no activity detected within the configured follow-up window.',
dueAt: now, dueAt: now,
priority: 'medium', priority: 'medium',
@@ -456,23 +466,39 @@ export async function processFollowUpReminders() {
interestId: interest.id, interestId: interest.id,
clientId: interest.clientId, clientId: interest.clientId,
autoGenerated: true, autoGenerated: true,
}); };
});
// Update last fired timestamp if (newReminders.length > 0) {
await db.insert(reminders).values(newReminders);
await db await db
.update(interests) .update(interests)
.set({ reminderLastFired: now }) .set({ reminderLastFired: now })
.where(eq(interests.id, interest.id)); .where(
inArray(
interests.id,
dueInterests.map((i) => i.id),
),
);
}
// Fire notification to the port room // Single port-room emit summarising the batch — the per-row emit was
// mostly noise to the dashboard and amplified socket traffic linearly
// with interest count.
if (newReminders.length > 0) {
emitToRoom(`port:${port.id}`, 'system:alert', { emitToRoom(`port:${port.id}`, 'system:alert', {
alertType: 'follow_up_created', alertType: 'follow_up_created',
message: title, message: `${newReminders.length} follow-up reminder${
newReminders.length === 1 ? '' : 's'
} created`,
severity: 'info', severity: 'info',
}); });
logger.info({ interestId: interest.id, portId: port.id }, 'Auto follow-up reminder created');
} }
logger.info(
{ portId: port.id, created: newReminders.length },
'Auto follow-up reminders created (bulk)',
);
} }
} }