Files
pn-new-crm/src/lib/services/analytics.service.ts
Matt f9980900b1 perf(analytics): collapse 30-day occupancy timeline into single GROUP BY query
The dashboard's occupancy-timeline metric was firing N separate queries
(one per day, 30 for .30d / 90 for .90d) that saturated the postgres pool
and stalled every other request in the app. Replace with a single query
using generate_series for the date range + LEFT JOIN onto active
reservations + COUNT(DISTINCT berth_id) GROUP BY day.

Same data, ~30× fewer queries on .30d, ~90× fewer on .90d. The snapshot
cache layer still applies, so cached reads are still zero-DB.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 15:40:44 +02:00

348 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Phase B analytics service. Reads pre-computed snapshots from
* `analytics_snapshots`; recomputes on demand if older than `SNAPSHOT_TTL_MS`.
* The recurring `analytics-refresh` BullMQ job (PR3) warms the table
* every 15 minutes per port × per metric.
*/
import { and, between, eq, isNull, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { analyticsSnapshots } from '@/lib/db/schema/insights';
import { interests } from '@/lib/db/schema/interests';
import { invoices } from '@/lib/db/schema/financial';
import { PIPELINE_STAGES } from '@/lib/constants';
import {
ALL_RANGES,
isCustomRange,
rangeToBounds,
type CustomDateRange,
type DateRange,
type PresetDateRange,
} from '@/lib/analytics/range';
// Re-export the shared types for callers that already import from this
// module - keeps the existing public API intact.
export { ALL_RANGES, isCustomRange, rangeToBounds };
export type { DateRange, PresetDateRange, CustomDateRange };
export type MetricBase =
| 'pipeline_funnel'
| 'occupancy_timeline'
| 'revenue_breakdown'
| 'lead_source_attribution';
/**
* Snapshot key. Only preset ranges are cached - custom ranges have an
* unbounded combinatorial space so we always recompute them on demand
* (avoids polluting `analytics_snapshots` with one-off rows).
*/
export type MetricId = `${MetricBase}.${PresetDateRange}`;
export const ALL_METRICS: readonly MetricBase[] = [
'pipeline_funnel',
'occupancy_timeline',
'revenue_breakdown',
'lead_source_attribution',
] as const;
export const SNAPSHOT_TTL_MS = 15 * 60 * 1000; // 15 minutes
// ─── Output shapes ────────────────────────────────────────────────────────────
export interface PipelineFunnelData {
stages: Array<{ stage: string; count: number; conversionPct: number }>;
/** Counts of terminal lost/cancelled outcomes in the range. Surfaces below
* the funnel so users can see leakage without it polluting the conversion
* math. Total = sum of these counts. */
lost: { count: number; byOutcome: Record<string, number> };
}
export interface OccupancyTimelineData {
points: Array<{ date: string; occupied: number; total: number; occupancyPct: number }>;
}
export interface RevenueBreakdownData {
bars: Array<{ status: string; amount: number; currency: string }>;
}
export interface LeadSourceAttributionData {
slices: Array<{ source: string; count: number }>;
}
export type SnapshotData =
| PipelineFunnelData
| OccupancyTimelineData
| RevenueBreakdownData
| LeadSourceAttributionData;
// ─── Cache layer ──────────────────────────────────────────────────────────────
export async function readSnapshot<T extends SnapshotData>(
portId: string,
metricId: MetricId,
): Promise<T | null> {
const row = await db.query.analyticsSnapshots.findFirst({
where: and(eq(analyticsSnapshots.portId, portId), eq(analyticsSnapshots.metricId, metricId)),
});
if (!row) return null;
const age = Date.now() - row.computedAt.getTime();
if (age > SNAPSHOT_TTL_MS) return null;
return row.data as T;
}
export async function writeSnapshot(
portId: string,
metricId: MetricId,
data: SnapshotData,
): Promise<void> {
await db
.insert(analyticsSnapshots)
.values({ portId, metricId, data })
.onConflictDoUpdate({
target: [analyticsSnapshots.portId, analyticsSnapshots.metricId],
set: { data, computedAt: new Date() },
});
}
// Range helpers (rangeToBounds, rangeToDays, rangeSpanDays) moved to
// @/lib/analytics/range - that file is client-safe (no DB imports) so it
// can be used from React components AND this server module.
// ─── Computations ─────────────────────────────────────────────────────────────
export async function computePipelineFunnel(
portId: string,
range: DateRange,
): Promise<PipelineFunnelData> {
const { from, to } = rangeToBounds(range);
// Stage counts EXCLUDE lost/cancelled outcomes - those never become
// conversions, so polluting the funnel with them gives meaningless math.
// Lost is reported separately in the `lost` block.
const stageRows = await db
.select({ stage: interests.pipelineStage, count: sql<number>`count(*)::int` })
.from(interests)
.where(
and(
eq(interests.portId, portId),
isNull(interests.archivedAt),
between(interests.createdAt, from, to),
sql`(${interests.outcome} IS NULL OR ${interests.outcome} = 'won')`,
),
)
.groupBy(interests.pipelineStage);
const counts = new Map(stageRows.map((r) => [r.stage, r.count]));
const top = counts.get('open') ?? 0;
const stages = PIPELINE_STAGES.map((stage) => {
const count = counts.get(stage) ?? 0;
const conversionPct = top === 0 ? 0 : Math.round((count / top) * 1000) / 10;
return { stage, count, conversionPct };
});
// Lost / cancelled summary. Same date-range filter as the funnel.
const lostRows = await db
.select({ outcome: interests.outcome, count: sql<number>`count(*)::int` })
.from(interests)
.where(
and(
eq(interests.portId, portId),
isNull(interests.archivedAt),
between(interests.createdAt, from, to),
sql`${interests.outcome} IS NOT NULL AND ${interests.outcome} != 'won'`,
),
)
.groupBy(interests.outcome);
const byOutcome: Record<string, number> = {};
let lostTotal = 0;
for (const row of lostRows) {
if (!row.outcome) continue;
byOutcome[row.outcome] = row.count;
lostTotal += row.count;
}
return { stages, lost: { count: lostTotal, byOutcome } };
}
export async function computeOccupancyTimeline(
portId: string,
range: DateRange,
): Promise<OccupancyTimelineData> {
const { from, to } = rangeToBounds(range);
// Total berths per port (current count - assumes no churn).
const totalRow = await db.execute<{ total: number }>(
sql`SELECT count(*)::int AS total FROM berths WHERE port_id = ${portId}`,
);
const total = totalRow[0]?.total ?? 0;
// Single-query implementation: generate_series for the date range and
// LEFT JOIN active reservations whose [start_date, end_date] window
// covers each day. Returns every day's occupancy count in one round
// trip; replaces the prior per-day loop that fired N queries (30 for
// .30d, 90 for .90d) and saturated the postgres pool.
const fromStr = from.toISOString().slice(0, 10);
const toStr = new Date(to.getTime() - 86_400_000).toISOString().slice(0, 10);
const rows = await db.execute<{ day: string; occupied: number }>(
sql`
WITH days AS (
SELECT generate_series(${fromStr}::date, ${toStr}::date, '1 day'::interval)::date AS day
),
active_reservations AS (
SELECT berth_id, start_date, end_date
FROM berth_reservations
WHERE port_id = ${portId} AND status = 'active'
)
SELECT
to_char(days.day, 'YYYY-MM-DD') AS day,
COUNT(DISTINCT ar.berth_id)::int AS occupied
FROM days
LEFT JOIN active_reservations ar
ON ar.start_date <= days.day
AND (ar.end_date IS NULL OR ar.end_date >= days.day)
GROUP BY days.day
ORDER BY days.day
`,
);
const points: OccupancyTimelineData['points'] = rows.map((r) => {
const occupied = Number(r.occupied) || 0;
const occupancyPct = total === 0 ? 0 : Math.round((occupied / total) * 1000) / 10;
return { date: r.day, occupied, total, occupancyPct };
});
return { points };
}
export async function computeRevenueBreakdown(
portId: string,
range: DateRange,
): Promise<RevenueBreakdownData> {
const { from, to } = rangeToBounds(range);
const rows = await db
.select({
status: invoices.status,
currency: invoices.currency,
amount: sql<string>`coalesce(sum(${invoices.total}), 0)::text`,
})
.from(invoices)
.where(
and(
eq(invoices.portId, portId),
isNull(invoices.archivedAt),
between(invoices.createdAt, from, to),
),
)
.groupBy(invoices.status, invoices.currency);
return {
bars: rows.map((r) => ({
status: r.status,
currency: r.currency,
amount: Number(r.amount),
})),
};
}
export async function computeLeadSourceAttribution(
portId: string,
range: DateRange,
): Promise<LeadSourceAttributionData> {
const { from, to } = rangeToBounds(range);
const rows = await db
.select({ source: interests.source, count: sql<number>`count(*)::int` })
.from(interests)
.where(
and(
eq(interests.portId, portId),
isNull(interests.archivedAt),
between(interests.createdAt, from, to),
),
)
.groupBy(interests.source);
return {
slices: rows
.map((r) => ({
source: r.source ?? 'unspecified',
count: r.count,
}))
.sort((a, b) => b.count - a.count),
};
}
// ─── Public read API (cache → compute → write back) ──────────────────────────
//
// Custom ranges always recompute (cache key would be unbounded). Preset
// ranges go cache → compute → write-back as before.
export async function getPipelineFunnel(
portId: string,
range: DateRange,
): Promise<PipelineFunnelData> {
if (isCustomRange(range)) return computePipelineFunnel(portId, range);
const metricId = `pipeline_funnel.${range}` as const;
const cached = await readSnapshot<PipelineFunnelData>(portId, metricId);
if (cached) return cached;
const fresh = await computePipelineFunnel(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
export async function getOccupancyTimeline(
portId: string,
range: DateRange,
): Promise<OccupancyTimelineData> {
if (isCustomRange(range)) return computeOccupancyTimeline(portId, range);
const metricId = `occupancy_timeline.${range}` as const;
const cached = await readSnapshot<OccupancyTimelineData>(portId, metricId);
if (cached) return cached;
const fresh = await computeOccupancyTimeline(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
export async function getRevenueBreakdown(
portId: string,
range: DateRange,
): Promise<RevenueBreakdownData> {
if (isCustomRange(range)) return computeRevenueBreakdown(portId, range);
const metricId = `revenue_breakdown.${range}` as const;
const cached = await readSnapshot<RevenueBreakdownData>(portId, metricId);
if (cached) return cached;
const fresh = await computeRevenueBreakdown(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
export async function getLeadSourceAttribution(
portId: string,
range: DateRange,
): Promise<LeadSourceAttributionData> {
if (isCustomRange(range)) return computeLeadSourceAttribution(portId, range);
const metricId = `lead_source_attribution.${range}` as const;
const cached = await readSnapshot<LeadSourceAttributionData>(portId, metricId);
if (cached) return cached;
const fresh = await computeLeadSourceAttribution(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
// ─── Cron entrypoint: warm every (port × metric × range) ────────────────────
export async function refreshSnapshotsForPort(portId: string): Promise<void> {
for (const range of ALL_RANGES) {
const [funnel, occupancy, revenue, leadSource] = await Promise.all([
computePipelineFunnel(portId, range),
computeOccupancyTimeline(portId, range),
computeRevenueBreakdown(portId, range),
computeLeadSourceAttribution(portId, range),
]);
await Promise.all([
writeSnapshot(portId, `pipeline_funnel.${range}`, funnel),
writeSnapshot(portId, `occupancy_timeline.${range}`, occupancy),
writeSnapshot(portId, `revenue_breakdown.${range}`, revenue),
writeSnapshot(portId, `lead_source_attribution.${range}`, leadSource),
]);
}
}