/** * Phase B analytics service. Reads pre-computed snapshots from * `analytics_snapshots`; recomputes on demand if older than `SNAPSHOT_TTL_MS`. * The recurring `analytics-refresh` BullMQ job (PR3) warms the table * every 15 minutes per port × per metric. */ import { and, between, eq, isNull, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { analyticsSnapshots } from '@/lib/db/schema/insights'; import { interests } from '@/lib/db/schema/interests'; import { PIPELINE_STAGES } from '@/lib/constants'; import { ALL_RANGES, isCustomRange, rangeToBounds, type CustomDateRange, type DateRange, type PresetDateRange, } from '@/lib/analytics/range'; // Re-export the shared types for callers that already import from this // module - keeps the existing public API intact. export { ALL_RANGES, isCustomRange, rangeToBounds }; export type { DateRange, PresetDateRange, CustomDateRange }; export type MetricBase = 'pipeline_funnel' | 'occupancy_timeline' | 'lead_source_attribution'; /** * Snapshot key. Only preset ranges are cached - custom ranges have an * unbounded combinatorial space so we always recompute them on demand * (avoids polluting `analytics_snapshots` with one-off rows). */ export type MetricId = `${MetricBase}.${PresetDateRange}`; export const ALL_METRICS: readonly MetricBase[] = [ 'pipeline_funnel', 'occupancy_timeline', 'lead_source_attribution', ] as const; export const SNAPSHOT_TTL_MS = 15 * 60 * 1000; // 15 minutes // ─── Output shapes ──────────────────────────────────────────────────────────── export interface PipelineFunnelData { stages: Array<{ stage: string; count: number; conversionPct: number }>; /** Counts of terminal lost/cancelled outcomes in the range. Surfaces below * the funnel so users can see leakage without it polluting the conversion * math. Total = sum of these counts. */ lost: { count: number; byOutcome: Record }; } export interface OccupancyTimelineData { points: Array<{ date: string; occupied: number; total: number; occupancyPct: number }>; } export interface LeadSourceAttributionData { slices: Array<{ source: string; count: number }>; } export type SnapshotData = PipelineFunnelData | OccupancyTimelineData | LeadSourceAttributionData; // ─── Cache layer ────────────────────────────────────────────────────────────── export async function readSnapshot( portId: string, metricId: MetricId, ): Promise { const row = await db.query.analyticsSnapshots.findFirst({ where: and(eq(analyticsSnapshots.portId, portId), eq(analyticsSnapshots.metricId, metricId)), }); if (!row) return null; const age = Date.now() - row.computedAt.getTime(); if (age > SNAPSHOT_TTL_MS) return null; return row.data as T; } export async function writeSnapshot( portId: string, metricId: MetricId, data: SnapshotData, ): Promise { await db .insert(analyticsSnapshots) .values({ portId, metricId, data }) .onConflictDoUpdate({ target: [analyticsSnapshots.portId, analyticsSnapshots.metricId], set: { data, computedAt: new Date() }, }); } // Range helpers (rangeToBounds, rangeToDays, rangeSpanDays) moved to // @/lib/analytics/range - that file is client-safe (no DB imports) so it // can be used from React components AND this server module. // ─── Computations ───────────────────────────────────────────────────────────── export async function computePipelineFunnel( portId: string, range: DateRange, ): Promise { const { from, to } = rangeToBounds(range); // Stage counts EXCLUDE lost/cancelled outcomes - those never become // conversions, so polluting the funnel with them gives meaningless math. // Lost is reported separately in the `lost` block. const stageRows = await db .select({ stage: interests.pipelineStage, count: sql`count(*)::int` }) .from(interests) .where( and( eq(interests.portId, portId), isNull(interests.archivedAt), between(interests.createdAt, from, to), sql`(${interests.outcome} IS NULL OR ${interests.outcome} = 'won')`, ), ) .groupBy(interests.pipelineStage); const counts = new Map(stageRows.map((r) => [r.stage, r.count])); // First stage in the canonical order anchors the conversion percentage. const top = counts.get(PIPELINE_STAGES[0]) ?? 0; const stages = PIPELINE_STAGES.map((stage) => { const count = counts.get(stage) ?? 0; const conversionPct = top === 0 ? 0 : Math.round((count / top) * 1000) / 10; return { stage, count, conversionPct }; }); // Lost / cancelled summary. Same date-range filter as the funnel. const lostRows = await db .select({ outcome: interests.outcome, count: sql`count(*)::int` }) .from(interests) .where( and( eq(interests.portId, portId), isNull(interests.archivedAt), between(interests.createdAt, from, to), sql`${interests.outcome} IS NOT NULL AND ${interests.outcome} != 'won'`, ), ) .groupBy(interests.outcome); const byOutcome: Record = {}; let lostTotal = 0; for (const row of lostRows) { if (!row.outcome) continue; byOutcome[row.outcome] = row.count; lostTotal += row.count; } return { stages, lost: { count: lostTotal, byOutcome } }; } export async function computeOccupancyTimeline( portId: string, range: DateRange, ): Promise { const { from, to } = rangeToBounds(range); // Total berths per port (current count - assumes no churn). const totalRow = await db.execute<{ total: number }>( sql`SELECT count(*)::int AS total FROM berths WHERE port_id = ${portId}`, ); const total = totalRow[0]?.total ?? 0; // Occupancy = cumulative count of berths sold (i.e. won deals) on or // before each day. Per 2026-05-14 decision, the canonical occupancy // signal is "the deal closed and money changed hands" - reservations // are merely holds and don't count as occupied. Sources from // `interests.outcome='won'` + `outcome_at::date`; primary-berth link // via `interest_berths` so multi-berth deals contribute every linked // berth once. Single round-trip via generate_series cross-join with a // sold_berths CTE. const fromStr = from.toISOString().slice(0, 10); const toStr = new Date(to.getTime() - 86_400_000).toISOString().slice(0, 10); const rows = await db.execute<{ day: string; occupied: number }>( sql` WITH days AS ( SELECT generate_series(${fromStr}::date, ${toStr}::date, '1 day'::interval)::date AS day ), sold_berths AS ( SELECT DISTINCT ib.berth_id, (i.outcome_at AT TIME ZONE 'UTC')::date AS sold_on FROM interests i INNER JOIN interest_berths ib ON ib.interest_id = i.id WHERE i.port_id = ${portId} AND i.outcome = 'won' AND i.outcome_at IS NOT NULL AND i.archived_at IS NULL ) SELECT to_char(days.day, 'YYYY-MM-DD') AS day, COUNT(DISTINCT sb.berth_id)::int AS occupied FROM days LEFT JOIN sold_berths sb ON sb.sold_on <= days.day GROUP BY days.day ORDER BY days.day `, ); const points: OccupancyTimelineData['points'] = rows.map((r) => { const occupied = Number(r.occupied) || 0; const occupancyPct = total === 0 ? 0 : Math.round((occupied / total) * 1000) / 10; return { date: r.day, occupied, total, occupancyPct }; }); return { points }; } export async function computeLeadSourceAttribution( portId: string, range: DateRange, ): Promise { const { from, to } = rangeToBounds(range); const rows = await db .select({ source: interests.source, count: sql`count(*)::int` }) .from(interests) .where( and( eq(interests.portId, portId), isNull(interests.archivedAt), between(interests.createdAt, from, to), ), ) .groupBy(interests.source); return { slices: rows .map((r) => ({ source: r.source ?? 'unspecified', count: r.count, })) .sort((a, b) => b.count - a.count), }; } // ─── Public read API (cache → compute → write back) ────────────────────────── // // Custom ranges always recompute (cache key would be unbounded). Preset // ranges go cache → compute → write-back as before. export async function getPipelineFunnel( portId: string, range: DateRange, ): Promise { if (isCustomRange(range)) return computePipelineFunnel(portId, range); const metricId = `pipeline_funnel.${range}` as const; const cached = await readSnapshot(portId, metricId); if (cached) return cached; const fresh = await computePipelineFunnel(portId, range); await writeSnapshot(portId, metricId, fresh); return fresh; } export async function getOccupancyTimeline( portId: string, range: DateRange, ): Promise { if (isCustomRange(range)) return computeOccupancyTimeline(portId, range); const metricId = `occupancy_timeline.${range}` as const; const cached = await readSnapshot(portId, metricId); if (cached) return cached; const fresh = await computeOccupancyTimeline(portId, range); await writeSnapshot(portId, metricId, fresh); return fresh; } export async function getLeadSourceAttribution( portId: string, range: DateRange, ): Promise { if (isCustomRange(range)) return computeLeadSourceAttribution(portId, range); const metricId = `lead_source_attribution.${range}` as const; const cached = await readSnapshot(portId, metricId); if (cached) return cached; const fresh = await computeLeadSourceAttribution(portId, range); await writeSnapshot(portId, metricId, fresh); return fresh; } // ─── Cron entrypoint: warm every (port × metric × range) ──────────────────── export async function refreshSnapshotsForPort(portId: string): Promise { for (const range of ALL_RANGES) { const [funnel, occupancy, leadSource] = await Promise.all([ computePipelineFunnel(portId, range), computeOccupancyTimeline(portId, range), computeLeadSourceAttribution(portId, range), ]); await Promise.all([ writeSnapshot(portId, `pipeline_funnel.${range}`, funnel), writeSnapshot(portId, `occupancy_timeline.${range}`, occupancy), writeSnapshot(portId, `lead_source_attribution.${range}`, leadSource), ]); } }