From 01b201e1a258e66e9b2fe0703ad2030f43c00606 Mon Sep 17 00:00:00 2001 From: Matt Ciaccio Date: Tue, 28 Apr 2026 14:54:46 +0200 Subject: [PATCH] feat(analytics): real computations + 15-min snapshot refresh job MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR3 of Phase B. Replaces the no-op stubs in analytics.service.ts with working drizzle queries and adds the recurring BullMQ job that warms the cache. Computations: - computePipelineFunnel: groups interests by pipeline_stage filtered by port + range + not archived; emits 8-row stages array with conversion pct relative to 'open' as the funnel top. - computeOccupancyTimeline: per day in range, counts berths covered by an active reservation (start_date ≤ day, end_date IS NULL OR ≥ day); emits {date, occupied, total, occupancyPct}. - computeRevenueBreakdown: sums invoices.total grouped by status + currency; filters out archived rows. - computeLeadSourceAttribution: counts interests by source descending; null source bucketed as 'unspecified'. Public API (getPipelineFunnel, getOccupancyTimeline, etc.) reads analytics_snapshots first; falls back to compute + writeSnapshot. TTL 15 minutes (matches the cron interval). Cron: - queue/scheduler.ts registers 'analytics-refresh' on maintenance with pattern '*/15 * * * *'. - queue/workers/maintenance.ts dispatches to refreshSnapshotsForPort for every port; per-port try/catch so one bad port doesn't kill the sweep. Tests: tests/integration/analytics-service.test.ts (9 cases). Pipeline funnel math (incl. zero state), occupancy timeline shape/percentages with seeded reservations, revenue grouped by status + currency, lead source attribution incl. null bucketing, cache hit (mutate snapshot directly → next read returns mutated value), refreshSnapshotsForPort warms every metric×range combo. Vitest 690/690 (+9). tsc + lint clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/lib/queue/scheduler.ts | 2 + src/lib/queue/workers/maintenance.ts | 14 + src/lib/services/analytics.service.ts | 274 +++++++++++++++++--- tests/integration/analytics-service.test.ts | 208 +++++++++++++++ 4 files changed, 468 insertions(+), 30 deletions(-) create mode 100644 tests/integration/analytics-service.test.ts diff --git a/src/lib/queue/scheduler.ts b/src/lib/queue/scheduler.ts index f7b15ba..9ca8162 100644 --- a/src/lib/queue/scheduler.ts +++ b/src/lib/queue/scheduler.ts @@ -50,6 +50,8 @@ export async function registerRecurringJobs(): Promise { // Phase B: alert rule engine sweep { queue: 'maintenance', name: 'alerts-evaluate', pattern: '*/5 * * * *' }, + // Phase B: analytics snapshot warm + { queue: 'maintenance', name: 'analytics-refresh', pattern: '*/15 * * * *' }, ]; for (const job of recurring) { diff --git a/src/lib/queue/workers/maintenance.ts b/src/lib/queue/workers/maintenance.ts index f55624c..4f14272 100644 --- a/src/lib/queue/workers/maintenance.ts +++ b/src/lib/queue/workers/maintenance.ts @@ -34,6 +34,20 @@ export const maintenanceWorker = new Worker( logger.info(summary, 'Alert engine sweep complete'); break; } + case 'analytics-refresh': { + const { ports } = await import('@/lib/db/schema/ports'); + const { refreshSnapshotsForPort } = await import('@/lib/services/analytics.service'); + const allPorts = await db.select({ id: ports.id }).from(ports); + for (const p of allPorts) { + try { + await refreshSnapshotsForPort(p.id); + } catch (err) { + logger.warn({ portId: p.id, err }, 'Analytics refresh failed for port'); + } + } + logger.info({ count: allPorts.length }, 'Analytics snapshot refresh complete'); + break; + } default: logger.warn({ jobName: job.name }, 'Unknown maintenance job'); } diff --git a/src/lib/services/analytics.service.ts b/src/lib/services/analytics.service.ts index b1852dd..0bba592 100644 --- a/src/lib/services/analytics.service.ts +++ b/src/lib/services/analytics.service.ts @@ -1,35 +1,50 @@ /** * Phase B analytics service. Reads pre-computed snapshots from - * `analytics_snapshots` keyed by `metric_id` and recomputes on demand if - * the cached row is older than `SNAPSHOT_TTL_MS`. The recomputation jobs - * land in `analytics-snapshot-job.ts` (PR3). + * `analytics_snapshots`; recomputes on demand if older than `SNAPSHOT_TTL_MS`. + * The recurring `analytics-refresh` BullMQ job (PR3) warms the table + * every 15 minutes per port × per metric. */ -import { and, eq } from 'drizzle-orm'; +import { and, eq, gte, isNull, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { analyticsSnapshots } from '@/lib/db/schema/insights'; +import { interests } from '@/lib/db/schema/interests'; +import { invoices } from '@/lib/db/schema/financial'; +import { berthReservations } from '@/lib/db/schema/reservations'; export type DateRange = '7d' | '30d' | '90d' | 'today'; -export type MetricId = - | `pipeline_funnel.${DateRange}` - | `occupancy_timeline.${DateRange}` - | `revenue_breakdown.${DateRange}` - | `lead_source_attribution.${DateRange}`; +export type MetricBase = + | 'pipeline_funnel' + | 'occupancy_timeline' + | 'revenue_breakdown' + | 'lead_source_attribution'; + +export type MetricId = `${MetricBase}.${DateRange}`; + +export const ALL_RANGES: readonly DateRange[] = ['today', '7d', '30d', '90d'] as const; +export const ALL_METRICS: readonly MetricBase[] = [ + 'pipeline_funnel', + 'occupancy_timeline', + 'revenue_breakdown', + 'lead_source_attribution', +] as const; export const SNAPSHOT_TTL_MS = 15 * 60 * 1000; // 15 minutes +// ─── Output shapes ──────────────────────────────────────────────────────────── + export interface PipelineFunnelData { stages: Array<{ stage: string; count: number; conversionPct: number }>; } export interface OccupancyTimelineData { - points: Array<{ date: string; available: number; underOffer: number; sold: number }>; + points: Array<{ date: string; occupied: number; total: number; occupancyPct: number }>; } export interface RevenueBreakdownData { - bars: Array<{ category: string; amount: number; currency: string }>; + bars: Array<{ status: string; amount: number; currency: string }>; } export interface LeadSourceAttributionData { @@ -42,11 +57,8 @@ export type SnapshotData = | RevenueBreakdownData | LeadSourceAttributionData; -/** - * Read a snapshot by `(portId, metricId)`. Returns null when missing or - * stale; the caller should request a recompute (or the recurring job - * eventually fills it). - */ +// ─── Cache layer ────────────────────────────────────────────────────────────── + export async function readSnapshot( portId: string, metricId: MetricId, @@ -74,33 +86,235 @@ export async function writeSnapshot( }); } -// Computation entrypoints — bodies land in PR3 along with the recurring -// snapshot job. Exported as no-op stubs so PR1's tsc/lint stay green. +// ─── Range helpers ──────────────────────────────────────────────────────────── + +function rangeToCutoff(range: DateRange): Date { + const now = Date.now(); + switch (range) { + case 'today': + return new Date(now - 1 * 86_400_000); + case '7d': + return new Date(now - 7 * 86_400_000); + case '30d': + return new Date(now - 30 * 86_400_000); + case '90d': + return new Date(now - 90 * 86_400_000); + } +} + +function rangeToDays(range: DateRange): number { + switch (range) { + case 'today': + return 1; + case '7d': + return 7; + case '30d': + return 30; + case '90d': + return 90; + } +} + +// ─── Computations ───────────────────────────────────────────────────────────── + +const PIPELINE_STAGES = [ + 'open', + 'details_sent', + 'in_communication', + 'visited', + 'signed_eoi_nda', + 'deposit_10pct', + 'contract', + 'completed', +] as const; export async function computePipelineFunnel( - _portId: string, - _range: DateRange, + portId: string, + range: DateRange, ): Promise { - return { stages: [] }; + const cutoff = rangeToCutoff(range); + const rows = await db + .select({ stage: interests.pipelineStage, count: sql`count(*)::int` }) + .from(interests) + .where( + and( + eq(interests.portId, portId), + isNull(interests.archivedAt), + gte(interests.createdAt, cutoff), + ), + ) + .groupBy(interests.pipelineStage); + + const counts = new Map(rows.map((r) => [r.stage, r.count])); + const top = counts.get('open') ?? 0; + + const stages = PIPELINE_STAGES.map((stage) => { + const count = counts.get(stage) ?? 0; + const conversionPct = top === 0 ? 0 : Math.round((count / top) * 1000) / 10; + return { stage, count, conversionPct }; + }); + + return { stages }; } export async function computeOccupancyTimeline( - _portId: string, - _range: DateRange, + portId: string, + range: DateRange, ): Promise { - return { points: [] }; + const days = rangeToDays(range); + // Total berths per port (current count — assumes no churn). + const totalRow = await db.execute<{ total: number }>( + sql`SELECT count(*)::int AS total FROM berths WHERE port_id = ${portId}`, + ); + const total = totalRow[0]?.total ?? 0; + + // For each day in range, count berths that have an active reservation + // covering that day. A reservation is "covering" if start_date <= day + // AND (end_date IS NULL OR end_date >= day). + const points: OccupancyTimelineData['points'] = []; + for (let i = days - 1; i >= 0; i--) { + const day = new Date(Date.now() - i * 86_400_000); + const dayStr = day.toISOString().slice(0, 10); + const occRow = await db + .select({ occupied: sql`count(distinct ${berthReservations.berthId})::int` }) + .from(berthReservations) + .where( + and( + eq(berthReservations.portId, portId), + eq(berthReservations.status, 'active'), + sql`${berthReservations.startDate} <= ${dayStr}::date`, + sql`(${berthReservations.endDate} IS NULL OR ${berthReservations.endDate} >= ${dayStr}::date)`, + ), + ); + const occupied = occRow[0]?.occupied ?? 0; + const occupancyPct = total === 0 ? 0 : Math.round((occupied / total) * 1000) / 10; + points.push({ date: dayStr, occupied, total, occupancyPct }); + } + return { points }; } export async function computeRevenueBreakdown( - _portId: string, - _range: DateRange, + portId: string, + range: DateRange, ): Promise { - return { bars: [] }; + const cutoff = rangeToCutoff(range); + const rows = await db + .select({ + status: invoices.status, + currency: invoices.currency, + amount: sql`coalesce(sum(${invoices.total}), 0)::text`, + }) + .from(invoices) + .where( + and( + eq(invoices.portId, portId), + isNull(invoices.archivedAt), + gte(invoices.createdAt, cutoff), + ), + ) + .groupBy(invoices.status, invoices.currency); + + return { + bars: rows.map((r) => ({ + status: r.status, + currency: r.currency, + amount: Number(r.amount), + })), + }; } export async function computeLeadSourceAttribution( - _portId: string, - _range: DateRange, + portId: string, + range: DateRange, ): Promise { - return { slices: [] }; + const cutoff = rangeToCutoff(range); + const rows = await db + .select({ source: interests.source, count: sql`count(*)::int` }) + .from(interests) + .where( + and( + eq(interests.portId, portId), + isNull(interests.archivedAt), + gte(interests.createdAt, cutoff), + ), + ) + .groupBy(interests.source); + + return { + slices: rows + .map((r) => ({ + source: r.source ?? 'unspecified', + count: r.count, + })) + .sort((a, b) => b.count - a.count), + }; +} + +// ─── Public read API (cache → compute → write back) ────────────────────────── + +export async function getPipelineFunnel( + portId: string, + range: DateRange, +): Promise { + const metricId = `pipeline_funnel.${range}` as const; + const cached = await readSnapshot(portId, metricId); + if (cached) return cached; + const fresh = await computePipelineFunnel(portId, range); + await writeSnapshot(portId, metricId, fresh); + return fresh; +} + +export async function getOccupancyTimeline( + portId: string, + range: DateRange, +): Promise { + const metricId = `occupancy_timeline.${range}` as const; + const cached = await readSnapshot(portId, metricId); + if (cached) return cached; + const fresh = await computeOccupancyTimeline(portId, range); + await writeSnapshot(portId, metricId, fresh); + return fresh; +} + +export async function getRevenueBreakdown( + portId: string, + range: DateRange, +): Promise { + const metricId = `revenue_breakdown.${range}` as const; + const cached = await readSnapshot(portId, metricId); + if (cached) return cached; + const fresh = await computeRevenueBreakdown(portId, range); + await writeSnapshot(portId, metricId, fresh); + return fresh; +} + +export async function getLeadSourceAttribution( + portId: string, + range: DateRange, +): Promise { + const metricId = `lead_source_attribution.${range}` as const; + const cached = await readSnapshot(portId, metricId); + if (cached) return cached; + const fresh = await computeLeadSourceAttribution(portId, range); + await writeSnapshot(portId, metricId, fresh); + return fresh; +} + +// ─── Cron entrypoint: warm every (port × metric × range) ──────────────────── + +export async function refreshSnapshotsForPort(portId: string): Promise { + for (const range of ALL_RANGES) { + const [funnel, occupancy, revenue, leadSource] = await Promise.all([ + computePipelineFunnel(portId, range), + computeOccupancyTimeline(portId, range), + computeRevenueBreakdown(portId, range), + computeLeadSourceAttribution(portId, range), + ]); + await Promise.all([ + writeSnapshot(portId, `pipeline_funnel.${range}`, funnel), + writeSnapshot(portId, `occupancy_timeline.${range}`, occupancy), + writeSnapshot(portId, `revenue_breakdown.${range}`, revenue), + writeSnapshot(portId, `lead_source_attribution.${range}`, leadSource), + ]); + } } diff --git a/tests/integration/analytics-service.test.ts b/tests/integration/analytics-service.test.ts new file mode 100644 index 0000000..825adc2 --- /dev/null +++ b/tests/integration/analytics-service.test.ts @@ -0,0 +1,208 @@ +/** + * Analytics service integration tests — exercise the four computations + * against a seeded port + assert the cache layer reads/writes correctly. + */ + +import { describe, it, expect } from 'vitest'; +import { eq, and } from 'drizzle-orm'; + +import { db } from '@/lib/db'; +import { interests } from '@/lib/db/schema/interests'; +import { invoices } from '@/lib/db/schema/financial'; +import { berthReservations } from '@/lib/db/schema/reservations'; +import { analyticsSnapshots } from '@/lib/db/schema/insights'; +import { + computePipelineFunnel, + computeOccupancyTimeline, + computeRevenueBreakdown, + computeLeadSourceAttribution, + getPipelineFunnel, + refreshSnapshotsForPort, + ALL_METRICS, + ALL_RANGES, + SNAPSHOT_TTL_MS, +} from '@/lib/services/analytics.service'; +import { makePort, makeClient, makeBerth, makeYacht } from '../helpers/factories'; + +describe('analytics service', () => { + describe('computePipelineFunnel', () => { + it('aggregates interests by stage with conversion percentages', async () => { + const port = await makePort(); + const client = await makeClient({ portId: port.id }); + // 3 open, 2 details_sent, 1 visited + for (const stage of ['open', 'open', 'open', 'details_sent', 'details_sent', 'visited']) { + await db.insert(interests).values({ + portId: port.id, + clientId: client.id, + pipelineStage: stage, + }); + } + + const result = await computePipelineFunnel(port.id, '30d'); + + const open = result.stages.find((s) => s.stage === 'open'); + const details = result.stages.find((s) => s.stage === 'details_sent'); + const visited = result.stages.find((s) => s.stage === 'visited'); + expect(open?.count).toBe(3); + expect(open?.conversionPct).toBe(100); + expect(details?.count).toBe(2); + expect(details?.conversionPct).toBeCloseTo(66.7, 0); + expect(visited?.count).toBe(1); + expect(visited?.conversionPct).toBeCloseTo(33.3, 0); + }); + + it('returns zeros when port has no interests', async () => { + const port = await makePort(); + const result = await computePipelineFunnel(port.id, '30d'); + expect(result.stages).toHaveLength(8); + expect(result.stages.every((s) => s.count === 0)).toBe(true); + }); + }); + + describe('computeOccupancyTimeline', () => { + it('returns 7 points for 7d range with occupancy percentages', async () => { + const port = await makePort(); + await makeBerth({ portId: port.id }); + await makeBerth({ portId: port.id }); + const client = await makeClient({ portId: port.id }); + const yacht = await makeYacht({ + portId: port.id, + ownerType: 'client', + ownerId: client.id, + }); + const berth = await makeBerth({ portId: port.id }); + // Active reservation covering today + await db.insert(berthReservations).values({ + portId: port.id, + berthId: berth.id, + clientId: client.id, + yachtId: yacht.id, + status: 'active', + startDate: new Date(Date.now() - 5 * 86_400_000), + createdBy: 'seed', + }); + + const result = await computeOccupancyTimeline(port.id, '7d'); + expect(result.points).toHaveLength(7); + // Last point is today; should reflect 1/3 occupancy. + const today = result.points[result.points.length - 1]!; + expect(today.total).toBe(3); + expect(today.occupied).toBe(1); + expect(today.occupancyPct).toBeCloseTo(33.3, 0); + }); + }); + + describe('computeRevenueBreakdown', () => { + it('groups invoice totals by status and currency', async () => { + const port = await makePort(); + const baseInvoice = { + portId: port.id, + clientName: 'Acme', + billingEntityType: 'client' as const, + billingEntityId: 'client-id', + dueDate: '2026-12-31', + currency: 'USD', + subtotal: '0', + createdBy: 'seed', + }; + await db.insert(invoices).values([ + { ...baseInvoice, invoiceNumber: 'INV-001', total: '1000', status: 'paid' }, + { ...baseInvoice, invoiceNumber: 'INV-002', total: '500', status: 'paid' }, + { ...baseInvoice, invoiceNumber: 'INV-003', total: '2000', status: 'sent' }, + ]); + + const result = await computeRevenueBreakdown(port.id, '30d'); + const paid = result.bars.find((b) => b.status === 'paid'); + const sent = result.bars.find((b) => b.status === 'sent'); + expect(paid?.amount).toBe(1500); + expect(sent?.amount).toBe(2000); + }); + }); + + describe('computeLeadSourceAttribution', () => { + it('counts interests grouped by source descending', async () => { + const port = await makePort(); + const client = await makeClient({ portId: port.id }); + for (const source of ['website', 'website', 'website', 'manual', 'referral', 'referral']) { + await db.insert(interests).values({ + portId: port.id, + clientId: client.id, + pipelineStage: 'open', + source, + }); + } + + const result = await computeLeadSourceAttribution(port.id, '30d'); + expect(result.slices[0]).toEqual({ source: 'website', count: 3 }); + expect(result.slices[1]).toEqual({ source: 'referral', count: 2 }); + expect(result.slices[2]).toEqual({ source: 'manual', count: 1 }); + }); + + it('groups null source as "unspecified"', async () => { + const port = await makePort(); + const client = await makeClient({ portId: port.id }); + await db.insert(interests).values({ + portId: port.id, + clientId: client.id, + pipelineStage: 'open', + source: null, + }); + + const result = await computeLeadSourceAttribution(port.id, '30d'); + expect(result.slices.find((s) => s.source === 'unspecified')?.count).toBe(1); + }); + }); + + describe('cache', () => { + it('getPipelineFunnel writes a snapshot and returns it on subsequent calls', async () => { + const port = await makePort(); + const client = await makeClient({ portId: port.id }); + await db.insert(interests).values({ + portId: port.id, + clientId: client.id, + pipelineStage: 'open', + }); + + const first = await getPipelineFunnel(port.id, '30d'); + // Snapshot written. + const row = await db.query.analyticsSnapshots.findFirst({ + where: and( + eq(analyticsSnapshots.portId, port.id), + eq(analyticsSnapshots.metricId, 'pipeline_funnel.30d'), + ), + }); + expect(row).toBeDefined(); + expect(row?.data).toEqual(first); + + // Mutate the snapshot row directly to confirm cache is being read, + // not recomputed. + const sentinel = { stages: [{ stage: 'sentinel', count: 999, conversionPct: 0 }] }; + await db + .update(analyticsSnapshots) + .set({ data: sentinel }) + .where( + and( + eq(analyticsSnapshots.portId, port.id), + eq(analyticsSnapshots.metricId, 'pipeline_funnel.30d'), + ), + ); + const second = await getPipelineFunnel(port.id, '30d'); + expect(second).toEqual(sentinel); + }); + + it('refreshSnapshotsForPort warms every metric × range combo', async () => { + const port = await makePort(); + await refreshSnapshotsForPort(port.id); + const rows = await db + .select({ metricId: analyticsSnapshots.metricId }) + .from(analyticsSnapshots) + .where(eq(analyticsSnapshots.portId, port.id)); + const expected = ALL_METRICS.length * ALL_RANGES.length; + expect(rows).toHaveLength(expected); + }); + + it('snapshot ttl constant is 15 minutes', () => { + expect(SNAPSHOT_TTL_MS).toBe(15 * 60 * 1000); + }); + }); +});