Files
pn-new-crm/src/lib/services/analytics.service.ts
Matt 449b9497ab fix(uat): batch — timeline overshoot, name-sync, reset-password, dashboard cleanup, queue/seed hygiene + alpha UAT findings doc
UAT findings landed across the last few Playwright + React Grab passes;
single grouped commit so the index doesn't fragment into 30 one-liners.

User & auth:
- `user-settings`: name now updates the avatar + topbar menu after save
  (was reading stale session).
- `me/password-reset`: 3 bugs (token validation, error response shape,
  redirect chain).
- Admin user permission-overrides route honours the same envelope as
  the rest of the admin surface.

Dashboard:
- Removed obsolete `revenue-breakdown-chart` + `dashboard-widgets-card`
  (replaced by the customisable widget grid).
- Strip `revenue_breakdown` from analytics route + use-analytics +
  service + integration test so nothing renders an empty card.
- Activity log timeline overshoot fix (`interest-timeline` +
  `entity-activity-feed`).
- Tightened tiles: active-deals, berth-heat-widget, pipeline-value, kpi-tile.
- `dev-mode-banner`: derive dismissed state synchronously instead of
  via an effect (set-state-in-effect lint rule).

Forms & lists (assorted polish):
- client / company / yacht / interest / reminder forms — validation +
  empty-state copy + tab transitions.
- companies/yachts list tweaks; berth recommender panel; qualification
  checklist; supplemental info request button.

Infra & misc:
- Queue workers (ai / email / notifications) — log shape +
  per-job timeout consistency.
- Auth / brochures / users schema small adjustments; seeds reflect
  permissions matrix changes.
- Scan shell + scanner manifest + AI admin page small fixes.
- `next.config.transpilePackages` adds `echarts`/`zrender`/`echarts-for-react`
  (recommended config from echarts-for-react inside Next).

Docs:
- `docs/superpowers/audits/alpha-uat-master.md` — single rolling
  cross-cutting UAT findings doc (per CLAUDE.md convention).
- `docs/BACKLOG.md`: dashboard stats cards (§I) + activity-log
  normalization (§J).
- 2026-05-18 audit log updated with this batch.
- `CLAUDE.md` — small manual UAT scaffold notes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 15:56:11 +02:00

