fix(audit): import cluster — M27 (commit idempotency), M25 (in-file dedup preview), M26 (undo destructive-update reporting), L33 (mapping/mooring), L35 (port-auth doc)

M25 DB unique-index backstop deferred: needs a migration (column + backfill +
insert-stamp trigger + dedup) — tracked as a follow-up. The classify in-file
dedup (preview accuracy) ships now.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-02 12:41:00 +02:00
parent 9305c030de
commit 25988dbfad
6 changed files with 269 additions and 24 deletions

View File

@@ -7,11 +7,28 @@ import { createBerth, updateBerth } from '@/lib/services/berths.service';
import type { ImportAdapter, MappedRow } from '../types';
/**
* Accepted import spellings of a mooring: letters, an optional separating
* hyphen, optional leading zeros, then 16 digits. The 6-digit cap (audit
* L33(b)) rejects absurd numbers that would overflow JS's safe-integer range
* during canonicalization — a real marina mooring is at most a few thousand.
* Canonicalization strips the hyphen + leading zeros and upper-cases the
* letters, so the *output* always conforms to the canonical `^[A-Z]+\d+$`.
*/
const MOORING_INPUT_RE = /^[A-Za-z]+-?0*\d{1,6}$/;
/** Canonical stored form — the post-canonicalization invariant. */
const MOORING_CANON_RE = /^[A-Z]+\d+$/;
/** Canonicalize a mooring to the unified `^[A-Z]+\d+$` form ("A1", "D32"):
* uppercase letters, drop a hyphen + leading zeros on the number. */
* uppercase letters, drop a hyphen + leading zeros on the number. The number
* is normalized digit-wise (no parseInt) so values up to the 6-digit input
* cap survive without floating-point/MAX_SAFE_INTEGER precision loss. */
function canonMoo(raw: string): string {
const m = /^([A-Za-z]+)-?0*(\d+)$/.exec(raw.trim());
return m ? `${m[1]!.toUpperCase()}${parseInt(m[2]!, 10)}` : raw.trim().toUpperCase();
if (!m) return raw.trim().toUpperCase();
// Drop leading zeros without parseInt; keep a lone "0" as "0".
const digits = m[2]!.replace(/^0+(?=\d)/, '');
return `${m[1]!.toUpperCase()}${digits}`;
}
const num = (s: string | undefined): number | undefined =>
@@ -28,7 +45,12 @@ export const berthsAdapter: ImportAdapter = {
label: 'Mooring number',
required: true,
aliases: ['mooring', 'berth', 'berthnumber'],
zod: z.string().regex(/^[A-Za-z]+-?0*\d+$/, 'Use a form like A1, B12, E18'),
zod: z
.string()
.regex(MOORING_INPUT_RE, 'Use a form like A1, B12, E18 (max 6 digits)')
// Defense in depth: whatever the input spelling, the canonical form
// must conform to ^[A-Z]+\d+$ (audit L33(b)).
.refine((v) => MOORING_CANON_RE.test(canonMoo(v)), 'Invalid mooring format'),
},
{
key: 'area',

View File

@@ -42,6 +42,16 @@ export async function classifyRow(
rowNumber: number,
policy: ConflictPolicy,
ctx: ImportCtx,
/**
* Match keys already emitted by *earlier rows of the same file*. When set,
* an insert whose natural key was already seen in-file is re-routed through
* the same conflict branch a DB match would take — so the dry-run preview
* reflects what the sequential commit actually does (audit M25: previously
* two file rows sharing one brand-new email both classified `insert`, but the
* commit turns row 2 into a skip/update/error once row 1 is live). Mutated by
* this function: the row's resolved key is added on classification.
*/
inFileSeen?: Set<string>,
): Promise<ClassifiedRow> {
const mapped = applyMapping(raw, mapping);
@@ -56,26 +66,37 @@ export async function classifyRow(
resolved = fk.resolved;
}
// Dedup by natural key.
// Dedup by natural key — first against the DB, then (M25) against earlier
// rows of this same file so the preview matches the commit's sequential
// classify-then-insert ordering.
const key = adapter.matchKey(mapped);
const existing = key ? await adapter.findExisting(ctx.portId, key) : null;
const matchedInFile = !existing && key != null && (inFileSeen?.has(key) ?? false);
if (existing) {
if (existing || matchedInFile) {
// existingId is the DB row's id when one exists; for an in-file predecessor
// there is no id yet at dry-run time (it gets created earlier in the same
// commit pass), so it's left undefined.
const existingId = existing?.id;
if (policy === 'error-on-match') {
return {
rowNumber,
outcome: 'error',
existingId: existing.id,
existingId,
errors: [{ field: '*', message: 'Matches an existing record' }],
};
}
if (policy === 'update-matches' && adapter.update) {
return { rowNumber, outcome: 'update', existingId: existing.id, mapped, resolved };
return { rowNumber, outcome: 'update', existingId, mapped, resolved };
}
// skip-matches, or update requested on an insert-only adapter.
return { rowNumber, outcome: 'skip', existingId: existing.id, mapped, resolved };
return { rowNumber, outcome: 'skip', existingId, mapped, resolved };
}
// First sighting of this key in-file → record it so a later duplicate row
// classifies as a match above.
if (key != null) inFileSeen?.add(key);
return { rowNumber, outcome: 'insert', mapped, resolved };
}
@@ -99,8 +120,11 @@ export async function classifyRows(
): Promise<DryRunSummary> {
const rows: ClassifiedRow[] = [];
const summary = { total: rawRows.length, insert: 0, update: 0, skip: 0, error: 0 };
// Tracks natural keys already classified as inserts so a later in-file
// duplicate previews as a skip/update/error (M25), mirroring the commit.
const inFileSeen = new Set<string>();
for (let i = 0; i < rawRows.length; i++) {
const c = await classifyRow(adapter, rawRows[i]!, mapping, i + 1, policy, ctx);
const c = await classifyRow(adapter, rawRows[i]!, mapping, i + 1, policy, ctx, inFileSeen);
rows.push(c);
summary[c.outcome] += 1;
}

View File

@@ -8,9 +8,22 @@
*
* undoBatch hard-deletes the rows a batch *inserted* (reverse order), port-
* scoped, leaning on DB referential integrity as the guard: a row that now
* has dependents fails its delete and is reported, not force-removed.
* has dependents fails its delete and is reported (with the blocking
* constraint/table) rather than force-removed.
*
* IMPORTANT (audit M26) — `update-matches` is DESTRUCTIVE WITHOUT ROLLBACK.
* undoBatch reverses ONLY `action='inserted'` rows. An `update-matches` run
* that overwrote, say, 500 companies' taxId/billingEmail or 500 berths'
* price/dimensions CANNOT be undone here: the ledger stores only the entity id,
* not the pre-image, so there is nothing to restore to. A full update-undo
* would require capturing a JSON pre-image of every updated row, which needs a
* new column on `import_batch_rows` (e.g. `pre_image jsonb`) plus a per-row
* SELECT-before-UPDATE in commitBatch — deferred (out of the current scope's
* single-table reach). Until then, any UI/route offering "Undo" on an
* update-matches batch MUST warn the operator that updates are irreversible.
* See the explicit guard + warning emitted by undoBatch below.
*/
import { and, eq } from 'drizzle-orm';
import { and, eq, inArray } from 'drizzle-orm';
import { db } from '@/lib/db';
import { importBatches, importBatchRows } from '@/lib/db/schema/imports';
@@ -25,6 +38,49 @@ import type { ConflictPolicy, ImportAdapter, ImportCtx, RawRow } from './types';
* Extended as the FK adapters land. */
const UNDOABLE = new Set(['companies', 'clients', 'berths']);
/**
* Pull a human-readable "blocked by" reason out of a Postgres FK-violation
* (code 23503) so undo's blocked-row report names the dependent table/
* constraint instead of just a bare row number (audit M26). postgres.js
* surfaces these on the error or its `cause`. Falls back to the raw message.
*/
function describeDeleteBlock(err: unknown): string {
if (!err || typeof err !== 'object') return 'delete blocked';
const e = err as {
code?: unknown;
table_name?: unknown;
table?: unknown;
constraint_name?: unknown;
constraint?: unknown;
detail?: unknown;
message?: unknown;
cause?: {
code?: unknown;
table_name?: unknown;
table?: unknown;
constraint_name?: unknown;
constraint?: unknown;
detail?: unknown;
};
};
const code = e.code ?? e.cause?.code;
if (code === '23503') {
// The *referencing* (dependent) table is `table_name`; the constraint
// names the relationship. `detail` is the most specific ("Key (id)=(…) is
// still referenced from table \"payments\".").
const table = e.table_name ?? e.table ?? e.cause?.table_name ?? e.cause?.table;
const constraint =
e.constraint_name ?? e.constraint ?? e.cause?.constraint_name ?? e.cause?.constraint;
const detail = e.detail ?? e.cause?.detail;
const where = table ? `dependent rows in "${String(table)}"` : 'dependent rows exist';
const via = constraint ? ` (constraint ${String(constraint)})` : '';
return typeof detail === 'string' && detail
? `blocked by ${where}${via}: ${detail}`
: `blocked by ${where}${via}`;
}
return typeof e.message === 'string' && e.message ? e.message : 'delete blocked';
}
/** Hard-delete one imported entity, port-scoped. Throws on FK violation
* (a dependent now exists) — the caller reports that row as blocked. */
async function deleteEntity(entityType: string, entityId: string, portId: string): Promise<void> {
@@ -73,10 +129,25 @@ export async function commitBatch(params: {
const { batchId, adapter, rawRows, mapping, policy, ctx } = params;
const counts: CommitResult = { inserted: 0, updated: 0, skipped: 0, errored: 0 };
await db
// Idempotency gate (audit M27): atomically claim the batch for commit, but
// only if it's still awaiting one. The conditional WHERE + RETURNING means a
// concurrent worker (or a re-enqueued duplicate job) that lost the race sees
// zero rows and bails *before* writing any import_batch_rows — so undo never
// sees two runs' worth of ledger rows and the header counts stay reconciled.
// Pairs with the worker's pre-flight status check (which also avoids the
// wasted file re-read); this UPDATE is the authoritative claim.
const claimed = await db
.update(importBatches)
.set({ status: 'committing', totalRows: rawRows.length })
.where(eq(importBatches.id, batchId));
.where(
and(eq(importBatches.id, batchId), inArray(importBatches.status, ['dry_run', 'uploaded'])),
)
.returning({ id: importBatches.id });
if (claimed.length === 0) {
// Already committing/completed/failed/undone (or vanished). Do NOT
// re-process — return the no-op counts so callers don't double-count.
return counts;
}
for (let i = 0; i < rawRows.length; i++) {
const rowNumber = i + 1;
@@ -125,16 +196,37 @@ export async function commitBatch(params: {
return counts;
}
export interface UndoBlockedRow {
rowNumber: number;
/** Human-readable reason — names the blocking dependent table/constraint
* when the delete tripped an FK (audit M26), else the raw error. */
reason: string;
}
export interface UndoResult {
deleted: number;
blocked: number;
blockedRows: number[];
/** Same blocked rows as `blockedRows`, with the blocking reason attached. */
blockedDetail: UndoBlockedRow[];
/**
* Rows this batch *updated* under `update-matches` that undo did NOT reverse
* (audit M26: there is no stored pre-image to restore, so updates are
* destructive-without-rollback). Surfaced so the caller can warn the
* operator that these mutations remain in place.
*/
irreversibleUpdates: number;
}
/**
* Hard-delete the rows a batch inserted (port-scoped). A delete blocked by a
* dependent (FK) is reported, not forced. Marks the batch `undone` when every
* inserted row was removed.
* dependent (FK) is reported (with the blocking table/constraint), not forced.
* Marks the batch `undone` when every inserted row was removed.
*
* NOTE (audit M26): this reverses ONLY inserts. Any `action='updated'` rows are
* left untouched and counted in `irreversibleUpdates` — undo cannot restore the
* overwritten values because no pre-image is captured. Callers offering an
* "Undo" affordance must warn when `irreversibleUpdates > 0`.
*/
export async function undoBatch(batchId: string, portId: string): Promise<UndoResult> {
const [batch] = await db
@@ -161,7 +253,20 @@ export async function undoBatch(batchId: string, portId: string): Promise<UndoRe
.from(importBatchRows)
.where(and(eq(importBatchRows.batchId, batchId), eq(importBatchRows.action, 'inserted')));
const result: UndoResult = { deleted: 0, blocked: 0, blockedRows: [] };
// Count the update-matches mutations we can't reverse (audit M26). These stay
// applied; the count lets the caller warn the operator.
const updatedRows = await db
.select({ id: importBatchRows.id })
.from(importBatchRows)
.where(and(eq(importBatchRows.batchId, batchId), eq(importBatchRows.action, 'updated')));
const result: UndoResult = {
deleted: 0,
blocked: 0,
blockedRows: [],
blockedDetail: [],
irreversibleUpdates: updatedRows.length,
};
for (const row of inserted) {
if (!row.entityId) continue;
try {
@@ -172,9 +277,17 @@ export async function undoBatch(batchId: string, portId: string): Promise<UndoRe
.set({ action: 'skipped', error: 'undone' })
.where(eq(importBatchRows.id, row.id));
result.deleted += 1;
} catch {
} catch (e) {
const reason = describeDeleteBlock(e);
result.blocked += 1;
result.blockedRows.push(row.rowNumber);
result.blockedDetail.push({ rowNumber: row.rowNumber, reason });
// Persist the reason on the ledger row so the error report can explain
// *why* a row couldn't be undone, not just that it couldn't.
await db
.update(importBatchRows)
.set({ error: `undo blocked: ${reason}`.slice(0, 1000) })
.where(eq(importBatchRows.id, row.id));
}
}

View File

@@ -29,19 +29,41 @@ function lev(a: string, b: string): number {
return prev[n]!;
}
/** A single auto-mapping suggestion. `confident` matches are safe to
* pre-select in the UI; `review` matches (substring overlap only) must be
* surfaced for the operator to confirm, never silently pre-applied. */
export interface MappingSuggestion {
fieldKey: string;
header: string;
/**
* - `exact` — normalized key/label/alias equals the header (score 0).
* - `fuzzy` — close edit distance (≤2 edits) on a whole token; pre-selectable.
* - `review` — substring overlap only (e.g. "Billing Email" ⊃ "email");
* audit L33(a): these mis-map at scale, so they are NOT pre-selected.
*/
tier: 'exact' | 'fuzzy' | 'review';
}
/**
* For each field, pick the best-matching header. Exact normalized match on
* key / label / alias wins; otherwise a substring or close edit-distance
* match. A header is claimed by at most one field (first-come by field order).
* Returns `fieldKey → header` (only confident matches; unmatched fields absent).
* For each field, find the best-matching header and classify the match tier.
* Exact normalized match on key / label / alias is `exact`; a close
* edit-distance match on a whole token is `fuzzy`; a bare substring overlap is
* `review` (audit L33: substring scoring let "Billing Email" auto-map to
* `email` and "Company Name" to `name`, so a careless confirm imported into the
* wrong column). A header is claimed by at most one field (first-come by field
* order). Returns every match with its tier; callers decide what to pre-select.
*/
export function suggestMapping(headers: string[], fields: ImportField[]): Record<string, string> {
const out: Record<string, string> = {};
export function suggestMappingDetailed(
headers: string[],
fields: ImportField[],
): MappingSuggestion[] {
const out: MappingSuggestion[] = [];
const taken = new Set<string>();
const normHeaders = headers.map((h) => ({ raw: h, n: norm(h) }));
for (const field of fields) {
const candidates = [field.key, field.label, ...(field.aliases ?? [])].map(norm);
// Lower score = better. 0 exact, 1 substring (review), 2+ edit-distance.
let best: { header: string; score: number } | null = null;
for (const h of normHeaders) {
@@ -54,6 +76,8 @@ export function suggestMapping(headers: string[], fields: ImportField[]): Record
else {
const d = lev(c, h.n);
// Accept only close matches (≤2 edits, and not longer than the token).
// Edit-distance scores land at 2..3 (1 + d); a bare substring scores
// exactly 1. Tier classification below reads those bands back out.
if (d <= 2 && d < Math.max(c.length, h.n.length)) score = Math.min(score, 1 + d);
}
}
@@ -61,13 +85,31 @@ export function suggestMapping(headers: string[], fields: ImportField[]): Record
}
if (best) {
out[field.key] = best.header;
// score 0 → exact; score exactly 1 → substring overlap (review);
// score ≥ 2 → close edit distance (fuzzy, pre-selectable).
const tier: MappingSuggestion['tier'] =
best.score === 0 ? 'exact' : best.score === 1 ? 'review' : 'fuzzy';
out.push({ fieldKey: field.key, header: best.header, tier });
taken.add(best.header);
}
}
return out;
}
/**
* Confident auto-mapping for pre-selection: `fieldKey → header` for `exact` and
* `fuzzy` matches only. Substring-only (`review`) matches are intentionally
* omitted (audit L33) — fetch them via {@link suggestMappingDetailed} and show
* them as un-applied suggestions the operator must confirm.
*/
export function suggestMapping(headers: string[], fields: ImportField[]): Record<string, string> {
const out: Record<string, string> = {};
for (const s of suggestMappingDetailed(headers, fields)) {
if (s.tier !== 'review') out[s.fieldKey] = s.header;
}
return out;
}
/**
* Apply a `fieldKey → header` mapping to a raw row, producing `fieldKey → cell`.
* Empty / whitespace-only cells are dropped so downstream "required" checks and

View File

@@ -44,6 +44,23 @@ export type RawRow = Record<string, string>;
export type MappedRow = Record<string, string>;
export interface ImportCtx {
/**
* Tenant scope for every read/write this import performs. Adapters stamp it
* onto inserts and use it to port-scope their dedup lookups.
*
* SECURITY / TRUST BOUNDARY (audit L35) — `portId` is currently sourced from
* `import_batches.portId` and trusted wholesale. That is safe ONLY because no
* API route enqueues this engine yet; the batch row is created server-side.
* Any future commit/dry-run HTTP route MUST, before enqueuing or running the
* engine:
* 1. re-derive the acting port from the authenticated session (NEVER take
* a portId from the request body), and
* 2. assert `batch.portId === session.portId` (reject cross-tenant batch
* ids — batch ids are guessable enough to warrant the check), and
* 3. gate on an `import` permission (no permission is checked anywhere in
* the engine path today).
* Do not relax this to read a client-supplied portId.
*/
portId: string;
meta: AuditMeta;
}

View File

@@ -48,6 +48,27 @@ export const importWorker = new Worker(
return;
}
// Idempotency guard (audit M27): only a batch still awaiting commit may be
// committed. A re-enqueue (future commit endpoint, operator re-trigger, or
// any stray duplicate job) of an already-committing/completed/failed/undone
// batch must NOT re-run — re-processing appends a second full set of
// import_batch_rows, so undo later sees both run-1 inserts and run-2 skips
// and the header counts no longer reconcile with the ledger undo trusts.
// commitBatch itself also gates the status transition with a conditional
// UPDATE (defense in depth against a TOCTOU race with another worker), but
// this early return avoids the wasted file re-read + parse in the common
// case. NOTE for the future authorization boundary (audit L35): when a
// commit/dry-run API route lands it MUST re-derive portId from the session
// and assert batch.portId === session.portId before enqueuing, and gate on
// an `import` permission — this worker trusts batch.portId wholesale.
if (batch.status !== 'dry_run' && batch.status !== 'uploaded') {
logger.warn(
{ batchId, status: batch.status },
'Import batch already past the commit gate — skipping re-run',
);
return;
}
const adapter = getAdapter(batch.entityType);
if (!adapter || !batch.storageKey || !batch.mappingJson) {
await db
@@ -69,6 +90,12 @@ export const importWorker = new Worker(
mapping: batch.mappingJson,
policy: batch.conflictPolicy as ConflictPolicy,
ctx: {
// Trust boundary (audit L35): portId is taken from the persisted
// batch and trusted. Safe only because batches are created
// server-side with no client-supplied portId today. The commit/
// dry-run API route, when it lands, MUST re-derive portId from the
// session, assert batch.portId === session.portId, and gate on an
// `import` permission before enqueuing this job. See ImportCtx.portId.
portId: batch.portId,
meta: {
userId: batch.createdBy,