feat(phase-b): ship analytics dashboard, alerts, scanner PWA, dedup, audit view

Phase B (Insights & Alerts) PR4-11 in one drop. Builds on the schema +
service skeletons committed in PRs 1-3.

PR4  Analytics dashboard — 4 chart types (funnel/timeline/breakdown/source),
     date-range picker (today/7d/30d/90d), CSV+PNG export per card.
PR5  Alert rail UI + /alerts page — topbar bell w/ live count, dashboard
     right-rail, three-tab page (active/dismissed/resolved), socket-driven
     invalidation. Bell lazy-loads list on popover open to keep cold pages
     fast in non-dashboard routes.
PR6  EOI queue tab on documents hub — filters to in-flight EOIs, count
     surfaces in tab label.
PR7  Interests-by-berth tab on berth detail — replaces the stub.
PR8  Expense duplicate detection — BullMQ job runs scan on create, yellow
     banner on detail w/ Merge / Not-a-duplicate, transactional merge
     consolidates receipts and archives the source.
PR9  Receipt scanner PWA + multi-provider AI — port-scoped /scan route in
     its own (scanner) group with no dashboard chrome, dynamic per-port
     manifest, OpenAI + Claude provider abstraction, admin OCR settings
     page (port-level + super-admin global default w/ opt-in fallback),
     test-connection endpoint, manual-entry fallback when no key is
     configured. Verify form always shown before save — no ghost rows.
PR10 Audit log read view — swap to tsvector full-text search on the
     existing GIN index, cursor pagination, filters for entity/action/user
     /date range, batched actor-email resolution.
PR11 Real-API tests — opt-in receipt-ocr.spec (admin save+test, optional
     real-receipt parse via REALAPI_RECEIPT_FIXTURE) and alert-engine
     socket-fanout spec gated behind RUN_ALERT_ENGINE_REALAPI. Both skip
     cleanly without their gate envs so CI stays green.

Test totals: vitest 690 -> 713, smoke 130 -> 138, realapi +2 opt-in.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Matt Ciaccio
2026-04-28 17:21:55 +02:00
parent 2fa70f4582
commit f52d21df83
63 changed files with 4459 additions and 206 deletions

View File

@@ -48,6 +48,17 @@ export const maintenanceWorker = new Worker(
logger.info({ count: allPorts.length }, 'Analytics snapshot refresh complete');
break;
}
case 'expense-dedup-scan': {
const { expenseId } = job.data as { expenseId: string };
if (!expenseId) {
logger.warn({ jobId: job.id }, 'expense-dedup-scan missing expenseId');
break;
}
const { markBestDuplicate } = await import('@/lib/services/expense-dedup.service');
const matchedId = await markBestDuplicate(expenseId);
logger.info({ expenseId, matchedId: matchedId ?? null }, 'expense-dedup-scan complete');
break;
}
default:
logger.warn({ jobName: job.name }, 'Unknown maintenance job');
}

View File

@@ -50,8 +50,11 @@ export async function searchAuditLogs(options: AuditSearchOptions = {}): Promise
}
if (options.cursor) {
// Strict less-than on (createdAt, id) for stable cursor pagination.
// ISO-stringify the date so postgres-js binds it cleanly inside a tuple
// comparison; raw Date objects throw under postgres@3.x parameter binding.
const cursorAt = options.cursor.createdAt.toISOString();
conds.push(
sql`(${auditLogs.createdAt}, ${auditLogs.id}) < (${options.cursor.createdAt}, ${options.cursor.id})`,
sql`(${auditLogs.createdAt}, ${auditLogs.id}) < (${cursorAt}::timestamptz, ${options.cursor.id})`,
);
}

View File

