From 13efe177a521073a8eab9b4ab37af1ec78fedb5d Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 17 Jun 2026 17:49:22 +0200 Subject: [PATCH] feat(alerts): split interest.stale into worked-then-quiet + new-untouched (interest.no_activity) - interest.stale now fires only for interests with real in-system follow-up (contact log / note / update audit) that went quiet 14+ days. - new interest.no_activity rule covers never-touched, non-imported interests. - guard interest.high_value_silent against imported-untouched hot leads. - keys off migration_source_links ledger to identify the bulk import, so the imported backlog matches neither rule and the engine auto-resolves the flood. - test teardown: delete interest_contact_log + test migration ledger rows. Co-Authored-By: Claude Fable 5 --- src/lib/db/schema/insights.ts | 1 + src/lib/services/alert-rules.ts | 126 ++++++++++++++++++------ tests/global-setup.ts | 4 + tests/integration/alerts-engine.test.ts | 110 ++++++++++++++++++++- 4 files changed, 208 insertions(+), 33 deletions(-) diff --git a/src/lib/db/schema/insights.ts b/src/lib/db/schema/insights.ts index 995454b0..2b4bb7c6 100644 --- a/src/lib/db/schema/insights.ts +++ b/src/lib/db/schema/insights.ts @@ -99,6 +99,7 @@ export type AlertSeverity = 'info' | 'warning' | 'critical'; export const ALERT_RULES = [ 'reservation.no_agreement', 'interest.stale', + 'interest.no_activity', 'document.signer_overdue', 'berth.under_offer_stalled', 'expense.duplicate', diff --git a/src/lib/services/alert-rules.ts b/src/lib/services/alert-rules.ts index 14fb0927..5d6aa14e 100644 --- a/src/lib/services/alert-rules.ts +++ b/src/lib/services/alert-rules.ts @@ -32,6 +32,46 @@ function daysAgo(n: number): Date { return new Date(Date.now() - n * DAY_MS); } +// ─── shared interest-activity fragments ─────────────────────────────────────── +// Correlated subqueries keyed on `interests.id`, reused by the interest rules. + +/** + * True when the interest was created by the legacy→CRM bulk import. The + * migration ledger is the only reliable marker (no column on `interests`). + */ +const isImportedSql = sql`EXISTS ( + SELECT 1 FROM migration_source_links msl + WHERE msl.source_system = 'nocodb_interests' + AND msl.target_entity_type = 'interest' + AND msl.target_entity_id = ${interests.id} +)`; + +/** + * True when a real user has worked the interest in-system: a logged contact, a + * note, or an UPDATE audit by a real user. The initial create-audit is excluded + * (action='update' only) so a bare, never-touched creation does not count. + */ +const hasFollowupSql = sql`( + EXISTS (SELECT 1 FROM interest_contact_log icl WHERE icl.interest_id = ${interests.id}) + OR EXISTS (SELECT 1 FROM interest_notes inn WHERE inn.interest_id = ${interests.id}) + OR EXISTS ( + SELECT 1 FROM audit_logs al + WHERE al.entity_type = 'interest' AND al.entity_id = ${interests.id} + AND al.user_id IS NOT NULL AND al.action = 'update' + ) +)`; + +/** + * Most recent genuine in-system touch, used as the staleness clock. Coalesced to + * '-infinity' so GREATEST never returns NULL. + */ +const lastTouchAtSql = sql`GREATEST( + COALESCE(${interests.dateLastContact}, '-infinity'::timestamptz), + COALESCE((SELECT max(icl.occurred_at) FROM interest_contact_log icl WHERE icl.interest_id = ${interests.id}), '-infinity'::timestamptz), + COALESCE((SELECT max(inn.created_at) FROM interest_notes inn WHERE inn.interest_id = ${interests.id}), '-infinity'::timestamptz), + COALESCE((SELECT max(al.created_at) FROM audit_logs al WHERE al.entity_type='interest' AND al.entity_id=${interests.id} AND al.user_id IS NOT NULL AND al.action='update'), '-infinity'::timestamptz) +)`; + // ─── reservation.no_agreement ───────────────────────────────────────────────── // Active reservations > 3 days old that have no reservation_agreement document // in any non-cancelled state. @@ -70,22 +110,18 @@ async function reservationNoAgreement(portId: string): Promise })); } +// Mid-funnel stages where silence is a problem. EOI / reservation / deposit / +// contract stages have their own dedicated alerts (eoi.unsigned_long, +// reservation.no_agreement, etc.), so these rules sit before signing kicks in. +const ACTIVE_EARLY_STAGES = ['enquiry', 'qualified', 'nurturing']; + // ─── interest.stale ─────────────────────────────────────────────────────────── -// Pipeline stuck in mid-funnel stages with no contact for 14+ days. +// A lead a user actually WORKED in-system (logged a contact / note / made an +// update) that has since gone quiet for 14+ days. Interests that were merely +// imported and never touched are handled by interest.no_activity, not here — so +// the bulk-import backlog never lands in this rule. async function interestStale(portId: string): Promise { - // Mid-funnel stages where silence is a problem. EOI / reservation / - // deposit / contract stages have their own dedicated alerts - // (eoi.unsigned_long, reservation.no_agreement, deposit_overdue, etc.), - // so this alert sits before signing kicks in. - // - // 2026-05-14 pipeline-refactor sweep: the prior values - // ('details_sent', 'in_communication', 'eoi_sent') were collapsed by - // migration 0062 into the 7-stage canon (enquiry / qualified / - // nurturing / eoi / ...). Until this fix landed, this alert never - // fired because no row in the new schema carried the dead stage - // strings. - const STALE_STAGES = ['enquiry', 'qualified', 'nurturing']; const rows = await db .select({ id: interests.id, @@ -97,25 +133,10 @@ async function interestStale(portId: string): Promise { .where( and( eq(interests.portId, portId), - inArray(interests.pipelineStage, STALE_STAGES), + inArray(interests.pipelineStage, ACTIVE_EARLY_STAGES), isNull(interests.archivedAt), - // An interest can't be "stale for 14+ days" if it has only existed in - // THIS system for less than 14 days. Without this floor, a bulk import - // (which backdates dateLastContact to the legacy value) instantly flags - // every migrated interest as stale and floods the alert rail. - // - // We floor on updatedAt, NOT createdAt: the legacy→CRM migration - // backfilled created_at to each interest's real origination date (so - // analytics date-ranges work), which would make every migrated row look - // 14+ days old and re-open the flood. updated_at is left at the - // migration timestamp, so it's the reliable "entered/last-touched this - // system" clock — migrated rows stay suppressed for 14 days, then the - // contact-based OR below governs. - lt(interests.updatedAt, daysAgo(14)), - or( - lt(interests.dateLastContact, daysAgo(14)), - and(isNull(interests.dateLastContact), lt(interests.updatedAt, daysAgo(14))), - ), + sql`${hasFollowupSql}`, + sql`${lastTouchAtSql} < now() - interval '14 days'`, ), ); @@ -123,7 +144,7 @@ async function interestStale(portId: string): Promise { ruleId: 'interest.stale', severity: 'info', title: `Stale interest: ${r.clientName}`, - body: `In '${STAGE_LABELS[r.stage as PipelineStage] ?? r.stage.replace(/_/g, ' ')}' with no contact for 14+ days.`, + body: `In '${STAGE_LABELS[r.stage as PipelineStage] ?? r.stage.replace(/_/g, ' ')}' — worked but no activity for 14+ days.`, link: `/[port]/interests/${r.id}`, entityType: 'interest', entityId: r.id, @@ -131,6 +152,42 @@ async function interestStale(portId: string): Promise { })); } +// ─── interest.no_activity ───────────────────────────────────────────────────── +// A brand-new inbound interest nobody has touched in-system, 14+ days after it +// arrived. Excludes bulk-imported rows (those live in migration_source_links) +// so the historical backlog never nags. + +async function interestNoActivity(portId: string): Promise { + const rows = await db + .select({ + id: interests.id, + stage: interests.pipelineStage, + clientName: sql`coalesce((SELECT full_name FROM clients WHERE id = ${interests.clientId}), 'unknown')`, + }) + .from(interests) + .where( + and( + eq(interests.portId, portId), + inArray(interests.pipelineStage, ACTIVE_EARLY_STAGES), + isNull(interests.archivedAt), + lt(interests.createdAt, daysAgo(14)), + sql`NOT ${hasFollowupSql}`, + sql`NOT ${isImportedSql}`, + ), + ); + + return rows.map((r) => ({ + ruleId: 'interest.no_activity', + severity: 'info', + title: `New inquiry untouched: ${r.clientName}`, + body: `In '${STAGE_LABELS[r.stage as PipelineStage] ?? r.stage.replace(/_/g, ' ')}' — no activity since it arrived 14+ days ago.`, + link: `/[port]/interests/${r.id}`, + entityType: 'interest', + entityId: r.id, + metadata: { stage: r.stage }, + })); +} + // ─── document.signer_overdue ────────────────────────────────────────────────── // Pending signer for >14d, last reminder >7d ago (or never). @@ -282,6 +339,10 @@ async function interestHighValueSilent(portId: string): Promise { export const RULE_REGISTRY: Record = { 'reservation.no_agreement': reservationNoAgreement, 'interest.stale': interestStale, + 'interest.no_activity': interestNoActivity, 'document.signer_overdue': documentSignerOverdue, 'berth.under_offer_stalled': berthUnderOfferStalled, 'expense.duplicate': expenseDuplicate, diff --git a/tests/global-setup.ts b/tests/global-setup.ts index 65e94dd9..30d2ba1b 100644 --- a/tests/global-setup.ts +++ b/tests/global-setup.ts @@ -47,6 +47,7 @@ export async function teardown() { , del_files AS (DELETE FROM files WHERE port_id IN (SELECT id FROM doomed) RETURNING 1) , del_ft AS (DELETE FROM form_templates WHERE port_id IN (SELECT id FROM doomed) RETURNING 1) , del_gr AS (DELETE FROM generated_reports WHERE port_id IN (SELECT id FROM doomed) RETURNING 1) + , del_icl AS (DELETE FROM interest_contact_log WHERE port_id IN (SELECT id FROM doomed) RETURNING 1) , del_int AS (DELETE FROM interests WHERE port_id IN (SELECT id FROM doomed) RETURNING 1) , del_ib AS (DELETE FROM interest_berths WHERE berth_id IN (SELECT id FROM berths WHERE port_id IN (SELECT id FROM doomed)) RETURNING 1) , del_inv AS (DELETE FROM invoices WHERE port_id IN (SELECT id FROM doomed) RETURNING 1) @@ -69,5 +70,8 @@ export async function teardown() { , del_ports AS (DELETE FROM ports WHERE id IN (SELECT id FROM doomed) RETURNING 1) SELECT 1 `); + // migration_source_links has no port_id FK; purge test-only ledger rows by + // the marker applied_id our alert tests use. + await db.execute(sql`DELETE FROM migration_source_links WHERE applied_id = 'test-apply'`); await closeDb(); } diff --git a/tests/integration/alerts-engine.test.ts b/tests/integration/alerts-engine.test.ts index 7131c93f..e1b7e7b3 100644 --- a/tests/integration/alerts-engine.test.ts +++ b/tests/integration/alerts-engine.test.ts @@ -18,6 +18,8 @@ import { alerts } from '@/lib/db/schema/insights'; import { interests } from '@/lib/db/schema/interests'; import { berthTenancies } from '@/lib/db/schema/tenancies'; import { documents } from '@/lib/db/schema/documents'; +import { interestContactLog } from '@/lib/db/schema/operations'; +import { migrationSourceLinks } from '@/lib/db/schema/migration'; import { runAlertEngineForPorts } from '@/lib/services/alert-engine'; import { makePort, makeClient, makeBerth, makeYacht } from '../helpers/factories'; @@ -32,6 +34,30 @@ async function listOpenAlerts(portId: string, ruleId: string) { .where(and(eq(alerts.portId, portId), eq(alerts.ruleId, ruleId), isNull(alerts.resolvedAt))); } +/** Mark an interest as bulk-imported via the migration ledger. */ +async function markImported(interestId: string) { + await db.insert(migrationSourceLinks).values({ + sourceSystem: 'nocodb_interests', + sourceId: `legacy-${interestId}`, + targetEntityType: 'interest', + targetEntityId: interestId, + appliedId: 'test-apply', + }); +} + +/** A genuine in-system follow-up: a logged contact at `occurredAt`. */ +async function logContact(portId: string, interestId: string, occurredAt: Date) { + await db.insert(interestContactLog).values({ + portId, + interestId, + occurredAt, + channel: 'phone', + direction: 'outbound', + summary: 'Test follow-up', + createdBy: 'seed', + }); +} + describe('alert engine', () => { beforeEach(() => { vi.clearAllMocks(); @@ -149,7 +175,7 @@ describe('alert engine', () => { expect(allRows[0]!.resolvedAt).not.toBeNull(); }); - it('interest.stale fires for old leads in mid-funnel stages', async () => { + it('interest.stale fires for worked leads gone quiet >14d', async () => { const port = await makePort(); const client = await makeClient({ portId: port.id }); const stale = new Date(Date.now() - 30 * 86_400_000); @@ -164,6 +190,9 @@ describe('alert engine', () => { updatedAt: stale, }) .returning(); + // A real in-system follow-up 30 days ago → this is a worked-then-quiet lead, + // not an untouched import. + await logContact(port.id, interest!.id, stale); await clearAlerts(port.id); await runAlertEngineForPorts([port.id]); @@ -172,6 +201,60 @@ describe('alert engine', () => { expect(open).toHaveLength(1); expect(open[0]!.entityId).toBe(interest!.id); expect(open[0]!.severity).toBe('info'); + // A worked lead must not also fire the new-untouched rule. + expect(await listOpenAlerts(port.id, 'interest.no_activity')).toHaveLength(0); + }); + + it('interest.stale does NOT fire for imported, never-touched interests', async () => { + const port = await makePort(); + const client = await makeClient({ portId: port.id }); + const migrationTime = new Date(Date.now() - 20 * 86_400_000); + const legacyDate = new Date(Date.now() - 3 * 365 * 86_400_000); // ~3yr back-dated + const [interest] = await db + .insert(interests) + .values({ + portId: port.id, + clientId: client.id, + pipelineStage: 'qualified', + dateLastContact: legacyDate, // back-dated by the migration + createdAt: migrationTime, + updatedAt: migrationTime, + }) + .returning(); + await markImported(interest!.id); + + await clearAlerts(port.id); + await runAlertEngineForPorts([port.id]); + + // Imported + never touched in-system → neither interest rule should fire. + expect(await listOpenAlerts(port.id, 'interest.stale')).toHaveLength(0); + expect(await listOpenAlerts(port.id, 'interest.no_activity')).toHaveLength(0); + }); + + it('interest.no_activity fires for new, non-imported, untouched interests >14d old', async () => { + const port = await makePort(); + const client = await makeClient({ portId: port.id }); + const created = new Date(Date.now() - 20 * 86_400_000); + const [interest] = await db + .insert(interests) + .values({ + portId: port.id, + clientId: client.id, + pipelineStage: 'enquiry', + dateLastContact: null, + createdAt: created, + updatedAt: created, + }) + .returning(); + + await clearAlerts(port.id); + await runAlertEngineForPorts([port.id]); + + const open = await listOpenAlerts(port.id, 'interest.no_activity'); + expect(open).toHaveLength(1); + expect(open[0]!.entityId).toBe(interest!.id); + expect(open[0]!.severity).toBe('info'); + expect(await listOpenAlerts(port.id, 'interest.stale')).toHaveLength(0); }); it('interest.high_value_silent fires for hot leads silent >7d', async () => { @@ -195,6 +278,31 @@ describe('alert engine', () => { expect(open[0]!.severity).toBe('critical'); }); + it('interest.high_value_silent skips imported, never-touched hot leads', async () => { + const port = await makePort(); + const client = await makeClient({ portId: port.id }); + const migrationTime = new Date(Date.now() - 20 * 86_400_000); + const legacyDate = new Date(Date.now() - 3 * 365 * 86_400_000); + const [interest] = await db + .insert(interests) + .values({ + portId: port.id, + clientId: client.id, + pipelineStage: 'qualified', + leadCategory: 'hot_lead', + dateLastContact: legacyDate, + createdAt: migrationTime, + updatedAt: migrationTime, + }) + .returning(); + await markImported(interest!.id); + + await clearAlerts(port.id); + await runAlertEngineForPorts([port.id]); + + expect(await listOpenAlerts(port.id, 'interest.high_value_silent')).toHaveLength(0); + }); + it('engine reports rule errors without crashing the sweep', async () => { const port = await makePort(); const summary = await runAlertEngineForPorts([port.id]);