Files
pn-new-crm/src/lib/services/analytics.service.ts
Matt Ciaccio 01b201e1a2 feat(analytics): real computations + 15-min snapshot refresh job
PR3 of Phase B. Replaces the no-op stubs in analytics.service.ts with
working drizzle queries and adds the recurring BullMQ job that warms
the cache.

Computations:
- computePipelineFunnel: groups interests by pipeline_stage filtered by
  port + range + not archived; emits 8-row stages array with conversion
  pct relative to 'open' as the funnel top.
- computeOccupancyTimeline: per day in range, counts berths covered by
  an active reservation (start_date ≤ day, end_date IS NULL OR ≥ day);
  emits {date, occupied, total, occupancyPct}.
- computeRevenueBreakdown: sums invoices.total grouped by status +
  currency; filters out archived rows.
- computeLeadSourceAttribution: counts interests by source descending;
  null source bucketed as 'unspecified'.

Public API (getPipelineFunnel, getOccupancyTimeline, etc.) reads
analytics_snapshots first; falls back to compute + writeSnapshot. TTL
15 minutes (matches the cron interval).

Cron:
- queue/scheduler.ts registers 'analytics-refresh' on maintenance with
  pattern '*/15 * * * *'.
- queue/workers/maintenance.ts dispatches to refreshSnapshotsForPort
  for every port; per-port try/catch so one bad port doesn't kill the
  sweep.

Tests: tests/integration/analytics-service.test.ts (9 cases). Pipeline
funnel math (incl. zero state), occupancy timeline shape/percentages
with seeded reservations, revenue grouped by status + currency, lead
source attribution incl. null bucketing, cache hit (mutate snapshot
directly → next read returns mutated value), refreshSnapshotsForPort
warms every metric×range combo.

Vitest 690/690 (+9). tsc + lint clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 14:54:46 +02:00

