PR2 of Phase B. Wires the alert framework end-to-end:
- alert-rules.ts: 10 rule evaluators implemented as pure async fns over
the existing schema. reservation.no_agreement, interest.stale,
document.signer_overdue, berth.under_offer_stalled, expense.duplicate,
expense.unscanned, interest.high_value_silent, eoi.unsigned_long,
audit.suspicious_login fire against real conditions.
document.expiring_soon stays inert until the documents schema gets an
expires_at column. audit.suspicious_login also stays inert until the
auth layer logs 'login.failed' rows (TODO noted in the rule body).
- alert-engine.ts: runAlertEngine() walks every port × every rule and
calls reconcileAlertsForPort. Errors per (port, rule) are collected
in the summary, not thrown — one bad evaluator can't stop the sweep.
- alerts.service.ts: reconcileAlertsForPort now emits 'alert:created'
socket events on insert and 'alert:resolved' on auto-resolve;
dismissAlert emits 'alert:dismissed'. All scoped to port:{portId}
rooms.
- socket/events.ts: adds the three Server→Client alert event types.
- queue/scheduler.ts: registers 'alerts-evaluate' on the maintenance
queue with cron */5 * * * * (every 5 min, per spec risk register).
- queue/workers/maintenance.ts: dispatches 'alerts-evaluate' to
runAlertEngine; logs sweep summary.
Tests:
- tests/integration/alerts-engine.test.ts (6 cases): seeds reservation
→ fires, runs twice → no dupe, adds agreement → auto-resolves; seeds
stale interest → fires; hot lead silent → critical; engine summary
shape on no-data port. Socket emit module is vi.mocked.
Vitest 681/681 (was 675; +6). tsc clean. Lint clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
138 lines
4.1 KiB
TypeScript
138 lines
4.1 KiB
TypeScript
/**
|
|
* Phase B alert framework — service layer.
|
|
*
|
|
* This is the skeleton: types, function shapes, and behaviour stubs. The
|
|
* actual rule evaluators live in `alert-rules.ts` (PR2). The cron
|
|
* dispatcher will compose this service with that catalogue.
|
|
*/
|
|
|
|
import { and, eq, isNull, sql } from 'drizzle-orm';
|
|
import { createHash } from 'crypto';
|
|
|
|
import { db } from '@/lib/db';
|
|
import { alerts, type Alert, type AlertSeverity, type AlertRuleId } from '@/lib/db/schema/insights';
|
|
import { emitToRoom } from '@/lib/socket/server';
|
|
|
|
export interface AlertCandidate {
|
|
ruleId: AlertRuleId;
|
|
severity: AlertSeverity;
|
|
title: string;
|
|
body?: string;
|
|
link: string;
|
|
entityType?: string;
|
|
entityId?: string;
|
|
metadata?: Record<string, unknown>;
|
|
}
|
|
|
|
/**
|
|
* Stable identity hash so re-evaluations of the same condition upsert
|
|
* onto the same row (via `idx_alerts_fingerprint_open`).
|
|
*/
|
|
export function fingerprintFor(
|
|
c: Pick<AlertCandidate, 'ruleId' | 'entityType' | 'entityId'>,
|
|
): string {
|
|
return createHash('sha1')
|
|
.update(`${c.ruleId}|${c.entityType ?? ''}|${c.entityId ?? ''}`)
|
|
.digest('hex');
|
|
}
|
|
|
|
/**
|
|
* Apply a batch of rule outputs against the open-alert table:
|
|
* - upsert open alerts (rule still firing)
|
|
* - resolve any open alert in scope whose fingerprint isn't in this batch
|
|
*/
|
|
export async function reconcileAlertsForPort(
|
|
portId: string,
|
|
ruleId: AlertRuleId,
|
|
candidates: AlertCandidate[],
|
|
): Promise<void> {
|
|
// Insert new / leave existing — only one open row per fingerprint
|
|
// thanks to the partial unique index. Track newly inserted rows so we
|
|
// can emit `alert:created` to the port room.
|
|
for (const c of candidates) {
|
|
const fingerprint = fingerprintFor(c);
|
|
const inserted = await db
|
|
.insert(alerts)
|
|
.values({
|
|
portId,
|
|
ruleId: c.ruleId,
|
|
severity: c.severity,
|
|
title: c.title,
|
|
body: c.body,
|
|
link: c.link,
|
|
entityType: c.entityType,
|
|
entityId: c.entityId,
|
|
fingerprint,
|
|
metadata: c.metadata ?? {},
|
|
})
|
|
.onConflictDoNothing()
|
|
.returning({ id: alerts.id });
|
|
if (inserted[0]) {
|
|
emitToRoom(`port:${portId}`, 'alert:created', {
|
|
alertId: inserted[0].id,
|
|
portId,
|
|
ruleId: c.ruleId,
|
|
severity: c.severity,
|
|
title: c.title,
|
|
link: c.link,
|
|
});
|
|
}
|
|
}
|
|
|
|
// Auto-resolve open alerts for this rule whose fingerprint disappeared.
|
|
const liveFingerprints = new Set(candidates.map((c) => fingerprintFor(c)));
|
|
const open = await db.query.alerts.findMany({
|
|
where: and(eq(alerts.portId, portId), eq(alerts.ruleId, ruleId), isNull(alerts.resolvedAt)),
|
|
});
|
|
const stale = open.filter((a) => !liveFingerprints.has(a.fingerprint));
|
|
for (const a of stale) {
|
|
await db
|
|
.update(alerts)
|
|
.set({ resolvedAt: sql`now()` })
|
|
.where(eq(alerts.id, a.id));
|
|
emitToRoom(`port:${portId}`, 'alert:resolved', {
|
|
alertId: a.id,
|
|
portId,
|
|
ruleId,
|
|
});
|
|
}
|
|
}
|
|
|
|
export async function dismissAlert(alertId: string, userId: string): Promise<void> {
|
|
const [row] = await db
|
|
.update(alerts)
|
|
.set({ dismissedAt: sql`now()`, dismissedBy: userId })
|
|
.where(eq(alerts.id, alertId))
|
|
.returning({ id: alerts.id, portId: alerts.portId });
|
|
if (row) {
|
|
emitToRoom(`port:${row.portId}`, 'alert:dismissed', { alertId: row.id, portId: row.portId });
|
|
}
|
|
}
|
|
|
|
export async function acknowledgeAlert(alertId: string, userId: string): Promise<void> {
|
|
await db
|
|
.update(alerts)
|
|
.set({ acknowledgedAt: sql`now()`, acknowledgedBy: userId })
|
|
.where(eq(alerts.id, alertId));
|
|
}
|
|
|
|
export interface ListAlertsOptions {
|
|
severity?: AlertSeverity[];
|
|
includeDismissed?: boolean;
|
|
includeResolved?: boolean;
|
|
}
|
|
|
|
export async function listAlertsForPort(
|
|
portId: string,
|
|
options: ListAlertsOptions = {},
|
|
): Promise<Alert[]> {
|
|
const conditions = [eq(alerts.portId, portId)];
|
|
if (!options.includeResolved) conditions.push(isNull(alerts.resolvedAt));
|
|
if (!options.includeDismissed) conditions.push(isNull(alerts.dismissedAt));
|
|
return db.query.alerts.findMany({
|
|
where: and(...conditions),
|
|
orderBy: (a, { desc }) => [desc(a.firedAt)],
|
|
limit: 100,
|
|
});
|
|
}
|