@@ -60,6 +60,13 @@ function buildHubTabFilters(
if (!tab || tab === 'all') return filters;
switch (tab) {
case 'eoi_queue':
// EOI documents currently in-flight (drafted, sent, or partially signed).
// Used by the dedicated tab on the documents hub to triage EOI signing
// pipeline volume separate from the all-doc-types view.
filters.push(eq(documents.documentType, 'eoi'));
filters.push(inArray(documents.status, ['draft', 'sent', 'partially_signed']));
break;
case 'awaiting_them':
// "awaiting them" = pending signers other than the current user.
// Without a known caller email we cannot make that distinction, so
@@ -209,6 +216,7 @@ export async function listDocuments(
export interface HubTabCounts {
all: number;
eoi_queue: number;
awaiting_them: number;
awaiting_me: number;
completed: number;
@@ -233,15 +241,16 @@ export async function getHubTabCounts(
return row?.count ?? 0;
}
const [all, awaiting_them, awaiting_me, completed, expired] = await Promise.all([
const [all, eoi_queue, awaiting_them, awaiting_me, completed, expired] = await Promise.all([
tabCount('all'),
tabCount('eoi_queue'),
tabCount('awaiting_them'),
tabCount('awaiting_me'),
tabCount('completed'),
tabCount('expired'),
]);
return { all, awaiting_them, awaiting_me, completed, expired };
return { all, eoi_queue, awaiting_them, awaiting_me, completed, expired };
}
// ─── Get by ID ────────────────────────────────────────────────────────────────

View File

@@ -69,3 +69,59 @@ export async function markBestDuplicate(expenseId: string): Promise<string | nul
.where(eq(expenses.id, expenseId));
return best.candidateId;
}
/**
* Clear the duplicate flag — operator confirmed this is a real expense.
* Leaves `dedupScannedAt` populated so the engine doesn't re-flag it.
*/
export async function clearDuplicate(expenseId: string, portId: string): Promise<void> {
await db
.update(expenses)
.set({ duplicateOf: null, dedupScannedAt: sql`now()` })
.where(and(eq(expenses.id, expenseId), eq(expenses.portId, portId)));
}
/**
* Merge `sourceId` into `targetId`: combine receipt files, archive the
* source, and clear the duplicate-of pointer. Both rows must belong to
* the same port; runs inside a single transaction so a partial failure
* leaves both rows untouched.
*/
export async function mergeDuplicate(
sourceId: string,
targetId: string,
portId: string,
): Promise<void> {
if (sourceId === targetId) {
throw new Error('Cannot merge an expense into itself');
}
await db.transaction(async (tx) => {
const [source] = await tx
.select()
.from(expenses)
.where(and(eq(expenses.id, sourceId), eq(expenses.portId, portId)));
const [target] = await tx
.select()
.from(expenses)
.where(and(eq(expenses.id, targetId), eq(expenses.portId, portId)));
if (!source || !target) {
throw new Error('Source or target expense not found in this port');
}
const mergedReceipts = Array.from(
new Set([...(target.receiptFileIds ?? []), ...(source.receiptFileIds ?? [])]),
);
await tx
.update(expenses)
.set({ receiptFileIds: mergedReceipts })
.where(eq(expenses.id, targetId));
// Archive the source — preserves audit history, keeps any FKs alive.
await tx
.update(expenses)
.set({ archivedAt: sql`now()`, duplicateOf: null })
.where(eq(expenses.id, sourceId));
});
}

View File

@@ -11,7 +11,11 @@ import { NotFoundError, ConflictError } from '@/lib/errors';
import { emitToRoom } from '@/lib/socket/server';
import { convert } from '@/lib/services/currency';
import { logger } from '@/lib/logger';
import type { CreateExpenseInput, UpdateExpenseInput, ListExpensesInput } from '@/lib/validators/expenses';
import type {
CreateExpenseInput,
UpdateExpenseInput,
ListExpensesInput,
} from '@/lib/validators/expenses';
export type { ListExpensesInput };
@@ -59,7 +63,10 @@ export async function listExpenses(portId: string, query: ListExpensesInput) {
includeArchived: query.includeArchived,
archivedAtColumn: expenses.archivedAt,
sort: query.sort
? { column: expenses[query.sort as keyof typeof expenses] as unknown as PgColumn, direction: query.order }
? {
column: expenses[query.sort as keyof typeof expenses] as unknown as PgColumn,
direction: query.order,
}
: undefined,
});
}
@@ -87,7 +94,10 @@ export async function createExpense(
exchangeRate = String(conversion.rate);
} else {
// BR-040: if rate unavailable, save without conversion + log warning
logger.warn({ currency: data.currency }, 'Currency rate unavailable, saving expense without USD conversion');
logger.warn(
{ currency: data.currency },
'Currency rate unavailable, saving expense without USD conversion',
);
}
} else {
amountUsd = String(data.amount);
@@ -137,6 +147,15 @@ export async function createExpense(
category: expense.category ?? '',
});
// Schedule a duplicate-detection sweep. Best-effort — we don't want a
// queue-side hiccup to fail the user's create.
try {
const { getQueue } = await import('@/lib/queue');
await getQueue('maintenance').add('expense-dedup-scan', { expenseId: expense.id });
} catch (err) {
logger.warn({ err, expenseId: expense.id }, 'Failed to enqueue expense-dedup-scan');
}
return expense;
}
@@ -161,7 +180,10 @@ export async function updateExpense(
updateData.amountUsd = String(conversion.result);
updateData.exchangeRate = String(conversion.rate);
} else {
logger.warn({ currency: newCurrency }, 'Currency rate unavailable during update, clearing USD conversion');
logger.warn(
{ currency: newCurrency },
'Currency rate unavailable during update, clearing USD conversion',
);
updateData.amountUsd = null;
updateData.exchangeRate = null;
}
@@ -204,11 +226,7 @@ export async function updateExpense(
return updated;
}
export async function archiveExpense(
id: string,
portId: string,
meta: ServiceAuditMeta,
) {
export async function archiveExpense(id: string, portId: string, meta: ServiceAuditMeta) {
const existing = await getExpenseById(id, portId);
// BR-045: Check if linked to non-draft invoice
@@ -216,12 +234,7 @@ export async function archiveExpense(
.select({ invoiceId: invoiceExpenses.invoiceId })
.from(invoiceExpenses)
.innerJoin(invoices, eq(invoices.id, invoiceExpenses.invoiceId))
.where(
and(
eq(invoiceExpenses.expenseId, id),
sql`${invoices.status} != 'draft'`,
),
)
.where(and(eq(invoiceExpenses.expenseId, id), sql`${invoices.status} != 'draft'`))
.limit(1);
if (linkedInvoice.length > 0) {
@@ -244,11 +257,7 @@ export async function archiveExpense(
emitToRoom(`port:${portId}`, 'expense:archived', { expenseId: id });
}
export async function restoreExpense(
id: string,
portId: string,
meta: ServiceAuditMeta,
) {
export async function restoreExpense(id: string, portId: string, meta: ServiceAuditMeta) {
await getExpenseById(id, portId);
await restore(expenses, expenses.id, id);

View File

@@ -0,0 +1,157 @@
/**
* OCR provider config — stored in `system_settings` under the key
* `ocr.config`. Each port can either have its own row (port_id = port.id)
* or opt into the global row (port_id = null) by setting `useGlobal: true`.
*/
import { and, eq, isNull } from 'drizzle-orm';
import { db } from '@/lib/db';
import { systemSettings } from '@/lib/db/schema/system';
import { encrypt, decrypt } from '@/lib/utils/encryption';
export type OcrProvider = 'openai' | 'claude';
export const OCR_MODELS: Record<OcrProvider, string[]> = {
openai: ['gpt-4o-mini', 'gpt-4o', 'gpt-4-turbo'],
claude: ['claude-haiku-4-5', 'claude-sonnet-4-6', 'claude-opus-4-7'],
};
export const DEFAULT_MODEL: Record<OcrProvider, string> = {
openai: 'gpt-4o-mini',
claude: 'claude-haiku-4-5',
};
/** Public shape that admin UIs read — never includes the raw key. */
export interface OcrConfigPublic {
provider: OcrProvider;
model: string;
/** True when an encrypted key is present. We never echo the key itself. */
hasApiKey: boolean;
/** Port-level rows can opt into the global config. */
useGlobal: boolean;
}
/** Internal shape including the decrypted key — server-side only. */
export interface OcrConfigResolved extends OcrConfigPublic {
apiKey: string | null;
/** Source of the resolved row: 'port' | 'global' | 'none'. */
source: 'port' | 'global' | 'none';
}
interface StoredOcrConfig {
provider: OcrProvider;
model: string;
apiKeyEncrypted: string | null;
useGlobal: boolean;
}
const KEY = 'ocr.config';
async function readRow(portId: string | null): Promise<StoredOcrConfig | null> {
const where =
portId === null
? and(eq(systemSettings.key, KEY), isNull(systemSettings.portId))
: and(eq(systemSettings.key, KEY), eq(systemSettings.portId, portId));
const [row] = await db.select().from(systemSettings).where(where);
if (!row) return null;
return row.value as unknown as StoredOcrConfig;
}
async function writeRow(portId: string | null, value: StoredOcrConfig, userId: string) {
// upsert: delete + insert keeps logic simple given the (key, port_id) unique index.
await db
.delete(systemSettings)
.where(
portId === null
? and(eq(systemSettings.key, KEY), isNull(systemSettings.portId))
: and(eq(systemSettings.key, KEY), eq(systemSettings.portId, portId)),
);
await db.insert(systemSettings).values({
key: KEY,
portId,
value: value as unknown as Record<string, unknown>,
updatedBy: userId,
});
}
/**
* Resolve the active OCR config for a port: port row (unless `useGlobal`),
* falling back to the global row, falling back to a default-empty config.
*/
export async function getResolvedOcrConfig(portId: string): Promise<OcrConfigResolved> {
const portRow = await readRow(portId);
const useGlobal = portRow?.useGlobal === true || !portRow;
const sourceRow = useGlobal ? await readRow(null) : portRow;
if (!sourceRow) {
return {
provider: 'openai',
model: DEFAULT_MODEL.openai,
apiKey: null,
hasApiKey: false,
useGlobal: portRow?.useGlobal === true,
source: 'none',
};
}
return {
provider: sourceRow.provider,
model: sourceRow.model,
apiKey: sourceRow.apiKeyEncrypted ? decrypt(sourceRow.apiKeyEncrypted) : null,
hasApiKey: Boolean(sourceRow.apiKeyEncrypted),
useGlobal: portRow?.useGlobal === true,
source: useGlobal ? 'global' : 'port',
};
}
/** Public-safe view for the admin UI — same shape but never the key. */
export async function getPublicOcrConfig(portId: string | null): Promise<OcrConfigPublic> {
const row = await readRow(portId);
if (!row) {
return {
provider: 'openai',
model: DEFAULT_MODEL.openai,
hasApiKey: false,
useGlobal: false,
};
}
return {
provider: row.provider,
model: row.model,
hasApiKey: Boolean(row.apiKeyEncrypted),
useGlobal: row.useGlobal,
};
}
export interface SaveOcrConfigInput {
provider: OcrProvider;
model: string;
/** When provided, replaces any stored key. When undefined, the existing key is preserved. */
apiKey?: string;
/** When true, clears the stored key. */
clearApiKey?: boolean;
useGlobal?: boolean;
}
export async function saveOcrConfig(
portId: string | null,
input: SaveOcrConfigInput,
userId: string,
): Promise<void> {
const existing = await readRow(portId);
let apiKeyEncrypted = existing?.apiKeyEncrypted ?? null;
if (input.clearApiKey) {
apiKeyEncrypted = null;
} else if (input.apiKey !== undefined && input.apiKey.length > 0) {
apiKeyEncrypted = encrypt(input.apiKey);
}
await writeRow(
portId,
{
provider: input.provider,
model: input.model,
apiKeyEncrypted,
useGlobal: portId === null ? false : Boolean(input.useGlobal),
},
userId,
);
}

View File

@@ -0,0 +1,172 @@
/**
* Receipt OCR provider adapters. Each adapter takes raw image bytes
* and returns a normalized `ParsedReceipt` shape; callers don't care
* which provider produced it.
*/
import OpenAI from 'openai';
import { logger } from '@/lib/logger';
export interface ParsedReceiptLineItem {
description: string;
amount: number;
}
export interface ParsedReceipt {
establishment: string | null;
/** ISO YYYY-MM-DD. */
date: string | null;
amount: number | null;
currency: string | null;
lineItems: ParsedReceiptLineItem[];
/** 0..1; below 0.6 surfaces "verify mode" UI. */
confidence: number;
}
const EMPTY_RESULT: ParsedReceipt = {
establishment: null,
date: null,
amount: null,
currency: null,
lineItems: [],
confidence: 0,
};
const SYSTEM_PROMPT =
'You extract structured data from a marina-business receipt image. Return ONLY a JSON object with these keys: establishment (string), date (ISO YYYY-MM-DD), amount (number, total), currency (3-letter ISO code), lineItems (array of {description, amount}), confidence (number 0-1). If a field cannot be read, return null for that field. Set confidence near 0 if the image is unreadable, near 1 if every field was confidently extracted.';
interface RunArgs {
imageBuffer: Buffer;
mimeType: string;
apiKey: string;
model: string;
}
function safeParse(content: string): ParsedReceipt {
const cleaned = content.replace(/```json\n?|\n?```/g, '').trim();
try {
const obj = JSON.parse(cleaned) as Partial<ParsedReceipt>;
return {
establishment: obj.establishment ?? null,
date: obj.date ?? null,
amount: typeof obj.amount === 'number' ? obj.amount : null,
currency: obj.currency ?? null,
lineItems: Array.isArray(obj.lineItems) ? obj.lineItems : [],
confidence: typeof obj.confidence === 'number' ? obj.confidence : 0,
};
} catch (err) {
logger.warn({ err, contentLen: cleaned.length }, 'OCR provider returned non-JSON');
return EMPTY_RESULT;
}
}
async function runOpenAi({
imageBuffer,
mimeType,
apiKey,
model,
}: RunArgs): Promise<ParsedReceipt> {
const client = new OpenAI({ apiKey });
const base64 = imageBuffer.toString('base64');
const response = await client.chat.completions.create({
model,
messages: [
{ role: 'system', content: SYSTEM_PROMPT },
{
role: 'user',
content: [
{ type: 'text', text: 'Extract the receipt as JSON.' },
{
type: 'image_url',
image_url: { url: `data:${mimeType};base64,${base64}` },
},
],
},
],
max_tokens: 1024,
response_format: { type: 'json_object' },
});
return safeParse(response.choices[0]?.message?.content ?? '{}');
}
async function runClaude({
imageBuffer,
mimeType,
apiKey,
model,
}: RunArgs): Promise<ParsedReceipt> {
const base64 = imageBuffer.toString('base64');
const res = await fetch('https://api.anthropic.com/v1/messages', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': apiKey,
'anthropic-version': '2023-06-01',
},
body: JSON.stringify({
model,
max_tokens: 1024,
system: SYSTEM_PROMPT,
messages: [
{
role: 'user',
content: [
{
type: 'image',
source: { type: 'base64', media_type: mimeType, data: base64 },
},
{ type: 'text', text: 'Extract the receipt as JSON.' },
],
},
],
}),
});
if (!res.ok) {
const detail = await res.text().catch(() => '');
throw new Error(`Claude API ${res.status}: ${detail.slice(0, 200)}`);
}
const body = (await res.json()) as { content?: Array<{ type: string; text?: string }> };
const text = body.content?.find((c) => c.type === 'text')?.text ?? '{}';
return safeParse(text);
}
export async function runOcr(args: {
provider: 'openai' | 'claude';
imageBuffer: Buffer;
mimeType: string;
apiKey: string;
model: string;
}): Promise<ParsedReceipt> {
if (args.provider === 'openai') return runOpenAi(args);
return runClaude(args);
}
/**
* Tiny dummy-image probe used by the admin "Test connection" button.
* Returns the raw HTTP status so callers can render plain-English errors.
*/
export async function testProvider(
provider: 'openai' | 'claude',
apiKey: string,
model: string,
): Promise<{ ok: true } | { ok: false; reason: string }> {
// 1×1 transparent PNG.
const pixelPng = Buffer.from(
'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNk+A8AAQUBAScY42YAAAAASUVORK5CYII=',
'base64',
);
try {
await runOcr({
provider,
imageBuffer: pixelPng,
mimeType: 'image/png',
apiKey,
model,
});
return { ok: true };
} catch (err) {
const reason = err instanceof Error ? err.message : 'Unknown error';
return { ok: false, reason };
}
}

View File

@@ -71,6 +71,7 @@ export type CreateDocumentWizardInput = z.infer<typeof createDocumentWizardSchem
export const documentsHubTabs = [
'all',
'eoi_queue',
'awaiting_them',
'awaiting_me',
'completed',