/** * Alert rule catalog. Each entry is a pure async function that takes a * `portId` and returns an array of `AlertCandidate` rows the engine should * upsert. The engine (in `alerts.service.ts`) handles dedupe via the * fingerprint partial-unique index and auto-resolves stale alerts. * * Adding a rule: * 1. Add the literal to `ALERT_RULES` in schema/insights.ts. * 2. Implement the evaluator below. * 3. Register it in `RULE_REGISTRY`. * 4. Add a unit test in tests/unit/services/alert-rules-evaluators.test.ts. */ import { and, eq, isNull, isNotNull, lt, gt, sql, inArray, or, desc } from 'drizzle-orm'; import { db } from '@/lib/db'; import { interests } from '@/lib/db/schema/interests'; import { berthReservations } from '@/lib/db/schema/reservations'; import { berths } from '@/lib/db/schema/berths'; import { documents, documentSigners } from '@/lib/db/schema/documents'; import { expenses } from '@/lib/db/schema/financial'; import { alerts as alertsTable } from '@/lib/db/schema/insights'; import { ALERT_RULES, type AlertRuleId } from '@/lib/db/schema/insights'; import type { AlertCandidate } from './alerts.service'; type RuleEvaluator = (portId: string) => Promise; const DAY_MS = 86_400_000; function daysAgo(n: number): Date { return new Date(Date.now() - n * DAY_MS); } // ─── reservation.no_agreement ───────────────────────────────────────────────── // Active reservations > 3 days old that have no reservation_agreement document // in any non-cancelled state. async function reservationNoAgreement(portId: string): Promise { const rows = await db .select({ id: berthReservations.id, startDate: berthReservations.startDate, clientName: sql`coalesce((SELECT full_name FROM clients WHERE id = ${berthReservations.clientId}), 'unknown')`, yachtName: sql`coalesce((SELECT name FROM yachts WHERE id = ${berthReservations.yachtId}), 'unknown')`, }) .from(berthReservations) .where( and( eq(berthReservations.portId, portId), eq(berthReservations.status, 'active'), lt(berthReservations.createdAt, daysAgo(3)), sql`NOT EXISTS ( SELECT 1 FROM ${documents} WHERE ${documents.reservationId} = ${berthReservations.id} AND ${documents.documentType} = 'reservation_agreement' AND ${documents.status} NOT IN ('cancelled', 'expired') )`, ), ); return rows.map((r) => ({ ruleId: 'reservation.no_agreement', severity: 'warning', title: `Reservation needs an agreement`, body: `Active reservation for ${r.yachtName} (${r.clientName}) has no signed agreement yet.`, link: `/[port]/berth-reservations/${r.id}`, entityType: 'reservation', entityId: r.id, })); } // ─── interest.stale ─────────────────────────────────────────────────────────── // Pipeline stuck in mid-funnel stages with no contact for 14+ days. async function interestStale(portId: string): Promise { // Mid-funnel stages where silence is a problem. EOI/deposit/contract stages // have their own dedicated alerts (eoi.unsigned_long, deposit_overdue, etc.). const STALE_STAGES = ['details_sent', 'in_communication', 'eoi_sent']; const rows = await db .select({ id: interests.id, stage: interests.pipelineStage, lastContact: interests.dateLastContact, clientName: sql`coalesce((SELECT full_name FROM clients WHERE id = ${interests.clientId}), 'unknown')`, }) .from(interests) .where( and( eq(interests.portId, portId), inArray(interests.pipelineStage, STALE_STAGES), isNull(interests.archivedAt), or( lt(interests.dateLastContact, daysAgo(14)), and(isNull(interests.dateLastContact), lt(interests.updatedAt, daysAgo(14))), ), ), ); return rows.map((r) => ({ ruleId: 'interest.stale', severity: 'info', title: `Stale interest: ${r.clientName}`, body: `In '${r.stage}' with no contact for 14+ days.`, link: `/[port]/interests/${r.id}`, entityType: 'interest', entityId: r.id, metadata: { stage: r.stage, lastContact: r.lastContact }, })); } // ─── document.signer_overdue ────────────────────────────────────────────────── // Pending signer for >14d, last reminder >7d ago (or never). async function documentSignerOverdue(portId: string): Promise { const cutoff = daysAgo(14); const rows = await db .select({ docId: documents.id, title: documents.title, docType: documents.documentType, signerId: documentSigners.id, signerEmail: documentSigners.signerEmail, signerName: documentSigners.signerName, sentAt: documentSigners.createdAt, }) .from(documents) .innerJoin(documentSigners, eq(documentSigners.documentId, documents.id)) .where( and( eq(documents.portId, portId), inArray(documents.status, ['sent', 'partially_signed']), eq(documentSigners.status, 'pending'), lt(documentSigners.createdAt, cutoff), ), ); return rows.map((r) => ({ ruleId: 'document.signer_overdue', severity: 'warning', title: `Signer overdue: ${r.signerName}`, body: `${r.docType.toUpperCase()} "${r.title}" - pending >14 days.`, link: `/[port]/documents/${r.docId}`, entityType: 'document', entityId: r.docId, metadata: { signerId: r.signerId, signerEmail: r.signerEmail, sentAt: r.sentAt }, })); } // ─── berth.under_offer_stalled ──────────────────────────────────────────────── // Berths sitting in 'under_offer' status for 30+ days. async function berthUnderOfferStalled(portId: string): Promise { const rows = await db .select({ id: berths.id, mooringNumber: berths.mooringNumber, updatedAt: berths.updatedAt, }) .from(berths) .where( and( eq(berths.portId, portId), eq(berths.status, 'under_offer'), lt(berths.updatedAt, daysAgo(30)), ), ); return rows.map((r) => ({ ruleId: 'berth.under_offer_stalled', severity: 'info', title: `Berth ${r.mooringNumber} stalled under offer`, body: `No status change in 30+ days.`, link: `/[port]/berths/${r.id}`, entityType: 'berth', entityId: r.id, metadata: { stalledSince: r.updatedAt }, })); } // ─── expense.duplicate ──────────────────────────────────────────────────────── // Expenses whose duplicate_of is set (the dedup service writes this). async function expenseDuplicate(portId: string): Promise { const rows = await db .select({ id: expenses.id, vendor: expenses.establishmentName, amount: expenses.amount, duplicateOf: expenses.duplicateOf, }) .from(expenses) .where( and( eq(expenses.portId, portId), isNotNull(expenses.duplicateOf), isNull(expenses.archivedAt), ), ); return rows.map((r) => ({ ruleId: 'expense.duplicate', severity: 'info', title: `Possible duplicate expense`, body: `${r.vendor ?? 'Unknown vendor'} - ${r.amount}.`, link: `/[port]/expenses/${r.id}`, entityType: 'expense', entityId: r.id, metadata: { duplicateOf: r.duplicateOf }, })); } // ─── expense.unscanned ──────────────────────────────────────────────────────── // Expense uploaded with a receipt file but OCR didn't run / failed > 1h ago. async function expenseUnscanned(portId: string): Promise { const rows = await db .select({ id: expenses.id, vendor: expenses.establishmentName, ocrStatus: expenses.ocrStatus, createdAt: expenses.createdAt, }) .from(expenses) .where( and( eq(expenses.portId, portId), eq(expenses.ocrStatus, 'pending'), sql`array_length(${expenses.receiptFileIds}, 1) > 0`, lt(expenses.createdAt, new Date(Date.now() - 60 * 60 * 1000)), isNull(expenses.archivedAt), ), ); return rows.map((r) => ({ ruleId: 'expense.unscanned', severity: 'info', title: `Receipt not scanned`, body: `${r.vendor ?? 'Unknown vendor'} - uploaded over an hour ago.`, link: `/[port]/expenses/${r.id}`, entityType: 'expense', entityId: r.id, })); } // ─── interest.high_value_silent ─────────────────────────────────────────────── // Hot leads with no contact for 7+ days. Highest severity in the catalog. async function interestHighValueSilent(portId: string): Promise { const cutoff = daysAgo(7); const rows = await db .select({ id: interests.id, stage: interests.pipelineStage, clientName: sql`coalesce((SELECT full_name FROM clients WHERE id = ${interests.clientId}), 'unknown')`, }) .from(interests) .where( and( eq(interests.portId, portId), eq(interests.leadCategory, 'hot_lead'), isNull(interests.archivedAt), or( lt(interests.dateLastContact, cutoff), and(isNull(interests.dateLastContact), lt(interests.updatedAt, cutoff)), ), ), ); return rows.map((r) => ({ ruleId: 'interest.high_value_silent', severity: 'critical', title: `Hot lead silent: ${r.clientName}`, body: `No contact for 7+ days - high-value at risk.`, link: `/[port]/interests/${r.id}`, entityType: 'interest', entityId: r.id, metadata: { stage: r.stage }, })); } // ─── eoi.unsigned_long ──────────────────────────────────────────────────────── // EOI documents in 'sent' status for 21+ days. async function eoiUnsignedLong(portId: string): Promise { const rows = await db .select({ id: documents.id, title: documents.title, createdAt: documents.createdAt, }) .from(documents) .where( and( eq(documents.portId, portId), eq(documents.documentType, 'eoi'), inArray(documents.status, ['sent', 'partially_signed']), lt(documents.createdAt, daysAgo(21)), ), ); return rows.map((r) => ({ ruleId: 'eoi.unsigned_long', severity: 'warning', title: `EOI unsigned >21 days`, body: `"${r.title}" - sent over 3 weeks ago.`, link: `/[port]/documents/${r.id}`, entityType: 'document', entityId: r.id, })); } export const RULE_REGISTRY: Record = { 'reservation.no_agreement': reservationNoAgreement, 'interest.stale': interestStale, 'document.signer_overdue': documentSignerOverdue, 'berth.under_offer_stalled': berthUnderOfferStalled, 'expense.duplicate': expenseDuplicate, 'expense.unscanned': expenseUnscanned, 'interest.high_value_silent': interestHighValueSilent, 'eoi.unsigned_long': eoiUnsignedLong, }; export function listRuleIds(): readonly AlertRuleId[] { return ALERT_RULES; } // silence unused-import warnings until later PRs use them const _unused = { gt, desc, alertsTable }; void _unused;