Files
pn-new-crm/src/lib/services/analytics.service.ts

107 lines
2.9 KiB
TypeScript
Raw Normal View History

feat(insights): Phase B schema + service skeletons PR1 of Phase B per docs/superpowers/specs/2026-04-28-phase-b-insights-alerts-design.md. Lays the foundation that PRs 2-10 will fill in with behaviour. Schema (migration 0014): - alerts table with rule-engine fields (rule_id, severity, link, entity_type/id, fingerprint, fired/dismissed/acknowledged/resolved timestamps, jsonb metadata). Partial-unique fingerprint index keeps one open row per (port, rule, entity); separate indexes power severity-filtered and time-ordered queries. - analytics_snapshots (port_id, metric_id) -> jsonb cache + computedAt for the 15-min recurring refresh. - expenses: duplicate_of self-FK, dedup_scanned_at, ocr_status/raw/ confidence; partial index on (port, vendor, amount, date) where duplicate_of IS NULL drives the dedup heuristic. - audit_logs.search_text: GENERATED ALWAYS tsvector over action+entity_type+entity_id+user_id, GIN-indexed (drizzle can't model GENERATED ALWAYS in TS yet, so the migration appends manual ALTER + the GIN index). Service skeletons in src/lib/services/: - alerts.service.ts: fingerprintFor, reconcileAlertsForPort (upsert + auto-resolve), dismiss, acknowledge, listAlertsForPort. - alert-rules.ts: RULE_REGISTRY of 10 rule evaluators (currently no-op); PR2 fills in the bodies. - analytics.service.ts: readSnapshot/writeSnapshot with 15-min TTL + no-op compute* stubs for the four chart series; PR3 fills behavior. - expense-dedup.service.ts: scanForDuplicates + markBestDuplicate using the partial dedup index. PR8 wires the BullMQ trigger. - expense-ocr.service.ts: OcrResult/OcrLineItem types + ocrReceipt stub. PR9 wires Claude Vision (Haiku 4.5 + ephemeral system-prompt cache). - audit-search.service.ts: tsvector @@ plainto_tsquery + cursor pagination on (createdAt, id). PR10 wires the admin UI. tsc clean, lint clean, vitest 675/675 (one unrelated AES random-output flake passes solo). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 14:43:01 +02:00
/**
* Phase B analytics service. Reads pre-computed snapshots from
* `analytics_snapshots` keyed by `metric_id` and recomputes on demand if
* the cached row is older than `SNAPSHOT_TTL_MS`. The recomputation jobs
* land in `analytics-snapshot-job.ts` (PR3).
*/
import { and, eq } from 'drizzle-orm';
import { db } from '@/lib/db';
import { analyticsSnapshots } from '@/lib/db/schema/insights';
export type DateRange = '7d' | '30d' | '90d' | 'today';
export type MetricId =
| `pipeline_funnel.${DateRange}`
| `occupancy_timeline.${DateRange}`
| `revenue_breakdown.${DateRange}`
| `lead_source_attribution.${DateRange}`;
export const SNAPSHOT_TTL_MS = 15 * 60 * 1000; // 15 minutes
export interface PipelineFunnelData {
stages: Array<{ stage: string; count: number; conversionPct: number }>;
}
export interface OccupancyTimelineData {
points: Array<{ date: string; available: number; underOffer: number; sold: number }>;
}
export interface RevenueBreakdownData {
bars: Array<{ category: string; amount: number; currency: string }>;
}
export interface LeadSourceAttributionData {
slices: Array<{ source: string; count: number }>;
}
export type SnapshotData =
| PipelineFunnelData
| OccupancyTimelineData
| RevenueBreakdownData
| LeadSourceAttributionData;
/**
* Read a snapshot by `(portId, metricId)`. Returns null when missing or
* stale; the caller should request a recompute (or the recurring job
* eventually fills it).
*/
export async function readSnapshot<T extends SnapshotData>(
portId: string,
metricId: MetricId,
): Promise<T | null> {
const row = await db.query.analyticsSnapshots.findFirst({
where: and(eq(analyticsSnapshots.portId, portId), eq(analyticsSnapshots.metricId, metricId)),
});
if (!row) return null;
const age = Date.now() - row.computedAt.getTime();
if (age > SNAPSHOT_TTL_MS) return null;
return row.data as T;
}
export async function writeSnapshot(
portId: string,
metricId: MetricId,
data: SnapshotData,
): Promise<void> {
await db
.insert(analyticsSnapshots)
.values({ portId, metricId, data })
.onConflictDoUpdate({
target: [analyticsSnapshots.portId, analyticsSnapshots.metricId],
set: { data, computedAt: new Date() },
});
}
// Computation entrypoints — bodies land in PR3 along with the recurring
// snapshot job. Exported as no-op stubs so PR1's tsc/lint stay green.
export async function computePipelineFunnel(
_portId: string,
_range: DateRange,
): Promise<PipelineFunnelData> {
return { stages: [] };
}
export async function computeOccupancyTimeline(
_portId: string,
_range: DateRange,
): Promise<OccupancyTimelineData> {
return { points: [] };
}
export async function computeRevenueBreakdown(
_portId: string,
_range: DateRange,
): Promise<RevenueBreakdownData> {
return { bars: [] };
}
export async function computeLeadSourceAttribution(
_portId: string,
_range: DateRange,
): Promise<LeadSourceAttributionData> {
return { slices: [] };
}