feat(analytics): real computations + 15-min snapshot refresh job

PR3 of Phase B. Replaces the no-op stubs in analytics.service.ts with
working drizzle queries and adds the recurring BullMQ job that warms
the cache.

Computations:
- computePipelineFunnel: groups interests by pipeline_stage filtered by
  port + range + not archived; emits 8-row stages array with conversion
  pct relative to 'open' as the funnel top.
- computeOccupancyTimeline: per day in range, counts berths covered by
  an active reservation (start_date ≤ day, end_date IS NULL OR ≥ day);
  emits {date, occupied, total, occupancyPct}.
- computeRevenueBreakdown: sums invoices.total grouped by status +
  currency; filters out archived rows.
- computeLeadSourceAttribution: counts interests by source descending;
  null source bucketed as 'unspecified'.

Public API (getPipelineFunnel, getOccupancyTimeline, etc.) reads
analytics_snapshots first; falls back to compute + writeSnapshot. TTL
15 minutes (matches the cron interval).

Cron:
- queue/scheduler.ts registers 'analytics-refresh' on maintenance with
  pattern '*/15 * * * *'.
- queue/workers/maintenance.ts dispatches to refreshSnapshotsForPort
  for every port; per-port try/catch so one bad port doesn't kill the
  sweep.

Tests: tests/integration/analytics-service.test.ts (9 cases). Pipeline
funnel math (incl. zero state), occupancy timeline shape/percentages
with seeded reservations, revenue grouped by status + currency, lead
source attribution incl. null bucketing, cache hit (mutate snapshot
directly → next read returns mutated value), refreshSnapshotsForPort
warms every metric×range combo.

Vitest 690/690 (+9). tsc + lint clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Matt Ciaccio
2026-04-28 14:54:46 +02:00
parent 94f049c8b8
commit 01b201e1a2
4 changed files with 468 additions and 30 deletions

View File

