/** * Phase B analytics service. Reads pre-computed snapshots from * `analytics_snapshots`; recomputes on demand if older than `SNAPSHOT_TTL_MS`. * The recurring `analytics-refresh` BullMQ job (PR3) warms the table * every 15 minutes per port × per metric. */ import { and, eq, gte, isNull, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { analyticsSnapshots } from '@/lib/db/schema/insights'; import { interests } from '@/lib/db/schema/interests'; import { invoices } from '@/lib/db/schema/financial'; import { berthReservations } from '@/lib/db/schema/reservations'; import { PIPELINE_STAGES } from '@/lib/constants'; export type DateRange = '7d' | '30d' | '90d' | 'today'; export type MetricBase = | 'pipeline_funnel' | 'occupancy_timeline' | 'revenue_breakdown' | 'lead_source_attribution'; export type MetricId = `${MetricBase}.${DateRange}`; export const ALL_RANGES: readonly DateRange[] = ['today', '7d', '30d', '90d'] as const; export const ALL_METRICS: readonly MetricBase[] = [ 'pipeline_funnel', 'occupancy_timeline', 'revenue_breakdown', 'lead_source_attribution', ] as const; export const SNAPSHOT_TTL_MS = 15 * 60 * 1000; // 15 minutes // ─── Output shapes ──────────────────────────────────────────────────────────── export interface PipelineFunnelData { stages: Array<{ stage: string; count: number; conversionPct: number }>; /** Counts of terminal lost/cancelled outcomes in the range. Surfaces below * the funnel so users can see leakage without it polluting the conversion * math. Total = sum of these counts. */ lost: { count: number; byOutcome: Record }; } export interface OccupancyTimelineData { points: Array<{ date: string; occupied: number; total: number; occupancyPct: number }>; } export interface RevenueBreakdownData { bars: Array<{ status: string; amount: number; currency: string }>; } export interface LeadSourceAttributionData { slices: Array<{ source: string; count: number }>; } export type SnapshotData = | PipelineFunnelData | OccupancyTimelineData | RevenueBreakdownData | LeadSourceAttributionData; // ─── Cache layer ────────────────────────────────────────────────────────────── export async function readSnapshot( portId: string, metricId: MetricId, ): Promise { const row = await db.query.analyticsSnapshots.findFirst({ where: and(eq(analyticsSnapshots.portId, portId), eq(analyticsSnapshots.metricId, metricId)), }); if (!row) return null; const age = Date.now() - row.computedAt.getTime(); if (age > SNAPSHOT_TTL_MS) return null; return row.data as T; } export async function writeSnapshot( portId: string, metricId: MetricId, data: SnapshotData, ): Promise { await db .insert(analyticsSnapshots) .values({ portId, metricId, data }) .onConflictDoUpdate({ target: [analyticsSnapshots.portId, analyticsSnapshots.metricId], set: { data, computedAt: new Date() }, }); } // ─── Range helpers ──────────────────────────────────────────────────────────── function rangeToCutoff(range: DateRange): Date { const now = Date.now(); switch (range) { case 'today': return new Date(now - 1 * 86_400_000); case '7d': return new Date(now - 7 * 86_400_000); case '30d': return new Date(now - 30 * 86_400_000); case '90d': return new Date(now - 90 * 86_400_000); } } function rangeToDays(range: DateRange): number { switch (range) { case 'today': return 1; case '7d': return 7; case '30d': return 30; case '90d': return 90; } } // ─── Computations ───────────────────────────────────────────────────────────── export async function computePipelineFunnel( portId: string, range: DateRange, ): Promise { const cutoff = rangeToCutoff(range); // Stage counts EXCLUDE lost/cancelled outcomes — those never become // conversions, so polluting the funnel with them gives meaningless math. // Lost is reported separately in the `lost` block. const stageRows = await db .select({ stage: interests.pipelineStage, count: sql`count(*)::int` }) .from(interests) .where( and( eq(interests.portId, portId), isNull(interests.archivedAt), gte(interests.createdAt, cutoff), sql`(${interests.outcome} IS NULL OR ${interests.outcome} = 'won')`, ), ) .groupBy(interests.pipelineStage); const counts = new Map(stageRows.map((r) => [r.stage, r.count])); const top = counts.get('open') ?? 0; const stages = PIPELINE_STAGES.map((stage) => { const count = counts.get(stage) ?? 0; const conversionPct = top === 0 ? 0 : Math.round((count / top) * 1000) / 10; return { stage, count, conversionPct }; }); // Lost / cancelled summary. Same date-range filter as the funnel. const lostRows = await db .select({ outcome: interests.outcome, count: sql`count(*)::int` }) .from(interests) .where( and( eq(interests.portId, portId), isNull(interests.archivedAt), gte(interests.createdAt, cutoff), sql`${interests.outcome} IS NOT NULL AND ${interests.outcome} != 'won'`, ), ) .groupBy(interests.outcome); const byOutcome: Record = {}; let lostTotal = 0; for (const row of lostRows) { if (!row.outcome) continue; byOutcome[row.outcome] = row.count; lostTotal += row.count; } return { stages, lost: { count: lostTotal, byOutcome } }; } export async function computeOccupancyTimeline( portId: string, range: DateRange, ): Promise { const days = rangeToDays(range); // Total berths per port (current count — assumes no churn). const totalRow = await db.execute<{ total: number }>( sql`SELECT count(*)::int AS total FROM berths WHERE port_id = ${portId}`, ); const total = totalRow[0]?.total ?? 0; // For each day in range, count berths that have an active reservation // covering that day. A reservation is "covering" if start_date <= day // AND (end_date IS NULL OR end_date >= day). const points: OccupancyTimelineData['points'] = []; for (let i = days - 1; i >= 0; i--) { const day = new Date(Date.now() - i * 86_400_000); const dayStr = day.toISOString().slice(0, 10); const occRow = await db .select({ occupied: sql`count(distinct ${berthReservations.berthId})::int` }) .from(berthReservations) .where( and( eq(berthReservations.portId, portId), eq(berthReservations.status, 'active'), sql`${berthReservations.startDate} <= ${dayStr}::date`, sql`(${berthReservations.endDate} IS NULL OR ${berthReservations.endDate} >= ${dayStr}::date)`, ), ); const occupied = occRow[0]?.occupied ?? 0; const occupancyPct = total === 0 ? 0 : Math.round((occupied / total) * 1000) / 10; points.push({ date: dayStr, occupied, total, occupancyPct }); } return { points }; } export async function computeRevenueBreakdown( portId: string, range: DateRange, ): Promise { const cutoff = rangeToCutoff(range); const rows = await db .select({ status: invoices.status, currency: invoices.currency, amount: sql`coalesce(sum(${invoices.total}), 0)::text`, }) .from(invoices) .where( and( eq(invoices.portId, portId), isNull(invoices.archivedAt), gte(invoices.createdAt, cutoff), ), ) .groupBy(invoices.status, invoices.currency); return { bars: rows.map((r) => ({ status: r.status, currency: r.currency, amount: Number(r.amount), })), }; } export async function computeLeadSourceAttribution( portId: string, range: DateRange, ): Promise { const cutoff = rangeToCutoff(range); const rows = await db .select({ source: interests.source, count: sql`count(*)::int` }) .from(interests) .where( and( eq(interests.portId, portId), isNull(interests.archivedAt), gte(interests.createdAt, cutoff), ), ) .groupBy(interests.source); return { slices: rows .map((r) => ({ source: r.source ?? 'unspecified', count: r.count, })) .sort((a, b) => b.count - a.count), }; } // ─── Public read API (cache → compute → write back) ────────────────────────── export async function getPipelineFunnel( portId: string, range: DateRange, ): Promise { const metricId = `pipeline_funnel.${range}` as const; const cached = await readSnapshot(portId, metricId); if (cached) return cached; const fresh = await computePipelineFunnel(portId, range); await writeSnapshot(portId, metricId, fresh); return fresh; } export async function getOccupancyTimeline( portId: string, range: DateRange, ): Promise { const metricId = `occupancy_timeline.${range}` as const; const cached = await readSnapshot(portId, metricId); if (cached) return cached; const fresh = await computeOccupancyTimeline(portId, range); await writeSnapshot(portId, metricId, fresh); return fresh; } export async function getRevenueBreakdown( portId: string, range: DateRange, ): Promise { const metricId = `revenue_breakdown.${range}` as const; const cached = await readSnapshot(portId, metricId); if (cached) return cached; const fresh = await computeRevenueBreakdown(portId, range); await writeSnapshot(portId, metricId, fresh); return fresh; } export async function getLeadSourceAttribution( portId: string, range: DateRange, ): Promise { const metricId = `lead_source_attribution.${range}` as const; const cached = await readSnapshot(portId, metricId); if (cached) return cached; const fresh = await computeLeadSourceAttribution(portId, range); await writeSnapshot(portId, metricId, fresh); return fresh; } // ─── Cron entrypoint: warm every (port × metric × range) ──────────────────── export async function refreshSnapshotsForPort(portId: string): Promise { for (const range of ALL_RANGES) { const [funnel, occupancy, revenue, leadSource] = await Promise.all([ computePipelineFunnel(portId, range), computeOccupancyTimeline(portId, range), computeRevenueBreakdown(portId, range), computeLeadSourceAttribution(portId, range), ]); await Promise.all([ writeSnapshot(portId, `pipeline_funnel.${range}`, funnel), writeSnapshot(portId, `occupancy_timeline.${range}`, occupancy), writeSnapshot(portId, `revenue_breakdown.${range}`, revenue), writeSnapshot(portId, `lead_source_attribution.${range}`, leadSource), ]); } }