Files
pn-new-crm/tests/integration/analytics-service.test.ts
Matt Ciaccio 01b201e1a2 feat(analytics): real computations + 15-min snapshot refresh job
PR3 of Phase B. Replaces the no-op stubs in analytics.service.ts with
working drizzle queries and adds the recurring BullMQ job that warms
the cache.

Computations:
- computePipelineFunnel: groups interests by pipeline_stage filtered by
  port + range + not archived; emits 8-row stages array with conversion
  pct relative to 'open' as the funnel top.
- computeOccupancyTimeline: per day in range, counts berths covered by
  an active reservation (start_date ≤ day, end_date IS NULL OR ≥ day);
  emits {date, occupied, total, occupancyPct}.
- computeRevenueBreakdown: sums invoices.total grouped by status +
  currency; filters out archived rows.
- computeLeadSourceAttribution: counts interests by source descending;
  null source bucketed as 'unspecified'.

Public API (getPipelineFunnel, getOccupancyTimeline, etc.) reads
analytics_snapshots first; falls back to compute + writeSnapshot. TTL
15 minutes (matches the cron interval).

Cron:
- queue/scheduler.ts registers 'analytics-refresh' on maintenance with
  pattern '*/15 * * * *'.
- queue/workers/maintenance.ts dispatches to refreshSnapshotsForPort
  for every port; per-port try/catch so one bad port doesn't kill the
  sweep.

Tests: tests/integration/analytics-service.test.ts (9 cases). Pipeline
funnel math (incl. zero state), occupancy timeline shape/percentages
with seeded reservations, revenue grouped by status + currency, lead
source attribution incl. null bucketing, cache hit (mutate snapshot
directly → next read returns mutated value), refreshSnapshotsForPort
warms every metric×range combo.

Vitest 690/690 (+9). tsc + lint clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 14:54:46 +02:00

