feat(insights): Phase B schema + service skeletons

PR1 of Phase B per docs/superpowers/specs/2026-04-28-phase-b-insights-alerts-design.md.
Lays the foundation that PRs 2-10 will fill in with behaviour.

Schema (migration 0014):
- alerts table with rule-engine fields (rule_id, severity, link,
  entity_type/id, fingerprint, fired/dismissed/acknowledged/resolved
  timestamps, jsonb metadata). Partial-unique fingerprint index keeps
  one open row per (port, rule, entity); separate indexes power
  severity-filtered and time-ordered queries.
- analytics_snapshots (port_id, metric_id) -> jsonb cache + computedAt
  for the 15-min recurring refresh.
- expenses: duplicate_of self-FK, dedup_scanned_at, ocr_status/raw/
  confidence; partial index on (port, vendor, amount, date) where
  duplicate_of IS NULL drives the dedup heuristic.
- audit_logs.search_text: GENERATED ALWAYS tsvector over
  action+entity_type+entity_id+user_id, GIN-indexed (drizzle can't
  model GENERATED ALWAYS in TS yet, so the migration appends manual
  ALTER + the GIN index).

Service skeletons in src/lib/services/:
- alerts.service.ts: fingerprintFor, reconcileAlertsForPort (upsert +
  auto-resolve), dismiss, acknowledge, listAlertsForPort.
- alert-rules.ts: RULE_REGISTRY of 10 rule evaluators (currently no-op);
  PR2 fills in the bodies.
- analytics.service.ts: readSnapshot/writeSnapshot with 15-min TTL +
  no-op compute* stubs for the four chart series; PR3 fills behavior.
- expense-dedup.service.ts: scanForDuplicates + markBestDuplicate
  using the partial dedup index. PR8 wires the BullMQ trigger.
- expense-ocr.service.ts: OcrResult/OcrLineItem types + ocrReceipt
  stub. PR9 wires Claude Vision (Haiku 4.5 + ephemeral system-prompt
  cache).
- audit-search.service.ts: tsvector @@ plainto_tsquery + cursor
  pagination on (createdAt, id). PR10 wires the admin UI.

tsc clean, lint clean, vitest 675/675 (one unrelated AES random-output
flake passes solo).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Matt Ciaccio
2026-04-28 14:43:01 +02:00
parent f1ed2a5f87
commit e77d55ac50
13 changed files with 10451 additions and 10 deletions

View File

@@ -0,0 +1,31 @@
/**
* Alert rule catalog. Each entry is a pure async function that takes a
* `portId` and returns an array of `AlertCandidate` rows the engine should
* upsert into `alerts`. Skeleton: signatures only — implementations land
* in PR2.
*/
import type { AlertCandidate } from './alerts.service';
import { ALERT_RULES, type AlertRuleId } from '@/lib/db/schema/insights';
type RuleEvaluator = (portId: string) => Promise<AlertCandidate[]>;
/** Empty implementations — every evaluator returns no candidates. PR2
* fills these in; the cron dispatcher in PR2 walks `RULE_REGISTRY`. */
export const RULE_REGISTRY: Record<AlertRuleId, RuleEvaluator> = {
'reservation.no_agreement': async () => [],
'interest.stale': async () => [],
'document.expiring_soon': async () => [],
'document.signer_overdue': async () => [],
'berth.under_offer_stalled': async () => [],
'expense.duplicate': async () => [],
'expense.unscanned': async () => [],
'interest.high_value_silent': async () => [],
'eoi.unsigned_long': async () => [],
'audit.suspicious_login': async () => [],
};
/** Sanity check: catalog matches the ALERT_RULES literal type. */
export function listRuleIds(): readonly AlertRuleId[] {
return ALERT_RULES;
}

View File

