diff --git a/src/lib/import/adapters/berths.ts b/src/lib/import/adapters/berths.ts index bc395b41..a957769e 100644 --- a/src/lib/import/adapters/berths.ts +++ b/src/lib/import/adapters/berths.ts @@ -7,11 +7,28 @@ import { createBerth, updateBerth } from '@/lib/services/berths.service'; import type { ImportAdapter, MappedRow } from '../types'; +/** + * Accepted import spellings of a mooring: letters, an optional separating + * hyphen, optional leading zeros, then 1–6 digits. The 6-digit cap (audit + * L33(b)) rejects absurd numbers that would overflow JS's safe-integer range + * during canonicalization — a real marina mooring is at most a few thousand. + * Canonicalization strips the hyphen + leading zeros and upper-cases the + * letters, so the *output* always conforms to the canonical `^[A-Z]+\d+$`. + */ +const MOORING_INPUT_RE = /^[A-Za-z]+-?0*\d{1,6}$/; +/** Canonical stored form — the post-canonicalization invariant. */ +const MOORING_CANON_RE = /^[A-Z]+\d+$/; + /** Canonicalize a mooring to the unified `^[A-Z]+\d+$` form ("A1", "D32"): - * uppercase letters, drop a hyphen + leading zeros on the number. */ + * uppercase letters, drop a hyphen + leading zeros on the number. The number + * is normalized digit-wise (no parseInt) so values up to the 6-digit input + * cap survive without floating-point/MAX_SAFE_INTEGER precision loss. */ function canonMoo(raw: string): string { const m = /^([A-Za-z]+)-?0*(\d+)$/.exec(raw.trim()); - return m ? `${m[1]!.toUpperCase()}${parseInt(m[2]!, 10)}` : raw.trim().toUpperCase(); + if (!m) return raw.trim().toUpperCase(); + // Drop leading zeros without parseInt; keep a lone "0" as "0". + const digits = m[2]!.replace(/^0+(?=\d)/, ''); + return `${m[1]!.toUpperCase()}${digits}`; } const num = (s: string | undefined): number | undefined => @@ -28,7 +45,12 @@ export const berthsAdapter: ImportAdapter = { label: 'Mooring number', required: true, aliases: ['mooring', 'berth', 'berthnumber'], - zod: z.string().regex(/^[A-Za-z]+-?0*\d+$/, 'Use a form like A1, B12, E18'), + zod: z + .string() + .regex(MOORING_INPUT_RE, 'Use a form like A1, B12, E18 (max 6 digits)') + // Defense in depth: whatever the input spelling, the canonical form + // must conform to ^[A-Z]+\d+$ (audit L33(b)). + .refine((v) => MOORING_CANON_RE.test(canonMoo(v)), 'Invalid mooring format'), }, { key: 'area', diff --git a/src/lib/import/classify.ts b/src/lib/import/classify.ts index d1e39408..018baee6 100644 --- a/src/lib/import/classify.ts +++ b/src/lib/import/classify.ts @@ -42,6 +42,16 @@ export async function classifyRow( rowNumber: number, policy: ConflictPolicy, ctx: ImportCtx, + /** + * Match keys already emitted by *earlier rows of the same file*. When set, + * an insert whose natural key was already seen in-file is re-routed through + * the same conflict branch a DB match would take — so the dry-run preview + * reflects what the sequential commit actually does (audit M25: previously + * two file rows sharing one brand-new email both classified `insert`, but the + * commit turns row 2 into a skip/update/error once row 1 is live). Mutated by + * this function: the row's resolved key is added on classification. + */ + inFileSeen?: Set, ): Promise { const mapped = applyMapping(raw, mapping); @@ -56,26 +66,37 @@ export async function classifyRow( resolved = fk.resolved; } - // Dedup by natural key. + // Dedup by natural key — first against the DB, then (M25) against earlier + // rows of this same file so the preview matches the commit's sequential + // classify-then-insert ordering. const key = adapter.matchKey(mapped); const existing = key ? await adapter.findExisting(ctx.portId, key) : null; + const matchedInFile = !existing && key != null && (inFileSeen?.has(key) ?? false); - if (existing) { + if (existing || matchedInFile) { + // existingId is the DB row's id when one exists; for an in-file predecessor + // there is no id yet at dry-run time (it gets created earlier in the same + // commit pass), so it's left undefined. + const existingId = existing?.id; if (policy === 'error-on-match') { return { rowNumber, outcome: 'error', - existingId: existing.id, + existingId, errors: [{ field: '*', message: 'Matches an existing record' }], }; } if (policy === 'update-matches' && adapter.update) { - return { rowNumber, outcome: 'update', existingId: existing.id, mapped, resolved }; + return { rowNumber, outcome: 'update', existingId, mapped, resolved }; } // skip-matches, or update requested on an insert-only adapter. - return { rowNumber, outcome: 'skip', existingId: existing.id, mapped, resolved }; + return { rowNumber, outcome: 'skip', existingId, mapped, resolved }; } + // First sighting of this key in-file → record it so a later duplicate row + // classifies as a match above. + if (key != null) inFileSeen?.add(key); + return { rowNumber, outcome: 'insert', mapped, resolved }; } @@ -99,8 +120,11 @@ export async function classifyRows( ): Promise { const rows: ClassifiedRow[] = []; const summary = { total: rawRows.length, insert: 0, update: 0, skip: 0, error: 0 }; + // Tracks natural keys already classified as inserts so a later in-file + // duplicate previews as a skip/update/error (M25), mirroring the commit. + const inFileSeen = new Set(); for (let i = 0; i < rawRows.length; i++) { - const c = await classifyRow(adapter, rawRows[i]!, mapping, i + 1, policy, ctx); + const c = await classifyRow(adapter, rawRows[i]!, mapping, i + 1, policy, ctx, inFileSeen); rows.push(c); summary[c.outcome] += 1; } diff --git a/src/lib/import/commit.ts b/src/lib/import/commit.ts index d929ad6f..44c6e32c 100644 --- a/src/lib/import/commit.ts +++ b/src/lib/import/commit.ts @@ -8,9 +8,22 @@ * * undoBatch hard-deletes the rows a batch *inserted* (reverse order), port- * scoped, leaning on DB referential integrity as the guard: a row that now - * has dependents fails its delete and is reported, not force-removed. + * has dependents fails its delete and is reported (with the blocking + * constraint/table) rather than force-removed. + * + * IMPORTANT (audit M26) — `update-matches` is DESTRUCTIVE WITHOUT ROLLBACK. + * undoBatch reverses ONLY `action='inserted'` rows. An `update-matches` run + * that overwrote, say, 500 companies' taxId/billingEmail or 500 berths' + * price/dimensions CANNOT be undone here: the ledger stores only the entity id, + * not the pre-image, so there is nothing to restore to. A full update-undo + * would require capturing a JSON pre-image of every updated row, which needs a + * new column on `import_batch_rows` (e.g. `pre_image jsonb`) plus a per-row + * SELECT-before-UPDATE in commitBatch — deferred (out of the current scope's + * single-table reach). Until then, any UI/route offering "Undo" on an + * update-matches batch MUST warn the operator that updates are irreversible. + * See the explicit guard + warning emitted by undoBatch below. */ -import { and, eq } from 'drizzle-orm'; +import { and, eq, inArray } from 'drizzle-orm'; import { db } from '@/lib/db'; import { importBatches, importBatchRows } from '@/lib/db/schema/imports'; @@ -25,6 +38,49 @@ import type { ConflictPolicy, ImportAdapter, ImportCtx, RawRow } from './types'; * Extended as the FK adapters land. */ const UNDOABLE = new Set(['companies', 'clients', 'berths']); +/** + * Pull a human-readable "blocked by" reason out of a Postgres FK-violation + * (code 23503) so undo's blocked-row report names the dependent table/ + * constraint instead of just a bare row number (audit M26). postgres.js + * surfaces these on the error or its `cause`. Falls back to the raw message. + */ +function describeDeleteBlock(err: unknown): string { + if (!err || typeof err !== 'object') return 'delete blocked'; + const e = err as { + code?: unknown; + table_name?: unknown; + table?: unknown; + constraint_name?: unknown; + constraint?: unknown; + detail?: unknown; + message?: unknown; + cause?: { + code?: unknown; + table_name?: unknown; + table?: unknown; + constraint_name?: unknown; + constraint?: unknown; + detail?: unknown; + }; + }; + const code = e.code ?? e.cause?.code; + if (code === '23503') { + // The *referencing* (dependent) table is `table_name`; the constraint + // names the relationship. `detail` is the most specific ("Key (id)=(…) is + // still referenced from table \"payments\"."). + const table = e.table_name ?? e.table ?? e.cause?.table_name ?? e.cause?.table; + const constraint = + e.constraint_name ?? e.constraint ?? e.cause?.constraint_name ?? e.cause?.constraint; + const detail = e.detail ?? e.cause?.detail; + const where = table ? `dependent rows in "${String(table)}"` : 'dependent rows exist'; + const via = constraint ? ` (constraint ${String(constraint)})` : ''; + return typeof detail === 'string' && detail + ? `blocked by ${where}${via}: ${detail}` + : `blocked by ${where}${via}`; + } + return typeof e.message === 'string' && e.message ? e.message : 'delete blocked'; +} + /** Hard-delete one imported entity, port-scoped. Throws on FK violation * (a dependent now exists) — the caller reports that row as blocked. */ async function deleteEntity(entityType: string, entityId: string, portId: string): Promise { @@ -73,10 +129,25 @@ export async function commitBatch(params: { const { batchId, adapter, rawRows, mapping, policy, ctx } = params; const counts: CommitResult = { inserted: 0, updated: 0, skipped: 0, errored: 0 }; - await db + // Idempotency gate (audit M27): atomically claim the batch for commit, but + // only if it's still awaiting one. The conditional WHERE + RETURNING means a + // concurrent worker (or a re-enqueued duplicate job) that lost the race sees + // zero rows and bails *before* writing any import_batch_rows — so undo never + // sees two runs' worth of ledger rows and the header counts stay reconciled. + // Pairs with the worker's pre-flight status check (which also avoids the + // wasted file re-read); this UPDATE is the authoritative claim. + const claimed = await db .update(importBatches) .set({ status: 'committing', totalRows: rawRows.length }) - .where(eq(importBatches.id, batchId)); + .where( + and(eq(importBatches.id, batchId), inArray(importBatches.status, ['dry_run', 'uploaded'])), + ) + .returning({ id: importBatches.id }); + if (claimed.length === 0) { + // Already committing/completed/failed/undone (or vanished). Do NOT + // re-process — return the no-op counts so callers don't double-count. + return counts; + } for (let i = 0; i < rawRows.length; i++) { const rowNumber = i + 1; @@ -125,16 +196,37 @@ export async function commitBatch(params: { return counts; } +export interface UndoBlockedRow { + rowNumber: number; + /** Human-readable reason — names the blocking dependent table/constraint + * when the delete tripped an FK (audit M26), else the raw error. */ + reason: string; +} + export interface UndoResult { deleted: number; blocked: number; blockedRows: number[]; + /** Same blocked rows as `blockedRows`, with the blocking reason attached. */ + blockedDetail: UndoBlockedRow[]; + /** + * Rows this batch *updated* under `update-matches` that undo did NOT reverse + * (audit M26: there is no stored pre-image to restore, so updates are + * destructive-without-rollback). Surfaced so the caller can warn the + * operator that these mutations remain in place. + */ + irreversibleUpdates: number; } /** * Hard-delete the rows a batch inserted (port-scoped). A delete blocked by a - * dependent (FK) is reported, not forced. Marks the batch `undone` when every - * inserted row was removed. + * dependent (FK) is reported (with the blocking table/constraint), not forced. + * Marks the batch `undone` when every inserted row was removed. + * + * NOTE (audit M26): this reverses ONLY inserts. Any `action='updated'` rows are + * left untouched and counted in `irreversibleUpdates` — undo cannot restore the + * overwritten values because no pre-image is captured. Callers offering an + * "Undo" affordance must warn when `irreversibleUpdates > 0`. */ export async function undoBatch(batchId: string, portId: string): Promise { const [batch] = await db @@ -161,7 +253,20 @@ export async function undoBatch(batchId: string, portId: string): Promise { - const out: Record = {}; +export function suggestMappingDetailed( + headers: string[], + fields: ImportField[], +): MappingSuggestion[] { + const out: MappingSuggestion[] = []; const taken = new Set(); const normHeaders = headers.map((h) => ({ raw: h, n: norm(h) })); for (const field of fields) { const candidates = [field.key, field.label, ...(field.aliases ?? [])].map(norm); + // Lower score = better. 0 exact, 1 substring (review), 2+ edit-distance. let best: { header: string; score: number } | null = null; for (const h of normHeaders) { @@ -54,6 +76,8 @@ export function suggestMapping(headers: string[], fields: ImportField[]): Record else { const d = lev(c, h.n); // Accept only close matches (≤2 edits, and not longer than the token). + // Edit-distance scores land at 2..3 (1 + d); a bare substring scores + // exactly 1. Tier classification below reads those bands back out. if (d <= 2 && d < Math.max(c.length, h.n.length)) score = Math.min(score, 1 + d); } } @@ -61,13 +85,31 @@ export function suggestMapping(headers: string[], fields: ImportField[]): Record } if (best) { - out[field.key] = best.header; + // score 0 → exact; score exactly 1 → substring overlap (review); + // score ≥ 2 → close edit distance (fuzzy, pre-selectable). + const tier: MappingSuggestion['tier'] = + best.score === 0 ? 'exact' : best.score === 1 ? 'review' : 'fuzzy'; + out.push({ fieldKey: field.key, header: best.header, tier }); taken.add(best.header); } } return out; } +/** + * Confident auto-mapping for pre-selection: `fieldKey → header` for `exact` and + * `fuzzy` matches only. Substring-only (`review`) matches are intentionally + * omitted (audit L33) — fetch them via {@link suggestMappingDetailed} and show + * them as un-applied suggestions the operator must confirm. + */ +export function suggestMapping(headers: string[], fields: ImportField[]): Record { + const out: Record = {}; + for (const s of suggestMappingDetailed(headers, fields)) { + if (s.tier !== 'review') out[s.fieldKey] = s.header; + } + return out; +} + /** * Apply a `fieldKey → header` mapping to a raw row, producing `fieldKey → cell`. * Empty / whitespace-only cells are dropped so downstream "required" checks and diff --git a/src/lib/import/types.ts b/src/lib/import/types.ts index f9ebde6a..b7ce1a3d 100644 --- a/src/lib/import/types.ts +++ b/src/lib/import/types.ts @@ -44,6 +44,23 @@ export type RawRow = Record; export type MappedRow = Record; export interface ImportCtx { + /** + * Tenant scope for every read/write this import performs. Adapters stamp it + * onto inserts and use it to port-scope their dedup lookups. + * + * SECURITY / TRUST BOUNDARY (audit L35) — `portId` is currently sourced from + * `import_batches.portId` and trusted wholesale. That is safe ONLY because no + * API route enqueues this engine yet; the batch row is created server-side. + * Any future commit/dry-run HTTP route MUST, before enqueuing or running the + * engine: + * 1. re-derive the acting port from the authenticated session (NEVER take + * a portId from the request body), and + * 2. assert `batch.portId === session.portId` (reject cross-tenant batch + * ids — batch ids are guessable enough to warrant the check), and + * 3. gate on an `import` permission (no permission is checked anywhere in + * the engine path today). + * Do not relax this to read a client-supplied portId. + */ portId: string; meta: AuditMeta; } diff --git a/src/lib/queue/workers/import.ts b/src/lib/queue/workers/import.ts index b9fe43b3..a9c75724 100644 --- a/src/lib/queue/workers/import.ts +++ b/src/lib/queue/workers/import.ts @@ -48,6 +48,27 @@ export const importWorker = new Worker( return; } + // Idempotency guard (audit M27): only a batch still awaiting commit may be + // committed. A re-enqueue (future commit endpoint, operator re-trigger, or + // any stray duplicate job) of an already-committing/completed/failed/undone + // batch must NOT re-run — re-processing appends a second full set of + // import_batch_rows, so undo later sees both run-1 inserts and run-2 skips + // and the header counts no longer reconcile with the ledger undo trusts. + // commitBatch itself also gates the status transition with a conditional + // UPDATE (defense in depth against a TOCTOU race with another worker), but + // this early return avoids the wasted file re-read + parse in the common + // case. NOTE for the future authorization boundary (audit L35): when a + // commit/dry-run API route lands it MUST re-derive portId from the session + // and assert batch.portId === session.portId before enqueuing, and gate on + // an `import` permission — this worker trusts batch.portId wholesale. + if (batch.status !== 'dry_run' && batch.status !== 'uploaded') { + logger.warn( + { batchId, status: batch.status }, + 'Import batch already past the commit gate — skipping re-run', + ); + return; + } + const adapter = getAdapter(batch.entityType); if (!adapter || !batch.storageKey || !batch.mappingJson) { await db @@ -69,6 +90,12 @@ export const importWorker = new Worker( mapping: batch.mappingJson, policy: batch.conflictPolicy as ConflictPolicy, ctx: { + // Trust boundary (audit L35): portId is taken from the persisted + // batch and trusted. Safe only because batches are created + // server-side with no client-supplied portId today. The commit/ + // dry-run API route, when it lands, MUST re-derive portId from the + // session, assert batch.portId === session.portId, and gate on an + // `import` permission before enqueuing this job. See ImportCtx.portId. portId: batch.portId, meta: { userId: batch.createdBy,