@@ -1,35 +1,50 @@
/**
* Phase B analytics service. Reads pre-computed snapshots from
* `analytics_snapshots` keyed by `metric_id` and recomputes on demand if
* the cached row is older than `SNAPSHOT_TTL_MS`. The recomputation jobs
* land in `analytics-snapshot-job.ts` (PR3).
* `analytics_snapshots`; recomputes on demand if older than `SNAPSHOT_TTL_MS`.
* The recurring `analytics-refresh` BullMQ job (PR3) warms the table
* every 15 minutes per port × per metric.
*/
import { and, eq } from 'drizzle-orm';
import { and, eq, gte, isNull, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { analyticsSnapshots } from '@/lib/db/schema/insights';
import { interests } from '@/lib/db/schema/interests';
import { invoices } from '@/lib/db/schema/financial';
import { berthReservations } from '@/lib/db/schema/reservations';
export type DateRange = '7d' | '30d' | '90d' | 'today';
export type MetricId =
| `pipeline_funnel.${DateRange}`
| `occupancy_timeline.${DateRange}`
| `revenue_breakdown.${DateRange}`
| `lead_source_attribution.${DateRange}`;
export type MetricBase =
| 'pipeline_funnel'
| 'occupancy_timeline'
| 'revenue_breakdown'
| 'lead_source_attribution';
export type MetricId = `${MetricBase}.${DateRange}`;
export const ALL_RANGES: readonly DateRange[] = ['today', '7d', '30d', '90d'] as const;
export const ALL_METRICS: readonly MetricBase[] = [
'pipeline_funnel',
'occupancy_timeline',
'revenue_breakdown',
'lead_source_attribution',
] as const;
export const SNAPSHOT_TTL_MS = 15 * 60 * 1000; // 15 minutes
// ─── Output shapes ────────────────────────────────────────────────────────────
export interface PipelineFunnelData {
stages: Array<{ stage: string; count: number; conversionPct: number }>;
}
export interface OccupancyTimelineData {
points: Array<{ date: string; available: number; underOffer: number; sold: number }>;
points: Array<{ date: string; occupied: number; total: number; occupancyPct: number }>;
}
export interface RevenueBreakdownData {
bars: Array<{ category: string; amount: number; currency: string }>;
bars: Array<{ status: string; amount: number; currency: string }>;
}
export interface LeadSourceAttributionData {
@@ -42,11 +57,8 @@ export type SnapshotData =
| RevenueBreakdownData
| LeadSourceAttributionData;
/**
* Read a snapshot by `(portId, metricId)`. Returns null when missing or
* stale; the caller should request a recompute (or the recurring job
* eventually fills it).
*/
// ─── Cache layer ──────────────────────────────────────────────────────────────
export async function readSnapshot<T extends SnapshotData>(
portId: string,
metricId: MetricId,
@@ -74,33 +86,235 @@ export async function writeSnapshot(
});
}
// Computation entrypoints — bodies land in PR3 along with the recurring
// snapshot job. Exported as no-op stubs so PR1's tsc/lint stay green.
// ─── Range helpers ────────────────────────────────────────────────────────────
function rangeToCutoff(range: DateRange): Date {
const now = Date.now();
switch (range) {
case 'today':
return new Date(now - 1 * 86_400_000);
case '7d':
return new Date(now - 7 * 86_400_000);
case '30d':
return new Date(now - 30 * 86_400_000);
case '90d':
return new Date(now - 90 * 86_400_000);
}
}
function rangeToDays(range: DateRange): number {
switch (range) {
case 'today':
return 1;
case '7d':
return 7;
case '30d':
return 30;
case '90d':
return 90;
}
}
// ─── Computations ─────────────────────────────────────────────────────────────
const PIPELINE_STAGES = [
'open',
'details_sent',
'in_communication',
'visited',
'signed_eoi_nda',
'deposit_10pct',
'contract',
'completed',
] as const;
export async function computePipelineFunnel(
_portId: string,
_range: DateRange,
portId: string,
range: DateRange,
): Promise<PipelineFunnelData> {
return { stages: [] };
const cutoff = rangeToCutoff(range);
const rows = await db
.select({ stage: interests.pipelineStage, count: sql<number>`count(*)::int` })
.from(interests)
.where(
and(
eq(interests.portId, portId),
isNull(interests.archivedAt),
gte(interests.createdAt, cutoff),
),
)
.groupBy(interests.pipelineStage);
const counts = new Map(rows.map((r) => [r.stage, r.count]));
const top = counts.get('open') ?? 0;
const stages = PIPELINE_STAGES.map((stage) => {
const count = counts.get(stage) ?? 0;
const conversionPct = top === 0 ? 0 : Math.round((count / top) * 1000) / 10;
return { stage, count, conversionPct };
});
return { stages };
}
export async function computeOccupancyTimeline(
_portId: string,
_range: DateRange,
portId: string,
range: DateRange,
): Promise<OccupancyTimelineData> {
return { points: [] };
const days = rangeToDays(range);
// Total berths per port (current count — assumes no churn).
const totalRow = await db.execute<{ total: number }>(
sql`SELECT count(*)::int AS total FROM berths WHERE port_id = ${portId}`,
);
const total = totalRow[0]?.total ?? 0;
// For each day in range, count berths that have an active reservation
// covering that day. A reservation is "covering" if start_date <= day
// AND (end_date IS NULL OR end_date >= day).
const points: OccupancyTimelineData['points'] = [];
for (let i = days - 1; i >= 0; i--) {
const day = new Date(Date.now() - i * 86_400_000);
const dayStr = day.toISOString().slice(0, 10);
const occRow = await db
.select({ occupied: sql<number>`count(distinct ${berthReservations.berthId})::int` })
.from(berthReservations)
.where(
and(
eq(berthReservations.portId, portId),
eq(berthReservations.status, 'active'),
sql`${berthReservations.startDate} <= ${dayStr}::date`,
sql`(${berthReservations.endDate} IS NULL OR ${berthReservations.endDate} >= ${dayStr}::date)`,
),
);
const occupied = occRow[0]?.occupied ?? 0;
const occupancyPct = total === 0 ? 0 : Math.round((occupied / total) * 1000) / 10;
points.push({ date: dayStr, occupied, total, occupancyPct });
}
return { points };
}
export async function computeRevenueBreakdown(
_portId: string,
_range: DateRange,
portId: string,
range: DateRange,
): Promise<RevenueBreakdownData> {
return { bars: [] };
const cutoff = rangeToCutoff(range);
const rows = await db
.select({
status: invoices.status,
currency: invoices.currency,
amount: sql<string>`coalesce(sum(${invoices.total}), 0)::text`,
})
.from(invoices)
.where(
and(
eq(invoices.portId, portId),
isNull(invoices.archivedAt),
gte(invoices.createdAt, cutoff),
),
)
.groupBy(invoices.status, invoices.currency);
return {
bars: rows.map((r) => ({
status: r.status,
currency: r.currency,
amount: Number(r.amount),
})),
};
}
export async function computeLeadSourceAttribution(
_portId: string,
_range: DateRange,
portId: string,
range: DateRange,
): Promise<LeadSourceAttributionData> {
return { slices: [] };
const cutoff = rangeToCutoff(range);
const rows = await db
.select({ source: interests.source, count: sql<number>`count(*)::int` })
.from(interests)
.where(
and(
eq(interests.portId, portId),
isNull(interests.archivedAt),
gte(interests.createdAt, cutoff),
),
)
.groupBy(interests.source);
return {
slices: rows
.map((r) => ({
source: r.source ?? 'unspecified',
count: r.count,
}))
.sort((a, b) => b.count - a.count),
};
}
// ─── Public read API (cache → compute → write back) ──────────────────────────
export async function getPipelineFunnel(
portId: string,
range: DateRange,
): Promise<PipelineFunnelData> {
const metricId = `pipeline_funnel.${range}` as const;
const cached = await readSnapshot<PipelineFunnelData>(portId, metricId);
if (cached) return cached;
const fresh = await computePipelineFunnel(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
export async function getOccupancyTimeline(
portId: string,
range: DateRange,
): Promise<OccupancyTimelineData> {
const metricId = `occupancy_timeline.${range}` as const;
const cached = await readSnapshot<OccupancyTimelineData>(portId, metricId);
if (cached) return cached;
const fresh = await computeOccupancyTimeline(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
export async function getRevenueBreakdown(
portId: string,
range: DateRange,
): Promise<RevenueBreakdownData> {
const metricId = `revenue_breakdown.${range}` as const;
const cached = await readSnapshot<RevenueBreakdownData>(portId, metricId);
if (cached) return cached;
const fresh = await computeRevenueBreakdown(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
export async function getLeadSourceAttribution(
portId: string,
range: DateRange,
): Promise<LeadSourceAttributionData> {
const metricId = `lead_source_attribution.${range}` as const;
const cached = await readSnapshot<LeadSourceAttributionData>(portId, metricId);
if (cached) return cached;
const fresh = await computeLeadSourceAttribution(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
// ─── Cron entrypoint: warm every (port × metric × range) ────────────────────
export async function refreshSnapshotsForPort(portId: string): Promise<void> {
for (const range of ALL_RANGES) {
const [funnel, occupancy, revenue, leadSource] = await Promise.all([
computePipelineFunnel(portId, range),
computeOccupancyTimeline(portId, range),
computeRevenueBreakdown(portId, range),
computeLeadSourceAttribution(portId, range),
]);
await Promise.all([
writeSnapshot(portId, `pipeline_funnel.${range}`, funnel),
writeSnapshot(portId, `occupancy_timeline.${range}`, occupancy),
writeSnapshot(portId, `revenue_breakdown.${range}`, revenue),
writeSnapshot(portId, `lead_source_attribution.${range}`, leadSource),
]);
}
}