209 lines
7.5 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Analytics service integration tests — exercise the four computations
* against a seeded port + assert the cache layer reads/writes correctly.
*/
import { describe, it, expect } from 'vitest';
import { eq, and } from 'drizzle-orm';
import { db } from '@/lib/db';
import { interests } from '@/lib/db/schema/interests';
import { invoices } from '@/lib/db/schema/financial';
import { berthReservations } from '@/lib/db/schema/reservations';
import { analyticsSnapshots } from '@/lib/db/schema/insights';
import {
computePipelineFunnel,
computeOccupancyTimeline,
computeRevenueBreakdown,
computeLeadSourceAttribution,
getPipelineFunnel,
refreshSnapshotsForPort,
ALL_METRICS,
ALL_RANGES,
SNAPSHOT_TTL_MS,
} from '@/lib/services/analytics.service';
import { makePort, makeClient, makeBerth, makeYacht } from '../helpers/factories';
describe('analytics service', () => {
describe('computePipelineFunnel', () => {
it('aggregates interests by stage with conversion percentages', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
// 3 open, 2 details_sent, 1 visited
for (const stage of ['open', 'open', 'open', 'details_sent', 'details_sent', 'visited']) {
await db.insert(interests).values({
portId: port.id,
clientId: client.id,
pipelineStage: stage,
});
}
const result = await computePipelineFunnel(port.id, '30d');
const open = result.stages.find((s) => s.stage === 'open');
const details = result.stages.find((s) => s.stage === 'details_sent');
const visited = result.stages.find((s) => s.stage === 'visited');
expect(open?.count).toBe(3);
expect(open?.conversionPct).toBe(100);
expect(details?.count).toBe(2);
expect(details?.conversionPct).toBeCloseTo(66.7, 0);
expect(visited?.count).toBe(1);
expect(visited?.conversionPct).toBeCloseTo(33.3, 0);
});
it('returns zeros when port has no interests', async () => {
const port = await makePort();
const result = await computePipelineFunnel(port.id, '30d');
expect(result.stages).toHaveLength(8);
expect(result.stages.every((s) => s.count === 0)).toBe(true);
});
});
describe('computeOccupancyTimeline', () => {
it('returns 7 points for 7d range with occupancy percentages', async () => {
const port = await makePort();
await makeBerth({ portId: port.id });
await makeBerth({ portId: port.id });
const client = await makeClient({ portId: port.id });
const yacht = await makeYacht({
portId: port.id,
ownerType: 'client',
ownerId: client.id,
});
const berth = await makeBerth({ portId: port.id });
// Active reservation covering today
await db.insert(berthReservations).values({
portId: port.id,
berthId: berth.id,
clientId: client.id,
yachtId: yacht.id,
status: 'active',
startDate: new Date(Date.now() - 5 * 86_400_000),
createdBy: 'seed',
});
const result = await computeOccupancyTimeline(port.id, '7d');
expect(result.points).toHaveLength(7);
// Last point is today; should reflect 1/3 occupancy.
const today = result.points[result.points.length - 1]!;
expect(today.total).toBe(3);
expect(today.occupied).toBe(1);
expect(today.occupancyPct).toBeCloseTo(33.3, 0);
});
});
describe('computeRevenueBreakdown', () => {
it('groups invoice totals by status and currency', async () => {
const port = await makePort();
const baseInvoice = {
portId: port.id,
clientName: 'Acme',
billingEntityType: 'client' as const,
billingEntityId: 'client-id',
dueDate: '2026-12-31',
currency: 'USD',
subtotal: '0',
createdBy: 'seed',
};
await db.insert(invoices).values([
{ ...baseInvoice, invoiceNumber: 'INV-001', total: '1000', status: 'paid' },
{ ...baseInvoice, invoiceNumber: 'INV-002', total: '500', status: 'paid' },
{ ...baseInvoice, invoiceNumber: 'INV-003', total: '2000', status: 'sent' },
]);
const result = await computeRevenueBreakdown(port.id, '30d');
const paid = result.bars.find((b) => b.status === 'paid');
const sent = result.bars.find((b) => b.status === 'sent');
expect(paid?.amount).toBe(1500);
expect(sent?.amount).toBe(2000);
});
});
describe('computeLeadSourceAttribution', () => {
it('counts interests grouped by source descending', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
for (const source of ['website', 'website', 'website', 'manual', 'referral', 'referral']) {
await db.insert(interests).values({
portId: port.id,
clientId: client.id,
pipelineStage: 'open',
source,
});
}
const result = await computeLeadSourceAttribution(port.id, '30d');
expect(result.slices[0]).toEqual({ source: 'website', count: 3 });
expect(result.slices[1]).toEqual({ source: 'referral', count: 2 });
expect(result.slices[2]).toEqual({ source: 'manual', count: 1 });
});
it('groups null source as "unspecified"', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
await db.insert(interests).values({
portId: port.id,
clientId: client.id,
pipelineStage: 'open',
source: null,
});
const result = await computeLeadSourceAttribution(port.id, '30d');
expect(result.slices.find((s) => s.source === 'unspecified')?.count).toBe(1);
});
});
describe('cache', () => {
it('getPipelineFunnel writes a snapshot and returns it on subsequent calls', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
await db.insert(interests).values({
portId: port.id,
clientId: client.id,
pipelineStage: 'open',
});
const first = await getPipelineFunnel(port.id, '30d');
// Snapshot written.
const row = await db.query.analyticsSnapshots.findFirst({
where: and(
eq(analyticsSnapshots.portId, port.id),
eq(analyticsSnapshots.metricId, 'pipeline_funnel.30d'),
),
});
expect(row).toBeDefined();
expect(row?.data).toEqual(first);
// Mutate the snapshot row directly to confirm cache is being read,
// not recomputed.
const sentinel = { stages: [{ stage: 'sentinel', count: 999, conversionPct: 0 }] };
await db
.update(analyticsSnapshots)
.set({ data: sentinel })
.where(
and(
eq(analyticsSnapshots.portId, port.id),
eq(analyticsSnapshots.metricId, 'pipeline_funnel.30d'),
),
);
const second = await getPipelineFunnel(port.id, '30d');
expect(second).toEqual(sentinel);
});
it('refreshSnapshotsForPort warms every metric × range combo', async () => {
const port = await makePort();
await refreshSnapshotsForPort(port.id);
const rows = await db
.select({ metricId: analyticsSnapshots.metricId })
.from(analyticsSnapshots)
.where(eq(analyticsSnapshots.portId, port.id));
const expected = ALL_METRICS.length * ALL_RANGES.length;
expect(rows).toHaveLength(expected);
});
it('snapshot ttl constant is 15 minutes', () => {
expect(SNAPSHOT_TTL_MS).toBe(15 * 60 * 1000);
});
});
});