@@ -0,0 +1,115 @@
/**
* Phase B alert framework — service layer.
*
* This is the skeleton: types, function shapes, and behaviour stubs. The
* actual rule evaluators live in `alert-rules.ts` (PR2). The cron
* dispatcher will compose this service with that catalogue.
*/
import { and, eq, isNull, sql } from 'drizzle-orm';
import { createHash } from 'crypto';
import { db } from '@/lib/db';
import { alerts, type Alert, type AlertSeverity, type AlertRuleId } from '@/lib/db/schema/insights';
export interface AlertCandidate {
ruleId: AlertRuleId;
severity: AlertSeverity;
title: string;
body?: string;
link: string;
entityType?: string;
entityId?: string;
metadata?: Record<string, unknown>;
}
/**
* Stable identity hash so re-evaluations of the same condition upsert
* onto the same row (via `idx_alerts_fingerprint_open`).
*/
export function fingerprintFor(
c: Pick<AlertCandidate, 'ruleId' | 'entityType' | 'entityId'>,
): string {
return createHash('sha1')
.update(`${c.ruleId}|${c.entityType ?? ''}|${c.entityId ?? ''}`)
.digest('hex');
}
/**
* Apply a batch of rule outputs against the open-alert table:
* - upsert open alerts (rule still firing)
* - resolve any open alert in scope whose fingerprint isn't in this batch
*/
export async function reconcileAlertsForPort(
portId: string,
ruleId: AlertRuleId,
candidates: AlertCandidate[],
): Promise<void> {
// Insert new / leave existing — only one open row per fingerprint
// thanks to the partial unique index.
for (const c of candidates) {
const fingerprint = fingerprintFor(c);
await db
.insert(alerts)
.values({
portId,
ruleId: c.ruleId,
severity: c.severity,
title: c.title,
body: c.body,
link: c.link,
entityType: c.entityType,
entityId: c.entityId,
fingerprint,
metadata: c.metadata ?? {},
})
.onConflictDoNothing();
}
// Auto-resolve open alerts for this rule whose fingerprint disappeared.
const liveFingerprints = new Set(candidates.map((c) => fingerprintFor(c)));
const open = await db.query.alerts.findMany({
where: and(eq(alerts.portId, portId), eq(alerts.ruleId, ruleId), isNull(alerts.resolvedAt)),
});
const stale = open.filter((a) => !liveFingerprints.has(a.fingerprint));
for (const a of stale) {
await db
.update(alerts)
.set({ resolvedAt: sql`now()` })
.where(eq(alerts.id, a.id));
}
}
export async function dismissAlert(alertId: string, userId: string): Promise<void> {
await db
.update(alerts)
.set({ dismissedAt: sql`now()`, dismissedBy: userId })
.where(eq(alerts.id, alertId));
}
export async function acknowledgeAlert(alertId: string, userId: string): Promise<void> {
await db
.update(alerts)
.set({ acknowledgedAt: sql`now()`, acknowledgedBy: userId })
.where(eq(alerts.id, alertId));
}
export interface ListAlertsOptions {
severity?: AlertSeverity[];
includeDismissed?: boolean;
includeResolved?: boolean;
}
export async function listAlertsForPort(
portId: string,
options: ListAlertsOptions = {},
): Promise<Alert[]> {
const conditions = [eq(alerts.portId, portId)];
if (!options.includeResolved) conditions.push(isNull(alerts.resolvedAt));
if (!options.includeDismissed) conditions.push(isNull(alerts.dismissedAt));
return db.query.alerts.findMany({
where: and(...conditions),
orderBy: (a, { desc }) => [desc(a.firedAt)],
limit: 100,
});
}

View File

