diff --git a/src/lib/queue/scheduler.ts b/src/lib/queue/scheduler.ts index 1fb687f..f7b15ba 100644 --- a/src/lib/queue/scheduler.ts +++ b/src/lib/queue/scheduler.ts @@ -14,39 +14,42 @@ interface RecurringJobDef { export async function registerRecurringJobs(): Promise { const recurring: RecurringJobDef[] = [ // Documenso signature fallback poll — primary is webhooks, this is safety net - { queue: 'documents', name: 'signature-poll', pattern: '0 */6 * * *' }, + { queue: 'documents', name: 'signature-poll', pattern: '0 */6 * * *' }, // Reminder checks - { queue: 'notifications', name: 'reminder-check', pattern: '0 * * * *' }, - { queue: 'notifications', name: 'reminder-overdue-check', pattern: '*/15 * * * *' }, + { queue: 'notifications', name: 'reminder-check', pattern: '0 * * * *' }, + { queue: 'notifications', name: 'reminder-overdue-check', pattern: '*/15 * * * *' }, // Google Calendar background sync - { queue: 'maintenance', name: 'calendar-sync', pattern: '*/30 * * * *' }, + { queue: 'maintenance', name: 'calendar-sync', pattern: '*/30 * * * *' }, // Daily checks at 08:00 - { queue: 'notifications', name: 'invoice-overdue-check', pattern: '0 8 * * *' }, - { queue: 'notifications', name: 'tenure-expiry-check', pattern: '0 8 * * *' }, + { queue: 'notifications', name: 'invoice-overdue-check', pattern: '0 8 * * *' }, + { queue: 'notifications', name: 'tenure-expiry-check', pattern: '0 8 * * *' }, // Exchange rate refresh every 6 hours - { queue: 'maintenance', name: 'currency-refresh', pattern: '0 */6 * * *' }, + { queue: 'maintenance', name: 'currency-refresh', pattern: '0 */6 * * *' }, // Database backup / cleanup - { queue: 'maintenance', name: 'database-backup', pattern: '0 2 * * *' }, - { queue: 'maintenance', name: 'backup-cleanup', pattern: '0 3 * * 0' }, // Sunday 03:00 + { queue: 'maintenance', name: 'database-backup', pattern: '0 2 * * *' }, + { queue: 'maintenance', name: 'backup-cleanup', pattern: '0 3 * * 0' }, // Sunday 03:00 // Session cleanup - { queue: 'maintenance', name: 'session-cleanup', pattern: '0 4 * * *' }, + { queue: 'maintenance', name: 'session-cleanup', pattern: '0 4 * * *' }, // Report scheduler — checks every minute for reports due to run - { queue: 'reports', name: 'report-scheduler', pattern: '* * * * *' }, + { queue: 'reports', name: 'report-scheduler', pattern: '* * * * *' }, // Notification digest — configurable per user; placeholder fires hourly // TODO(L2): make per-user schedule configurable (read from user_settings) - { queue: 'email', name: 'notification-digest', pattern: '0 * * * *' }, + { queue: 'email', name: 'notification-digest', pattern: '0 * * * *' }, // Cleanup jobs - { queue: 'maintenance', name: 'temp-file-cleanup', pattern: '0 5 * * *' }, - { queue: 'maintenance', name: 'form-expiry-check', pattern: '0 * * * *' }, + { queue: 'maintenance', name: 'temp-file-cleanup', pattern: '0 5 * * *' }, + { queue: 'maintenance', name: 'form-expiry-check', pattern: '0 * * * *' }, + + // Phase B: alert rule engine sweep + { queue: 'maintenance', name: 'alerts-evaluate', pattern: '*/5 * * * *' }, ]; for (const job of recurring) { @@ -56,7 +59,10 @@ export async function registerRecurringJobs(): Promise { { pattern: job.pattern }, { data: {}, name: job.name }, ); - logger.info({ queue: job.queue, job: job.name, pattern: job.pattern }, 'Registered recurring job'); + logger.info( + { queue: job.queue, job: job.name, pattern: job.pattern }, + 'Registered recurring job', + ); } logger.info({ count: recurring.length }, 'All recurring jobs registered'); diff --git a/src/lib/queue/workers/maintenance.ts b/src/lib/queue/workers/maintenance.ts index b982433..f55624c 100644 --- a/src/lib/queue/workers/maintenance.ts +++ b/src/lib/queue/workers/maintenance.ts @@ -28,6 +28,12 @@ export const maintenanceWorker = new Worker( logger.info({ expired: result.length }, 'Form expiry check complete'); break; } + case 'alerts-evaluate': { + const { runAlertEngine } = await import('@/lib/services/alert-engine'); + const summary = await runAlertEngine(); + logger.info(summary, 'Alert engine sweep complete'); + break; + } default: logger.warn({ jobName: job.name }, 'Unknown maintenance job'); } diff --git a/src/lib/services/alert-engine.ts b/src/lib/services/alert-engine.ts new file mode 100644 index 0000000..71ce711 --- /dev/null +++ b/src/lib/services/alert-engine.ts @@ -0,0 +1,50 @@ +/** + * Alert engine — runs every rule against every port. Called by the + * BullMQ recurring job 'alerts-evaluate' every 5 minutes; exposed as a + * function so integration tests can drive it without a worker. + */ + +import { logger } from '@/lib/logger'; +import { db } from '@/lib/db'; +import { ports } from '@/lib/db/schema/ports'; + +import { reconcileAlertsForPort } from './alerts.service'; +import { RULE_REGISTRY, listRuleIds } from './alert-rules'; + +export interface EngineRunSummary { + portsScanned: number; + rulesEvaluated: number; + errors: Array<{ portId: string; ruleId: string; message: string }>; +} + +/** Evaluate every rule for every port, upsert + auto-resolve. */ +export async function runAlertEngine(): Promise { + const allPorts = await db.select({ id: ports.id, slug: ports.slug }).from(ports); + return runAlertEngineForPorts(allPorts.map((p) => p.id)); +} + +/** Same engine scoped to a specific list of port IDs (used by tests + the + * per-port webhook trigger). */ +export async function runAlertEngineForPorts(portIds: string[]): Promise { + const ruleIds = listRuleIds(); + const errors: EngineRunSummary['errors'] = []; + + for (const portId of portIds) { + for (const ruleId of ruleIds) { + try { + const candidates = await RULE_REGISTRY[ruleId](portId); + await reconcileAlertsForPort(portId, ruleId, candidates); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + logger.warn({ portId, ruleId, err }, 'alert rule evaluator failed'); + errors.push({ portId, ruleId, message }); + } + } + } + + return { + portsScanned: portIds.length, + rulesEvaluated: portIds.length * ruleIds.length, + errors, + }; +} diff --git a/src/lib/services/alert-rules.ts b/src/lib/services/alert-rules.ts index c98db08..9dd02a1 100644 --- a/src/lib/services/alert-rules.ts +++ b/src/lib/services/alert-rules.ts @@ -1,31 +1,373 @@ /** * Alert rule catalog. Each entry is a pure async function that takes a * `portId` and returns an array of `AlertCandidate` rows the engine should - * upsert into `alerts`. Skeleton: signatures only — implementations land - * in PR2. + * upsert. The engine (in `alerts.service.ts`) handles dedupe via the + * fingerprint partial-unique index and auto-resolves stale alerts. + * + * Adding a rule: + * 1. Add the literal to `ALERT_RULES` in schema/insights.ts. + * 2. Implement the evaluator below. + * 3. Register it in `RULE_REGISTRY`. + * 4. Add a unit test in tests/unit/services/alert-rules-evaluators.test.ts. */ -import type { AlertCandidate } from './alerts.service'; +import { and, eq, isNull, isNotNull, lt, gt, gte, sql, inArray, or, desc } from 'drizzle-orm'; + +import { db } from '@/lib/db'; +import { interests } from '@/lib/db/schema/interests'; +import { berthReservations } from '@/lib/db/schema/reservations'; +import { berths } from '@/lib/db/schema/berths'; +import { documents, documentSigners } from '@/lib/db/schema/documents'; +import { expenses } from '@/lib/db/schema/financial'; +import { auditLogs } from '@/lib/db/schema/system'; +import { alerts as alertsTable } from '@/lib/db/schema/insights'; import { ALERT_RULES, type AlertRuleId } from '@/lib/db/schema/insights'; +import type { AlertCandidate } from './alerts.service'; + type RuleEvaluator = (portId: string) => Promise; -/** Empty implementations — every evaluator returns no candidates. PR2 - * fills these in; the cron dispatcher in PR2 walks `RULE_REGISTRY`. */ +const DAY_MS = 86_400_000; + +function daysAgo(n: number): Date { + return new Date(Date.now() - n * DAY_MS); +} + +// ─── reservation.no_agreement ───────────────────────────────────────────────── +// Active reservations > 3 days old that have no reservation_agreement document +// in any non-cancelled state. + +async function reservationNoAgreement(portId: string): Promise { + const rows = await db + .select({ + id: berthReservations.id, + startDate: berthReservations.startDate, + clientName: sql`coalesce((SELECT full_name FROM clients WHERE id = ${berthReservations.clientId}), 'unknown')`, + yachtName: sql`coalesce((SELECT name FROM yachts WHERE id = ${berthReservations.yachtId}), 'unknown')`, + }) + .from(berthReservations) + .where( + and( + eq(berthReservations.portId, portId), + eq(berthReservations.status, 'active'), + lt(berthReservations.createdAt, daysAgo(3)), + sql`NOT EXISTS ( + SELECT 1 FROM ${documents} + WHERE ${documents.reservationId} = ${berthReservations.id} + AND ${documents.documentType} = 'reservation_agreement' + AND ${documents.status} NOT IN ('cancelled', 'expired') + )`, + ), + ); + + return rows.map((r) => ({ + ruleId: 'reservation.no_agreement', + severity: 'warning', + title: `Reservation needs an agreement`, + body: `Active reservation for ${r.yachtName} (${r.clientName}) has no signed agreement yet.`, + link: `/[port]/berth-reservations/${r.id}`, + entityType: 'reservation', + entityId: r.id, + })); +} + +// ─── interest.stale ─────────────────────────────────────────────────────────── +// Pipeline stuck in mid-funnel stages with no contact for 14+ days. + +async function interestStale(portId: string): Promise { + const STALE_STAGES = ['details_sent', 'in_communication', 'visited']; + const rows = await db + .select({ + id: interests.id, + stage: interests.pipelineStage, + lastContact: interests.dateLastContact, + clientName: sql`coalesce((SELECT full_name FROM clients WHERE id = ${interests.clientId}), 'unknown')`, + }) + .from(interests) + .where( + and( + eq(interests.portId, portId), + inArray(interests.pipelineStage, STALE_STAGES), + isNull(interests.archivedAt), + or( + lt(interests.dateLastContact, daysAgo(14)), + and(isNull(interests.dateLastContact), lt(interests.updatedAt, daysAgo(14))), + ), + ), + ); + + return rows.map((r) => ({ + ruleId: 'interest.stale', + severity: 'info', + title: `Stale interest: ${r.clientName}`, + body: `In '${r.stage}' with no contact for 14+ days.`, + link: `/[port]/interests/${r.id}`, + entityType: 'interest', + entityId: r.id, + metadata: { stage: r.stage, lastContact: r.lastContact }, + })); +} + +// ─── document.expiring_soon ─────────────────────────────────────────────────── +// In-flight signing documents whose expiry is within 7 days. + +async function documentExpiringSoon(_portId: string): Promise { + // documents schema doesn't expose expires_at on the parent row in this + // build. Until the column lands, fall back to no-op so the rule slot + // is registered but doesn't fire. + return []; +} + +// ─── document.signer_overdue ────────────────────────────────────────────────── +// Pending signer for >14d, last reminder >7d ago (or never). + +async function documentSignerOverdue(portId: string): Promise { + const cutoff = daysAgo(14); + const rows = await db + .select({ + docId: documents.id, + title: documents.title, + docType: documents.documentType, + signerId: documentSigners.id, + signerEmail: documentSigners.signerEmail, + signerName: documentSigners.signerName, + sentAt: documentSigners.createdAt, + }) + .from(documents) + .innerJoin(documentSigners, eq(documentSigners.documentId, documents.id)) + .where( + and( + eq(documents.portId, portId), + inArray(documents.status, ['sent', 'partially_signed']), + eq(documentSigners.status, 'pending'), + lt(documentSigners.createdAt, cutoff), + ), + ); + + return rows.map((r) => ({ + ruleId: 'document.signer_overdue', + severity: 'warning', + title: `Signer overdue: ${r.signerName}`, + body: `${r.docType.toUpperCase()} "${r.title}" — pending >14 days.`, + link: `/[port]/documents/${r.docId}`, + entityType: 'document', + entityId: r.docId, + metadata: { signerId: r.signerId, signerEmail: r.signerEmail, sentAt: r.sentAt }, + })); +} + +// ─── berth.under_offer_stalled ──────────────────────────────────────────────── +// Berths sitting in 'under_offer' status for 30+ days. + +async function berthUnderOfferStalled(portId: string): Promise { + const rows = await db + .select({ + id: berths.id, + mooringNumber: berths.mooringNumber, + updatedAt: berths.updatedAt, + }) + .from(berths) + .where( + and( + eq(berths.portId, portId), + eq(berths.status, 'under_offer'), + lt(berths.updatedAt, daysAgo(30)), + ), + ); + + return rows.map((r) => ({ + ruleId: 'berth.under_offer_stalled', + severity: 'info', + title: `Berth ${r.mooringNumber} stalled under offer`, + body: `No status change in 30+ days.`, + link: `/[port]/berths/${r.id}`, + entityType: 'berth', + entityId: r.id, + metadata: { stalledSince: r.updatedAt }, + })); +} + +// ─── expense.duplicate ──────────────────────────────────────────────────────── +// Expenses whose duplicate_of is set (the dedup service writes this). + +async function expenseDuplicate(portId: string): Promise { + const rows = await db + .select({ + id: expenses.id, + vendor: expenses.establishmentName, + amount: expenses.amount, + duplicateOf: expenses.duplicateOf, + }) + .from(expenses) + .where( + and( + eq(expenses.portId, portId), + isNotNull(expenses.duplicateOf), + isNull(expenses.archivedAt), + ), + ); + + return rows.map((r) => ({ + ruleId: 'expense.duplicate', + severity: 'info', + title: `Possible duplicate expense`, + body: `${r.vendor ?? 'Unknown vendor'} — ${r.amount}.`, + link: `/[port]/expenses/${r.id}`, + entityType: 'expense', + entityId: r.id, + metadata: { duplicateOf: r.duplicateOf }, + })); +} + +// ─── expense.unscanned ──────────────────────────────────────────────────────── +// Expense uploaded with a receipt file but OCR didn't run / failed > 1h ago. + +async function expenseUnscanned(portId: string): Promise { + const rows = await db + .select({ + id: expenses.id, + vendor: expenses.establishmentName, + ocrStatus: expenses.ocrStatus, + createdAt: expenses.createdAt, + }) + .from(expenses) + .where( + and( + eq(expenses.portId, portId), + eq(expenses.ocrStatus, 'pending'), + sql`array_length(${expenses.receiptFileIds}, 1) > 0`, + lt(expenses.createdAt, new Date(Date.now() - 60 * 60 * 1000)), + isNull(expenses.archivedAt), + ), + ); + + return rows.map((r) => ({ + ruleId: 'expense.unscanned', + severity: 'info', + title: `Receipt not scanned`, + body: `${r.vendor ?? 'Unknown vendor'} — uploaded over an hour ago.`, + link: `/[port]/expenses/${r.id}`, + entityType: 'expense', + entityId: r.id, + })); +} + +// ─── interest.high_value_silent ─────────────────────────────────────────────── +// Hot leads with no contact for 7+ days. Highest severity in the catalog. + +async function interestHighValueSilent(portId: string): Promise { + const cutoff = daysAgo(7); + const rows = await db + .select({ + id: interests.id, + stage: interests.pipelineStage, + clientName: sql`coalesce((SELECT full_name FROM clients WHERE id = ${interests.clientId}), 'unknown')`, + }) + .from(interests) + .where( + and( + eq(interests.portId, portId), + eq(interests.leadCategory, 'hot_lead'), + isNull(interests.archivedAt), + or( + lt(interests.dateLastContact, cutoff), + and(isNull(interests.dateLastContact), lt(interests.updatedAt, cutoff)), + ), + ), + ); + + return rows.map((r) => ({ + ruleId: 'interest.high_value_silent', + severity: 'critical', + title: `Hot lead silent: ${r.clientName}`, + body: `No contact for 7+ days — high-value at risk.`, + link: `/[port]/interests/${r.id}`, + entityType: 'interest', + entityId: r.id, + metadata: { stage: r.stage }, + })); +} + +// ─── eoi.unsigned_long ──────────────────────────────────────────────────────── +// EOI documents in 'sent' status for 21+ days. + +async function eoiUnsignedLong(portId: string): Promise { + const rows = await db + .select({ + id: documents.id, + title: documents.title, + createdAt: documents.createdAt, + }) + .from(documents) + .where( + and( + eq(documents.portId, portId), + eq(documents.documentType, 'eoi'), + inArray(documents.status, ['sent', 'partially_signed']), + lt(documents.createdAt, daysAgo(21)), + ), + ); + + return rows.map((r) => ({ + ruleId: 'eoi.unsigned_long', + severity: 'warning', + title: `EOI unsigned >21 days`, + body: `"${r.title}" — sent over 3 weeks ago.`, + link: `/[port]/documents/${r.id}`, + entityType: 'document', + entityId: r.id, + })); +} + +// ─── audit.suspicious_login ─────────────────────────────────────────────────── +// >3 failed logins from same IP in the past hour. Depends on the auth layer +// recording rows with action='login.failed' (TODO: instrument better-auth +// hooks to record these — until that lands, this evaluator returns [] and +// the rule slot stays inert). + +async function auditSuspiciousLogin(_portId: string): Promise { + const cutoff = new Date(Date.now() - 60 * 60 * 1000); + const rows = await db + .select({ + ipAddress: auditLogs.ipAddress, + attempts: sql`count(*)::int`, + }) + .from(auditLogs) + .where(and(eq(auditLogs.action, 'login.failed'), gte(auditLogs.createdAt, cutoff))) + .groupBy(auditLogs.ipAddress) + .having(sql`count(*) > 3`); + + return rows + .filter((r) => r.ipAddress) + .map((r) => ({ + ruleId: 'audit.suspicious_login' as const, + severity: 'critical' as const, + title: `Repeated failed logins`, + body: `${r.attempts} failed attempts from ${r.ipAddress} in the last hour.`, + link: `/[port]/admin/audit?ip=${encodeURIComponent(r.ipAddress!)}`, + entityType: 'audit', + entityId: r.ipAddress!, + metadata: { attempts: r.attempts }, + })); +} + export const RULE_REGISTRY: Record = { - 'reservation.no_agreement': async () => [], - 'interest.stale': async () => [], - 'document.expiring_soon': async () => [], - 'document.signer_overdue': async () => [], - 'berth.under_offer_stalled': async () => [], - 'expense.duplicate': async () => [], - 'expense.unscanned': async () => [], - 'interest.high_value_silent': async () => [], - 'eoi.unsigned_long': async () => [], - 'audit.suspicious_login': async () => [], + 'reservation.no_agreement': reservationNoAgreement, + 'interest.stale': interestStale, + 'document.expiring_soon': documentExpiringSoon, + 'document.signer_overdue': documentSignerOverdue, + 'berth.under_offer_stalled': berthUnderOfferStalled, + 'expense.duplicate': expenseDuplicate, + 'expense.unscanned': expenseUnscanned, + 'interest.high_value_silent': interestHighValueSilent, + 'eoi.unsigned_long': eoiUnsignedLong, + 'audit.suspicious_login': auditSuspiciousLogin, }; -/** Sanity check: catalog matches the ALERT_RULES literal type. */ export function listRuleIds(): readonly AlertRuleId[] { return ALERT_RULES; } + +// silence unused-import warnings until later PRs use them +const _unused = { gt, desc, alertsTable }; +void _unused; diff --git a/src/lib/services/alerts.service.ts b/src/lib/services/alerts.service.ts index b294bef..5de877a 100644 --- a/src/lib/services/alerts.service.ts +++ b/src/lib/services/alerts.service.ts @@ -11,6 +11,7 @@ import { createHash } from 'crypto'; import { db } from '@/lib/db'; import { alerts, type Alert, type AlertSeverity, type AlertRuleId } from '@/lib/db/schema/insights'; +import { emitToRoom } from '@/lib/socket/server'; export interface AlertCandidate { ruleId: AlertRuleId; @@ -46,10 +47,11 @@ export async function reconcileAlertsForPort( candidates: AlertCandidate[], ): Promise { // Insert new / leave existing — only one open row per fingerprint - // thanks to the partial unique index. + // thanks to the partial unique index. Track newly inserted rows so we + // can emit `alert:created` to the port room. for (const c of candidates) { const fingerprint = fingerprintFor(c); - await db + const inserted = await db .insert(alerts) .values({ portId, @@ -63,7 +65,18 @@ export async function reconcileAlertsForPort( fingerprint, metadata: c.metadata ?? {}, }) - .onConflictDoNothing(); + .onConflictDoNothing() + .returning({ id: alerts.id }); + if (inserted[0]) { + emitToRoom(`port:${portId}`, 'alert:created', { + alertId: inserted[0].id, + portId, + ruleId: c.ruleId, + severity: c.severity, + title: c.title, + link: c.link, + }); + } } // Auto-resolve open alerts for this rule whose fingerprint disappeared. @@ -77,14 +90,23 @@ export async function reconcileAlertsForPort( .update(alerts) .set({ resolvedAt: sql`now()` }) .where(eq(alerts.id, a.id)); + emitToRoom(`port:${portId}`, 'alert:resolved', { + alertId: a.id, + portId, + ruleId, + }); } } export async function dismissAlert(alertId: string, userId: string): Promise { - await db + const [row] = await db .update(alerts) .set({ dismissedAt: sql`now()`, dismissedBy: userId }) - .where(eq(alerts.id, alertId)); + .where(eq(alerts.id, alertId)) + .returning({ id: alerts.id, portId: alerts.portId }); + if (row) { + emitToRoom(`port:${row.portId}`, 'alert:dismissed', { alertId: row.id, portId: row.portId }); + } } export async function acknowledgeAlert(alertId: string, userId: string): Promise { diff --git a/src/lib/socket/events.ts b/src/lib/socket/events.ts index ec5efae..2d38dfb 100644 --- a/src/lib/socket/events.ts +++ b/src/lib/socket/events.ts @@ -246,6 +246,18 @@ export interface ServerToClientEvents { }) => void; 'file:updated': (payload: { fileId: string; changedFields?: string[] }) => void; 'file:deleted': (payload: { fileId: string; filename?: string }) => void; + + // Phase B alert framework + 'alert:created': (payload: { + alertId: string; + portId: string; + ruleId: string; + severity: 'info' | 'warning' | 'critical'; + title: string; + link: string; + }) => void; + 'alert:resolved': (payload: { alertId: string; portId: string; ruleId: string }) => void; + 'alert:dismissed': (payload: { alertId: string; portId: string }) => void; } // Client → Server events (minimal — most actions go through REST API) diff --git a/tests/integration/alerts-engine.test.ts b/tests/integration/alerts-engine.test.ts new file mode 100644 index 0000000..e2a4401 --- /dev/null +++ b/tests/integration/alerts-engine.test.ts @@ -0,0 +1,206 @@ +/** + * Engine integration test — drives `runAlertEngineForPorts` against + * seeded conditions and asserts: (1) correct alerts upsert, (2) running + * twice doesn't duplicate, (3) mutating state auto-resolves stale alerts. + * + * Socket emissions are stubbed via vi.mock so the test stays offline. + */ + +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { and, eq, isNull } from 'drizzle-orm'; + +vi.mock('@/lib/socket/server', () => ({ + emitToRoom: vi.fn(), +})); + +import { db } from '@/lib/db'; +import { alerts } from '@/lib/db/schema/insights'; +import { interests } from '@/lib/db/schema/interests'; +import { berthReservations } from '@/lib/db/schema/reservations'; +import { documents } from '@/lib/db/schema/documents'; +import { runAlertEngineForPorts } from '@/lib/services/alert-engine'; +import { makePort, makeClient, makeBerth, makeYacht } from '../helpers/factories'; + +async function clearAlerts(portId: string) { + await db.delete(alerts).where(eq(alerts.portId, portId)); +} + +async function listOpenAlerts(portId: string, ruleId: string) { + return db + .select() + .from(alerts) + .where(and(eq(alerts.portId, portId), eq(alerts.ruleId, ruleId), isNull(alerts.resolvedAt))); +} + +describe('alert engine', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it('reservation.no_agreement fires for active reservation older than 3 days without agreement', async () => { + const port = await makePort(); + const client = await makeClient({ portId: port.id }); + const berth = await makeBerth({ portId: port.id }); + const yacht = await makeYacht({ + portId: port.id, + ownerType: 'client', + ownerId: client.id, + overrides: { name: 'M/Y Test' }, + }); + const fourDaysAgo = new Date(Date.now() - 4 * 86_400_000); + const [resv] = await db + .insert(berthReservations) + .values({ + portId: port.id, + berthId: berth.id, + clientId: client.id, + yachtId: yacht.id, + status: 'active', + startDate: new Date(), + createdBy: 'seed', + createdAt: fourDaysAgo, + }) + .returning(); + expect(resv).toBeDefined(); + + await clearAlerts(port.id); + await runAlertEngineForPorts([port.id]); + + const open = await listOpenAlerts(port.id, 'reservation.no_agreement'); + expect(open).toHaveLength(1); + expect(open[0]!.entityId).toBe(resv!.id); + expect(open[0]!.severity).toBe('warning'); + }); + + it('does not duplicate on a second sweep', async () => { + const port = await makePort(); + const client = await makeClient({ portId: port.id }); + const berth = await makeBerth({ portId: port.id }); + const yacht = await makeYacht({ + portId: port.id, + ownerType: 'client', + ownerId: client.id, + }); + const stale = new Date(Date.now() - 10 * 86_400_000); + await db.insert(berthReservations).values({ + portId: port.id, + berthId: berth.id, + clientId: client.id, + yachtId: yacht.id, + status: 'active', + startDate: new Date(), + createdBy: 'seed', + createdAt: stale, + }); + + await clearAlerts(port.id); + await runAlertEngineForPorts([port.id]); + await runAlertEngineForPorts([port.id]); + + const open = await listOpenAlerts(port.id, 'reservation.no_agreement'); + expect(open).toHaveLength(1); + }); + + it('auto-resolves an open alert when the underlying condition clears', async () => { + const port = await makePort(); + const client = await makeClient({ portId: port.id }); + const berth = await makeBerth({ portId: port.id }); + const yacht = await makeYacht({ + portId: port.id, + ownerType: 'client', + ownerId: client.id, + }); + const tenDaysAgo = new Date(Date.now() - 10 * 86_400_000); + const [resv] = await db + .insert(berthReservations) + .values({ + portId: port.id, + berthId: berth.id, + clientId: client.id, + yachtId: yacht.id, + status: 'active', + startDate: new Date(), + createdBy: 'seed', + createdAt: tenDaysAgo, + }) + .returning(); + + await clearAlerts(port.id); + await runAlertEngineForPorts([port.id]); + expect(await listOpenAlerts(port.id, 'reservation.no_agreement')).toHaveLength(1); + + // Add an agreement document — condition no longer fires. + await db.insert(documents).values({ + portId: port.id, + reservationId: resv!.id, + documentType: 'reservation_agreement', + title: 'Reservation Agreement', + status: 'sent', + createdBy: 'seed', + }); + await runAlertEngineForPorts([port.id]); + + expect(await listOpenAlerts(port.id, 'reservation.no_agreement')).toHaveLength(0); + const allRows = await db + .select() + .from(alerts) + .where(and(eq(alerts.portId, port.id), eq(alerts.ruleId, 'reservation.no_agreement'))); + expect(allRows).toHaveLength(1); + expect(allRows[0]!.resolvedAt).not.toBeNull(); + }); + + it('interest.stale fires for old leads in mid-funnel stages', async () => { + const port = await makePort(); + const client = await makeClient({ portId: port.id }); + const stale = new Date(Date.now() - 30 * 86_400_000); + const [interest] = await db + .insert(interests) + .values({ + portId: port.id, + clientId: client.id, + pipelineStage: 'in_communication', + dateLastContact: stale, + createdAt: stale, + updatedAt: stale, + }) + .returning(); + + await clearAlerts(port.id); + await runAlertEngineForPorts([port.id]); + + const open = await listOpenAlerts(port.id, 'interest.stale'); + expect(open).toHaveLength(1); + expect(open[0]!.entityId).toBe(interest!.id); + expect(open[0]!.severity).toBe('info'); + }); + + it('interest.high_value_silent fires for hot leads silent >7d', async () => { + const port = await makePort(); + const client = await makeClient({ portId: port.id }); + const stale = new Date(Date.now() - 10 * 86_400_000); + await db.insert(interests).values({ + portId: port.id, + clientId: client.id, + pipelineStage: 'visited', + leadCategory: 'hot_lead', + dateLastContact: stale, + updatedAt: stale, + }); + + await clearAlerts(port.id); + await runAlertEngineForPorts([port.id]); + + const open = await listOpenAlerts(port.id, 'interest.high_value_silent'); + expect(open).toHaveLength(1); + expect(open[0]!.severity).toBe('critical'); + }); + + it('engine reports rule errors without crashing the sweep', async () => { + const port = await makePort(); + const summary = await runAlertEngineForPorts([port.id]); + expect(summary.portsScanned).toBe(1); + expect(summary.rulesEvaluated).toBeGreaterThan(0); + // No conditions seeded — no rules should fail. + expect(summary.errors).toHaveLength(0); + }); +});