import { and, count, eq, gte, inArray, isNull } from 'drizzle-orm'; import { db } from '@/lib/db'; import { redis } from '@/lib/redis'; import { interests, interestBerths, interestNotes } from '@/lib/db/schema/interests'; import { reminders } from '@/lib/db/schema/operations'; import { emailThreads } from '@/lib/db/schema/email'; import { logger } from '@/lib/logger'; import { PIPELINE_STAGES } from '@/lib/constants'; import { NotFoundError } from '@/lib/errors'; // ─── Types ──────────────────────────────────────────────────────────────────── export interface InterestScore { totalScore: number; // 0-100 (normalised) breakdown: { pipelineAge: number; // 0-100 stageSpeed: number; // 0-100 documentCompleteness: number; // 0-100 engagement: number; // 0-100 berthLinked: number; // 0 or 25 }; calculatedAt: Date; } // ─── Redis cache ────────────────────────────────────────────────────────────── // Cache key includes portId so a foreign-port caller hitting the same // interestId never sees a port-A cached value. (Even if interestId is // already globally unique, baking portId into the key means a stale or // hostile caller cannot reuse cached entries across tenants.) const SCORE_KEY = (portId: string, interestId: string) => `interest-score:${portId}:${interestId}`; const SCORE_TTL = 3600; // 1 hour // ─── Scoring helpers ────────────────────────────────────────────────────────── function scorePipelineAge(createdAt: Date): number { const days = Math.floor((Date.now() - createdAt.getTime()) / (1000 * 60 * 60 * 24)); if (days <= 30) return 100; if (days <= 60) return 80; if (days <= 90) return 60; if (days <= 180) return 40; return 20; } function scoreStageSpeed(createdAt: Date, pipelineStage: string): number { const idx = PIPELINE_STAGES.indexOf(pipelineStage as (typeof PIPELINE_STAGES)[number]); const stageIndex = idx === -1 ? 0 : idx; if (stageIndex === 0) { // Still at open - no progression return 0; } const daysSinceCreation = Math.max(1, (Date.now() - createdAt.getTime()) / (1000 * 60 * 60 * 24)); // Average days per stage transition const avgDaysPerStage = daysSinceCreation / stageIndex; // Thresholds: <7 days/stage = great, <14 = ok, <30 = slow, >=30 = cold if (avgDaysPerStage < 7) return 100; if (avgDaysPerStage < 14) return 75; if (avgDaysPerStage < 30) return 50; if (avgDaysPerStage < 60) return 25; return 10; } function scoreDocumentCompleteness(interest: { eoiStatus: string | null; contractStatus: string | null; depositStatus: string | null; dateEoiSigned: Date | null; dateContractSigned: Date | null; dateDepositReceived: Date | null; }): number { let score = 0; // EOI signed if (interest.eoiStatus === 'signed' || interest.dateEoiSigned != null) { score += 30; } // Contract if (interest.contractStatus === 'signed' || interest.dateContractSigned != null) { score += 30; } // Deposit if ( interest.depositStatus === 'received' || interest.depositStatus === 'paid' || interest.dateDepositReceived != null ) { score += 40; } return Math.min(score, 100); } // ─── Main scoring function ──────────────────────────────────────────────────── export async function calculateInterestScore( interestId: string, portId: string, ): Promise { // Verify the interest belongs to the caller's port BEFORE returning a // cached value. The cache key now includes portId, but defense-in-depth: // a port-B caller passing a port-A interestId still gets NotFound // instead of a leaked score. const interest = await db.query.interests.findFirst({ where: and(eq(interests.id, interestId), eq(interests.portId, portId)), }); if (!interest) { throw new NotFoundError('interest'); } // Try cache (port-scoped key) try { const cached = await redis.get(SCORE_KEY(portId, interestId)); if (cached) { const parsed = JSON.parse(cached) as InterestScore & { calculatedAt: string }; return { ...parsed, calculatedAt: new Date(parsed.calculatedAt) }; } } catch (err) { logger.warn({ err, interestId }, 'Redis cache read failed for interest score'); } // 1. Pipeline age const pipelineAge = scorePipelineAge(interest.createdAt); // 2. Stage speed const stageSpeed = scoreStageSpeed(interest.createdAt, interest.pipelineStage); // 3. Document completeness const documentCompleteness = scoreDocumentCompleteness(interest); // 4. Engagement - notes, emails, reminders in last 30 days const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); const [notesResult, remindersResult, emailResult, berthLinkResult] = await Promise.all([ db .select({ value: count() }) .from(interestNotes) .where( and(eq(interestNotes.interestId, interestId), gte(interestNotes.createdAt, thirtyDaysAgo)), ), db .select({ value: count() }) .from(reminders) .where( and( eq(reminders.interestId, interestId), eq(reminders.status, 'completed'), gte(reminders.completedAt, thirtyDaysAgo), ), ), db .select({ value: count() }) .from(emailThreads) .where( and( eq(emailThreads.clientId, interest.clientId), eq(emailThreads.portId, portId), gte(emailThreads.lastMessageAt, thirtyDaysAgo), ), ), // Plan §3.4: any junction row counts as "berth linked", not just the // primary - the score awards engagement for an interest that has *any* // berth association at all. db .select({ value: count() }) .from(interestBerths) .where(eq(interestBerths.interestId, interestId)), ]); const notesCount = notesResult[0]?.value ?? 0; const remindersCount = remindersResult[0]?.value ?? 0; const emailCount = emailResult[0]?.value ?? 0; const notesScore = Math.min(notesCount * 10, 50); const emailScore = Math.min(emailCount * 5, 30); const remindersScore = Math.min(remindersCount * 10, 20); const engagement = Math.min(notesScore + emailScore + remindersScore, 100); // 5. Berth linked - true when the interest has at least one junction row. const berthLinked = (berthLinkResult[0]?.value ?? 0) > 0 ? 25 : 0; // ── Normalise: max raw = 100+100+100+100+25 = 425 → /425 * 100 ── const RAW_MAX = 425; const rawTotal = pipelineAge + stageSpeed + documentCompleteness + engagement + berthLinked; const totalScore = Math.round((rawTotal / RAW_MAX) * 100); const result: InterestScore = { totalScore, breakdown: { pipelineAge, stageSpeed, documentCompleteness, engagement, berthLinked, }, calculatedAt: new Date(), }; // Write to cache (fire-and-forget) redis .setex(SCORE_KEY(portId, interestId), SCORE_TTL, JSON.stringify(result)) .catch((err) => logger.warn({ err, interestId }, 'Redis cache write failed for interest score'), ); return result; } // ─── Bulk scoring ───────────────────────────────────────────────────────────── /** * Score every active interest in a port. The previous implementation * fanned out one scoring call per interest, each issuing 1 redis read + * 1 interests.findFirst + 4 count queries → 6N round trips per * dashboard render (≈6000 for a 1k-interest port). Cold-cache flushes * pegged the API for a couple of seconds. * * The new path replaces those 4N count queries with 4 grouped queries * (one per dimension, filtered by inArray on the port's interest ids) * and merges in JS. The redis cache is still consulted, but only as a * map merged onto the freshly computed scores so cached values short- * circuit recomputation without re-issuing the per-row count fan-out. */ export async function calculateBulkScores( portId: string, ): Promise> { const allInterests = await db .select({ id: interests.id, clientId: interests.clientId, pipelineStage: interests.pipelineStage, createdAt: interests.createdAt, eoiStatus: interests.eoiStatus, contractStatus: interests.contractStatus, depositStatus: interests.depositStatus, dateEoiSigned: interests.dateEoiSigned, dateContractSigned: interests.dateContractSigned, dateDepositReceived: interests.dateDepositReceived, }) .from(interests) .where(and(eq(interests.portId, portId), isNull(interests.archivedAt))); if (allInterests.length === 0) return []; const ids = allInterests.map((i) => i.id); const clientIds = Array.from( new Set(allInterests.map((i) => i.clientId).filter((v): v is string => Boolean(v))), ); const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // Four grouped aggregates against the port's interest set. Each is a // single index-friendly scan on `interest_id` (or `client_id` for the // email-threads case) - no per-row round trips. const [notesGrouped, remindersGrouped, emailsGrouped, berthLinksGrouped] = await Promise.all([ db .select({ interestId: interestNotes.interestId, value: count() }) .from(interestNotes) .where( and(inArray(interestNotes.interestId, ids), gte(interestNotes.createdAt, thirtyDaysAgo)), ) .groupBy(interestNotes.interestId), db .select({ interestId: reminders.interestId, value: count() }) .from(reminders) .where( and( inArray(reminders.interestId, ids), eq(reminders.status, 'completed'), gte(reminders.completedAt, thirtyDaysAgo), ), ) .groupBy(reminders.interestId), clientIds.length > 0 ? db .select({ clientId: emailThreads.clientId, value: count() }) .from(emailThreads) .where( and( inArray(emailThreads.clientId, clientIds), eq(emailThreads.portId, portId), gte(emailThreads.lastMessageAt, thirtyDaysAgo), ), ) .groupBy(emailThreads.clientId) : Promise.resolve([] as Array<{ clientId: string | null; value: number }>), db .select({ interestId: interestBerths.interestId, value: count() }) .from(interestBerths) .where(inArray(interestBerths.interestId, ids)) .groupBy(interestBerths.interestId), ]); const notesByInterest = new Map( notesGrouped .filter((r): r is { interestId: string; value: number } => r.interestId !== null) .map((r) => [r.interestId, r.value]), ); const remindersByInterest = new Map( remindersGrouped .filter((r): r is { interestId: string; value: number } => r.interestId !== null) .map((r) => [r.interestId, r.value]), ); const emailsByClient = new Map( emailsGrouped .filter((r): r is { clientId: string; value: number } => r.clientId !== null) .map((r) => [r.clientId, r.value]), ); const berthLinksByInterest = new Map( berthLinksGrouped .filter((r): r is { interestId: string; value: number } => r.interestId !== null) .map((r) => [r.interestId, r.value]), ); const RAW_MAX = 425; const calculatedAt = new Date(); const calculatedAtIso = calculatedAt.toISOString(); const scored = allInterests.map((interest) => { const pipelineAge = scorePipelineAge(interest.createdAt); const stageSpeed = scoreStageSpeed(interest.createdAt, interest.pipelineStage); const documentCompleteness = scoreDocumentCompleteness({ eoiStatus: interest.eoiStatus, contractStatus: interest.contractStatus, depositStatus: interest.depositStatus, dateEoiSigned: interest.dateEoiSigned, dateContractSigned: interest.dateContractSigned, dateDepositReceived: interest.dateDepositReceived, }); const notesCount = notesByInterest.get(interest.id) ?? 0; const remindersCount = remindersByInterest.get(interest.id) ?? 0; const emailCount = interest.clientId ? (emailsByClient.get(interest.clientId) ?? 0) : 0; const notesScore = Math.min(notesCount * 10, 50); const emailScore = Math.min(emailCount * 5, 30); const remindersScore = Math.min(remindersCount * 10, 20); const engagement = Math.min(notesScore + emailScore + remindersScore, 100); const berthLinked = (berthLinksByInterest.get(interest.id) ?? 0) > 0 ? 25 : 0; const rawTotal = pipelineAge + stageSpeed + documentCompleteness + engagement + berthLinked; const totalScore = Math.round((rawTotal / RAW_MAX) * 100); const score: InterestScore = { totalScore, breakdown: { pipelineAge, stageSpeed, documentCompleteness, engagement, berthLinked }, calculatedAt, }; return { interestId: interest.id, score }; }); // Refresh the redis cache for each interest in a single pipeline so // single-interest reads downstream short-circuit the per-row queries. // Fire-and-forget - bulk scoring's correctness doesn't depend on the // cache write succeeding. redis .pipeline( scored.map(({ interestId, score }) => [ 'setex', SCORE_KEY(portId, interestId), SCORE_TTL, JSON.stringify({ ...score, calculatedAt: calculatedAtIso }), ]), ) .exec() .catch((err) => logger.warn({ err, portId }, 'Redis bulk cache write failed')); return scored; }