@@ -0,0 +1,106 @@
/**
* Phase B analytics service. Reads pre-computed snapshots from
* `analytics_snapshots` keyed by `metric_id` and recomputes on demand if
* the cached row is older than `SNAPSHOT_TTL_MS`. The recomputation jobs
* land in `analytics-snapshot-job.ts` (PR3).
*/
import { and, eq } from 'drizzle-orm';
import { db } from '@/lib/db';
import { analyticsSnapshots } from '@/lib/db/schema/insights';
export type DateRange = '7d' | '30d' | '90d' | 'today';
export type MetricId =
| `pipeline_funnel.${DateRange}`
| `occupancy_timeline.${DateRange}`
| `revenue_breakdown.${DateRange}`
| `lead_source_attribution.${DateRange}`;
export const SNAPSHOT_TTL_MS = 15 * 60 * 1000; // 15 minutes
export interface PipelineFunnelData {
stages: Array<{ stage: string; count: number; conversionPct: number }>;
}
export interface OccupancyTimelineData {
points: Array<{ date: string; available: number; underOffer: number; sold: number }>;
}
export interface RevenueBreakdownData {
bars: Array<{ category: string; amount: number; currency: string }>;
}
export interface LeadSourceAttributionData {
slices: Array<{ source: string; count: number }>;
}
export type SnapshotData =
| PipelineFunnelData
| OccupancyTimelineData
| RevenueBreakdownData
| LeadSourceAttributionData;
/**
* Read a snapshot by `(portId, metricId)`. Returns null when missing or
* stale; the caller should request a recompute (or the recurring job
* eventually fills it).
*/
export async function readSnapshot<T extends SnapshotData>(
portId: string,
metricId: MetricId,
): Promise<T | null> {
const row = await db.query.analyticsSnapshots.findFirst({
where: and(eq(analyticsSnapshots.portId, portId), eq(analyticsSnapshots.metricId, metricId)),
});
if (!row) return null;
const age = Date.now() - row.computedAt.getTime();
if (age > SNAPSHOT_TTL_MS) return null;
return row.data as T;
}
export async function writeSnapshot(
portId: string,
metricId: MetricId,
data: SnapshotData,
): Promise<void> {
await db
.insert(analyticsSnapshots)
.values({ portId, metricId, data })
.onConflictDoUpdate({
target: [analyticsSnapshots.portId, analyticsSnapshots.metricId],
set: { data, computedAt: new Date() },
});
}
// Computation entrypoints — bodies land in PR3 along with the recurring
// snapshot job. Exported as no-op stubs so PR1's tsc/lint stay green.
export async function computePipelineFunnel(
_portId: string,
_range: DateRange,
): Promise<PipelineFunnelData> {
return { stages: [] };
}
export async function computeOccupancyTimeline(
_portId: string,
_range: DateRange,
): Promise<OccupancyTimelineData> {
return { points: [] };
}
export async function computeRevenueBreakdown(
_portId: string,
_range: DateRange,
): Promise<RevenueBreakdownData> {
return { bars: [] };
}
export async function computeLeadSourceAttribution(
_portId: string,
_range: DateRange,
): Promise<LeadSourceAttributionData> {
return { slices: [] };
}

View File

@@ -0,0 +1,72 @@
/**
* Audit log search — PR1 skeleton. PR10 fills in the cursor pagination
* and per-port + super-admin scoping; v1 already has the GIN index on
* `audit_logs.search_text`.
*/
import { and, desc, eq, gte, lte, sql, type SQL } from 'drizzle-orm';
import { db } from '@/lib/db';
import { auditLogs, type AuditLog } from '@/lib/db/schema/system';
export interface AuditSearchOptions {
/** Limit results to a single port. Omit for super-admin all-ports view. */
portId?: string;
/** Free-text query — runs against the GIN-indexed search_text column. */
q?: string;
/** Filter by actor (user id). */
userId?: string;
/** Filter by action verb: 'create' | 'update' | 'delete' | ... */
action?: string;
/** Filter by entity type: 'client' | 'interest' | 'document' | ... */
entityType?: string;
/** Filter by exact entity id (e.g. paste a uuid into search). */
entityId?: string;
/** Inclusive date range. */
from?: Date;
to?: Date;
/** Pagination — cursor on (createdAt, id). */
cursor?: { createdAt: Date; id: string };
limit?: number;
}
export interface AuditSearchPage {
rows: AuditLog[];
nextCursor: { createdAt: Date; id: string } | null;
}
export async function searchAuditLogs(options: AuditSearchOptions = {}): Promise<AuditSearchPage> {
const conds: SQL[] = [];
if (options.portId) conds.push(eq(auditLogs.portId, options.portId));
if (options.userId) conds.push(eq(auditLogs.userId, options.userId));
if (options.action) conds.push(eq(auditLogs.action, options.action));
if (options.entityType) conds.push(eq(auditLogs.entityType, options.entityType));
if (options.entityId) conds.push(eq(auditLogs.entityId, options.entityId));
if (options.from) conds.push(gte(auditLogs.createdAt, options.from));
if (options.to) conds.push(lte(auditLogs.createdAt, options.to));
if (options.q) {
// tsquery match against the GENERATED tsvector column.
conds.push(sql`${auditLogs.searchText} @@ plainto_tsquery('simple', ${options.q})`);
}
if (options.cursor) {
// Strict less-than on (createdAt, id) for stable cursor pagination.
conds.push(
sql`(${auditLogs.createdAt}, ${auditLogs.id}) < (${options.cursor.createdAt}, ${options.cursor.id})`,
);
}
const limit = Math.min(options.limit ?? 50, 200);
const rows = await db.query.auditLogs.findMany({
where: conds.length > 0 ? and(...conds) : undefined,
orderBy: [desc(auditLogs.createdAt), desc(auditLogs.id)],
limit: limit + 1,
});
const hasMore = rows.length > limit;
const truncated = hasMore ? rows.slice(0, limit) : rows;
const last = truncated[truncated.length - 1];
return {
rows: truncated,
nextCursor: hasMore && last ? { createdAt: last.createdAt, id: last.id } : null,
};
}

