4 Commits

Author SHA1 Message Date
Matt Ciaccio
94f049c8b8 merge: PR2 — alert rules engine + cron + socket (Phase B)
All checks were successful
Build & Push Docker Images / lint (pull_request) Successful in 1m3s
Build & Push Docker Images / build-and-push (pull_request) Has been skipped
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 14:50:57 +02:00
Matt Ciaccio
df495133b7 feat(alerts): rule engine, recurring evaluator, socket fanout
PR2 of Phase B. Wires the alert framework end-to-end:

- alert-rules.ts: 10 rule evaluators implemented as pure async fns over
  the existing schema. reservation.no_agreement, interest.stale,
  document.signer_overdue, berth.under_offer_stalled, expense.duplicate,
  expense.unscanned, interest.high_value_silent, eoi.unsigned_long,
  audit.suspicious_login fire against real conditions.
  document.expiring_soon stays inert until the documents schema gets an
  expires_at column. audit.suspicious_login also stays inert until the
  auth layer logs 'login.failed' rows (TODO noted in the rule body).

- alert-engine.ts: runAlertEngine() walks every port × every rule and
  calls reconcileAlertsForPort. Errors per (port, rule) are collected
  in the summary, not thrown — one bad evaluator can't stop the sweep.

- alerts.service.ts: reconcileAlertsForPort now emits 'alert:created'
  socket events on insert and 'alert:resolved' on auto-resolve;
  dismissAlert emits 'alert:dismissed'. All scoped to port:{portId}
  rooms.

- socket/events.ts: adds the three Server→Client alert event types.

- queue/scheduler.ts: registers 'alerts-evaluate' on the maintenance
  queue with cron */5 * * * * (every 5 min, per spec risk register).

- queue/workers/maintenance.ts: dispatches 'alerts-evaluate' to
  runAlertEngine; logs sweep summary.

Tests:
- tests/integration/alerts-engine.test.ts (6 cases): seeds reservation
  → fires, runs twice → no dupe, adds agreement → auto-resolves; seeds
  stale interest → fires; hot lead silent → critical; engine summary
  shape on no-data port. Socket emit module is vi.mocked.

Vitest 681/681 (was 675; +6). tsc clean. Lint clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 14:50:55 +02:00
Matt Ciaccio
639025ebf9 merge: PR1 — Phase B schema + service skeletons (Phase B)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 14:43:03 +02:00
Matt Ciaccio
e77d55ac50 feat(insights): Phase B schema + service skeletons
PR1 of Phase B per docs/superpowers/specs/2026-04-28-phase-b-insights-alerts-design.md.
Lays the foundation that PRs 2-10 will fill in with behaviour.

Schema (migration 0014):
- alerts table with rule-engine fields (rule_id, severity, link,
  entity_type/id, fingerprint, fired/dismissed/acknowledged/resolved
  timestamps, jsonb metadata). Partial-unique fingerprint index keeps
  one open row per (port, rule, entity); separate indexes power
  severity-filtered and time-ordered queries.
- analytics_snapshots (port_id, metric_id) -> jsonb cache + computedAt
  for the 15-min recurring refresh.
- expenses: duplicate_of self-FK, dedup_scanned_at, ocr_status/raw/
  confidence; partial index on (port, vendor, amount, date) where
  duplicate_of IS NULL drives the dedup heuristic.
- audit_logs.search_text: GENERATED ALWAYS tsvector over
  action+entity_type+entity_id+user_id, GIN-indexed (drizzle can't
  model GENERATED ALWAYS in TS yet, so the migration appends manual
  ALTER + the GIN index).

Service skeletons in src/lib/services/:
- alerts.service.ts: fingerprintFor, reconcileAlertsForPort (upsert +
  auto-resolve), dismiss, acknowledge, listAlertsForPort.
- alert-rules.ts: RULE_REGISTRY of 10 rule evaluators (currently no-op);
  PR2 fills in the bodies.
- analytics.service.ts: readSnapshot/writeSnapshot with 15-min TTL +
  no-op compute* stubs for the four chart series; PR3 fills behavior.
- expense-dedup.service.ts: scanForDuplicates + markBestDuplicate
  using the partial dedup index. PR8 wires the BullMQ trigger.
- expense-ocr.service.ts: OcrResult/OcrLineItem types + ocrReceipt
  stub. PR9 wires Claude Vision (Haiku 4.5 + ephemeral system-prompt
  cache).
- audit-search.service.ts: tsvector @@ plainto_tsquery + cursor
  pagination on (createdAt, id). PR10 wires the admin UI.

tsc clean, lint clean, vitest 675/675 (one unrelated AES random-output
flake passes solo).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 14:43:01 +02:00
18 changed files with 11110 additions and 25 deletions

View File

@@ -0,0 +1,56 @@
CREATE TABLE "alerts" (
"id" text PRIMARY KEY NOT NULL,
"port_id" text NOT NULL,
"rule_id" text NOT NULL,
"severity" text NOT NULL,
"title" text NOT NULL,
"body" text,
"link" text NOT NULL,
"entity_type" text,
"entity_id" text,
"fingerprint" text NOT NULL,
"fired_at" timestamp with time zone DEFAULT now() NOT NULL,
"dismissed_at" timestamp with time zone,
"dismissed_by" text,
"acknowledged_at" timestamp with time zone,
"acknowledged_by" text,
"resolved_at" timestamp with time zone,
"metadata" jsonb DEFAULT '{}'::jsonb
);
--> statement-breakpoint
CREATE TABLE "analytics_snapshots" (
"port_id" text NOT NULL,
"metric_id" text NOT NULL,
"computed_at" timestamp with time zone DEFAULT now() NOT NULL,
"data" jsonb NOT NULL
);
--> statement-breakpoint
ALTER TABLE "expenses" ADD COLUMN "duplicate_of" text;--> statement-breakpoint
ALTER TABLE "expenses" ADD COLUMN "dedup_scanned_at" timestamp with time zone;--> statement-breakpoint
ALTER TABLE "expenses" ADD COLUMN "ocr_status" text DEFAULT 'pending';--> statement-breakpoint
ALTER TABLE "expenses" ADD COLUMN "ocr_raw" jsonb;--> statement-breakpoint
ALTER TABLE "expenses" ADD COLUMN "ocr_confidence" numeric;--> statement-breakpoint
ALTER TABLE "audit_logs" ADD COLUMN "search_text" "tsvector";--> statement-breakpoint
ALTER TABLE "alerts" ADD CONSTRAINT "alerts_port_id_ports_id_fk" FOREIGN KEY ("port_id") REFERENCES "public"."ports"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "alerts" ADD CONSTRAINT "alerts_dismissed_by_user_id_fk" FOREIGN KEY ("dismissed_by") REFERENCES "public"."user"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "alerts" ADD CONSTRAINT "alerts_acknowledged_by_user_id_fk" FOREIGN KEY ("acknowledged_by") REFERENCES "public"."user"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "analytics_snapshots" ADD CONSTRAINT "analytics_snapshots_port_id_ports_id_fk" FOREIGN KEY ("port_id") REFERENCES "public"."ports"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE UNIQUE INDEX "idx_alerts_fingerprint_open" ON "alerts" USING btree ("port_id","fingerprint") WHERE resolved_at IS NULL;--> statement-breakpoint
CREATE INDEX "idx_alerts_port_fired" ON "alerts" USING btree ("port_id","fired_at");--> statement-breakpoint
CREATE INDEX "idx_alerts_port_severity_open" ON "alerts" USING btree ("port_id","severity") WHERE resolved_at IS NULL AND dismissed_at IS NULL;--> statement-breakpoint
CREATE UNIQUE INDEX "idx_analytics_pk" ON "analytics_snapshots" USING btree ("port_id","metric_id");--> statement-breakpoint
ALTER TABLE "expenses" ADD CONSTRAINT "expenses_duplicate_of_expenses_id_fk" FOREIGN KEY ("duplicate_of") REFERENCES "public"."expenses"("id") ON DELETE set null ON UPDATE no action;--> statement-breakpoint
CREATE INDEX "idx_expenses_dedup" ON "expenses" USING btree ("port_id","establishment_name","amount","expense_date") WHERE duplicate_of IS NULL;--> statement-breakpoint
-- audit_logs.search_text needs to be GENERATED ALWAYS (drizzle can't model that
-- in TS yet); drop the empty column and re-add it as the generated form.
ALTER TABLE "audit_logs" DROP COLUMN "search_text";--> statement-breakpoint
ALTER TABLE "audit_logs" ADD COLUMN "search_text" tsvector
GENERATED ALWAYS AS (
to_tsvector('simple',
coalesce("action", '') || ' ' ||
coalesce("entity_type", '') || ' ' ||
coalesce("entity_id", '') || ' ' ||
coalesce("user_id", '')
)
) STORED;--> statement-breakpoint
CREATE INDEX "idx_audit_search" ON "audit_logs" USING gin("search_text");