321 lines
10 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Phase B analytics service. Reads pre-computed snapshots from
* `analytics_snapshots`; recomputes on demand if older than `SNAPSHOT_TTL_MS`.
* The recurring `analytics-refresh` BullMQ job (PR3) warms the table
* every 15 minutes per port × per metric.
*/
import { and, eq, gte, isNull, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { analyticsSnapshots } from '@/lib/db/schema/insights';
import { interests } from '@/lib/db/schema/interests';
import { invoices } from '@/lib/db/schema/financial';
import { berthReservations } from '@/lib/db/schema/reservations';
export type DateRange = '7d' | '30d' | '90d' | 'today';
export type MetricBase =
| 'pipeline_funnel'
| 'occupancy_timeline'
| 'revenue_breakdown'
| 'lead_source_attribution';
export type MetricId = `${MetricBase}.${DateRange}`;
export const ALL_RANGES: readonly DateRange[] = ['today', '7d', '30d', '90d'] as const;
export const ALL_METRICS: readonly MetricBase[] = [
'pipeline_funnel',
'occupancy_timeline',
'revenue_breakdown',
'lead_source_attribution',
] as const;
export const SNAPSHOT_TTL_MS = 15 * 60 * 1000; // 15 minutes
// ─── Output shapes ────────────────────────────────────────────────────────────
export interface PipelineFunnelData {
stages: Array<{ stage: string; count: number; conversionPct: number }>;
}
export interface OccupancyTimelineData {
points: Array<{ date: string; occupied: number; total: number; occupancyPct: number }>;
}
export interface RevenueBreakdownData {
bars: Array<{ status: string; amount: number; currency: string }>;
}
export interface LeadSourceAttributionData {
slices: Array<{ source: string; count: number }>;
}
export type SnapshotData =
| PipelineFunnelData
| OccupancyTimelineData
| RevenueBreakdownData
| LeadSourceAttributionData;
// ─── Cache layer ──────────────────────────────────────────────────────────────
export async function readSnapshot<T extends SnapshotData>(
portId: string,
metricId: MetricId,
): Promise<T | null> {
const row = await db.query.analyticsSnapshots.findFirst({
where: and(eq(analyticsSnapshots.portId, portId), eq(analyticsSnapshots.metricId, metricId)),
});
if (!row) return null;
const age = Date.now() - row.computedAt.getTime();
if (age > SNAPSHOT_TTL_MS) return null;
return row.data as T;
}
export async function writeSnapshot(
portId: string,
metricId: MetricId,
data: SnapshotData,
): Promise<void> {
await db
.insert(analyticsSnapshots)
.values({ portId, metricId, data })
.onConflictDoUpdate({
target: [analyticsSnapshots.portId, analyticsSnapshots.metricId],
set: { data, computedAt: new Date() },
});
}
// ─── Range helpers ────────────────────────────────────────────────────────────
function rangeToCutoff(range: DateRange): Date {
const now = Date.now();
switch (range) {
case 'today':
return new Date(now - 1 * 86_400_000);
case '7d':
return new Date(now - 7 * 86_400_000);
case '30d':
return new Date(now - 30 * 86_400_000);
case '90d':
return new Date(now - 90 * 86_400_000);
}
}
function rangeToDays(range: DateRange): number {
switch (range) {
case 'today':
return 1;
case '7d':
return 7;
case '30d':
return 30;
case '90d':
return 90;
}
}
// ─── Computations ─────────────────────────────────────────────────────────────
const PIPELINE_STAGES = [
'open',
'details_sent',
'in_communication',
'visited',
'signed_eoi_nda',
'deposit_10pct',
'contract',
'completed',
] as const;
export async function computePipelineFunnel(
portId: string,
range: DateRange,
): Promise<PipelineFunnelData> {
const cutoff = rangeToCutoff(range);
const rows = await db
.select({ stage: interests.pipelineStage, count: sql<number>`count(*)::int` })
.from(interests)
.where(
and(
eq(interests.portId, portId),
isNull(interests.archivedAt),
gte(interests.createdAt, cutoff),
),
)
.groupBy(interests.pipelineStage);
const counts = new Map(rows.map((r) => [r.stage, r.count]));
const top = counts.get('open') ?? 0;
const stages = PIPELINE_STAGES.map((stage) => {
const count = counts.get(stage) ?? 0;
const conversionPct = top === 0 ? 0 : Math.round((count / top) * 1000) / 10;
return { stage, count, conversionPct };
});
return { stages };
}
export async function computeOccupancyTimeline(
portId: string,
range: DateRange,
): Promise<OccupancyTimelineData> {
const days = rangeToDays(range);
// Total berths per port (current count — assumes no churn).
const totalRow = await db.execute<{ total: number }>(
sql`SELECT count(*)::int AS total FROM berths WHERE port_id = ${portId}`,
);
const total = totalRow[0]?.total ?? 0;
// For each day in range, count berths that have an active reservation
// covering that day. A reservation is "covering" if start_date <= day
// AND (end_date IS NULL OR end_date >= day).
const points: OccupancyTimelineData['points'] = [];
for (let i = days - 1; i >= 0; i--) {
const day = new Date(Date.now() - i * 86_400_000);
const dayStr = day.toISOString().slice(0, 10);
const occRow = await db
.select({ occupied: sql<number>`count(distinct ${berthReservations.berthId})::int` })
.from(berthReservations)
.where(
and(
eq(berthReservations.portId, portId),
eq(berthReservations.status, 'active'),
sql`${berthReservations.startDate} <= ${dayStr}::date`,
sql`(${berthReservations.endDate} IS NULL OR ${berthReservations.endDate} >= ${dayStr}::date)`,
),
);
const occupied = occRow[0]?.occupied ?? 0;
const occupancyPct = total === 0 ? 0 : Math.round((occupied / total) * 1000) / 10;
points.push({ date: dayStr, occupied, total, occupancyPct });
}
return { points };
}
export async function computeRevenueBreakdown(
portId: string,
range: DateRange,
): Promise<RevenueBreakdownData> {
const cutoff = rangeToCutoff(range);
const rows = await db
.select({
status: invoices.status,
currency: invoices.currency,
amount: sql<string>`coalesce(sum(${invoices.total}), 0)::text`,
})
.from(invoices)
.where(
and(
eq(invoices.portId, portId),
isNull(invoices.archivedAt),
gte(invoices.createdAt, cutoff),
),
)
.groupBy(invoices.status, invoices.currency);
return {
bars: rows.map((r) => ({
status: r.status,
currency: r.currency,
amount: Number(r.amount),
})),
};
}
export async function computeLeadSourceAttribution(
portId: string,
range: DateRange,
): Promise<LeadSourceAttributionData> {
const cutoff = rangeToCutoff(range);
const rows = await db
.select({ source: interests.source, count: sql<number>`count(*)::int` })
.from(interests)
.where(
and(
eq(interests.portId, portId),
isNull(interests.archivedAt),
gte(interests.createdAt, cutoff),
),
)
.groupBy(interests.source);
return {
slices: rows
.map((r) => ({
source: r.source ?? 'unspecified',
count: r.count,
}))
.sort((a, b) => b.count - a.count),
};
}
// ─── Public read API (cache → compute → write back) ──────────────────────────
export async function getPipelineFunnel(
portId: string,
range: DateRange,
): Promise<PipelineFunnelData> {
const metricId = `pipeline_funnel.${range}` as const;
const cached = await readSnapshot<PipelineFunnelData>(portId, metricId);
if (cached) return cached;
const fresh = await computePipelineFunnel(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
export async function getOccupancyTimeline(
portId: string,
range: DateRange,
): Promise<OccupancyTimelineData> {
const metricId = `occupancy_timeline.${range}` as const;
const cached = await readSnapshot<OccupancyTimelineData>(portId, metricId);
if (cached) return cached;
const fresh = await computeOccupancyTimeline(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
export async function getRevenueBreakdown(
portId: string,
range: DateRange,
): Promise<RevenueBreakdownData> {
const metricId = `revenue_breakdown.${range}` as const;
const cached = await readSnapshot<RevenueBreakdownData>(portId, metricId);
if (cached) return cached;
const fresh = await computeRevenueBreakdown(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
export async function getLeadSourceAttribution(
portId: string,
range: DateRange,
): Promise<LeadSourceAttributionData> {
const metricId = `lead_source_attribution.${range}` as const;
const cached = await readSnapshot<LeadSourceAttributionData>(portId, metricId);
if (cached) return cached;
const fresh = await computeLeadSourceAttribution(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
// ─── Cron entrypoint: warm every (port × metric × range) ────────────────────
export async function refreshSnapshotsForPort(portId: string): Promise<void> {
for (const range of ALL_RANGES) {
const [funnel, occupancy, revenue, leadSource] = await Promise.all([
computePipelineFunnel(portId, range),
computeOccupancyTimeline(portId, range),
computeRevenueBreakdown(portId, range),
computeLeadSourceAttribution(portId, range),
]);
await Promise.all([
writeSnapshot(portId, `pipeline_funnel.${range}`, funnel),
writeSnapshot(portId, `occupancy_timeline.${range}`, occupancy),
writeSnapshot(portId, `revenue_breakdown.${range}`, revenue),
writeSnapshot(portId, `lead_source_attribution.${range}`, leadSource),
]);
}
}