View File

@@ -0,0 +1,71 @@
/**
* Expense duplicate detection — heuristic match on
* (port + vendor + amount + date ± 3d). PR1 ships the function shape;
* PR8 wires the BullMQ trigger and the merge service.
*/
import { and, between, eq, ne, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { expenses } from '@/lib/db/schema/financial';
const DEDUP_WINDOW_DAYS = 3;
export interface DedupCandidate {
/** Existing expense that the new one likely duplicates. */
candidateId: string;
/** 0..1 confidence; 1.0 = exact vendor + amount + same day. */
confidence: number;
}
export async function scanForDuplicates(expenseId: string): Promise<DedupCandidate[]> {
const target = await db.query.expenses.findFirst({ where: eq(expenses.id, expenseId) });
if (!target) return [];
const { portId, establishmentName, amount, expenseDate } = target;
if (!establishmentName || !amount || !expenseDate) return [];
const lo = new Date(expenseDate);
lo.setDate(lo.getDate() - DEDUP_WINDOW_DAYS);
const hi = new Date(expenseDate);
hi.setDate(hi.getDate() + DEDUP_WINDOW_DAYS);
const matches = await db.query.expenses.findMany({
where: and(
eq(expenses.portId, portId),
sql`lower(${expenses.establishmentName}) = lower(${establishmentName})`,
eq(expenses.amount, amount),
between(expenses.expenseDate, lo, hi),
ne(expenses.id, expenseId),
),
limit: 5,
});
return matches.map((m) => ({
candidateId: m.id,
confidence: dayDiff(m.expenseDate, expenseDate) === 0 ? 1.0 : 0.85,
}));
}
function dayDiff(a: Date, b: Date): number {
const ms = Math.abs(a.getTime() - b.getTime());
return Math.round(ms / 86_400_000);
}
/** Mark an expense as a duplicate of the candidate with the highest score. */
export async function markBestDuplicate(expenseId: string): Promise<string | null> {
const candidates = await scanForDuplicates(expenseId);
if (candidates.length === 0) {
await db
.update(expenses)
.set({ dedupScannedAt: sql`now()` })
.where(eq(expenses.id, expenseId));
return null;
}
const best = candidates.reduce((a, b) => (a.confidence >= b.confidence ? a : b));
await db
.update(expenses)
.set({ duplicateOf: best.candidateId, dedupScannedAt: sql`now()` })
.where(eq(expenses.id, expenseId));
return best.candidateId;
}

View File

@@ -0,0 +1,47 @@
/**
* Claude Vision-driven OCR for expense receipts. PR1 stub: types and the
* service contract. The actual API call wires up in PR9 with prompt
* caching on the system text and Haiku 4.5 by default.
*/
export interface OcrLineItem {
description: string;
quantity?: number;
unitPrice?: number;
amount: number;
}
export interface OcrResult {
vendor: string | null;
amount: number | null;
currency: string | null;
/** ISO date YYYY-MM-DD. */
date: string | null;
lineItems: OcrLineItem[];
/** 0..1; below 0.6 surfaces "verify mode" UI. */
confidence: number;
}
export interface OcrContext {
fileId: string;
fileUrl: string;
/** Optional MIME hint; the service still detects from bytes. */
mimeType?: string;
}
/** Cost ceiling per call (Haiku 4.5 + cached system prompt). PR9 enforces. */
export const OCR_MAX_TOKENS = 1024;
export const OCR_LOW_CONFIDENCE_THRESHOLD = 0.6;
/** Stub — returns "pending" shape so callers can wire UI in PR1 without
* Anthropic credentials. */
export async function ocrReceipt(_ctx: OcrContext): Promise<OcrResult> {
return {
vendor: null,
amount: null,
currency: null,
date: null,
lineItems: [],
confidence: 0,
};
}