File diff suppressed because it is too large Load Diff

View File

@@ -99,6 +99,13 @@
"when": 1777334766194,
"tag": "0013_abnormal_thundra",
"breakpoints": true
},
{
"idx": 14,
"version": "7",
"when": 1777379952283,
"tag": "0014_black_banshee",
"breakpoints": true
}
]
}

View File

@@ -8,7 +8,10 @@ import {
index,
uniqueIndex,
primaryKey,
jsonb,
AnyPgColumn,
} from 'drizzle-orm/pg-core';
import { sql } from 'drizzle-orm';
import { ports } from './ports';
import { files } from './documents';
@@ -36,6 +39,19 @@ export const expenses = pgTable(
paymentDate: date('payment_date'),
paymentReference: text('payment_reference'),
paymentNotes: text('payment_notes'),
/** When set, this expense is flagged as a duplicate of another in the
* same port. Self-referencing FK; the dedup service writes this. */
duplicateOf: text('duplicate_of').references((): AnyPgColumn => expenses.id, {
onDelete: 'set null',
}),
/** Last time the dedup heuristic ran against this row. */
dedupScannedAt: timestamp('dedup_scanned_at', { withTimezone: true }),
/** OCR pipeline state: 'pending'|'ok'|'failed'|'low_confidence'. */
ocrStatus: text('ocr_status').default('pending'),
/** Full Claude Vision response payload for audit/debug. */
ocrRaw: jsonb('ocr_raw'),
/** 0..1; values < 0.6 force the verify-mode UI. */
ocrConfidence: numeric('ocr_confidence'),
createdBy: text('created_by').notNull(),
archivedAt: timestamp('archived_at', { withTimezone: true }),
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
@@ -45,6 +61,10 @@ export const expenses = pgTable(
index('idx_expenses_port').on(table.portId),
index('idx_expenses_date').on(table.portId, table.expenseDate),
index('idx_expenses_category').on(table.portId, table.category),
// Powers the dedup heuristic lookup (port + vendor + amount + date window).
index('idx_expenses_dedup')
.on(table.portId, table.establishmentName, table.amount, table.expenseDate)
.where(sql`duplicate_of IS NULL`),
],
);

View File

@@ -47,5 +47,8 @@ export * from './operations';
// System
export * from './system';
// Insights (Phase B): alerts, analytics_snapshots
export * from './insights';
// Relations (must come last — references all tables)
export * from './relations';

View File

@@ -0,0 +1,101 @@
/**
* Phase B — operational insight surfaces.
*
* - `alerts`: rule-engine-fired actionable cards. The fingerprint column
* dedupes re-evaluations of the same condition; the partial unique
* index keeps a single open row per `(port, fingerprint)` while
* resolved/dismissed history accumulates.
* - `analytics_snapshots`: cached aggregate JSON keyed by metric+range,
* refreshed by a recurring job so dashboard hits don't recompute.
*/
import { pgTable, text, timestamp, jsonb, index, uniqueIndex } from 'drizzle-orm/pg-core';
import { sql } from 'drizzle-orm';
import { ports } from './ports';
import { user } from './users';
export const alerts = pgTable(
'alerts',
{
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
portId: text('port_id')
.notNull()
.references(() => ports.id, { onDelete: 'cascade' }),
/** Stable rule identifier: 'reservation.no_agreement', 'interest.stale', ... */
ruleId: text('rule_id').notNull(),
/** 'info' | 'warning' | 'critical' */
severity: text('severity').notNull(),
title: text('title').notNull(),
body: text('body'),
/** Relative path the card deep-links to. */
link: text('link').notNull(),
/** Optional FK target: 'interest', 'reservation', 'document', 'expense', ... */
entityType: text('entity_type'),
entityId: text('entity_id'),
/** Hash of (rule_id + entity_type + entity_id) — dedupes re-evaluations. */
fingerprint: text('fingerprint').notNull(),
firedAt: timestamp('fired_at', { withTimezone: true }).notNull().defaultNow(),
dismissedAt: timestamp('dismissed_at', { withTimezone: true }),
dismissedBy: text('dismissed_by').references(() => user.id),
/** "Someone is on it" — alert stays visible but stops nagging. */
acknowledgedAt: timestamp('acknowledged_at', { withTimezone: true }),
acknowledgedBy: text('acknowledged_by').references(() => user.id),
/** Set by the engine when the underlying condition no longer fires. */
resolvedAt: timestamp('resolved_at', { withTimezone: true }),
/** Per-rule extras: days_stale, amount_at_risk, etc. */
metadata: jsonb('metadata').default({}),
},
(table) => [
// Only one open alert per (port, fingerprint) — re-evaluation upserts.
uniqueIndex('idx_alerts_fingerprint_open')
.on(table.portId, table.fingerprint)
.where(sql`resolved_at IS NULL`),
index('idx_alerts_port_fired').on(table.portId, table.firedAt),
index('idx_alerts_port_severity_open')
.on(table.portId, table.severity)
.where(sql`resolved_at IS NULL AND dismissed_at IS NULL`),
],
);
export type Alert = typeof alerts.$inferSelect;
export type NewAlert = typeof alerts.$inferInsert;
export const analyticsSnapshots = pgTable(
'analytics_snapshots',
{
portId: text('port_id')
.notNull()
.references(() => ports.id, { onDelete: 'cascade' }),
/** Composite key: e.g. 'pipeline_funnel.30d', 'occupancy_timeline.90d'. */
metricId: text('metric_id').notNull(),
computedAt: timestamp('computed_at', { withTimezone: true }).notNull().defaultNow(),
/** Pre-shaped chart data. */
data: jsonb('data').notNull(),
},
(table) => [uniqueIndex('idx_analytics_pk').on(table.portId, table.metricId)],
);
export type AnalyticsSnapshot = typeof analyticsSnapshots.$inferSelect;
export type NewAnalyticsSnapshot = typeof analyticsSnapshots.$inferInsert;
/** Severity literal type for callers that want a typed enum. */
export type AlertSeverity = 'info' | 'warning' | 'critical';
/** Rule IDs in the v1 catalog — keep in sync with `alert-rules.ts`. */
export const ALERT_RULES = [
'reservation.no_agreement',
'interest.stale',
'document.expiring_soon',
'document.signer_overdue',
'berth.under_offer_stalled',
'expense.duplicate',
'expense.unscanned',
'interest.high_value_silent',
'eoi.unsigned_long',
'audit.suspicious_login',
] as const;
export type AlertRuleId = (typeof ALERT_RULES)[number];