295 lines
11 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Phase B analytics service. Reads pre-computed snapshots from
* `analytics_snapshots`; recomputes on demand if older than `SNAPSHOT_TTL_MS`.
* The recurring `analytics-refresh` BullMQ job (PR3) warms the table
* every 15 minutes per port × per metric.
*/
import { and, between, eq, isNull, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { analyticsSnapshots } from '@/lib/db/schema/insights';
import { interests } from '@/lib/db/schema/interests';
import { PIPELINE_STAGES } from '@/lib/constants';
import {
ALL_RANGES,
isCustomRange,
rangeToBounds,
type CustomDateRange,
type DateRange,
type PresetDateRange,
} from '@/lib/analytics/range';
// Re-export the shared types for callers that already import from this
// module - keeps the existing public API intact.
export { ALL_RANGES, isCustomRange, rangeToBounds };
export type { DateRange, PresetDateRange, CustomDateRange };
export type MetricBase = 'pipeline_funnel' | 'occupancy_timeline' | 'lead_source_attribution';
/**
* Snapshot key. Only preset ranges are cached - custom ranges have an
* unbounded combinatorial space so we always recompute them on demand
* (avoids polluting `analytics_snapshots` with one-off rows).
*/
export type MetricId = `${MetricBase}.${PresetDateRange}`;
export const ALL_METRICS: readonly MetricBase[] = [
'pipeline_funnel',
'occupancy_timeline',
'lead_source_attribution',
] as const;
export const SNAPSHOT_TTL_MS = 15 * 60 * 1000; // 15 minutes
// ─── Output shapes ────────────────────────────────────────────────────────────
export interface PipelineFunnelData {
stages: Array<{ stage: string; count: number; conversionPct: number }>;
/** Counts of terminal lost/cancelled outcomes in the range. Surfaces below
* the funnel so users can see leakage without it polluting the conversion
* math. Total = sum of these counts. */
lost: { count: number; byOutcome: Record<string, number> };
}
export interface OccupancyTimelineData {
points: Array<{ date: string; occupied: number; total: number; occupancyPct: number }>;
}
export interface LeadSourceAttributionData {
slices: Array<{ source: string; count: number }>;
}
export type SnapshotData = PipelineFunnelData | OccupancyTimelineData | LeadSourceAttributionData;
// ─── Cache layer ──────────────────────────────────────────────────────────────
export async function readSnapshot<T extends SnapshotData>(
portId: string,
metricId: MetricId,
): Promise<T | null> {
const row = await db.query.analyticsSnapshots.findFirst({
where: and(eq(analyticsSnapshots.portId, portId), eq(analyticsSnapshots.metricId, metricId)),
});
if (!row) return null;
const age = Date.now() - row.computedAt.getTime();
if (age > SNAPSHOT_TTL_MS) return null;
return row.data as T;
}
export async function writeSnapshot(
portId: string,
metricId: MetricId,
data: SnapshotData,
): Promise<void> {
await db
.insert(analyticsSnapshots)
.values({ portId, metricId, data })
.onConflictDoUpdate({
target: [analyticsSnapshots.portId, analyticsSnapshots.metricId],
set: { data, computedAt: new Date() },
});
}
// Range helpers (rangeToBounds, rangeToDays, rangeSpanDays) moved to
// @/lib/analytics/range - that file is client-safe (no DB imports) so it
// can be used from React components AND this server module.
// ─── Computations ─────────────────────────────────────────────────────────────
export async function computePipelineFunnel(
portId: string,
range: DateRange,
): Promise<PipelineFunnelData> {
const { from, to } = rangeToBounds(range);
// Stage counts EXCLUDE lost/cancelled outcomes - those never become
// conversions, so polluting the funnel with them gives meaningless math.
// Lost is reported separately in the `lost` block.
const stageRows = await db
.select({ stage: interests.pipelineStage, count: sql<number>`count(*)::int` })
.from(interests)
.where(
and(
eq(interests.portId, portId),
isNull(interests.archivedAt),
between(interests.createdAt, from, to),
sql`(${interests.outcome} IS NULL OR ${interests.outcome} = 'won')`,
),
)
.groupBy(interests.pipelineStage);
const counts = new Map(stageRows.map((r) => [r.stage, r.count]));
// First stage in the canonical order anchors the conversion percentage.
const top = counts.get(PIPELINE_STAGES[0]) ?? 0;
const stages = PIPELINE_STAGES.map((stage) => {
const count = counts.get(stage) ?? 0;
const conversionPct = top === 0 ? 0 : Math.round((count / top) * 1000) / 10;
return { stage, count, conversionPct };
});
// Lost / cancelled summary. Same date-range filter as the funnel.
const lostRows = await db
.select({ outcome: interests.outcome, count: sql<number>`count(*)::int` })
.from(interests)
.where(
and(
eq(interests.portId, portId),
isNull(interests.archivedAt),
between(interests.createdAt, from, to),
sql`${interests.outcome} IS NOT NULL AND ${interests.outcome} != 'won'`,
),
)
.groupBy(interests.outcome);
const byOutcome: Record<string, number> = {};
let lostTotal = 0;
for (const row of lostRows) {
if (!row.outcome) continue;
byOutcome[row.outcome] = row.count;
lostTotal += row.count;
}
return { stages, lost: { count: lostTotal, byOutcome } };
}
export async function computeOccupancyTimeline(
portId: string,
range: DateRange,
): Promise<OccupancyTimelineData> {
const { from, to } = rangeToBounds(range);
// Total berths per port (current count - assumes no churn).
const totalRow = await db.execute<{ total: number }>(
sql`SELECT count(*)::int AS total FROM berths WHERE port_id = ${portId}`,
);
const total = totalRow[0]?.total ?? 0;
// Occupancy = cumulative count of berths sold (i.e. won deals) on or
// before each day. Per 2026-05-14 decision, the canonical occupancy
// signal is "the deal closed and money changed hands" — reservations
// are merely holds and don't count as occupied. Sources from
// `interests.outcome='won'` + `outcome_at::date`; primary-berth link
// via `interest_berths` so multi-berth deals contribute every linked
// berth once. Single round-trip via generate_series cross-join with a
// sold_berths CTE.
const fromStr = from.toISOString().slice(0, 10);
const toStr = new Date(to.getTime() - 86_400_000).toISOString().slice(0, 10);
const rows = await db.execute<{ day: string; occupied: number }>(
sql`
WITH days AS (
SELECT generate_series(${fromStr}::date, ${toStr}::date, '1 day'::interval)::date AS day
),
sold_berths AS (
SELECT DISTINCT ib.berth_id, (i.outcome_at AT TIME ZONE 'UTC')::date AS sold_on
FROM interests i
INNER JOIN interest_berths ib ON ib.interest_id = i.id
WHERE i.port_id = ${portId}
AND i.outcome = 'won'
AND i.outcome_at IS NOT NULL
AND i.archived_at IS NULL
)
SELECT
to_char(days.day, 'YYYY-MM-DD') AS day,
COUNT(DISTINCT sb.berth_id)::int AS occupied
FROM days
LEFT JOIN sold_berths sb ON sb.sold_on <= days.day
GROUP BY days.day
ORDER BY days.day
`,
);
const points: OccupancyTimelineData['points'] = rows.map((r) => {
const occupied = Number(r.occupied) || 0;
const occupancyPct = total === 0 ? 0 : Math.round((occupied / total) * 1000) / 10;
return { date: r.day, occupied, total, occupancyPct };
});
return { points };
}
export async function computeLeadSourceAttribution(
portId: string,
range: DateRange,
): Promise<LeadSourceAttributionData> {
const { from, to } = rangeToBounds(range);
const rows = await db
.select({ source: interests.source, count: sql<number>`count(*)::int` })
.from(interests)
.where(
and(
eq(interests.portId, portId),
isNull(interests.archivedAt),
between(interests.createdAt, from, to),
),
)
.groupBy(interests.source);
return {
slices: rows
.map((r) => ({
source: r.source ?? 'unspecified',
count: r.count,
}))
.sort((a, b) => b.count - a.count),
};
}
// ─── Public read API (cache → compute → write back) ──────────────────────────
//
// Custom ranges always recompute (cache key would be unbounded). Preset
// ranges go cache → compute → write-back as before.
export async function getPipelineFunnel(
portId: string,
range: DateRange,
): Promise<PipelineFunnelData> {
if (isCustomRange(range)) return computePipelineFunnel(portId, range);
const metricId = `pipeline_funnel.${range}` as const;
const cached = await readSnapshot<PipelineFunnelData>(portId, metricId);
if (cached) return cached;
const fresh = await computePipelineFunnel(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
export async function getOccupancyTimeline(
portId: string,
range: DateRange,
): Promise<OccupancyTimelineData> {
if (isCustomRange(range)) return computeOccupancyTimeline(portId, range);
const metricId = `occupancy_timeline.${range}` as const;
const cached = await readSnapshot<OccupancyTimelineData>(portId, metricId);
if (cached) return cached;
const fresh = await computeOccupancyTimeline(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
export async function getLeadSourceAttribution(
portId: string,
range: DateRange,
): Promise<LeadSourceAttributionData> {
if (isCustomRange(range)) return computeLeadSourceAttribution(portId, range);
const metricId = `lead_source_attribution.${range}` as const;
const cached = await readSnapshot<LeadSourceAttributionData>(portId, metricId);
if (cached) return cached;
const fresh = await computeLeadSourceAttribution(portId, range);
await writeSnapshot(portId, metricId, fresh);
return fresh;
}
// ─── Cron entrypoint: warm every (port × metric × range) ────────────────────
export async function refreshSnapshotsForPort(portId: string): Promise<void> {
for (const range of ALL_RANGES) {
const [funnel, occupancy, leadSource] = await Promise.all([
computePipelineFunnel(portId, range),
computeOccupancyTimeline(portId, range),
computeLeadSourceAttribution(portId, range),
]);
await Promise.all([
writeSnapshot(portId, `pipeline_funnel.${range}`, funnel),
writeSnapshot(portId, `occupancy_timeline.${range}`, occupancy),
writeSnapshot(portId, `lead_source_attribution.${range}`, leadSource),
]);
}
}