feat(alerts): rule engine, recurring evaluator, socket fanout
PR2 of Phase B. Wires the alert framework end-to-end:
- alert-rules.ts: 10 rule evaluators implemented as pure async fns over
the existing schema. reservation.no_agreement, interest.stale,
document.signer_overdue, berth.under_offer_stalled, expense.duplicate,
expense.unscanned, interest.high_value_silent, eoi.unsigned_long,
audit.suspicious_login fire against real conditions.
document.expiring_soon stays inert until the documents schema gets an
expires_at column. audit.suspicious_login also stays inert until the
auth layer logs 'login.failed' rows (TODO noted in the rule body).
- alert-engine.ts: runAlertEngine() walks every port × every rule and
calls reconcileAlertsForPort. Errors per (port, rule) are collected
in the summary, not thrown — one bad evaluator can't stop the sweep.
- alerts.service.ts: reconcileAlertsForPort now emits 'alert:created'
socket events on insert and 'alert:resolved' on auto-resolve;
dismissAlert emits 'alert:dismissed'. All scoped to port:{portId}
rooms.
- socket/events.ts: adds the three Server→Client alert event types.
- queue/scheduler.ts: registers 'alerts-evaluate' on the maintenance
queue with cron */5 * * * * (every 5 min, per spec risk register).
- queue/workers/maintenance.ts: dispatches 'alerts-evaluate' to
runAlertEngine; logs sweep summary.
Tests:
- tests/integration/alerts-engine.test.ts (6 cases): seeds reservation
→ fires, runs twice → no dupe, adds agreement → auto-resolves; seeds
stale interest → fires; hot lead silent → critical; engine summary
shape on no-data port. Socket emit module is vi.mocked.
Vitest 681/681 (was 675; +6). tsc clean. Lint clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 14:50:55 +02:00
|
|
|
/**
|
|
|
|
|
* Engine integration test — drives `runAlertEngineForPorts` against
|
|
|
|
|
* seeded conditions and asserts: (1) correct alerts upsert, (2) running
|
|
|
|
|
* twice doesn't duplicate, (3) mutating state auto-resolves stale alerts.
|
|
|
|
|
*
|
|
|
|
|
* Socket emissions are stubbed via vi.mock so the test stays offline.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
|
|
|
|
import { and, eq, isNull } from 'drizzle-orm';
|
|
|
|
|
|
|
|
|
|
vi.mock('@/lib/socket/server', () => ({
|
|
|
|
|
emitToRoom: vi.fn(),
|
|
|
|
|
}));
|
|
|
|
|
|
|
|
|
|
import { db } from '@/lib/db';
|
|
|
|
|
import { alerts } from '@/lib/db/schema/insights';
|
|
|
|
|
import { interests } from '@/lib/db/schema/interests';
|
|
|
|
|
import { berthReservations } from '@/lib/db/schema/reservations';
|
|
|
|
|
import { documents } from '@/lib/db/schema/documents';
|
|
|
|
|
import { runAlertEngineForPorts } from '@/lib/services/alert-engine';
|
|
|
|
|
import { makePort, makeClient, makeBerth, makeYacht } from '../helpers/factories';
|
|
|
|
|
|
|
|
|
|
async function clearAlerts(portId: string) {
|
|
|
|
|
await db.delete(alerts).where(eq(alerts.portId, portId));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function listOpenAlerts(portId: string, ruleId: string) {
|
|
|
|
|
return db
|
|
|
|
|
.select()
|
|
|
|
|
.from(alerts)
|
|
|
|
|
.where(and(eq(alerts.portId, portId), eq(alerts.ruleId, ruleId), isNull(alerts.resolvedAt)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
describe('alert engine', () => {
|
|
|
|
|
beforeEach(() => {
|
|
|
|
|
vi.clearAllMocks();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
it('reservation.no_agreement fires for active reservation older than 3 days without agreement', async () => {
|
|
|
|
|
const port = await makePort();
|
|
|
|
|
const client = await makeClient({ portId: port.id });
|
|
|
|
|
const berth = await makeBerth({ portId: port.id });
|
|
|
|
|
const yacht = await makeYacht({
|
|
|
|
|
portId: port.id,
|
|
|
|
|
ownerType: 'client',
|
|
|
|
|
ownerId: client.id,
|
|
|
|
|
overrides: { name: 'M/Y Test' },
|
|
|
|
|
});
|
|
|
|
|
const fourDaysAgo = new Date(Date.now() - 4 * 86_400_000);
|
|
|
|
|
const [resv] = await db
|
|
|
|
|
.insert(berthReservations)
|
|
|
|
|
.values({
|
|
|
|
|
portId: port.id,
|
|
|
|
|
berthId: berth.id,
|
|
|
|
|
clientId: client.id,
|
|
|
|
|
yachtId: yacht.id,
|
|
|
|
|
status: 'active',
|
|
|
|
|
startDate: new Date(),
|
|
|
|
|
createdBy: 'seed',
|
|
|
|
|
createdAt: fourDaysAgo,
|
|
|
|
|
})
|
|
|
|
|
.returning();
|
|
|
|
|
expect(resv).toBeDefined();
|
|
|
|
|
|
|
|
|
|
await clearAlerts(port.id);
|
|
|
|
|
await runAlertEngineForPorts([port.id]);
|
|
|
|
|
|
|
|
|
|
const open = await listOpenAlerts(port.id, 'reservation.no_agreement');
|
|
|
|
|
expect(open).toHaveLength(1);
|
|
|
|
|
expect(open[0]!.entityId).toBe(resv!.id);
|
|
|
|
|
expect(open[0]!.severity).toBe('warning');
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
it('does not duplicate on a second sweep', async () => {
|
|
|
|
|
const port = await makePort();
|
|
|
|
|
const client = await makeClient({ portId: port.id });
|
|
|
|
|
const berth = await makeBerth({ portId: port.id });
|
|
|
|
|
const yacht = await makeYacht({
|
|
|
|
|
portId: port.id,
|
|
|
|
|
ownerType: 'client',
|
|
|
|
|
ownerId: client.id,
|
|
|
|
|
});
|
|
|
|
|
const stale = new Date(Date.now() - 10 * 86_400_000);
|
|
|
|
|
await db.insert(berthReservations).values({
|
|
|
|
|
portId: port.id,
|
|
|
|
|
berthId: berth.id,
|
|
|
|
|
clientId: client.id,
|
|
|
|
|
yachtId: yacht.id,
|
|
|
|
|
status: 'active',
|
|
|
|
|
startDate: new Date(),
|
|
|
|
|
createdBy: 'seed',
|
|
|
|
|
createdAt: stale,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
await clearAlerts(port.id);
|
|
|
|
|
await runAlertEngineForPorts([port.id]);
|
|
|
|
|
await runAlertEngineForPorts([port.id]);
|
|
|
|
|
|
|
|
|
|
const open = await listOpenAlerts(port.id, 'reservation.no_agreement');
|
|
|
|
|
expect(open).toHaveLength(1);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
it('auto-resolves an open alert when the underlying condition clears', async () => {
|
|
|
|
|
const port = await makePort();
|
|
|
|
|
const client = await makeClient({ portId: port.id });
|
|
|
|
|
const berth = await makeBerth({ portId: port.id });
|
|
|
|
|
const yacht = await makeYacht({
|
|
|
|
|
portId: port.id,
|
|
|
|
|
ownerType: 'client',
|
|
|
|
|
ownerId: client.id,
|
|
|
|
|
});
|
|
|
|
|
const tenDaysAgo = new Date(Date.now() - 10 * 86_400_000);
|
|
|
|
|
const [resv] = await db
|
|
|
|
|
.insert(berthReservations)
|
|
|
|
|
.values({
|
|
|
|
|
portId: port.id,
|
|
|
|
|
berthId: berth.id,
|
|
|
|
|
clientId: client.id,
|
|
|
|
|
yachtId: yacht.id,
|
|
|
|
|
status: 'active',
|
|
|
|
|
startDate: new Date(),
|
|
|
|
|
createdBy: 'seed',
|
|
|
|
|
createdAt: tenDaysAgo,
|
|
|
|
|
})
|
|
|
|
|
.returning();
|
|
|
|
|
|
|
|
|
|
await clearAlerts(port.id);
|
|
|
|
|
await runAlertEngineForPorts([port.id]);
|
|
|
|
|
expect(await listOpenAlerts(port.id, 'reservation.no_agreement')).toHaveLength(1);
|
|
|
|
|
|
|
|
|
|
// Add an agreement document — condition no longer fires.
|
|
|
|
|
await db.insert(documents).values({
|
|
|
|
|
portId: port.id,
|
|
|
|
|
reservationId: resv!.id,
|
|
|
|
|
documentType: 'reservation_agreement',
|
|
|
|
|
title: 'Reservation Agreement',
|
|
|
|
|
status: 'sent',
|
|
|
|
|
createdBy: 'seed',
|
|
|
|
|
});
|
|
|
|
|
await runAlertEngineForPorts([port.id]);
|
|
|
|
|
|
|
|
|
|
expect(await listOpenAlerts(port.id, 'reservation.no_agreement')).toHaveLength(0);
|
|
|
|
|
const allRows = await db
|
|
|
|
|
.select()
|
|
|
|
|
.from(alerts)
|
|
|
|
|
.where(and(eq(alerts.portId, port.id), eq(alerts.ruleId, 'reservation.no_agreement')));
|
|
|
|
|
expect(allRows).toHaveLength(1);
|
|
|
|
|
expect(allRows[0]!.resolvedAt).not.toBeNull();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
it('interest.stale fires for old leads in mid-funnel stages', async () => {
|
|
|
|
|
const port = await makePort();
|
|
|
|
|
const client = await makeClient({ portId: port.id });
|
|
|
|
|
const stale = new Date(Date.now() - 30 * 86_400_000);
|
|
|
|
|
const [interest] = await db
|
|
|
|
|
.insert(interests)
|
|
|
|
|
.values({
|
|
|
|
|
portId: port.id,
|
|
|
|
|
clientId: client.id,
|
|
|
|
|
pipelineStage: 'in_communication',
|
|
|
|
|
dateLastContact: stale,
|
|
|
|
|
createdAt: stale,
|
|
|
|
|
updatedAt: stale,
|
|
|
|
|
})
|
|
|
|
|
.returning();
|
|
|
|
|
|
|
|
|
|
await clearAlerts(port.id);
|
|
|
|
|
await runAlertEngineForPorts([port.id]);
|
|
|
|
|
|
|
|
|
|
const open = await listOpenAlerts(port.id, 'interest.stale');
|
|
|
|
|
expect(open).toHaveLength(1);
|
|
|
|
|
expect(open[0]!.entityId).toBe(interest!.id);
|
|
|
|
|
expect(open[0]!.severity).toBe('info');
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
it('interest.high_value_silent fires for hot leads silent >7d', async () => {
|
|
|
|
|
const port = await makePort();
|
|
|
|
|
const client = await makeClient({ portId: port.id });
|
|
|
|
|
const stale = new Date(Date.now() - 10 * 86_400_000);
|
|
|
|
|
await db.insert(interests).values({
|
|
|
|
|
portId: port.id,
|
|
|
|
|
clientId: client.id,
|
2026-05-03 16:14:04 +02:00
|
|
|
pipelineStage: 'in_communication',
|
feat(alerts): rule engine, recurring evaluator, socket fanout
PR2 of Phase B. Wires the alert framework end-to-end:
- alert-rules.ts: 10 rule evaluators implemented as pure async fns over
the existing schema. reservation.no_agreement, interest.stale,
document.signer_overdue, berth.under_offer_stalled, expense.duplicate,
expense.unscanned, interest.high_value_silent, eoi.unsigned_long,
audit.suspicious_login fire against real conditions.
document.expiring_soon stays inert until the documents schema gets an
expires_at column. audit.suspicious_login also stays inert until the
auth layer logs 'login.failed' rows (TODO noted in the rule body).
- alert-engine.ts: runAlertEngine() walks every port × every rule and
calls reconcileAlertsForPort. Errors per (port, rule) are collected
in the summary, not thrown — one bad evaluator can't stop the sweep.
- alerts.service.ts: reconcileAlertsForPort now emits 'alert:created'
socket events on insert and 'alert:resolved' on auto-resolve;
dismissAlert emits 'alert:dismissed'. All scoped to port:{portId}
rooms.
- socket/events.ts: adds the three Server→Client alert event types.
- queue/scheduler.ts: registers 'alerts-evaluate' on the maintenance
queue with cron */5 * * * * (every 5 min, per spec risk register).
- queue/workers/maintenance.ts: dispatches 'alerts-evaluate' to
runAlertEngine; logs sweep summary.
Tests:
- tests/integration/alerts-engine.test.ts (6 cases): seeds reservation
→ fires, runs twice → no dupe, adds agreement → auto-resolves; seeds
stale interest → fires; hot lead silent → critical; engine summary
shape on no-data port. Socket emit module is vi.mocked.
Vitest 681/681 (was 675; +6). tsc clean. Lint clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 14:50:55 +02:00
|
|
|
leadCategory: 'hot_lead',
|
|
|
|
|
dateLastContact: stale,
|
|
|
|
|
updatedAt: stale,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
await clearAlerts(port.id);
|
|
|
|
|
await runAlertEngineForPorts([port.id]);
|
|
|
|
|
|
|
|
|
|
const open = await listOpenAlerts(port.id, 'interest.high_value_silent');
|
|
|
|
|
expect(open).toHaveLength(1);
|
|
|
|
|
expect(open[0]!.severity).toBe('critical');
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
it('engine reports rule errors without crashing the sweep', async () => {
|
|
|
|
|
const port = await makePort();
|
|
|
|
|
const summary = await runAlertEngineForPorts([port.id]);
|
|
|
|
|
expect(summary.portsScanned).toBe(1);
|
|
|
|
|
expect(summary.rulesEvaluated).toBeGreaterThan(0);
|
|
|
|
|
// No conditions seeded — no rules should fail.
|
|
|
|
|
expect(summary.errors).toHaveLength(0);
|
|
|
|
|
});
|
|
|
|
|
});
|