View File

@@ -8,14 +8,24 @@ import {
jsonb,
index,
uniqueIndex,
customType,
} from 'drizzle-orm/pg-core';
import { ports } from './ports';
import { clients } from './clients';
// Drizzle doesn't ship a first-class tsvector type; declare a thin custom one.
const tsvector = customType<{ data: string; driverData: string }>({
dataType() {
return 'tsvector';
},
});
export const auditLogs = pgTable(
'audit_logs',
{
id: text('id').primaryKey().$defaultFn(() => crypto.randomUUID()),
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
portId: text('port_id').references(() => ports.id), // null for system-level events
userId: text('user_id'), // null for system-generated events
action: text('action').notNull(), // create, update, delete, archive, restore, merge, login, logout, revert
@@ -31,6 +41,10 @@ export const auditLogs = pgTable(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
revertOf: text('revert_of').references((): any => auditLogs.id),
metadata: jsonb('metadata').default({}),
/** Full-text search column. Stored generated; updated by the migration's
* GENERATED ALWAYS expression covering action + entityType + entityId
* + actor email lookup. */
searchText: tsvector('search_text'),
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
},
(table) => [
@@ -44,7 +58,9 @@ export const auditLogs = pgTable(
export const tags = pgTable(
'tags',
{
id: text('id').primaryKey().$defaultFn(() => crypto.randomUUID()),
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
portId: text('port_id')
.notNull()
.references(() => ports.id),
@@ -61,7 +77,9 @@ export const tags = pgTable(
export const webhooks = pgTable(
'webhooks',
{
id: text('id').primaryKey().$defaultFn(() => crypto.randomUUID()),
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
portId: text('port_id')
.notNull()
.references(() => ports.id),
@@ -80,7 +98,9 @@ export const webhooks = pgTable(
export const webhookDeliveries = pgTable(
'webhook_deliveries',
{
id: text('id').primaryKey().$defaultFn(() => crypto.randomUUID()),
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
webhookId: text('webhook_id')
.notNull()
.references(() => webhooks.id, { onDelete: 'cascade' }),
@@ -115,7 +135,9 @@ export const systemSettings = pgTable(
export const savedViews = pgTable(
'saved_views',
{
id: text('id').primaryKey().$defaultFn(() => crypto.randomUUID()),
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
portId: text('port_id')
.notNull()
.references(() => ports.id),
@@ -136,7 +158,9 @@ export const savedViews = pgTable(
export const scratchpadNotes = pgTable(
'scratchpad_notes',
{
id: text('id').primaryKey().$defaultFn(() => crypto.randomUUID()),
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
userId: text('user_id').notNull(),
content: text('content').notNull(),
linkedClientId: text('linked_client_id').references(() => clients.id),
@@ -150,7 +174,9 @@ export const scratchpadNotes = pgTable(
export const userNotificationPreferences = pgTable(
'user_notification_preferences',
{
id: text('id').primaryKey().$defaultFn(() => crypto.randomUUID()),
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
userId: text('user_id').notNull(),
portId: text('port_id')
.notNull()
@@ -167,7 +193,9 @@ export const userNotificationPreferences = pgTable(
export const currencyRates = pgTable(
'currency_rates',
{
id: text('id').primaryKey().$defaultFn(() => crypto.randomUUID()),
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
baseCurrency: text('base_currency').notNull(),
targetCurrency: text('target_currency').notNull(),
rate: numeric('rate').notNull(),
@@ -182,7 +210,9 @@ export const currencyRates = pgTable(
export const customFieldDefinitions = pgTable(
'custom_field_definitions',
{
id: text('id').primaryKey().$defaultFn(() => crypto.randomUUID()),
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
portId: text('port_id')
.notNull()
.references(() => ports.id),
@@ -204,7 +234,9 @@ export const customFieldDefinitions = pgTable(
export const customFieldValues = pgTable(
'custom_field_values',
{
id: text('id').primaryKey().$defaultFn(() => crypto.randomUUID()),
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
fieldId: text('field_id')
.notNull()
.references(() => customFieldDefinitions.id, { onDelete: 'cascade' }),

View File

@@ -47,6 +47,9 @@ export async function registerRecurringJobs(): Promise<void> {
// Cleanup jobs
{ queue: 'maintenance', name: 'temp-file-cleanup', pattern: '0 5 * * *' },
{ queue: 'maintenance', name: 'form-expiry-check', pattern: '0 * * * *' },
// Phase B: alert rule engine sweep
{ queue: 'maintenance', name: 'alerts-evaluate', pattern: '*/5 * * * *' },
];
for (const job of recurring) {
@@ -56,7 +59,10 @@ export async function registerRecurringJobs(): Promise<void> {
{ pattern: job.pattern },
{ data: {}, name: job.name },
);
logger.info({ queue: job.queue, job: job.name, pattern: job.pattern }, 'Registered recurring job');
logger.info(
{ queue: job.queue, job: job.name, pattern: job.pattern },
'Registered recurring job',
);
}
logger.info({ count: recurring.length }, 'All recurring jobs registered');

View File

@@ -28,6 +28,12 @@ export const maintenanceWorker = new Worker(
logger.info({ expired: result.length }, 'Form expiry check complete');
break;
}
case 'alerts-evaluate': {
const { runAlertEngine } = await import('@/lib/services/alert-engine');
const summary = await runAlertEngine();
logger.info(summary, 'Alert engine sweep complete');
break;
}
default:
logger.warn({ jobName: job.name }, 'Unknown maintenance job');
}

View File

@@ -0,0 +1,50 @@
/**
* Alert engine — runs every rule against every port. Called by the
* BullMQ recurring job 'alerts-evaluate' every 5 minutes; exposed as a
* function so integration tests can drive it without a worker.
*/
import { logger } from '@/lib/logger';
import { db } from '@/lib/db';
import { ports } from '@/lib/db/schema/ports';
import { reconcileAlertsForPort } from './alerts.service';
import { RULE_REGISTRY, listRuleIds } from './alert-rules';
export interface EngineRunSummary {
portsScanned: number;
rulesEvaluated: number;
errors: Array<{ portId: string; ruleId: string; message: string }>;
}
/** Evaluate every rule for every port, upsert + auto-resolve. */
export async function runAlertEngine(): Promise<EngineRunSummary> {
const allPorts = await db.select({ id: ports.id, slug: ports.slug }).from(ports);
return runAlertEngineForPorts(allPorts.map((p) => p.id));
}
/** Same engine scoped to a specific list of port IDs (used by tests + the
* per-port webhook trigger). */
export async function runAlertEngineForPorts(portIds: string[]): Promise<EngineRunSummary> {
const ruleIds = listRuleIds();
const errors: EngineRunSummary['errors'] = [];
for (const portId of portIds) {
for (const ruleId of ruleIds) {
try {
const candidates = await RULE_REGISTRY[ruleId](portId);
await reconcileAlertsForPort(portId, ruleId, candidates);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
logger.warn({ portId, ruleId, err }, 'alert rule evaluator failed');
errors.push({ portId, ruleId, message });
}
}
}
return {
portsScanned: portIds.length,
rulesEvaluated: portIds.length * ruleIds.length,
errors,
};
}

View File

@@ -0,0 +1,373 @@
/**
* Alert rule catalog. Each entry is a pure async function that takes a
* `portId` and returns an array of `AlertCandidate` rows the engine should
* upsert. The engine (in `alerts.service.ts`) handles dedupe via the
* fingerprint partial-unique index and auto-resolves stale alerts.
*
* Adding a rule:
* 1. Add the literal to `ALERT_RULES` in schema/insights.ts.
* 2. Implement the evaluator below.
* 3. Register it in `RULE_REGISTRY`.
* 4. Add a unit test in tests/unit/services/alert-rules-evaluators.test.ts.
*/
import { and, eq, isNull, isNotNull, lt, gt, gte, sql, inArray, or, desc } from 'drizzle-orm';
import { db } from '@/lib/db';
import { interests } from '@/lib/db/schema/interests';
import { berthReservations } from '@/lib/db/schema/reservations';
import { berths } from '@/lib/db/schema/berths';
import { documents, documentSigners } from '@/lib/db/schema/documents';
import { expenses } from '@/lib/db/schema/financial';
import { auditLogs } from '@/lib/db/schema/system';
import { alerts as alertsTable } from '@/lib/db/schema/insights';
import { ALERT_RULES, type AlertRuleId } from '@/lib/db/schema/insights';
import type { AlertCandidate } from './alerts.service';
type RuleEvaluator = (portId: string) => Promise<AlertCandidate[]>;
const DAY_MS = 86_400_000;
function daysAgo(n: number): Date {
return new Date(Date.now() - n * DAY_MS);
}
// ─── reservation.no_agreement ─────────────────────────────────────────────────
// Active reservations > 3 days old that have no reservation_agreement document
// in any non-cancelled state.
async function reservationNoAgreement(portId: string): Promise<AlertCandidate[]> {
const rows = await db
.select({
id: berthReservations.id,
startDate: berthReservations.startDate,
clientName: sql<string>`coalesce((SELECT full_name FROM clients WHERE id = ${berthReservations.clientId}), 'unknown')`,
yachtName: sql<string>`coalesce((SELECT name FROM yachts WHERE id = ${berthReservations.yachtId}), 'unknown')`,
})
.from(berthReservations)
.where(
and(
eq(berthReservations.portId, portId),
eq(berthReservations.status, 'active'),
lt(berthReservations.createdAt, daysAgo(3)),
sql`NOT EXISTS (
SELECT 1 FROM ${documents}
WHERE ${documents.reservationId} = ${berthReservations.id}
AND ${documents.documentType} = 'reservation_agreement'
AND ${documents.status} NOT IN ('cancelled', 'expired')
)`,
),
);
return rows.map((r) => ({
ruleId: 'reservation.no_agreement',
severity: 'warning',
title: `Reservation needs an agreement`,
body: `Active reservation for ${r.yachtName} (${r.clientName}) has no signed agreement yet.`,
link: `/[port]/berth-reservations/${r.id}`,
entityType: 'reservation',
entityId: r.id,
}));
}
// ─── interest.stale ───────────────────────────────────────────────────────────
// Pipeline stuck in mid-funnel stages with no contact for 14+ days.
async function interestStale(portId: string): Promise<AlertCandidate[]> {
const STALE_STAGES = ['details_sent', 'in_communication', 'visited'];
const rows = await db
.select({
id: interests.id,
stage: interests.pipelineStage,
lastContact: interests.dateLastContact,
clientName: sql<string>`coalesce((SELECT full_name FROM clients WHERE id = ${interests.clientId}), 'unknown')`,
})
.from(interests)
.where(
and(
eq(interests.portId, portId),
inArray(interests.pipelineStage, STALE_STAGES),
isNull(interests.archivedAt),
or(
lt(interests.dateLastContact, daysAgo(14)),
and(isNull(interests.dateLastContact), lt(interests.updatedAt, daysAgo(14))),
),
),
);
return rows.map((r) => ({
ruleId: 'interest.stale',
severity: 'info',
title: `Stale interest: ${r.clientName}`,
body: `In '${r.stage}' with no contact for 14+ days.`,
link: `/[port]/interests/${r.id}`,
entityType: 'interest',
entityId: r.id,
metadata: { stage: r.stage, lastContact: r.lastContact },
}));
}
// ─── document.expiring_soon ───────────────────────────────────────────────────
// In-flight signing documents whose expiry is within 7 days.
async function documentExpiringSoon(_portId: string): Promise<AlertCandidate[]> {
// documents schema doesn't expose expires_at on the parent row in this
// build. Until the column lands, fall back to no-op so the rule slot
// is registered but doesn't fire.
return [];
}
// ─── document.signer_overdue ──────────────────────────────────────────────────
// Pending signer for >14d, last reminder >7d ago (or never).
async function documentSignerOverdue(portId: string): Promise<AlertCandidate[]> {
const cutoff = daysAgo(14);
const rows = await db
.select({
docId: documents.id,
title: documents.title,
docType: documents.documentType,
signerId: documentSigners.id,
signerEmail: documentSigners.signerEmail,
signerName: documentSigners.signerName,
sentAt: documentSigners.createdAt,
})
.from(documents)
.innerJoin(documentSigners, eq(documentSigners.documentId, documents.id))
.where(
and(
eq(documents.portId, portId),
inArray(documents.status, ['sent', 'partially_signed']),
eq(documentSigners.status, 'pending'),
lt(documentSigners.createdAt, cutoff),
),
);
return rows.map((r) => ({
ruleId: 'document.signer_overdue',
severity: 'warning',
title: `Signer overdue: ${r.signerName}`,
body: `${r.docType.toUpperCase()} "${r.title}" — pending >14 days.`,
link: `/[port]/documents/${r.docId}`,
entityType: 'document',
entityId: r.docId,
metadata: { signerId: r.signerId, signerEmail: r.signerEmail, sentAt: r.sentAt },
}));
}
// ─── berth.under_offer_stalled ────────────────────────────────────────────────
// Berths sitting in 'under_offer' status for 30+ days.
async function berthUnderOfferStalled(portId: string): Promise<AlertCandidate[]> {
const rows = await db
.select({
id: berths.id,
mooringNumber: berths.mooringNumber,
updatedAt: berths.updatedAt,
})
.from(berths)
.where(
and(
eq(berths.portId, portId),
eq(berths.status, 'under_offer'),
lt(berths.updatedAt, daysAgo(30)),
),
);
return rows.map((r) => ({
ruleId: 'berth.under_offer_stalled',
severity: 'info',
title: `Berth ${r.mooringNumber} stalled under offer`,
body: `No status change in 30+ days.`,
link: `/[port]/berths/${r.id}`,
entityType: 'berth',
entityId: r.id,
metadata: { stalledSince: r.updatedAt },
}));
}
// ─── expense.duplicate ────────────────────────────────────────────────────────
// Expenses whose duplicate_of is set (the dedup service writes this).
async function expenseDuplicate(portId: string): Promise<AlertCandidate[]> {
const rows = await db
.select({
id: expenses.id,
vendor: expenses.establishmentName,
amount: expenses.amount,
duplicateOf: expenses.duplicateOf,
})
.from(expenses)
.where(
and(
eq(expenses.portId, portId),
isNotNull(expenses.duplicateOf),
isNull(expenses.archivedAt),
),
);
return rows.map((r) => ({
ruleId: 'expense.duplicate',
severity: 'info',
title: `Possible duplicate expense`,
body: `${r.vendor ?? 'Unknown vendor'}${r.amount}.`,
link: `/[port]/expenses/${r.id}`,
entityType: 'expense',
entityId: r.id,
metadata: { duplicateOf: r.duplicateOf },
}));
}
// ─── expense.unscanned ────────────────────────────────────────────────────────
// Expense uploaded with a receipt file but OCR didn't run / failed > 1h ago.
async function expenseUnscanned(portId: string): Promise<AlertCandidate[]> {
const rows = await db
.select({
id: expenses.id,
vendor: expenses.establishmentName,
ocrStatus: expenses.ocrStatus,
createdAt: expenses.createdAt,
})
.from(expenses)
.where(
and(
eq(expenses.portId, portId),
eq(expenses.ocrStatus, 'pending'),
sql`array_length(${expenses.receiptFileIds}, 1) > 0`,
lt(expenses.createdAt, new Date(Date.now() - 60 * 60 * 1000)),
isNull(expenses.archivedAt),
),
);
return rows.map((r) => ({
ruleId: 'expense.unscanned',
severity: 'info',
title: `Receipt not scanned`,
body: `${r.vendor ?? 'Unknown vendor'} — uploaded over an hour ago.`,
link: `/[port]/expenses/${r.id}`,
entityType: 'expense',
entityId: r.id,
}));
}
// ─── interest.high_value_silent ───────────────────────────────────────────────
// Hot leads with no contact for 7+ days. Highest severity in the catalog.
async function interestHighValueSilent(portId: string): Promise<AlertCandidate[]> {
const cutoff = daysAgo(7);
const rows = await db
.select({
id: interests.id,
stage: interests.pipelineStage,
clientName: sql<string>`coalesce((SELECT full_name FROM clients WHERE id = ${interests.clientId}), 'unknown')`,
})
.from(interests)
.where(
and(
eq(interests.portId, portId),
eq(interests.leadCategory, 'hot_lead'),
isNull(interests.archivedAt),
or(
lt(interests.dateLastContact, cutoff),
and(isNull(interests.dateLastContact), lt(interests.updatedAt, cutoff)),
),
),
);
return rows.map((r) => ({
ruleId: 'interest.high_value_silent',
severity: 'critical',
title: `Hot lead silent: ${r.clientName}`,
body: `No contact for 7+ days — high-value at risk.`,
link: `/[port]/interests/${r.id}`,
entityType: 'interest',
entityId: r.id,
metadata: { stage: r.stage },
}));
}
// ─── eoi.unsigned_long ────────────────────────────────────────────────────────
// EOI documents in 'sent' status for 21+ days.
async function eoiUnsignedLong(portId: string): Promise<AlertCandidate[]> {
const rows = await db
.select({
id: documents.id,
title: documents.title,
createdAt: documents.createdAt,
})
.from(documents)
.where(
and(
eq(documents.portId, portId),
eq(documents.documentType, 'eoi'),
inArray(documents.status, ['sent', 'partially_signed']),
lt(documents.createdAt, daysAgo(21)),
),
);
return rows.map((r) => ({
ruleId: 'eoi.unsigned_long',
severity: 'warning',
title: `EOI unsigned >21 days`,
body: `"${r.title}" — sent over 3 weeks ago.`,
link: `/[port]/documents/${r.id}`,
entityType: 'document',
entityId: r.id,
}));
}
// ─── audit.suspicious_login ───────────────────────────────────────────────────
// >3 failed logins from same IP in the past hour. Depends on the auth layer
// recording rows with action='login.failed' (TODO: instrument better-auth
// hooks to record these — until that lands, this evaluator returns [] and
// the rule slot stays inert).
async function auditSuspiciousLogin(_portId: string): Promise<AlertCandidate[]> {
const cutoff = new Date(Date.now() - 60 * 60 * 1000);
const rows = await db
.select({
ipAddress: auditLogs.ipAddress,
attempts: sql<number>`count(*)::int`,
})
.from(auditLogs)
.where(and(eq(auditLogs.action, 'login.failed'), gte(auditLogs.createdAt, cutoff)))
.groupBy(auditLogs.ipAddress)
.having(sql`count(*) > 3`);
return rows
.filter((r) => r.ipAddress)
.map((r) => ({
ruleId: 'audit.suspicious_login' as const,
severity: 'critical' as const,
title: `Repeated failed logins`,
body: `${r.attempts} failed attempts from ${r.ipAddress} in the last hour.`,
link: `/[port]/admin/audit?ip=${encodeURIComponent(r.ipAddress!)}`,
entityType: 'audit',
entityId: r.ipAddress!,
metadata: { attempts: r.attempts },
}));
}
export const RULE_REGISTRY: Record<AlertRuleId, RuleEvaluator> = {
'reservation.no_agreement': reservationNoAgreement,
'interest.stale': interestStale,
'document.expiring_soon': documentExpiringSoon,
'document.signer_overdue': documentSignerOverdue,
'berth.under_offer_stalled': berthUnderOfferStalled,
'expense.duplicate': expenseDuplicate,
'expense.unscanned': expenseUnscanned,
'interest.high_value_silent': interestHighValueSilent,
'eoi.unsigned_long': eoiUnsignedLong,
'audit.suspicious_login': auditSuspiciousLogin,
};
export function listRuleIds(): readonly AlertRuleId[] {
return ALERT_RULES;
}
// silence unused-import warnings until later PRs use them
const _unused = { gt, desc, alertsTable };
void _unused;

View File

@@ -0,0 +1,137 @@
/**
* Phase B alert framework — service layer.
*
* This is the skeleton: types, function shapes, and behaviour stubs. The
* actual rule evaluators live in `alert-rules.ts` (PR2). The cron
* dispatcher will compose this service with that catalogue.
*/
import { and, eq, isNull, sql } from 'drizzle-orm';
import { createHash } from 'crypto';
import { db } from '@/lib/db';
import { alerts, type Alert, type AlertSeverity, type AlertRuleId } from '@/lib/db/schema/insights';
import { emitToRoom } from '@/lib/socket/server';
export interface AlertCandidate {
ruleId: AlertRuleId;
severity: AlertSeverity;
title: string;
body?: string;
link: string;
entityType?: string;
entityId?: string;
metadata?: Record<string, unknown>;
}
/**
* Stable identity hash so re-evaluations of the same condition upsert
* onto the same row (via `idx_alerts_fingerprint_open`).
*/
export function fingerprintFor(
c: Pick<AlertCandidate, 'ruleId' | 'entityType' | 'entityId'>,
): string {
return createHash('sha1')
.update(`${c.ruleId}|${c.entityType ?? ''}|${c.entityId ?? ''}`)
.digest('hex');
}
/**
* Apply a batch of rule outputs against the open-alert table:
* - upsert open alerts (rule still firing)
* - resolve any open alert in scope whose fingerprint isn't in this batch
*/
export async function reconcileAlertsForPort(
portId: string,
ruleId: AlertRuleId,
candidates: AlertCandidate[],
): Promise<void> {
// Insert new / leave existing — only one open row per fingerprint
// thanks to the partial unique index. Track newly inserted rows so we
// can emit `alert:created` to the port room.
for (const c of candidates) {
const fingerprint = fingerprintFor(c);
const inserted = await db
.insert(alerts)
.values({
portId,
ruleId: c.ruleId,
severity: c.severity,
title: c.title,
body: c.body,
link: c.link,
entityType: c.entityType,
entityId: c.entityId,
fingerprint,
metadata: c.metadata ?? {},
})
.onConflictDoNothing()
.returning({ id: alerts.id });
if (inserted[0]) {
emitToRoom(`port:${portId}`, 'alert:created', {
alertId: inserted[0].id,
portId,
ruleId: c.ruleId,
severity: c.severity,
title: c.title,
link: c.link,
});
}
}
// Auto-resolve open alerts for this rule whose fingerprint disappeared.
const liveFingerprints = new Set(candidates.map((c) => fingerprintFor(c)));
const open = await db.query.alerts.findMany({
where: and(eq(alerts.portId, portId), eq(alerts.ruleId, ruleId), isNull(alerts.resolvedAt)),
});
const stale = open.filter((a) => !liveFingerprints.has(a.fingerprint));
for (const a of stale) {
await db
.update(alerts)
.set({ resolvedAt: sql`now()` })
.where(eq(alerts.id, a.id));
emitToRoom(`port:${portId}`, 'alert:resolved', {
alertId: a.id,
portId,
ruleId,
});
}
}
export async function dismissAlert(alertId: string, userId: string): Promise<void> {
const [row] = await db
.update(alerts)
.set({ dismissedAt: sql`now()`, dismissedBy: userId })
.where(eq(alerts.id, alertId))
.returning({ id: alerts.id, portId: alerts.portId });
if (row) {
emitToRoom(`port:${row.portId}`, 'alert:dismissed', { alertId: row.id, portId: row.portId });
}
}
export async function acknowledgeAlert(alertId: string, userId: string): Promise<void> {
await db
.update(alerts)
.set({ acknowledgedAt: sql`now()`, acknowledgedBy: userId })
.where(eq(alerts.id, alertId));
}
export interface ListAlertsOptions {
severity?: AlertSeverity[];
includeDismissed?: boolean;
includeResolved?: boolean;
}
export async function listAlertsForPort(
portId: string,
options: ListAlertsOptions = {},
): Promise<Alert[]> {
const conditions = [eq(alerts.portId, portId)];
if (!options.includeResolved) conditions.push(isNull(alerts.resolvedAt));
if (!options.includeDismissed) conditions.push(isNull(alerts.dismissedAt));
return db.query.alerts.findMany({
where: and(...conditions),
orderBy: (a, { desc }) => [desc(a.firedAt)],
limit: 100,
});
}

View File

@@ -0,0 +1,106 @@
/**
* Phase B analytics service. Reads pre-computed snapshots from
* `analytics_snapshots` keyed by `metric_id` and recomputes on demand if
* the cached row is older than `SNAPSHOT_TTL_MS`. The recomputation jobs
* land in `analytics-snapshot-job.ts` (PR3).
*/
import { and, eq } from 'drizzle-orm';
import { db } from '@/lib/db';
import { analyticsSnapshots } from '@/lib/db/schema/insights';
export type DateRange = '7d' | '30d' | '90d' | 'today';
export type MetricId =
| `pipeline_funnel.${DateRange}`
| `occupancy_timeline.${DateRange}`
| `revenue_breakdown.${DateRange}`
| `lead_source_attribution.${DateRange}`;
export const SNAPSHOT_TTL_MS = 15 * 60 * 1000; // 15 minutes
export interface PipelineFunnelData {
stages: Array<{ stage: string; count: number; conversionPct: number }>;
}
export interface OccupancyTimelineData {
points: Array<{ date: string; available: number; underOffer: number; sold: number }>;
}
export interface RevenueBreakdownData {
bars: Array<{ category: string; amount: number; currency: string }>;
}
export interface LeadSourceAttributionData {
slices: Array<{ source: string; count: number }>;
}
export type SnapshotData =
| PipelineFunnelData
| OccupancyTimelineData
| RevenueBreakdownData
| LeadSourceAttributionData;
/**
* Read a snapshot by `(portId, metricId)`. Returns null when missing or
* stale; the caller should request a recompute (or the recurring job
* eventually fills it).
*/
export async function readSnapshot<T extends SnapshotData>(
portId: string,
metricId: MetricId,
): Promise<T | null> {
const row = await db.query.analyticsSnapshots.findFirst({
where: and(eq(analyticsSnapshots.portId, portId), eq(analyticsSnapshots.metricId, metricId)),
});
if (!row) return null;
const age = Date.now() - row.computedAt.getTime();
if (age > SNAPSHOT_TTL_MS) return null;
return row.data as T;
}
export async function writeSnapshot(
portId: string,
metricId: MetricId,
data: SnapshotData,
): Promise<void> {
await db
.insert(analyticsSnapshots)
.values({ portId, metricId, data })
.onConflictDoUpdate({
target: [analyticsSnapshots.portId, analyticsSnapshots.metricId],
set: { data, computedAt: new Date() },
});
}
// Computation entrypoints — bodies land in PR3 along with the recurring
// snapshot job. Exported as no-op stubs so PR1's tsc/lint stay green.
export async function computePipelineFunnel(
_portId: string,
_range: DateRange,
): Promise<PipelineFunnelData> {
return { stages: [] };
}
export async function computeOccupancyTimeline(
_portId: string,
_range: DateRange,
): Promise<OccupancyTimelineData> {
return { points: [] };
}
export async function computeRevenueBreakdown(
_portId: string,
_range: DateRange,
): Promise<RevenueBreakdownData> {
return { bars: [] };
}
export async function computeLeadSourceAttribution(
_portId: string,
_range: DateRange,
): Promise<LeadSourceAttributionData> {
return { slices: [] };
}

View File

@@ -0,0 +1,72 @@
/**
* Audit log search — PR1 skeleton. PR10 fills in the cursor pagination
* and per-port + super-admin scoping; v1 already has the GIN index on
* `audit_logs.search_text`.
*/
import { and, desc, eq, gte, lte, sql, type SQL } from 'drizzle-orm';
import { db } from '@/lib/db';
import { auditLogs, type AuditLog } from '@/lib/db/schema/system';
export interface AuditSearchOptions {
/** Limit results to a single port. Omit for super-admin all-ports view. */
portId?: string;
/** Free-text query — runs against the GIN-indexed search_text column. */
q?: string;
/** Filter by actor (user id). */
userId?: string;
/** Filter by action verb: 'create' | 'update' | 'delete' | ... */
action?: string;
/** Filter by entity type: 'client' | 'interest' | 'document' | ... */
entityType?: string;
/** Filter by exact entity id (e.g. paste a uuid into search). */
entityId?: string;
/** Inclusive date range. */
from?: Date;
to?: Date;
/** Pagination — cursor on (createdAt, id). */
cursor?: { createdAt: Date; id: string };
limit?: number;
}
export interface AuditSearchPage {
rows: AuditLog[];
nextCursor: { createdAt: Date; id: string } | null;
}
export async function searchAuditLogs(options: AuditSearchOptions = {}): Promise<AuditSearchPage> {
const conds: SQL[] = [];
if (options.portId) conds.push(eq(auditLogs.portId, options.portId));
if (options.userId) conds.push(eq(auditLogs.userId, options.userId));
if (options.action) conds.push(eq(auditLogs.action, options.action));
if (options.entityType) conds.push(eq(auditLogs.entityType, options.entityType));
if (options.entityId) conds.push(eq(auditLogs.entityId, options.entityId));
if (options.from) conds.push(gte(auditLogs.createdAt, options.from));
if (options.to) conds.push(lte(auditLogs.createdAt, options.to));
if (options.q) {
// tsquery match against the GENERATED tsvector column.
conds.push(sql`${auditLogs.searchText} @@ plainto_tsquery('simple', ${options.q})`);
}
if (options.cursor) {
// Strict less-than on (createdAt, id) for stable cursor pagination.
conds.push(
sql`(${auditLogs.createdAt}, ${auditLogs.id}) < (${options.cursor.createdAt}, ${options.cursor.id})`,
);
}
const limit = Math.min(options.limit ?? 50, 200);
const rows = await db.query.auditLogs.findMany({
where: conds.length > 0 ? and(...conds) : undefined,
orderBy: [desc(auditLogs.createdAt), desc(auditLogs.id)],
limit: limit + 1,
});
const hasMore = rows.length > limit;
const truncated = hasMore ? rows.slice(0, limit) : rows;
const last = truncated[truncated.length - 1];
return {
rows: truncated,
nextCursor: hasMore && last ? { createdAt: last.createdAt, id: last.id } : null,
};
}

View File

@@ -0,0 +1,71 @@
/**
* Expense duplicate detection — heuristic match on
* (port + vendor + amount + date ± 3d). PR1 ships the function shape;
* PR8 wires the BullMQ trigger and the merge service.
*/
import { and, between, eq, ne, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { expenses } from '@/lib/db/schema/financial';
const DEDUP_WINDOW_DAYS = 3;
export interface DedupCandidate {
/** Existing expense that the new one likely duplicates. */
candidateId: string;
/** 0..1 confidence; 1.0 = exact vendor + amount + same day. */
confidence: number;
}
export async function scanForDuplicates(expenseId: string): Promise<DedupCandidate[]> {
const target = await db.query.expenses.findFirst({ where: eq(expenses.id, expenseId) });
if (!target) return [];
const { portId, establishmentName, amount, expenseDate } = target;
if (!establishmentName || !amount || !expenseDate) return [];
const lo = new Date(expenseDate);
lo.setDate(lo.getDate() - DEDUP_WINDOW_DAYS);
const hi = new Date(expenseDate);
hi.setDate(hi.getDate() + DEDUP_WINDOW_DAYS);
const matches = await db.query.expenses.findMany({
where: and(
eq(expenses.portId, portId),
sql`lower(${expenses.establishmentName}) = lower(${establishmentName})`,
eq(expenses.amount, amount),
between(expenses.expenseDate, lo, hi),
ne(expenses.id, expenseId),
),
limit: 5,
});
return matches.map((m) => ({
candidateId: m.id,
confidence: dayDiff(m.expenseDate, expenseDate) === 0 ? 1.0 : 0.85,
}));
}
function dayDiff(a: Date, b: Date): number {
const ms = Math.abs(a.getTime() - b.getTime());
return Math.round(ms / 86_400_000);
}
/** Mark an expense as a duplicate of the candidate with the highest score. */
export async function markBestDuplicate(expenseId: string): Promise<string | null> {
const candidates = await scanForDuplicates(expenseId);
if (candidates.length === 0) {
await db
.update(expenses)
.set({ dedupScannedAt: sql`now()` })
.where(eq(expenses.id, expenseId));
return null;
}
const best = candidates.reduce((a, b) => (a.confidence >= b.confidence ? a : b));
await db
.update(expenses)
.set({ duplicateOf: best.candidateId, dedupScannedAt: sql`now()` })
.where(eq(expenses.id, expenseId));
return best.candidateId;
}

View File

@@ -0,0 +1,47 @@
/**
* Claude Vision-driven OCR for expense receipts. PR1 stub: types and the
* service contract. The actual API call wires up in PR9 with prompt
* caching on the system text and Haiku 4.5 by default.
*/
export interface OcrLineItem {
description: string;
quantity?: number;
unitPrice?: number;
amount: number;
}
export interface OcrResult {
vendor: string | null;
amount: number | null;
currency: string | null;
/** ISO date YYYY-MM-DD. */
date: string | null;
lineItems: OcrLineItem[];
/** 0..1; below 0.6 surfaces "verify mode" UI. */
confidence: number;
}
export interface OcrContext {
fileId: string;
fileUrl: string;
/** Optional MIME hint; the service still detects from bytes. */
mimeType?: string;
}
/** Cost ceiling per call (Haiku 4.5 + cached system prompt). PR9 enforces. */
export const OCR_MAX_TOKENS = 1024;
export const OCR_LOW_CONFIDENCE_THRESHOLD = 0.6;
/** Stub — returns "pending" shape so callers can wire UI in PR1 without
* Anthropic credentials. */
export async function ocrReceipt(_ctx: OcrContext): Promise<OcrResult> {
return {
vendor: null,
amount: null,
currency: null,
date: null,
lineItems: [],
confidence: 0,
};
}

View File

@@ -246,6 +246,18 @@ export interface ServerToClientEvents {
}) => void;
'file:updated': (payload: { fileId: string; changedFields?: string[] }) => void;
'file:deleted': (payload: { fileId: string; filename?: string }) => void;
// Phase B alert framework
'alert:created': (payload: {
alertId: string;
portId: string;
ruleId: string;
severity: 'info' | 'warning' | 'critical';
title: string;
link: string;
}) => void;
'alert:resolved': (payload: { alertId: string; portId: string; ruleId: string }) => void;
'alert:dismissed': (payload: { alertId: string; portId: string }) => void;
}
// Client → Server events (minimal — most actions go through REST API)

View File

@@ -0,0 +1,206 @@
/**
* Engine integration test — drives `runAlertEngineForPorts` against
* seeded conditions and asserts: (1) correct alerts upsert, (2) running
* twice doesn't duplicate, (3) mutating state auto-resolves stale alerts.
*
* Socket emissions are stubbed via vi.mock so the test stays offline.
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { and, eq, isNull } from 'drizzle-orm';
vi.mock('@/lib/socket/server', () => ({
emitToRoom: vi.fn(),
}));
import { db } from '@/lib/db';
import { alerts } from '@/lib/db/schema/insights';
import { interests } from '@/lib/db/schema/interests';
import { berthReservations } from '@/lib/db/schema/reservations';
import { documents } from '@/lib/db/schema/documents';
import { runAlertEngineForPorts } from '@/lib/services/alert-engine';
import { makePort, makeClient, makeBerth, makeYacht } from '../helpers/factories';
async function clearAlerts(portId: string) {
await db.delete(alerts).where(eq(alerts.portId, portId));
}
async function listOpenAlerts(portId: string, ruleId: string) {
return db
.select()
.from(alerts)
.where(and(eq(alerts.portId, portId), eq(alerts.ruleId, ruleId), isNull(alerts.resolvedAt)));
}
describe('alert engine', () => {
beforeEach(() => {
vi.clearAllMocks();
});
it('reservation.no_agreement fires for active reservation older than 3 days without agreement', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
const berth = await makeBerth({ portId: port.id });
const yacht = await makeYacht({
portId: port.id,
ownerType: 'client',
ownerId: client.id,
overrides: { name: 'M/Y Test' },
});
const fourDaysAgo = new Date(Date.now() - 4 * 86_400_000);
const [resv] = await db
.insert(berthReservations)
.values({
portId: port.id,
berthId: berth.id,
clientId: client.id,
yachtId: yacht.id,
status: 'active',
startDate: new Date(),
createdBy: 'seed',
createdAt: fourDaysAgo,
})
.returning();
expect(resv).toBeDefined();
await clearAlerts(port.id);
await runAlertEngineForPorts([port.id]);
const open = await listOpenAlerts(port.id, 'reservation.no_agreement');
expect(open).toHaveLength(1);
expect(open[0]!.entityId).toBe(resv!.id);
expect(open[0]!.severity).toBe('warning');
});
it('does not duplicate on a second sweep', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
const berth = await makeBerth({ portId: port.id });
const yacht = await makeYacht({
portId: port.id,
ownerType: 'client',
ownerId: client.id,
});
const stale = new Date(Date.now() - 10 * 86_400_000);
await db.insert(berthReservations).values({
portId: port.id,
berthId: berth.id,
clientId: client.id,
yachtId: yacht.id,
status: 'active',
startDate: new Date(),
createdBy: 'seed',
createdAt: stale,
});
await clearAlerts(port.id);
await runAlertEngineForPorts([port.id]);
await runAlertEngineForPorts([port.id]);
const open = await listOpenAlerts(port.id, 'reservation.no_agreement');
expect(open).toHaveLength(1);
});
it('auto-resolves an open alert when the underlying condition clears', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
const berth = await makeBerth({ portId: port.id });
const yacht = await makeYacht({
portId: port.id,
ownerType: 'client',
ownerId: client.id,
});
const tenDaysAgo = new Date(Date.now() - 10 * 86_400_000);
const [resv] = await db
.insert(berthReservations)
.values({
portId: port.id,
berthId: berth.id,
clientId: client.id,
yachtId: yacht.id,
status: 'active',
startDate: new Date(),
createdBy: 'seed',
createdAt: tenDaysAgo,
})
.returning();
await clearAlerts(port.id);
await runAlertEngineForPorts([port.id]);
expect(await listOpenAlerts(port.id, 'reservation.no_agreement')).toHaveLength(1);
// Add an agreement document — condition no longer fires.
await db.insert(documents).values({
portId: port.id,
reservationId: resv!.id,
documentType: 'reservation_agreement',
title: 'Reservation Agreement',
status: 'sent',
createdBy: 'seed',
});
await runAlertEngineForPorts([port.id]);
expect(await listOpenAlerts(port.id, 'reservation.no_agreement')).toHaveLength(0);
const allRows = await db
.select()
.from(alerts)
.where(and(eq(alerts.portId, port.id), eq(alerts.ruleId, 'reservation.no_agreement')));
expect(allRows).toHaveLength(1);
expect(allRows[0]!.resolvedAt).not.toBeNull();
});
it('interest.stale fires for old leads in mid-funnel stages', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
const stale = new Date(Date.now() - 30 * 86_400_000);
const [interest] = await db
.insert(interests)
.values({
portId: port.id,
clientId: client.id,
pipelineStage: 'in_communication',
dateLastContact: stale,
createdAt: stale,
updatedAt: stale,
})
.returning();
await clearAlerts(port.id);
await runAlertEngineForPorts([port.id]);
const open = await listOpenAlerts(port.id, 'interest.stale');
expect(open).toHaveLength(1);
expect(open[0]!.entityId).toBe(interest!.id);
expect(open[0]!.severity).toBe('info');
});
it('interest.high_value_silent fires for hot leads silent >7d', async () => {
const port = await makePort();
const client = await makeClient({ portId: port.id });
const stale = new Date(Date.now() - 10 * 86_400_000);
await db.insert(interests).values({
portId: port.id,
clientId: client.id,
pipelineStage: 'visited',
leadCategory: 'hot_lead',
dateLastContact: stale,
updatedAt: stale,
});
await clearAlerts(port.id);
await runAlertEngineForPorts([port.id]);
const open = await listOpenAlerts(port.id, 'interest.high_value_silent');
expect(open).toHaveLength(1);
expect(open[0]!.severity).toBe('critical');
});
it('engine reports rule errors without crashing the sweep', async () => {
const port = await makePort();
const summary = await runAlertEngineForPorts([port.id]);
expect(summary.portsScanned).toBe(1);
expect(summary.rulesEvaluated).toBeGreaterThan(0);
// No conditions seeded — no rules should fail.
expect(summary.errors).toHaveLength(0);
});
});