feat(alerts): split interest.stale into worked-then-quiet + new-untouched (interest.no_activity)

- interest.stale now fires only for interests with real in-system follow-up
  (contact log / note / update audit) that went quiet 14+ days.
- new interest.no_activity rule covers never-touched, non-imported interests.
- guard interest.high_value_silent against imported-untouched hot leads.
- keys off migration_source_links ledger to identify the bulk import, so the
  imported backlog matches neither rule and the engine auto-resolves the flood.
- test teardown: delete interest_contact_log + test migration ledger rows.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-06-17 17:49:22 +02:00
parent 7591231c47
commit 13efe177a5
4 changed files with 208 additions and 33 deletions

View File

@@ -99,6 +99,7 @@ export type AlertSeverity = 'info' | 'warning' | 'critical';
export const ALERT_RULES = [
'reservation.no_agreement',
'interest.stale',
'interest.no_activity',
'document.signer_overdue',
'berth.under_offer_stalled',
'expense.duplicate',

View File

@@ -32,6 +32,46 @@ function daysAgo(n: number): Date {
return new Date(Date.now() - n * DAY_MS);
}
// ─── shared interest-activity fragments ───────────────────────────────────────
// Correlated subqueries keyed on `interests.id`, reused by the interest rules.
/**
* True when the interest was created by the legacy→CRM bulk import. The
* migration ledger is the only reliable marker (no column on `interests`).
*/
const isImportedSql = sql`EXISTS (
SELECT 1 FROM migration_source_links msl
WHERE msl.source_system = 'nocodb_interests'
AND msl.target_entity_type = 'interest'
AND msl.target_entity_id = ${interests.id}
)`;
/**
* True when a real user has worked the interest in-system: a logged contact, a
* note, or an UPDATE audit by a real user. The initial create-audit is excluded
* (action='update' only) so a bare, never-touched creation does not count.
*/
const hasFollowupSql = sql`(
EXISTS (SELECT 1 FROM interest_contact_log icl WHERE icl.interest_id = ${interests.id})
OR EXISTS (SELECT 1 FROM interest_notes inn WHERE inn.interest_id = ${interests.id})
OR EXISTS (
SELECT 1 FROM audit_logs al
WHERE al.entity_type = 'interest' AND al.entity_id = ${interests.id}
AND al.user_id IS NOT NULL AND al.action = 'update'
)
)`;
/**
* Most recent genuine in-system touch, used as the staleness clock. Coalesced to
* '-infinity' so GREATEST never returns NULL.
*/
const lastTouchAtSql = sql`GREATEST(
COALESCE(${interests.dateLastContact}, '-infinity'::timestamptz),
COALESCE((SELECT max(icl.occurred_at) FROM interest_contact_log icl WHERE icl.interest_id = ${interests.id}), '-infinity'::timestamptz),
COALESCE((SELECT max(inn.created_at) FROM interest_notes inn WHERE inn.interest_id = ${interests.id}), '-infinity'::timestamptz),
COALESCE((SELECT max(al.created_at) FROM audit_logs al WHERE al.entity_type='interest' AND al.entity_id=${interests.id} AND al.user_id IS NOT NULL AND al.action='update'), '-infinity'::timestamptz)
)`;
// ─── reservation.no_agreement ─────────────────────────────────────────────────
// Active reservations > 3 days old that have no reservation_agreement document
// in any non-cancelled state.
@@ -70,22 +110,18 @@ async function reservationNoAgreement(portId: string): Promise<AlertCandidate[]>
}));
}
// Mid-funnel stages where silence is a problem. EOI / reservation / deposit /
// contract stages have their own dedicated alerts (eoi.unsigned_long,
// reservation.no_agreement, etc.), so these rules sit before signing kicks in.
const ACTIVE_EARLY_STAGES = ['enquiry', 'qualified', 'nurturing'];
// ─── interest.stale ───────────────────────────────────────────────────────────
// Pipeline stuck in mid-funnel stages with no contact for 14+ days.
// A lead a user actually WORKED in-system (logged a contact / note / made an
// update) that has since gone quiet for 14+ days. Interests that were merely
// imported and never touched are handled by interest.no_activity, not here — so
// the bulk-import backlog never lands in this rule.
async function interestStale(portId: string): Promise<AlertCandidate[]> {
// Mid-funnel stages where silence is a problem. EOI / reservation /
// deposit / contract stages have their own dedicated alerts
// (eoi.unsigned_long, reservation.no_agreement, deposit_overdue, etc.),
// so this alert sits before signing kicks in.
//
// 2026-05-14 pipeline-refactor sweep: the prior values
// ('details_sent', 'in_communication', 'eoi_sent') were collapsed by
// migration 0062 into the 7-stage canon (enquiry / qualified /
// nurturing / eoi / ...). Until this fix landed, this alert never
// fired because no row in the new schema carried the dead stage
// strings.
const STALE_STAGES = ['enquiry', 'qualified', 'nurturing'];
const rows = await db
.select({
id: interests.id,
@@ -97,25 +133,10 @@ async function interestStale(portId: string): Promise<AlertCandidate[]> {
.where(
and(
eq(interests.portId, portId),
inArray(interests.pipelineStage, STALE_STAGES),
inArray(interests.pipelineStage, ACTIVE_EARLY_STAGES),
isNull(interests.archivedAt),
// An interest can't be "stale for 14+ days" if it has only existed in
// THIS system for less than 14 days. Without this floor, a bulk import
// (which backdates dateLastContact to the legacy value) instantly flags
// every migrated interest as stale and floods the alert rail.
//
// We floor on updatedAt, NOT createdAt: the legacy→CRM migration
// backfilled created_at to each interest's real origination date (so
// analytics date-ranges work), which would make every migrated row look
// 14+ days old and re-open the flood. updated_at is left at the
// migration timestamp, so it's the reliable "entered/last-touched this
// system" clock — migrated rows stay suppressed for 14 days, then the
// contact-based OR below governs.
lt(interests.updatedAt, daysAgo(14)),
or(
lt(interests.dateLastContact, daysAgo(14)),
and(isNull(interests.dateLastContact), lt(interests.updatedAt, daysAgo(14))),
),
sql`${hasFollowupSql}`,
sql`${lastTouchAtSql} < now() - interval '14 days'`,
),
);
@@ -123,7 +144,7 @@ async function interestStale(portId: string): Promise<AlertCandidate[]> {
ruleId: 'interest.stale',
severity: 'info',
title: `Stale interest: ${r.clientName}`,
body: `In '${STAGE_LABELS[r.stage as PipelineStage] ?? r.stage.replace(/_/g, ' ')}' with no contact for 14+ days.`,
body: `In '${STAGE_LABELS[r.stage as PipelineStage] ?? r.stage.replace(/_/g, ' ')}' — worked but no activity for 14+ days.`,
link: `/[port]/interests/${r.id}`,
entityType: 'interest',
entityId: r.id,
@@ -131,6 +152,42 @@ async function interestStale(portId: string): Promise<AlertCandidate[]> {
}));
}
// ─── interest.no_activity ─────────────────────────────────────────────────────
// A brand-new inbound interest nobody has touched in-system, 14+ days after it
// arrived. Excludes bulk-imported rows (those live in migration_source_links)
// so the historical backlog never nags.
async function interestNoActivity(portId: string): Promise<AlertCandidate[]> {
const rows = await db
.select({
id: interests.id,
stage: interests.pipelineStage,
clientName: sql<string>`coalesce((SELECT full_name FROM clients WHERE id = ${interests.clientId}), 'unknown')`,
})
.from(interests)
.where(
and(
eq(interests.portId, portId),
inArray(interests.pipelineStage, ACTIVE_EARLY_STAGES),
isNull(interests.archivedAt),
lt(interests.createdAt, daysAgo(14)),
sql`NOT ${hasFollowupSql}`,
sql`NOT ${isImportedSql}`,
),
);
return rows.map((r) => ({
ruleId: 'interest.no_activity',
severity: 'info',
title: `New inquiry untouched: ${r.clientName}`,
body: `In '${STAGE_LABELS[r.stage as PipelineStage] ?? r.stage.replace(/_/g, ' ')}' — no activity since it arrived 14+ days ago.`,
link: `/[port]/interests/${r.id}`,
entityType: 'interest',
entityId: r.id,
metadata: { stage: r.stage },
}));
}
// ─── document.signer_overdue ──────────────────────────────────────────────────
// Pending signer for >14d, last reminder >7d ago (or never).
@@ -282,6 +339,10 @@ async function interestHighValueSilent(portId: string): Promise<AlertCandidate[]
eq(interests.portId, portId),
eq(interests.leadCategory, 'hot_lead'),
isNull(interests.archivedAt),
// Don't flood from imported-but-never-touched hot leads (their
// dateLastContact is back-dated to a legacy date). Once a user works one
// in-system, it becomes eligible again.
sql`( NOT ${isImportedSql} OR ${hasFollowupSql} )`,
or(
lt(interests.dateLastContact, cutoff),
and(isNull(interests.dateLastContact), lt(interests.updatedAt, cutoff)),
@@ -335,6 +396,7 @@ async function eoiUnsignedLong(portId: string): Promise<AlertCandidate[]> {
export const RULE_REGISTRY: Record<AlertRuleId, RuleEvaluator> = {
'reservation.no_agreement': reservationNoAgreement,
'interest.stale': interestStale,
'interest.no_activity': interestNoActivity,
'document.signer_overdue': documentSignerOverdue,
'berth.under_offer_stalled': berthUnderOfferStalled,
'expense.duplicate': expenseDuplicate,

View File

@@ -47,6 +47,7 @@ export async function teardown() {
, del_files AS (DELETE FROM files WHERE port_id IN (SELECT id FROM doomed) RETURNING 1)
, del_ft AS (DELETE FROM form_templates WHERE port_id IN (SELECT id FROM doomed) RETURNING 1)
, del_gr AS (DELETE FROM generated_reports WHERE port_id IN (SELECT id FROM doomed) RETURNING 1)
, del_icl AS (DELETE FROM interest_contact_log WHERE port_id IN (SELECT id FROM doomed) RETURNING 1)
, del_int AS (DELETE FROM interests WHERE port_id IN (SELECT id FROM doomed) RETURNING 1)
, del_ib AS (DELETE FROM interest_berths WHERE berth_id IN (SELECT id FROM berths WHERE port_id IN (SELECT id FROM doomed)) RETURNING 1)
, del_inv AS (DELETE FROM invoices WHERE port_id IN (SELECT id FROM doomed) RETURNING 1)
@@ -69,5 +70,8 @@ export async function teardown() {
, del_ports AS (DELETE FROM ports WHERE id IN (SELECT id FROM doomed) RETURNING 1)
SELECT 1
`);
// migration_source_links has no port_id FK; purge test-only ledger rows by
// the marker applied_id our alert tests use.
await db.execute(sql`DELETE FROM migration_source_links WHERE applied_id = 'test-apply'`);
await closeDb();
}

View File

@@ -18,6 +18,8 @@ import { alerts } from '@/lib/db/schema/insights';
import { interests } from '@/lib/db/schema/interests';
import { berthTenancies } from '@/lib/db/schema/tenancies';
import { documents } from '@/lib/db/schema/documents';
import { interestContactLog } from '@/lib/db/schema/operations';
import { migrationSourceLinks } from '@/lib/db/schema/migration';
import { runAlertEngineForPorts } from '@/lib/services/alert-engine';
import { makePort, makeClient, makeBerth, makeYacht } from '../helpers/factories';
@@ -32,6 +34,30 @@ async function listOpenAlerts(portId: string, ruleId: string) {
.where(and(eq(alerts.portId, portId), eq(alerts.ruleId, ruleId), isNull(alerts.resolvedAt)));
}
/** Mark an interest as bulk-imported via the migration ledger. */
async function markImported(interestId: string) {
await db.insert(migrationSourceLinks).values({
sourceSystem: 'nocodb_interests',
sourceId: `legacy-${interestId}`,
targetEntityType: 'interest',
targetEntityId: interestId,
appliedId: 'test-apply',
});
}
/** A genuine in-system follow-up: a logged contact at `occurredAt`. */
async function logContact(portId: string, interestId: string, occurredAt: Date) {
await db.insert(interestContactLog).values({
portId,
interestId,
occurredAt,
channel: 'phone',
direction: 'outbound',
summary: 'Test follow-up',
createdBy: 'seed',
});
}
describe('alert engine', () => {
beforeEach(() => {
vi.clearAllMocks();
@@ -149,7 +175,7 @@ describe('alert engine', () => {
expect(allRows[0]!.resolvedAt).not.toBeNull();
});
it('interest.stale fires for old leads in mid-funnel stages', async () => {
it('interest.stale fires for worked leads gone quiet >14d', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
const stale = new Date(Date.now() - 30 * 86_400_000);
@@ -164,6 +190,9 @@ describe('alert engine', () => {
updatedAt: stale,
})
.returning();
// A real in-system follow-up 30 days ago → this is a worked-then-quiet lead,
// not an untouched import.
await logContact(port.id, interest!.id, stale);
await clearAlerts(port.id);
await runAlertEngineForPorts([port.id]);
@@ -172,6 +201,60 @@ describe('alert engine', () => {
expect(open).toHaveLength(1);
expect(open[0]!.entityId).toBe(interest!.id);
expect(open[0]!.severity).toBe('info');
// A worked lead must not also fire the new-untouched rule.
expect(await listOpenAlerts(port.id, 'interest.no_activity')).toHaveLength(0);
});
it('interest.stale does NOT fire for imported, never-touched interests', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
const migrationTime = new Date(Date.now() - 20 * 86_400_000);
const legacyDate = new Date(Date.now() - 3 * 365 * 86_400_000); // ~3yr back-dated
const [interest] = await db
.insert(interests)
.values({
portId: port.id,
clientId: client.id,
pipelineStage: 'qualified',
dateLastContact: legacyDate, // back-dated by the migration
createdAt: migrationTime,
updatedAt: migrationTime,
})
.returning();
await markImported(interest!.id);
await clearAlerts(port.id);
await runAlertEngineForPorts([port.id]);
// Imported + never touched in-system → neither interest rule should fire.
expect(await listOpenAlerts(port.id, 'interest.stale')).toHaveLength(0);
expect(await listOpenAlerts(port.id, 'interest.no_activity')).toHaveLength(0);
});
it('interest.no_activity fires for new, non-imported, untouched interests >14d old', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
const created = new Date(Date.now() - 20 * 86_400_000);
const [interest] = await db
.insert(interests)
.values({
portId: port.id,
clientId: client.id,
pipelineStage: 'enquiry',
dateLastContact: null,
createdAt: created,
updatedAt: created,
})
.returning();
await clearAlerts(port.id);
await runAlertEngineForPorts([port.id]);
const open = await listOpenAlerts(port.id, 'interest.no_activity');
expect(open).toHaveLength(1);
expect(open[0]!.entityId).toBe(interest!.id);
expect(open[0]!.severity).toBe('info');
expect(await listOpenAlerts(port.id, 'interest.stale')).toHaveLength(0);
});
it('interest.high_value_silent fires for hot leads silent >7d', async () => {
@@ -195,6 +278,31 @@ describe('alert engine', () => {
expect(open[0]!.severity).toBe('critical');
});
it('interest.high_value_silent skips imported, never-touched hot leads', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
const migrationTime = new Date(Date.now() - 20 * 86_400_000);
const legacyDate = new Date(Date.now() - 3 * 365 * 86_400_000);
const [interest] = await db
.insert(interests)
.values({
portId: port.id,
clientId: client.id,
pipelineStage: 'qualified',
leadCategory: 'hot_lead',
dateLastContact: legacyDate,
createdAt: migrationTime,
updatedAt: migrationTime,
})
.returning();
await markImported(interest!.id);
await clearAlerts(port.id);
await runAlertEngineForPorts([port.id]);
expect(await listOpenAlerts(port.id, 'interest.high_value_silent')).toHaveLength(0);
});
it('engine reports rule errors without crashing the sweep', async () => {
const port = await makePort();
const summary = await runAlertEngineForPorts([port.id]);