feat(alerts): rule engine, recurring evaluator, socket fanout
PR2 of Phase B. Wires the alert framework end-to-end:
- alert-rules.ts: 10 rule evaluators implemented as pure async fns over
the existing schema. reservation.no_agreement, interest.stale,
document.signer_overdue, berth.under_offer_stalled, expense.duplicate,
expense.unscanned, interest.high_value_silent, eoi.unsigned_long,
audit.suspicious_login fire against real conditions.
document.expiring_soon stays inert until the documents schema gets an
expires_at column. audit.suspicious_login also stays inert until the
auth layer logs 'login.failed' rows (TODO noted in the rule body).
- alert-engine.ts: runAlertEngine() walks every port × every rule and
calls reconcileAlertsForPort. Errors per (port, rule) are collected
in the summary, not thrown — one bad evaluator can't stop the sweep.
- alerts.service.ts: reconcileAlertsForPort now emits 'alert:created'
socket events on insert and 'alert:resolved' on auto-resolve;
dismissAlert emits 'alert:dismissed'. All scoped to port:{portId}
rooms.
- socket/events.ts: adds the three Server→Client alert event types.
- queue/scheduler.ts: registers 'alerts-evaluate' on the maintenance
queue with cron */5 * * * * (every 5 min, per spec risk register).
- queue/workers/maintenance.ts: dispatches 'alerts-evaluate' to
runAlertEngine; logs sweep summary.
Tests:
- tests/integration/alerts-engine.test.ts (6 cases): seeds reservation
→ fires, runs twice → no dupe, adds agreement → auto-resolves; seeds
stale interest → fires; hot lead silent → critical; engine summary
shape on no-data port. Socket emit module is vi.mocked.
Vitest 681/681 (was 675; +6). tsc clean. Lint clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -14,39 +14,42 @@ interface RecurringJobDef {
|
|||||||
export async function registerRecurringJobs(): Promise<void> {
|
export async function registerRecurringJobs(): Promise<void> {
|
||||||
const recurring: RecurringJobDef[] = [
|
const recurring: RecurringJobDef[] = [
|
||||||
// Documenso signature fallback poll — primary is webhooks, this is safety net
|
// Documenso signature fallback poll — primary is webhooks, this is safety net
|
||||||
{ queue: 'documents', name: 'signature-poll', pattern: '0 */6 * * *' },
|
{ queue: 'documents', name: 'signature-poll', pattern: '0 */6 * * *' },
|
||||||
|
|
||||||
// Reminder checks
|
// Reminder checks
|
||||||
{ queue: 'notifications', name: 'reminder-check', pattern: '0 * * * *' },
|
{ queue: 'notifications', name: 'reminder-check', pattern: '0 * * * *' },
|
||||||
{ queue: 'notifications', name: 'reminder-overdue-check', pattern: '*/15 * * * *' },
|
{ queue: 'notifications', name: 'reminder-overdue-check', pattern: '*/15 * * * *' },
|
||||||
|
|
||||||
// Google Calendar background sync
|
// Google Calendar background sync
|
||||||
{ queue: 'maintenance', name: 'calendar-sync', pattern: '*/30 * * * *' },
|
{ queue: 'maintenance', name: 'calendar-sync', pattern: '*/30 * * * *' },
|
||||||
|
|
||||||
// Daily checks at 08:00
|
// Daily checks at 08:00
|
||||||
{ queue: 'notifications', name: 'invoice-overdue-check', pattern: '0 8 * * *' },
|
{ queue: 'notifications', name: 'invoice-overdue-check', pattern: '0 8 * * *' },
|
||||||
{ queue: 'notifications', name: 'tenure-expiry-check', pattern: '0 8 * * *' },
|
{ queue: 'notifications', name: 'tenure-expiry-check', pattern: '0 8 * * *' },
|
||||||
|
|
||||||
// Exchange rate refresh every 6 hours
|
// Exchange rate refresh every 6 hours
|
||||||
{ queue: 'maintenance', name: 'currency-refresh', pattern: '0 */6 * * *' },
|
{ queue: 'maintenance', name: 'currency-refresh', pattern: '0 */6 * * *' },
|
||||||
|
|
||||||
// Database backup / cleanup
|
// Database backup / cleanup
|
||||||
{ queue: 'maintenance', name: 'database-backup', pattern: '0 2 * * *' },
|
{ queue: 'maintenance', name: 'database-backup', pattern: '0 2 * * *' },
|
||||||
{ queue: 'maintenance', name: 'backup-cleanup', pattern: '0 3 * * 0' }, // Sunday 03:00
|
{ queue: 'maintenance', name: 'backup-cleanup', pattern: '0 3 * * 0' }, // Sunday 03:00
|
||||||
|
|
||||||
// Session cleanup
|
// Session cleanup
|
||||||
{ queue: 'maintenance', name: 'session-cleanup', pattern: '0 4 * * *' },
|
{ queue: 'maintenance', name: 'session-cleanup', pattern: '0 4 * * *' },
|
||||||
|
|
||||||
// Report scheduler — checks every minute for reports due to run
|
// Report scheduler — checks every minute for reports due to run
|
||||||
{ queue: 'reports', name: 'report-scheduler', pattern: '* * * * *' },
|
{ queue: 'reports', name: 'report-scheduler', pattern: '* * * * *' },
|
||||||
|
|
||||||
// Notification digest — configurable per user; placeholder fires hourly
|
// Notification digest — configurable per user; placeholder fires hourly
|
||||||
// TODO(L2): make per-user schedule configurable (read from user_settings)
|
// TODO(L2): make per-user schedule configurable (read from user_settings)
|
||||||
{ queue: 'email', name: 'notification-digest', pattern: '0 * * * *' },
|
{ queue: 'email', name: 'notification-digest', pattern: '0 * * * *' },
|
||||||
|
|
||||||
// Cleanup jobs
|
// Cleanup jobs
|
||||||
{ queue: 'maintenance', name: 'temp-file-cleanup', pattern: '0 5 * * *' },
|
{ queue: 'maintenance', name: 'temp-file-cleanup', pattern: '0 5 * * *' },
|
||||||
{ queue: 'maintenance', name: 'form-expiry-check', pattern: '0 * * * *' },
|
{ queue: 'maintenance', name: 'form-expiry-check', pattern: '0 * * * *' },
|
||||||
|
|
||||||
|
// Phase B: alert rule engine sweep
|
||||||
|
{ queue: 'maintenance', name: 'alerts-evaluate', pattern: '*/5 * * * *' },
|
||||||
];
|
];
|
||||||
|
|
||||||
for (const job of recurring) {
|
for (const job of recurring) {
|
||||||
@@ -56,7 +59,10 @@ export async function registerRecurringJobs(): Promise<void> {
|
|||||||
{ pattern: job.pattern },
|
{ pattern: job.pattern },
|
||||||
{ data: {}, name: job.name },
|
{ data: {}, name: job.name },
|
||||||
);
|
);
|
||||||
logger.info({ queue: job.queue, job: job.name, pattern: job.pattern }, 'Registered recurring job');
|
logger.info(
|
||||||
|
{ queue: job.queue, job: job.name, pattern: job.pattern },
|
||||||
|
'Registered recurring job',
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info({ count: recurring.length }, 'All recurring jobs registered');
|
logger.info({ count: recurring.length }, 'All recurring jobs registered');
|
||||||
|
|||||||
@@ -28,6 +28,12 @@ export const maintenanceWorker = new Worker(
|
|||||||
logger.info({ expired: result.length }, 'Form expiry check complete');
|
logger.info({ expired: result.length }, 'Form expiry check complete');
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case 'alerts-evaluate': {
|
||||||
|
const { runAlertEngine } = await import('@/lib/services/alert-engine');
|
||||||
|
const summary = await runAlertEngine();
|
||||||
|
logger.info(summary, 'Alert engine sweep complete');
|
||||||
|
break;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
logger.warn({ jobName: job.name }, 'Unknown maintenance job');
|
logger.warn({ jobName: job.name }, 'Unknown maintenance job');
|
||||||
}
|
}
|
||||||
|
|||||||
50
src/lib/services/alert-engine.ts
Normal file
50
src/lib/services/alert-engine.ts
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
/**
|
||||||
|
* Alert engine — runs every rule against every port. Called by the
|
||||||
|
* BullMQ recurring job 'alerts-evaluate' every 5 minutes; exposed as a
|
||||||
|
* function so integration tests can drive it without a worker.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { logger } from '@/lib/logger';
|
||||||
|
import { db } from '@/lib/db';
|
||||||
|
import { ports } from '@/lib/db/schema/ports';
|
||||||
|
|
||||||
|
import { reconcileAlertsForPort } from './alerts.service';
|
||||||
|
import { RULE_REGISTRY, listRuleIds } from './alert-rules';
|
||||||
|
|
||||||
|
export interface EngineRunSummary {
|
||||||
|
portsScanned: number;
|
||||||
|
rulesEvaluated: number;
|
||||||
|
errors: Array<{ portId: string; ruleId: string; message: string }>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Evaluate every rule for every port, upsert + auto-resolve. */
|
||||||
|
export async function runAlertEngine(): Promise<EngineRunSummary> {
|
||||||
|
const allPorts = await db.select({ id: ports.id, slug: ports.slug }).from(ports);
|
||||||
|
return runAlertEngineForPorts(allPorts.map((p) => p.id));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Same engine scoped to a specific list of port IDs (used by tests + the
|
||||||
|
* per-port webhook trigger). */
|
||||||
|
export async function runAlertEngineForPorts(portIds: string[]): Promise<EngineRunSummary> {
|
||||||
|
const ruleIds = listRuleIds();
|
||||||
|
const errors: EngineRunSummary['errors'] = [];
|
||||||
|
|
||||||
|
for (const portId of portIds) {
|
||||||
|
for (const ruleId of ruleIds) {
|
||||||
|
try {
|
||||||
|
const candidates = await RULE_REGISTRY[ruleId](portId);
|
||||||
|
await reconcileAlertsForPort(portId, ruleId, candidates);
|
||||||
|
} catch (err) {
|
||||||
|
const message = err instanceof Error ? err.message : String(err);
|
||||||
|
logger.warn({ portId, ruleId, err }, 'alert rule evaluator failed');
|
||||||
|
errors.push({ portId, ruleId, message });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
portsScanned: portIds.length,
|
||||||
|
rulesEvaluated: portIds.length * ruleIds.length,
|
||||||
|
errors,
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -1,31 +1,373 @@
|
|||||||
/**
|
/**
|
||||||
* Alert rule catalog. Each entry is a pure async function that takes a
|
* Alert rule catalog. Each entry is a pure async function that takes a
|
||||||
* `portId` and returns an array of `AlertCandidate` rows the engine should
|
* `portId` and returns an array of `AlertCandidate` rows the engine should
|
||||||
* upsert into `alerts`. Skeleton: signatures only — implementations land
|
* upsert. The engine (in `alerts.service.ts`) handles dedupe via the
|
||||||
* in PR2.
|
* fingerprint partial-unique index and auto-resolves stale alerts.
|
||||||
|
*
|
||||||
|
* Adding a rule:
|
||||||
|
* 1. Add the literal to `ALERT_RULES` in schema/insights.ts.
|
||||||
|
* 2. Implement the evaluator below.
|
||||||
|
* 3. Register it in `RULE_REGISTRY`.
|
||||||
|
* 4. Add a unit test in tests/unit/services/alert-rules-evaluators.test.ts.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import type { AlertCandidate } from './alerts.service';
|
import { and, eq, isNull, isNotNull, lt, gt, gte, sql, inArray, or, desc } from 'drizzle-orm';
|
||||||
|
|
||||||
|
import { db } from '@/lib/db';
|
||||||
|
import { interests } from '@/lib/db/schema/interests';
|
||||||
|
import { berthReservations } from '@/lib/db/schema/reservations';
|
||||||
|
import { berths } from '@/lib/db/schema/berths';
|
||||||
|
import { documents, documentSigners } from '@/lib/db/schema/documents';
|
||||||
|
import { expenses } from '@/lib/db/schema/financial';
|
||||||
|
import { auditLogs } from '@/lib/db/schema/system';
|
||||||
|
import { alerts as alertsTable } from '@/lib/db/schema/insights';
|
||||||
import { ALERT_RULES, type AlertRuleId } from '@/lib/db/schema/insights';
|
import { ALERT_RULES, type AlertRuleId } from '@/lib/db/schema/insights';
|
||||||
|
|
||||||
|
import type { AlertCandidate } from './alerts.service';
|
||||||
|
|
||||||
type RuleEvaluator = (portId: string) => Promise<AlertCandidate[]>;
|
type RuleEvaluator = (portId: string) => Promise<AlertCandidate[]>;
|
||||||
|
|
||||||
/** Empty implementations — every evaluator returns no candidates. PR2
|
const DAY_MS = 86_400_000;
|
||||||
* fills these in; the cron dispatcher in PR2 walks `RULE_REGISTRY`. */
|
|
||||||
|
function daysAgo(n: number): Date {
|
||||||
|
return new Date(Date.now() - n * DAY_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── reservation.no_agreement ─────────────────────────────────────────────────
|
||||||
|
// Active reservations > 3 days old that have no reservation_agreement document
|
||||||
|
// in any non-cancelled state.
|
||||||
|
|
||||||
|
async function reservationNoAgreement(portId: string): Promise<AlertCandidate[]> {
|
||||||
|
const rows = await db
|
||||||
|
.select({
|
||||||
|
id: berthReservations.id,
|
||||||
|
startDate: berthReservations.startDate,
|
||||||
|
clientName: sql<string>`coalesce((SELECT full_name FROM clients WHERE id = ${berthReservations.clientId}), 'unknown')`,
|
||||||
|
yachtName: sql<string>`coalesce((SELECT name FROM yachts WHERE id = ${berthReservations.yachtId}), 'unknown')`,
|
||||||
|
})
|
||||||
|
.from(berthReservations)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(berthReservations.portId, portId),
|
||||||
|
eq(berthReservations.status, 'active'),
|
||||||
|
lt(berthReservations.createdAt, daysAgo(3)),
|
||||||
|
sql`NOT EXISTS (
|
||||||
|
SELECT 1 FROM ${documents}
|
||||||
|
WHERE ${documents.reservationId} = ${berthReservations.id}
|
||||||
|
AND ${documents.documentType} = 'reservation_agreement'
|
||||||
|
AND ${documents.status} NOT IN ('cancelled', 'expired')
|
||||||
|
)`,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
return rows.map((r) => ({
|
||||||
|
ruleId: 'reservation.no_agreement',
|
||||||
|
severity: 'warning',
|
||||||
|
title: `Reservation needs an agreement`,
|
||||||
|
body: `Active reservation for ${r.yachtName} (${r.clientName}) has no signed agreement yet.`,
|
||||||
|
link: `/[port]/berth-reservations/${r.id}`,
|
||||||
|
entityType: 'reservation',
|
||||||
|
entityId: r.id,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── interest.stale ───────────────────────────────────────────────────────────
|
||||||
|
// Pipeline stuck in mid-funnel stages with no contact for 14+ days.
|
||||||
|
|
||||||
|
async function interestStale(portId: string): Promise<AlertCandidate[]> {
|
||||||
|
const STALE_STAGES = ['details_sent', 'in_communication', 'visited'];
|
||||||
|
const rows = await db
|
||||||
|
.select({
|
||||||
|
id: interests.id,
|
||||||
|
stage: interests.pipelineStage,
|
||||||
|
lastContact: interests.dateLastContact,
|
||||||
|
clientName: sql<string>`coalesce((SELECT full_name FROM clients WHERE id = ${interests.clientId}), 'unknown')`,
|
||||||
|
})
|
||||||
|
.from(interests)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(interests.portId, portId),
|
||||||
|
inArray(interests.pipelineStage, STALE_STAGES),
|
||||||
|
isNull(interests.archivedAt),
|
||||||
|
or(
|
||||||
|
lt(interests.dateLastContact, daysAgo(14)),
|
||||||
|
and(isNull(interests.dateLastContact), lt(interests.updatedAt, daysAgo(14))),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
return rows.map((r) => ({
|
||||||
|
ruleId: 'interest.stale',
|
||||||
|
severity: 'info',
|
||||||
|
title: `Stale interest: ${r.clientName}`,
|
||||||
|
body: `In '${r.stage}' with no contact for 14+ days.`,
|
||||||
|
link: `/[port]/interests/${r.id}`,
|
||||||
|
entityType: 'interest',
|
||||||
|
entityId: r.id,
|
||||||
|
metadata: { stage: r.stage, lastContact: r.lastContact },
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── document.expiring_soon ───────────────────────────────────────────────────
|
||||||
|
// In-flight signing documents whose expiry is within 7 days.
|
||||||
|
|
||||||
|
async function documentExpiringSoon(_portId: string): Promise<AlertCandidate[]> {
|
||||||
|
// documents schema doesn't expose expires_at on the parent row in this
|
||||||
|
// build. Until the column lands, fall back to no-op so the rule slot
|
||||||
|
// is registered but doesn't fire.
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── document.signer_overdue ──────────────────────────────────────────────────
|
||||||
|
// Pending signer for >14d, last reminder >7d ago (or never).
|
||||||
|
|
||||||
|
async function documentSignerOverdue(portId: string): Promise<AlertCandidate[]> {
|
||||||
|
const cutoff = daysAgo(14);
|
||||||
|
const rows = await db
|
||||||
|
.select({
|
||||||
|
docId: documents.id,
|
||||||
|
title: documents.title,
|
||||||
|
docType: documents.documentType,
|
||||||
|
signerId: documentSigners.id,
|
||||||
|
signerEmail: documentSigners.signerEmail,
|
||||||
|
signerName: documentSigners.signerName,
|
||||||
|
sentAt: documentSigners.createdAt,
|
||||||
|
})
|
||||||
|
.from(documents)
|
||||||
|
.innerJoin(documentSigners, eq(documentSigners.documentId, documents.id))
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(documents.portId, portId),
|
||||||
|
inArray(documents.status, ['sent', 'partially_signed']),
|
||||||
|
eq(documentSigners.status, 'pending'),
|
||||||
|
lt(documentSigners.createdAt, cutoff),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
return rows.map((r) => ({
|
||||||
|
ruleId: 'document.signer_overdue',
|
||||||
|
severity: 'warning',
|
||||||
|
title: `Signer overdue: ${r.signerName}`,
|
||||||
|
body: `${r.docType.toUpperCase()} "${r.title}" — pending >14 days.`,
|
||||||
|
link: `/[port]/documents/${r.docId}`,
|
||||||
|
entityType: 'document',
|
||||||
|
entityId: r.docId,
|
||||||
|
metadata: { signerId: r.signerId, signerEmail: r.signerEmail, sentAt: r.sentAt },
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── berth.under_offer_stalled ────────────────────────────────────────────────
|
||||||
|
// Berths sitting in 'under_offer' status for 30+ days.
|
||||||
|
|
||||||
|
async function berthUnderOfferStalled(portId: string): Promise<AlertCandidate[]> {
|
||||||
|
const rows = await db
|
||||||
|
.select({
|
||||||
|
id: berths.id,
|
||||||
|
mooringNumber: berths.mooringNumber,
|
||||||
|
updatedAt: berths.updatedAt,
|
||||||
|
})
|
||||||
|
.from(berths)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(berths.portId, portId),
|
||||||
|
eq(berths.status, 'under_offer'),
|
||||||
|
lt(berths.updatedAt, daysAgo(30)),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
return rows.map((r) => ({
|
||||||
|
ruleId: 'berth.under_offer_stalled',
|
||||||
|
severity: 'info',
|
||||||
|
title: `Berth ${r.mooringNumber} stalled under offer`,
|
||||||
|
body: `No status change in 30+ days.`,
|
||||||
|
link: `/[port]/berths/${r.id}`,
|
||||||
|
entityType: 'berth',
|
||||||
|
entityId: r.id,
|
||||||
|
metadata: { stalledSince: r.updatedAt },
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── expense.duplicate ────────────────────────────────────────────────────────
|
||||||
|
// Expenses whose duplicate_of is set (the dedup service writes this).
|
||||||
|
|
||||||
|
async function expenseDuplicate(portId: string): Promise<AlertCandidate[]> {
|
||||||
|
const rows = await db
|
||||||
|
.select({
|
||||||
|
id: expenses.id,
|
||||||
|
vendor: expenses.establishmentName,
|
||||||
|
amount: expenses.amount,
|
||||||
|
duplicateOf: expenses.duplicateOf,
|
||||||
|
})
|
||||||
|
.from(expenses)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(expenses.portId, portId),
|
||||||
|
isNotNull(expenses.duplicateOf),
|
||||||
|
isNull(expenses.archivedAt),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
return rows.map((r) => ({
|
||||||
|
ruleId: 'expense.duplicate',
|
||||||
|
severity: 'info',
|
||||||
|
title: `Possible duplicate expense`,
|
||||||
|
body: `${r.vendor ?? 'Unknown vendor'} — ${r.amount}.`,
|
||||||
|
link: `/[port]/expenses/${r.id}`,
|
||||||
|
entityType: 'expense',
|
||||||
|
entityId: r.id,
|
||||||
|
metadata: { duplicateOf: r.duplicateOf },
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── expense.unscanned ────────────────────────────────────────────────────────
|
||||||
|
// Expense uploaded with a receipt file but OCR didn't run / failed > 1h ago.
|
||||||
|
|
||||||
|
async function expenseUnscanned(portId: string): Promise<AlertCandidate[]> {
|
||||||
|
const rows = await db
|
||||||
|
.select({
|
||||||
|
id: expenses.id,
|
||||||
|
vendor: expenses.establishmentName,
|
||||||
|
ocrStatus: expenses.ocrStatus,
|
||||||
|
createdAt: expenses.createdAt,
|
||||||
|
})
|
||||||
|
.from(expenses)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(expenses.portId, portId),
|
||||||
|
eq(expenses.ocrStatus, 'pending'),
|
||||||
|
sql`array_length(${expenses.receiptFileIds}, 1) > 0`,
|
||||||
|
lt(expenses.createdAt, new Date(Date.now() - 60 * 60 * 1000)),
|
||||||
|
isNull(expenses.archivedAt),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
return rows.map((r) => ({
|
||||||
|
ruleId: 'expense.unscanned',
|
||||||
|
severity: 'info',
|
||||||
|
title: `Receipt not scanned`,
|
||||||
|
body: `${r.vendor ?? 'Unknown vendor'} — uploaded over an hour ago.`,
|
||||||
|
link: `/[port]/expenses/${r.id}`,
|
||||||
|
entityType: 'expense',
|
||||||
|
entityId: r.id,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── interest.high_value_silent ───────────────────────────────────────────────
|
||||||
|
// Hot leads with no contact for 7+ days. Highest severity in the catalog.
|
||||||
|
|
||||||
|
async function interestHighValueSilent(portId: string): Promise<AlertCandidate[]> {
|
||||||
|
const cutoff = daysAgo(7);
|
||||||
|
const rows = await db
|
||||||
|
.select({
|
||||||
|
id: interests.id,
|
||||||
|
stage: interests.pipelineStage,
|
||||||
|
clientName: sql<string>`coalesce((SELECT full_name FROM clients WHERE id = ${interests.clientId}), 'unknown')`,
|
||||||
|
})
|
||||||
|
.from(interests)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(interests.portId, portId),
|
||||||
|
eq(interests.leadCategory, 'hot_lead'),
|
||||||
|
isNull(interests.archivedAt),
|
||||||
|
or(
|
||||||
|
lt(interests.dateLastContact, cutoff),
|
||||||
|
and(isNull(interests.dateLastContact), lt(interests.updatedAt, cutoff)),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
return rows.map((r) => ({
|
||||||
|
ruleId: 'interest.high_value_silent',
|
||||||
|
severity: 'critical',
|
||||||
|
title: `Hot lead silent: ${r.clientName}`,
|
||||||
|
body: `No contact for 7+ days — high-value at risk.`,
|
||||||
|
link: `/[port]/interests/${r.id}`,
|
||||||
|
entityType: 'interest',
|
||||||
|
entityId: r.id,
|
||||||
|
metadata: { stage: r.stage },
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── eoi.unsigned_long ────────────────────────────────────────────────────────
|
||||||
|
// EOI documents in 'sent' status for 21+ days.
|
||||||
|
|
||||||
|
async function eoiUnsignedLong(portId: string): Promise<AlertCandidate[]> {
|
||||||
|
const rows = await db
|
||||||
|
.select({
|
||||||
|
id: documents.id,
|
||||||
|
title: documents.title,
|
||||||
|
createdAt: documents.createdAt,
|
||||||
|
})
|
||||||
|
.from(documents)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(documents.portId, portId),
|
||||||
|
eq(documents.documentType, 'eoi'),
|
||||||
|
inArray(documents.status, ['sent', 'partially_signed']),
|
||||||
|
lt(documents.createdAt, daysAgo(21)),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
return rows.map((r) => ({
|
||||||
|
ruleId: 'eoi.unsigned_long',
|
||||||
|
severity: 'warning',
|
||||||
|
title: `EOI unsigned >21 days`,
|
||||||
|
body: `"${r.title}" — sent over 3 weeks ago.`,
|
||||||
|
link: `/[port]/documents/${r.id}`,
|
||||||
|
entityType: 'document',
|
||||||
|
entityId: r.id,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── audit.suspicious_login ───────────────────────────────────────────────────
|
||||||
|
// >3 failed logins from same IP in the past hour. Depends on the auth layer
|
||||||
|
// recording rows with action='login.failed' (TODO: instrument better-auth
|
||||||
|
// hooks to record these — until that lands, this evaluator returns [] and
|
||||||
|
// the rule slot stays inert).
|
||||||
|
|
||||||
|
async function auditSuspiciousLogin(_portId: string): Promise<AlertCandidate[]> {
|
||||||
|
const cutoff = new Date(Date.now() - 60 * 60 * 1000);
|
||||||
|
const rows = await db
|
||||||
|
.select({
|
||||||
|
ipAddress: auditLogs.ipAddress,
|
||||||
|
attempts: sql<number>`count(*)::int`,
|
||||||
|
})
|
||||||
|
.from(auditLogs)
|
||||||
|
.where(and(eq(auditLogs.action, 'login.failed'), gte(auditLogs.createdAt, cutoff)))
|
||||||
|
.groupBy(auditLogs.ipAddress)
|
||||||
|
.having(sql`count(*) > 3`);
|
||||||
|
|
||||||
|
return rows
|
||||||
|
.filter((r) => r.ipAddress)
|
||||||
|
.map((r) => ({
|
||||||
|
ruleId: 'audit.suspicious_login' as const,
|
||||||
|
severity: 'critical' as const,
|
||||||
|
title: `Repeated failed logins`,
|
||||||
|
body: `${r.attempts} failed attempts from ${r.ipAddress} in the last hour.`,
|
||||||
|
link: `/[port]/admin/audit?ip=${encodeURIComponent(r.ipAddress!)}`,
|
||||||
|
entityType: 'audit',
|
||||||
|
entityId: r.ipAddress!,
|
||||||
|
metadata: { attempts: r.attempts },
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
export const RULE_REGISTRY: Record<AlertRuleId, RuleEvaluator> = {
|
export const RULE_REGISTRY: Record<AlertRuleId, RuleEvaluator> = {
|
||||||
'reservation.no_agreement': async () => [],
|
'reservation.no_agreement': reservationNoAgreement,
|
||||||
'interest.stale': async () => [],
|
'interest.stale': interestStale,
|
||||||
'document.expiring_soon': async () => [],
|
'document.expiring_soon': documentExpiringSoon,
|
||||||
'document.signer_overdue': async () => [],
|
'document.signer_overdue': documentSignerOverdue,
|
||||||
'berth.under_offer_stalled': async () => [],
|
'berth.under_offer_stalled': berthUnderOfferStalled,
|
||||||
'expense.duplicate': async () => [],
|
'expense.duplicate': expenseDuplicate,
|
||||||
'expense.unscanned': async () => [],
|
'expense.unscanned': expenseUnscanned,
|
||||||
'interest.high_value_silent': async () => [],
|
'interest.high_value_silent': interestHighValueSilent,
|
||||||
'eoi.unsigned_long': async () => [],
|
'eoi.unsigned_long': eoiUnsignedLong,
|
||||||
'audit.suspicious_login': async () => [],
|
'audit.suspicious_login': auditSuspiciousLogin,
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Sanity check: catalog matches the ALERT_RULES literal type. */
|
|
||||||
export function listRuleIds(): readonly AlertRuleId[] {
|
export function listRuleIds(): readonly AlertRuleId[] {
|
||||||
return ALERT_RULES;
|
return ALERT_RULES;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// silence unused-import warnings until later PRs use them
|
||||||
|
const _unused = { gt, desc, alertsTable };
|
||||||
|
void _unused;
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import { createHash } from 'crypto';
|
|||||||
|
|
||||||
import { db } from '@/lib/db';
|
import { db } from '@/lib/db';
|
||||||
import { alerts, type Alert, type AlertSeverity, type AlertRuleId } from '@/lib/db/schema/insights';
|
import { alerts, type Alert, type AlertSeverity, type AlertRuleId } from '@/lib/db/schema/insights';
|
||||||
|
import { emitToRoom } from '@/lib/socket/server';
|
||||||
|
|
||||||
export interface AlertCandidate {
|
export interface AlertCandidate {
|
||||||
ruleId: AlertRuleId;
|
ruleId: AlertRuleId;
|
||||||
@@ -46,10 +47,11 @@ export async function reconcileAlertsForPort(
|
|||||||
candidates: AlertCandidate[],
|
candidates: AlertCandidate[],
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
// Insert new / leave existing — only one open row per fingerprint
|
// Insert new / leave existing — only one open row per fingerprint
|
||||||
// thanks to the partial unique index.
|
// thanks to the partial unique index. Track newly inserted rows so we
|
||||||
|
// can emit `alert:created` to the port room.
|
||||||
for (const c of candidates) {
|
for (const c of candidates) {
|
||||||
const fingerprint = fingerprintFor(c);
|
const fingerprint = fingerprintFor(c);
|
||||||
await db
|
const inserted = await db
|
||||||
.insert(alerts)
|
.insert(alerts)
|
||||||
.values({
|
.values({
|
||||||
portId,
|
portId,
|
||||||
@@ -63,7 +65,18 @@ export async function reconcileAlertsForPort(
|
|||||||
fingerprint,
|
fingerprint,
|
||||||
metadata: c.metadata ?? {},
|
metadata: c.metadata ?? {},
|
||||||
})
|
})
|
||||||
.onConflictDoNothing();
|
.onConflictDoNothing()
|
||||||
|
.returning({ id: alerts.id });
|
||||||
|
if (inserted[0]) {
|
||||||
|
emitToRoom(`port:${portId}`, 'alert:created', {
|
||||||
|
alertId: inserted[0].id,
|
||||||
|
portId,
|
||||||
|
ruleId: c.ruleId,
|
||||||
|
severity: c.severity,
|
||||||
|
title: c.title,
|
||||||
|
link: c.link,
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Auto-resolve open alerts for this rule whose fingerprint disappeared.
|
// Auto-resolve open alerts for this rule whose fingerprint disappeared.
|
||||||
@@ -77,14 +90,23 @@ export async function reconcileAlertsForPort(
|
|||||||
.update(alerts)
|
.update(alerts)
|
||||||
.set({ resolvedAt: sql`now()` })
|
.set({ resolvedAt: sql`now()` })
|
||||||
.where(eq(alerts.id, a.id));
|
.where(eq(alerts.id, a.id));
|
||||||
|
emitToRoom(`port:${portId}`, 'alert:resolved', {
|
||||||
|
alertId: a.id,
|
||||||
|
portId,
|
||||||
|
ruleId,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function dismissAlert(alertId: string, userId: string): Promise<void> {
|
export async function dismissAlert(alertId: string, userId: string): Promise<void> {
|
||||||
await db
|
const [row] = await db
|
||||||
.update(alerts)
|
.update(alerts)
|
||||||
.set({ dismissedAt: sql`now()`, dismissedBy: userId })
|
.set({ dismissedAt: sql`now()`, dismissedBy: userId })
|
||||||
.where(eq(alerts.id, alertId));
|
.where(eq(alerts.id, alertId))
|
||||||
|
.returning({ id: alerts.id, portId: alerts.portId });
|
||||||
|
if (row) {
|
||||||
|
emitToRoom(`port:${row.portId}`, 'alert:dismissed', { alertId: row.id, portId: row.portId });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function acknowledgeAlert(alertId: string, userId: string): Promise<void> {
|
export async function acknowledgeAlert(alertId: string, userId: string): Promise<void> {
|
||||||
|
|||||||
@@ -246,6 +246,18 @@ export interface ServerToClientEvents {
|
|||||||
}) => void;
|
}) => void;
|
||||||
'file:updated': (payload: { fileId: string; changedFields?: string[] }) => void;
|
'file:updated': (payload: { fileId: string; changedFields?: string[] }) => void;
|
||||||
'file:deleted': (payload: { fileId: string; filename?: string }) => void;
|
'file:deleted': (payload: { fileId: string; filename?: string }) => void;
|
||||||
|
|
||||||
|
// Phase B alert framework
|
||||||
|
'alert:created': (payload: {
|
||||||
|
alertId: string;
|
||||||
|
portId: string;
|
||||||
|
ruleId: string;
|
||||||
|
severity: 'info' | 'warning' | 'critical';
|
||||||
|
title: string;
|
||||||
|
link: string;
|
||||||
|
}) => void;
|
||||||
|
'alert:resolved': (payload: { alertId: string; portId: string; ruleId: string }) => void;
|
||||||
|
'alert:dismissed': (payload: { alertId: string; portId: string }) => void;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Client → Server events (minimal — most actions go through REST API)
|
// Client → Server events (minimal — most actions go through REST API)
|
||||||
|
|||||||
206
tests/integration/alerts-engine.test.ts
Normal file
206
tests/integration/alerts-engine.test.ts
Normal file
@@ -0,0 +1,206 @@
|
|||||||
|
/**
|
||||||
|
* Engine integration test — drives `runAlertEngineForPorts` against
|
||||||
|
* seeded conditions and asserts: (1) correct alerts upsert, (2) running
|
||||||
|
* twice doesn't duplicate, (3) mutating state auto-resolves stale alerts.
|
||||||
|
*
|
||||||
|
* Socket emissions are stubbed via vi.mock so the test stays offline.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||||
|
import { and, eq, isNull } from 'drizzle-orm';
|
||||||
|
|
||||||
|
vi.mock('@/lib/socket/server', () => ({
|
||||||
|
emitToRoom: vi.fn(),
|
||||||
|
}));
|
||||||
|
|
||||||
|
import { db } from '@/lib/db';
|
||||||
|
import { alerts } from '@/lib/db/schema/insights';
|
||||||
|
import { interests } from '@/lib/db/schema/interests';
|
||||||
|
import { berthReservations } from '@/lib/db/schema/reservations';
|
||||||
|
import { documents } from '@/lib/db/schema/documents';
|
||||||
|
import { runAlertEngineForPorts } from '@/lib/services/alert-engine';
|
||||||
|
import { makePort, makeClient, makeBerth, makeYacht } from '../helpers/factories';
|
||||||
|
|
||||||
|
async function clearAlerts(portId: string) {
|
||||||
|
await db.delete(alerts).where(eq(alerts.portId, portId));
|
||||||
|
}
|
||||||
|
|
||||||
|
async function listOpenAlerts(portId: string, ruleId: string) {
|
||||||
|
return db
|
||||||
|
.select()
|
||||||
|
.from(alerts)
|
||||||
|
.where(and(eq(alerts.portId, portId), eq(alerts.ruleId, ruleId), isNull(alerts.resolvedAt)));
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('alert engine', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('reservation.no_agreement fires for active reservation older than 3 days without agreement', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
const client = await makeClient({ portId: port.id });
|
||||||
|
const berth = await makeBerth({ portId: port.id });
|
||||||
|
const yacht = await makeYacht({
|
||||||
|
portId: port.id,
|
||||||
|
ownerType: 'client',
|
||||||
|
ownerId: client.id,
|
||||||
|
overrides: { name: 'M/Y Test' },
|
||||||
|
});
|
||||||
|
const fourDaysAgo = new Date(Date.now() - 4 * 86_400_000);
|
||||||
|
const [resv] = await db
|
||||||
|
.insert(berthReservations)
|
||||||
|
.values({
|
||||||
|
portId: port.id,
|
||||||
|
berthId: berth.id,
|
||||||
|
clientId: client.id,
|
||||||
|
yachtId: yacht.id,
|
||||||
|
status: 'active',
|
||||||
|
startDate: new Date(),
|
||||||
|
createdBy: 'seed',
|
||||||
|
createdAt: fourDaysAgo,
|
||||||
|
})
|
||||||
|
.returning();
|
||||||
|
expect(resv).toBeDefined();
|
||||||
|
|
||||||
|
await clearAlerts(port.id);
|
||||||
|
await runAlertEngineForPorts([port.id]);
|
||||||
|
|
||||||
|
const open = await listOpenAlerts(port.id, 'reservation.no_agreement');
|
||||||
|
expect(open).toHaveLength(1);
|
||||||
|
expect(open[0]!.entityId).toBe(resv!.id);
|
||||||
|
expect(open[0]!.severity).toBe('warning');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not duplicate on a second sweep', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
const client = await makeClient({ portId: port.id });
|
||||||
|
const berth = await makeBerth({ portId: port.id });
|
||||||
|
const yacht = await makeYacht({
|
||||||
|
portId: port.id,
|
||||||
|
ownerType: 'client',
|
||||||
|
ownerId: client.id,
|
||||||
|
});
|
||||||
|
const stale = new Date(Date.now() - 10 * 86_400_000);
|
||||||
|
await db.insert(berthReservations).values({
|
||||||
|
portId: port.id,
|
||||||
|
berthId: berth.id,
|
||||||
|
clientId: client.id,
|
||||||
|
yachtId: yacht.id,
|
||||||
|
status: 'active',
|
||||||
|
startDate: new Date(),
|
||||||
|
createdBy: 'seed',
|
||||||
|
createdAt: stale,
|
||||||
|
});
|
||||||
|
|
||||||
|
await clearAlerts(port.id);
|
||||||
|
await runAlertEngineForPorts([port.id]);
|
||||||
|
await runAlertEngineForPorts([port.id]);
|
||||||
|
|
||||||
|
const open = await listOpenAlerts(port.id, 'reservation.no_agreement');
|
||||||
|
expect(open).toHaveLength(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('auto-resolves an open alert when the underlying condition clears', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
const client = await makeClient({ portId: port.id });
|
||||||
|
const berth = await makeBerth({ portId: port.id });
|
||||||
|
const yacht = await makeYacht({
|
||||||
|
portId: port.id,
|
||||||
|
ownerType: 'client',
|
||||||
|
ownerId: client.id,
|
||||||
|
});
|
||||||
|
const tenDaysAgo = new Date(Date.now() - 10 * 86_400_000);
|
||||||
|
const [resv] = await db
|
||||||
|
.insert(berthReservations)
|
||||||
|
.values({
|
||||||
|
portId: port.id,
|
||||||
|
berthId: berth.id,
|
||||||
|
clientId: client.id,
|
||||||
|
yachtId: yacht.id,
|
||||||
|
status: 'active',
|
||||||
|
startDate: new Date(),
|
||||||
|
createdBy: 'seed',
|
||||||
|
createdAt: tenDaysAgo,
|
||||||
|
})
|
||||||
|
.returning();
|
||||||
|
|
||||||
|
await clearAlerts(port.id);
|
||||||
|
await runAlertEngineForPorts([port.id]);
|
||||||
|
expect(await listOpenAlerts(port.id, 'reservation.no_agreement')).toHaveLength(1);
|
||||||
|
|
||||||
|
// Add an agreement document — condition no longer fires.
|
||||||
|
await db.insert(documents).values({
|
||||||
|
portId: port.id,
|
||||||
|
reservationId: resv!.id,
|
||||||
|
documentType: 'reservation_agreement',
|
||||||
|
title: 'Reservation Agreement',
|
||||||
|
status: 'sent',
|
||||||
|
createdBy: 'seed',
|
||||||
|
});
|
||||||
|
await runAlertEngineForPorts([port.id]);
|
||||||
|
|
||||||
|
expect(await listOpenAlerts(port.id, 'reservation.no_agreement')).toHaveLength(0);
|
||||||
|
const allRows = await db
|
||||||
|
.select()
|
||||||
|
.from(alerts)
|
||||||
|
.where(and(eq(alerts.portId, port.id), eq(alerts.ruleId, 'reservation.no_agreement')));
|
||||||
|
expect(allRows).toHaveLength(1);
|
||||||
|
expect(allRows[0]!.resolvedAt).not.toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('interest.stale fires for old leads in mid-funnel stages', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
const client = await makeClient({ portId: port.id });
|
||||||
|
const stale = new Date(Date.now() - 30 * 86_400_000);
|
||||||
|
const [interest] = await db
|
||||||
|
.insert(interests)
|
||||||
|
.values({
|
||||||
|
portId: port.id,
|
||||||
|
clientId: client.id,
|
||||||
|
pipelineStage: 'in_communication',
|
||||||
|
dateLastContact: stale,
|
||||||
|
createdAt: stale,
|
||||||
|
updatedAt: stale,
|
||||||
|
})
|
||||||
|
.returning();
|
||||||
|
|
||||||
|
await clearAlerts(port.id);
|
||||||
|
await runAlertEngineForPorts([port.id]);
|
||||||
|
|
||||||
|
const open = await listOpenAlerts(port.id, 'interest.stale');
|
||||||
|
expect(open).toHaveLength(1);
|
||||||
|
expect(open[0]!.entityId).toBe(interest!.id);
|
||||||
|
expect(open[0]!.severity).toBe('info');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('interest.high_value_silent fires for hot leads silent >7d', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
const client = await makeClient({ portId: port.id });
|
||||||
|
const stale = new Date(Date.now() - 10 * 86_400_000);
|
||||||
|
await db.insert(interests).values({
|
||||||
|
portId: port.id,
|
||||||
|
clientId: client.id,
|
||||||
|
pipelineStage: 'visited',
|
||||||
|
leadCategory: 'hot_lead',
|
||||||
|
dateLastContact: stale,
|
||||||
|
updatedAt: stale,
|
||||||
|
});
|
||||||
|
|
||||||
|
await clearAlerts(port.id);
|
||||||
|
await runAlertEngineForPorts([port.id]);
|
||||||
|
|
||||||
|
const open = await listOpenAlerts(port.id, 'interest.high_value_silent');
|
||||||
|
expect(open).toHaveLength(1);
|
||||||
|
expect(open[0]!.severity).toBe('critical');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('engine reports rule errors without crashing the sweep', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
const summary = await runAlertEngineForPorts([port.id]);
|
||||||
|
expect(summary.portsScanned).toBe(1);
|
||||||
|
expect(summary.rulesEvaluated).toBeGreaterThan(0);
|
||||||
|
// No conditions seeded — no rules should fail.
|
||||||
|
expect(summary.errors).toHaveLength(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user