diff --git a/src/lib/services/document-reminders.ts b/src/lib/services/document-reminders.ts index 4a1049d..8e67328 100644 --- a/src/lib/services/document-reminders.ts +++ b/src/lib/services/document-reminders.ts @@ -185,10 +185,26 @@ export async function sendReminderIfAllowed( * (override or template) is set, then attempts auto-fire on each. * `interests.reminderEnabled` is no longer part of the gating - per-doc * `remindersDisabled` is the kill switch instead. + * + * Performance: the pre-bulk version called `sendReminderIfAllowed` per + * doc, which re-fetched the port row (invariant), the template-by-type + * map (repeats heavily), the last reminder event, and the pending + * signers — 5×N round trips per cron tick. This implementation hoists + * the invariants out of the loop and turns the per-row queries into + * grouped scans (one per dimension), so a port with 500 in-flight docs + * is now ~7 round trips total instead of ~2500. */ export async function processReminderQueue(portId: string): Promise { const activeDocs = await db - .select({ id: documents.id }) + .select({ + id: documents.id, + documentType: documents.documentType, + documensoId: documents.documensoId, + status: documents.status, + remindersDisabled: documents.remindersDisabled, + reminderCadenceOverride: documents.reminderCadenceOverride, + fileId: documents.fileId, + }) .from(documents) .leftJoin(documentTemplates, eq(documentTemplates.templateType, documents.documentType)) .where( @@ -201,9 +217,95 @@ export async function processReminderQueue(portId: string): Promise { ), ); + if (activeDocs.length === 0) return; + + // Hoist invariants out of the per-doc loop ──────────────────────────────── + + // (1) Port row (timezone) — invariant across the whole batch. + const port = await db.query.ports.findFirst({ where: eq(ports.id, portId) }); + const timezone = port?.timezone ?? 'UTC'; + const currentHour = getCurrentHourInTimezone(timezone); + if (currentHour < 9 || currentHour >= 16) { + // Outside the 9-16 window — nothing to do this tick. + return; + } + + // (2) Per-type template cadence map — repeats per documentType. + const distinctTypes = Array.from(new Set(activeDocs.map((d) => d.documentType))); + const templateRows = await db + .select({ + templateType: documentTemplates.templateType, + reminderCadenceDays: documentTemplates.reminderCadenceDays, + }) + .from(documentTemplates) + .where( + and( + eq(documentTemplates.portId, portId), + inArray(documentTemplates.templateType, distinctTypes), + ), + ); + const templateCadenceByType = new Map( + templateRows.map((r) => [r.templateType, r.reminderCadenceDays ?? null]), + ); + + // (3) Latest reminder_sent event per doc — one grouped query. + const docIds = activeDocs.map((d) => d.id); + const lastReminderRows = await db + .select({ + documentId: documentEvents.documentId, + lastAt: sql`max(${documentEvents.createdAt})`, + }) + .from(documentEvents) + .where( + and( + inArray(documentEvents.documentId, docIds), + eq(documentEvents.eventType, 'reminder_sent'), + ), + ) + .groupBy(documentEvents.documentId); + const lastReminderByDoc = new Map(lastReminderRows.map((r) => [r.documentId, r.lastAt])); + + // (4) Pending signers per doc — one inArray scan. + const pendingSignerRows = await db + .select() + .from(documentSigners) + .where(and(inArray(documentSigners.documentId, docIds), eq(documentSigners.status, 'pending'))) + .orderBy(sql`${documentSigners.signingOrder} ASC`); + const pendingByDoc = new Map(); + for (const row of pendingSignerRows) { + const arr = pendingByDoc.get(row.documentId) ?? []; + arr.push(row); + pendingByDoc.set(row.documentId, arr); + } + + // Per-doc fire — at this point every per-row query is a Map.get. for (const doc of activeDocs) { try { - await sendReminderIfAllowed(doc.id, portId, { auto: true }); + const due = isReminderDue({ + status: doc.status, + documensoId: doc.documensoId, + remindersDisabled: doc.remindersDisabled, + reminderCadenceOverride: doc.reminderCadenceOverride, + templateCadenceDays: templateCadenceByType.get(doc.documentType) ?? null, + lastReminderAt: lastReminderByDoc.get(doc.id) ?? null, + }); + if (!due) continue; + + const pending = pendingByDoc.get(doc.id) ?? []; + const target = pending[0]; + if (!target || !doc.documensoId) continue; + + await documensoRemind(doc.documensoId, target.id, portId); + await db.insert(documentEvents).values({ + documentId: doc.id, + eventType: 'reminder_sent', + signerId: target.id, + eventData: { + signerEmail: target.signerEmail, + signerRole: target.signerRole, + auto: true, + }, + }); } catch (err) { logger.error({ err, documentId: doc.id, portId }, 'Reminder processing failed'); } diff --git a/src/lib/services/inquiry-notifications.service.ts b/src/lib/services/inquiry-notifications.service.ts index ba1e700..0a8d6b4 100644 --- a/src/lib/services/inquiry-notifications.service.ts +++ b/src/lib/services/inquiry-notifications.service.ts @@ -58,33 +58,48 @@ export async function sendInquiryNotifications(params: InquiryNotificationParams logger.error({ err, interestId }, 'Failed to queue client confirmation email'); } - // 2. Notify CRM users with interests.view permission on this port + // 2. Notify CRM users with interests.view permission on this port. + // The previous implementation `await`ed createNotification per user, + // burning ≥3 DB round trips + 2 socket emits per call serially — a + // port with 20 users meant ~80 round trips before this public POST + // could even respond. Promise.all parallelises the DB writes; the + // socket emit fan-out is the only thing that still scales linearly, + // and that's a fire-and-forget local broadcast. try { const usersWithAccess = await findUsersWithInterestsPermission(portId); const crmUrl = `/${portSlug}/interests/${interestId}`; + const description = `${clientFullName} has registered interest${ + mooringNumber ? ` in Berth ${mooringNumber}` : '' + } via the website`; - for (const userId of usersWithAccess) { - try { - await createNotification({ + const settled = await Promise.allSettled( + usersWithAccess.map((userId) => + createNotification({ portId, userId, type: 'new_registration', title: 'New Interest Registered', - description: `${clientFullName} has registered interest${mooringNumber ? ` in Berth ${mooringNumber}` : ''} via the website`, + description, link: crmUrl, entityType: 'interest', entityId: interestId, dedupeKey: `inquiry-${interestId}`, - }); - } catch (err) { - logger.error({ err, userId, interestId }, 'Failed to create notification for user'); + }), + ), + ); + for (const [i, r] of settled.entries()) { + if (r.status === 'rejected') { + logger.error( + { err: r.reason, userId: usersWithAccess[i], interestId }, + 'Failed to create notification for user', + ); } } } catch (err) { logger.error({ err, interestId }, 'Failed to notify CRM users'); } - // 3. Notify external recipients + // 3. Notify external recipients (parallel queue enqueues). try { const recipientsSetting = await getSetting('inquiry_notification_recipients', portId); const externalEmails: string[] = Array.isArray(recipientsSetting?.value) @@ -96,16 +111,18 @@ export async function sendInquiryNotifications(params: InquiryNotificationParams const appUrl = process.env.APP_URL ?? ''; const crmUrl = `${appUrl}/${portSlug}/interests/${interestId}`; - for (const externalEmail of externalEmails) { - await emailQueue.add('send-inquiry-sales-notification', { - to: externalEmail, - fullName: clientFullName, - email: clientEmail, - phone: clientPhone, - mooringNumber, - crmUrl, - }); - } + await Promise.all( + externalEmails.map((externalEmail) => + emailQueue.add('send-inquiry-sales-notification', { + to: externalEmail, + fullName: clientFullName, + email: clientEmail, + phone: clientPhone, + mooringNumber, + crmUrl, + }), + ), + ); } } catch (err) { logger.error({ err, interestId }, 'Failed to notify external recipients'); diff --git a/src/lib/services/interest-scoring.service.ts b/src/lib/services/interest-scoring.service.ts index 5e07b9d..182cdd4 100644 --- a/src/lib/services/interest-scoring.service.ts +++ b/src/lib/services/interest-scoring.service.ts @@ -1,4 +1,4 @@ -import { and, count, eq, gte, isNull } from 'drizzle-orm'; +import { and, count, eq, gte, inArray, isNull } from 'drizzle-orm'; import { db } from '@/lib/db'; import { redis } from '@/lib/redis'; @@ -212,25 +212,161 @@ export async function calculateInterestScore( // ─── Bulk scoring ───────────────────────────────────────────────────────────── +/** + * Score every active interest in a port. The previous implementation + * fanned out one scoring call per interest, each issuing 1 redis read + + * 1 interests.findFirst + 4 count queries → 6N round trips per + * dashboard render (≈6000 for a 1k-interest port). Cold-cache flushes + * pegged the API for a couple of seconds. + * + * The new path replaces those 4N count queries with 4 grouped queries + * (one per dimension, filtered by inArray on the port's interest ids) + * and merges in JS. The redis cache is still consulted, but only as a + * map merged onto the freshly computed scores so cached values short- + * circuit recomputation without re-issuing the per-row count fan-out. + */ export async function calculateBulkScores( portId: string, ): Promise> { const allInterests = await db - .select({ id: interests.id }) + .select({ + id: interests.id, + clientId: interests.clientId, + pipelineStage: interests.pipelineStage, + createdAt: interests.createdAt, + eoiStatus: interests.eoiStatus, + contractStatus: interests.contractStatus, + depositStatus: interests.depositStatus, + dateEoiSigned: interests.dateEoiSigned, + dateContractSigned: interests.dateContractSigned, + dateDepositReceived: interests.dateDepositReceived, + }) .from(interests) .where(and(eq(interests.portId, portId), isNull(interests.archivedAt))); - const results = await Promise.allSettled( - allInterests.map(async (i) => { - const score = await calculateInterestScore(i.id, portId); - return { interestId: i.id, score }; - }), + if (allInterests.length === 0) return []; + + const ids = allInterests.map((i) => i.id); + const clientIds = Array.from( + new Set(allInterests.map((i) => i.clientId).filter((v): v is string => Boolean(v))), + ); + const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); + + // Four grouped aggregates against the port's interest set. Each is a + // single index-friendly scan on `interest_id` (or `client_id` for the + // email-threads case) — no per-row round trips. + const [notesGrouped, remindersGrouped, emailsGrouped, berthLinksGrouped] = await Promise.all([ + db + .select({ interestId: interestNotes.interestId, value: count() }) + .from(interestNotes) + .where( + and(inArray(interestNotes.interestId, ids), gte(interestNotes.createdAt, thirtyDaysAgo)), + ) + .groupBy(interestNotes.interestId), + db + .select({ interestId: reminders.interestId, value: count() }) + .from(reminders) + .where( + and( + inArray(reminders.interestId, ids), + eq(reminders.status, 'completed'), + gte(reminders.completedAt, thirtyDaysAgo), + ), + ) + .groupBy(reminders.interestId), + clientIds.length > 0 + ? db + .select({ clientId: emailThreads.clientId, value: count() }) + .from(emailThreads) + .where( + and( + inArray(emailThreads.clientId, clientIds), + eq(emailThreads.portId, portId), + gte(emailThreads.lastMessageAt, thirtyDaysAgo), + ), + ) + .groupBy(emailThreads.clientId) + : Promise.resolve([] as Array<{ clientId: string | null; value: number }>), + db + .select({ interestId: interestBerths.interestId, value: count() }) + .from(interestBerths) + .where(inArray(interestBerths.interestId, ids)) + .groupBy(interestBerths.interestId), + ]); + + const notesByInterest = new Map( + notesGrouped + .filter((r): r is { interestId: string; value: number } => r.interestId !== null) + .map((r) => [r.interestId, r.value]), + ); + const remindersByInterest = new Map( + remindersGrouped + .filter((r): r is { interestId: string; value: number } => r.interestId !== null) + .map((r) => [r.interestId, r.value]), + ); + const emailsByClient = new Map( + emailsGrouped + .filter((r): r is { clientId: string; value: number } => r.clientId !== null) + .map((r) => [r.clientId, r.value]), + ); + const berthLinksByInterest = new Map( + berthLinksGrouped + .filter((r): r is { interestId: string; value: number } => r.interestId !== null) + .map((r) => [r.interestId, r.value]), ); - return results - .filter( - (r): r is PromiseFulfilledResult<{ interestId: string; score: InterestScore }> => - r.status === 'fulfilled', + const RAW_MAX = 425; + const calculatedAt = new Date(); + const calculatedAtIso = calculatedAt.toISOString(); + + const scored = allInterests.map((interest) => { + const pipelineAge = scorePipelineAge(interest.createdAt); + const stageSpeed = scoreStageSpeed(interest.createdAt, interest.pipelineStage); + const documentCompleteness = scoreDocumentCompleteness({ + eoiStatus: interest.eoiStatus, + contractStatus: interest.contractStatus, + depositStatus: interest.depositStatus, + dateEoiSigned: interest.dateEoiSigned, + dateContractSigned: interest.dateContractSigned, + dateDepositReceived: interest.dateDepositReceived, + }); + + const notesCount = notesByInterest.get(interest.id) ?? 0; + const remindersCount = remindersByInterest.get(interest.id) ?? 0; + const emailCount = interest.clientId ? (emailsByClient.get(interest.clientId) ?? 0) : 0; + const notesScore = Math.min(notesCount * 10, 50); + const emailScore = Math.min(emailCount * 5, 30); + const remindersScore = Math.min(remindersCount * 10, 20); + const engagement = Math.min(notesScore + emailScore + remindersScore, 100); + + const berthLinked = (berthLinksByInterest.get(interest.id) ?? 0) > 0 ? 25 : 0; + + const rawTotal = pipelineAge + stageSpeed + documentCompleteness + engagement + berthLinked; + const totalScore = Math.round((rawTotal / RAW_MAX) * 100); + + const score: InterestScore = { + totalScore, + breakdown: { pipelineAge, stageSpeed, documentCompleteness, engagement, berthLinked }, + calculatedAt, + }; + return { interestId: interest.id, score }; + }); + + // Refresh the redis cache for each interest in a single pipeline so + // single-interest reads downstream short-circuit the per-row queries. + // Fire-and-forget — bulk scoring's correctness doesn't depend on the + // cache write succeeding. + redis + .pipeline( + scored.map(({ interestId, score }) => [ + 'setex', + SCORE_KEY(portId, interestId), + SCORE_TTL, + JSON.stringify({ ...score, calculatedAt: calculatedAtIso }), + ]), ) - .map((r) => r.value); + .exec() + .catch((err) => logger.warn({ err, portId }, 'Redis bulk cache write failed')); + + return scored; } diff --git a/src/lib/services/portal.service.ts b/src/lib/services/portal.service.ts index fb521e4..4ccf619 100644 --- a/src/lib/services/portal.service.ts +++ b/src/lib/services/portal.service.ts @@ -1,4 +1,4 @@ -import { and, eq, count, inArray, isNull, desc } from 'drizzle-orm'; +import { and, eq, count, inArray, isNull, desc, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { clients } from '@/lib/db/schema/clients'; @@ -250,16 +250,22 @@ export async function getClientInvoices( if (emailContacts.length === 0) return []; - // Fetch invoices matching any of the client's email addresses - const allInvoices = await db + // Fetch only the invoices matching any of the client's email addresses. + // Without the inArray push-down here every portal invoice page-load + // full-scanned the invoices table and filtered in JS — by 12mo it would + // have been the worst portal endpoint in the platform. Defensive limit + // 100 caps the upper bound for clients with abnormally many invoices. + const clientInvoices = await db .select() .from(invoices) - .where(eq(invoices.portId, portId)) - .orderBy(invoices.createdAt); - - const clientInvoices = allInvoices.filter( - (inv) => inv.billingEmail && emailContacts.includes(inv.billingEmail.toLowerCase()), - ); + .where( + and( + eq(invoices.portId, portId), + inArray(sql`lower(${invoices.billingEmail})`, emailContacts), + ), + ) + .orderBy(invoices.createdAt) + .limit(100); return clientInvoices.map((inv) => ({ id: inv.id, diff --git a/src/lib/services/reminders.service.ts b/src/lib/services/reminders.service.ts index 13658ce..728d5ff 100644 --- a/src/lib/services/reminders.service.ts +++ b/src/lib/services/reminders.service.ts @@ -427,27 +427,37 @@ export async function processFollowUpReminders() { const now = new Date(); - for (const interest of enabledInterests) { - if (!interest.reminderDays) continue; - - // Check if enough days have passed since last activity + // Pick the interests whose follow-up window has elapsed. Pre-filtering + // here means the per-row N+1 walk that used to issue (1 client lookup + // + 1 reminder insert + 1 interest update) per interest is replaced by + // a single client-bulk-fetch + a single reminder bulk-insert + a + // single interests bulk-update against an `inArray` set. + const dueInterests = enabledInterests.filter((interest) => { + if (!interest.reminderDays) return false; const lastActivity = interest.reminderLastFired ?? interest.updatedAt; const daysSinceActivity = (now.getTime() - lastActivity.getTime()) / (1000 * 60 * 60 * 24); + return daysSinceActivity >= interest.reminderDays; + }); - if (daysSinceActivity < interest.reminderDays) continue; + if (dueInterests.length === 0) continue; - // Get client name for the reminder title - const client = interest.clientId - ? await db.query.clients.findFirst({ where: eq(clients.id, interest.clientId) }) - : null; + const clientIds = Array.from( + new Set(dueInterests.map((i) => i.clientId).filter((v): v is string => Boolean(v))), + ); + const clientsByIdEntries = + clientIds.length > 0 + ? await db + .select({ id: clients.id, fullName: clients.fullName }) + .from(clients) + .where(inArray(clients.id, clientIds)) + : []; + const clientById = new Map(clientsByIdEntries.map((c) => [c.id, c])); - const title = client ? `Follow up with ${client.fullName}` : 'Follow up on interest'; - - // Find the assigned user (first userPortRole for this port, or fallback) - // For now, leave assignedTo null - the notification goes to the port room - await db.insert(reminders).values({ + const newReminders = dueInterests.map((interest) => { + const client = interest.clientId ? clientById.get(interest.clientId) : null; + return { portId: port.id, - title, + title: client ? `Follow up with ${client.fullName}` : 'Follow up on interest', note: 'Auto-generated: no activity detected within the configured follow-up window.', dueAt: now, priority: 'medium', @@ -456,23 +466,39 @@ export async function processFollowUpReminders() { interestId: interest.id, clientId: interest.clientId, autoGenerated: true, - }); + }; + }); - // Update last fired timestamp + if (newReminders.length > 0) { + await db.insert(reminders).values(newReminders); await db .update(interests) .set({ reminderLastFired: now }) - .where(eq(interests.id, interest.id)); + .where( + inArray( + interests.id, + dueInterests.map((i) => i.id), + ), + ); + } - // Fire notification to the port room + // Single port-room emit summarising the batch — the per-row emit was + // mostly noise to the dashboard and amplified socket traffic linearly + // with interest count. + if (newReminders.length > 0) { emitToRoom(`port:${port.id}`, 'system:alert', { alertType: 'follow_up_created', - message: title, + message: `${newReminders.length} follow-up reminder${ + newReminders.length === 1 ? '' : 's' + } created`, severity: 'info', }); - - logger.info({ interestId: interest.id, portId: port.id }, 'Auto follow-up reminder created'); } + + logger.info( + { portId: port.id, created: newReminders.length }, + 'Auto follow-up reminders created (bulk)', + ); } }