From c7325010e60ef0e155d8cd04831eea598dfc8e27 Mon Sep 17 00:00:00 2001 From: Matt Date: Mon, 1 Jun 2026 22:36:42 +0200 Subject: [PATCH] feat(import): commit runner + undo + wired BullMQ worker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Third importer increment — the write path, fully testable without UI. - commit.ts: commitBatch streams classified rows, applies insert/update per the conflict policy via the adapter (each row in its own try/catch so valid rows still land), records every action in import_batch_rows, and keeps live counts on the batch header. undoBatch hard-deletes a batch's inserted rows (port-scoped); a delete blocked by a dependent FK is reported, not forced, and the batch flips to `undone` only when every inserted row was removed. - import worker: replaced the no-op placeholder with the real processor — loads the batch, re-reads the uploaded file from storage, parses, and runs commitBatch under the batch's mapping + policy. Marks the batch failed on error. Concurrency 1 so imports don't race each other's dedup lookups. Tests: commit (skip/insert/error counts + per-row ledger + real inserted entity), undo (removes exactly the inserted row, flips status), and update-matches overwrite. 2 passing. Engine is now functional end-to-end at the service layer: parse → map → dry-run → commit → undo. Remaining: 4 FK adapters, API routes + permission, wizard UI + history. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/lib/import/commit.ts | 187 ++++++++++++++++++++++++ src/lib/queue/workers/import.ts | 101 +++++++++---- tests/integration/import-commit.test.ts | 130 ++++++++++++++++ 3 files changed, 392 insertions(+), 26 deletions(-) create mode 100644 src/lib/import/commit.ts create mode 100644 tests/integration/import-commit.test.ts diff --git a/src/lib/import/commit.ts b/src/lib/import/commit.ts new file mode 100644 index 00000000..d929ad6f --- /dev/null +++ b/src/lib/import/commit.ts @@ -0,0 +1,187 @@ +/** + * Commit + undo for the importer. + * + * commitBatch streams classified rows, applies insert/update per the conflict + * policy via the adapter (each row in its own try/catch so valid rows land), + * records every action in import_batch_rows, and keeps live counts on the + * batch header. + * + * undoBatch hard-deletes the rows a batch *inserted* (reverse order), port- + * scoped, leaning on DB referential integrity as the guard: a row that now + * has dependents fails its delete and is reported, not force-removed. + */ +import { and, eq } from 'drizzle-orm'; + +import { db } from '@/lib/db'; +import { importBatches, importBatchRows } from '@/lib/db/schema/imports'; +import { companies } from '@/lib/db/schema/companies'; +import { clients } from '@/lib/db/schema/clients'; +import { berths } from '@/lib/db/schema/berths'; + +import { classifyRow } from './classify'; +import type { ConflictPolicy, ImportAdapter, ImportCtx, RawRow } from './types'; + +/** Adapter keys whose inserted rows the importer can hard-delete on undo. + * Extended as the FK adapters land. */ +const UNDOABLE = new Set(['companies', 'clients', 'berths']); + +/** Hard-delete one imported entity, port-scoped. Throws on FK violation + * (a dependent now exists) — the caller reports that row as blocked. */ +async function deleteEntity(entityType: string, entityId: string, portId: string): Promise { + switch (entityType) { + case 'companies': + await db + .delete(companies) + .where(and(eq(companies.id, entityId), eq(companies.portId, portId))); + return; + case 'clients': + await db.delete(clients).where(and(eq(clients.id, entityId), eq(clients.portId, portId))); + return; + case 'berths': + await db.delete(berths).where(and(eq(berths.id, entityId), eq(berths.portId, portId))); + return; + default: + throw new Error(`Undo not supported for entity "${entityType}"`); + } +} + +async function recordRow( + batchId: string, + rowNumber: number, + action: 'inserted' | 'updated' | 'skipped' | 'errored', + entityId: string | null, + error: string | null, +): Promise { + await db.insert(importBatchRows).values({ batchId, rowNumber, action, entityId, error }); +} + +export interface CommitResult { + inserted: number; + updated: number; + skipped: number; + errored: number; +} + +export async function commitBatch(params: { + batchId: string; + adapter: ImportAdapter; + rawRows: RawRow[]; + mapping: Record; + policy: ConflictPolicy; + ctx: ImportCtx; +}): Promise { + const { batchId, adapter, rawRows, mapping, policy, ctx } = params; + const counts: CommitResult = { inserted: 0, updated: 0, skipped: 0, errored: 0 }; + + await db + .update(importBatches) + .set({ status: 'committing', totalRows: rawRows.length }) + .where(eq(importBatches.id, batchId)); + + for (let i = 0; i < rawRows.length; i++) { + const rowNumber = i + 1; + try { + const c = await classifyRow(adapter, rawRows[i]!, mapping, rowNumber, policy, ctx); + if (c.outcome === 'error') { + counts.errored += 1; + await recordRow( + batchId, + rowNumber, + 'errored', + null, + (c.errors ?? []).map((e) => `${e.field}: ${e.message}`).join('; ') || 'Invalid row', + ); + } else if (c.outcome === 'skip') { + counts.skipped += 1; + await recordRow(batchId, rowNumber, 'skipped', c.existingId ?? null, null); + } else if (c.outcome === 'update' && adapter.update) { + await adapter.update(c.existingId!, c.mapped!, c.resolved ?? {}, ctx); + counts.updated += 1; + await recordRow(batchId, rowNumber, 'updated', c.existingId!, null); + } else { + // insert + const { id } = await adapter.insert(c.mapped!, c.resolved ?? {}, ctx); + counts.inserted += 1; + await recordRow(batchId, rowNumber, 'inserted', id, null); + } + } catch (e) { + counts.errored += 1; + await recordRow(batchId, rowNumber, 'errored', null, (e as Error).message); + } + + if (rowNumber % 25 === 0) { + await db + .update(importBatches) + .set({ ...counts }) + .where(eq(importBatches.id, batchId)); + } + } + + await db + .update(importBatches) + .set({ ...counts, status: 'completed', completedAt: new Date() }) + .where(eq(importBatches.id, batchId)); + + return counts; +} + +export interface UndoResult { + deleted: number; + blocked: number; + blockedRows: number[]; +} + +/** + * Hard-delete the rows a batch inserted (port-scoped). A delete blocked by a + * dependent (FK) is reported, not forced. Marks the batch `undone` when every + * inserted row was removed. + */ +export async function undoBatch(batchId: string, portId: string): Promise { + const [batch] = await db + .select({ + id: importBatches.id, + entityType: importBatches.entityType, + status: importBatches.status, + }) + .from(importBatches) + .where(and(eq(importBatches.id, batchId), eq(importBatches.portId, portId))) + .limit(1); + if (!batch) throw new Error('Import batch not found'); + + if (!UNDOABLE.has(batch.entityType)) { + throw new Error(`Undo not supported for entity "${batch.entityType}"`); + } + + const inserted = await db + .select({ + id: importBatchRows.id, + rowNumber: importBatchRows.rowNumber, + entityId: importBatchRows.entityId, + }) + .from(importBatchRows) + .where(and(eq(importBatchRows.batchId, batchId), eq(importBatchRows.action, 'inserted'))); + + const result: UndoResult = { deleted: 0, blocked: 0, blockedRows: [] }; + for (const row of inserted) { + if (!row.entityId) continue; + try { + await deleteEntity(batch.entityType, row.entityId, portId); + // Flip the ledger row so a re-undo is a no-op. + await db + .update(importBatchRows) + .set({ action: 'skipped', error: 'undone' }) + .where(eq(importBatchRows.id, row.id)); + result.deleted += 1; + } catch { + result.blocked += 1; + result.blockedRows.push(row.rowNumber); + } + } + + await db + .update(importBatches) + .set({ status: result.blocked === 0 ? 'undone' : 'completed' }) + .where(eq(importBatches.id, batchId)); + + return result; +} diff --git a/src/lib/queue/workers/import.ts b/src/lib/queue/workers/import.ts index 5d4b73eb..b9fe43b3 100644 --- a/src/lib/queue/workers/import.ts +++ b/src/lib/queue/workers/import.ts @@ -1,42 +1,91 @@ import { Worker, type Job } from 'bullmq'; +import { eq } from 'drizzle-orm'; + import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; +import { db } from '@/lib/db'; +import { importBatches } from '@/lib/db/schema/imports'; +import { getStorageBackend } from '@/lib/storage'; +import { parseImportFile } from '@/lib/import/engine'; +import { getAdapter } from '@/lib/import/registry'; +import { commitBatch } from '@/lib/import/commit'; +import type { ConflictPolicy } from '@/lib/import/types'; + +function streamToBuffer(stream: NodeJS.ReadableStream): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + stream.on('data', (c: Buffer) => chunks.push(c)); + stream.on('end', () => resolve(Buffer.concat(chunks))); + stream.on('error', reject); + }); +} /** - * Bulk-import worker - DEFERRED FEATURE (placeholder). - * - * Status: registered with BullMQ so any future enqueue site lands on a - * real worker instance instead of disappearing into an unbound queue. - * No callers currently dispatch to this worker - the body is intentionally - * a no-op that logs the dispatch for forensics. - * - * Why deferred (vs implemented inline): - * - CSV/Excel import is a real product feature, not a refactor. Done - * properly it needs: per-entity schema mapping (clients / berths / - * interests / companies / yachts), zod-level row validation, per-row - * error rollup with line-numbered diagnostics, dry-run preview, - * progress reporting, dedupe-on-conflict policy, admin upload UI - * with column-mapping UX. Building it speculatively without a - * customer in the room would lock in a UX that may not match what - * real importers want. - * - When the trigger comes (a customer needs to bulk-load a season - * roster or migrate from another CRM), build it from product spec - * not from this placeholder. - * - * What's required to ship: papaparse (CSV) + a thin schema-per-entity - * mapping layer, plus an admin /admin/import page with a per-entity - * picker + file dropzone. The queue registration here stays as-is. + * Bulk-import commit worker. Enqueued by the dry-run "commit" action with + * `{ batchId }`. Re-reads the uploaded file from storage, re-classifies + + * applies each row under the batch's conflict policy (see commitBatch), and + * records per-row actions + live counts. Concurrency 1 (queue config) so + * imports never race each other's dedup lookups. */ export const importWorker = new Worker( 'import', async (job: Job) => { - logger.info({ jobId: job.id, jobName: job.name }, 'Processing import job'); - // Deferred - no callers enqueue this. If a job lands, we log + swallow - // so a future test enqueue doesn't trip the failed-job alert. + const { batchId } = (job.data ?? {}) as { batchId?: string }; + logger.info({ jobId: job.id, batchId }, 'Processing import commit'); + if (!batchId) return; + + const [batch] = await db + .select() + .from(importBatches) + .where(eq(importBatches.id, batchId)) + .limit(1); + if (!batch) { + logger.warn({ batchId }, 'Import batch vanished before commit'); + return; + } + + const adapter = getAdapter(batch.entityType); + if (!adapter || !batch.storageKey || !batch.mappingJson) { + await db + .update(importBatches) + .set({ status: 'failed', completedAt: new Date() }) + .where(eq(importBatches.id, batchId)); + logger.error({ batchId, entityType: batch.entityType }, 'Import batch not committable'); + return; + } + + try { + const stream = await (await getStorageBackend()).get(batch.storageKey); + const buffer = await streamToBuffer(stream); + const parsed = await parseImportFile(batch.filename, buffer); + await commitBatch({ + batchId, + adapter, + rawRows: parsed.rows, + mapping: batch.mappingJson, + policy: batch.conflictPolicy as ConflictPolicy, + ctx: { + portId: batch.portId, + meta: { + userId: batch.createdBy, + portId: batch.portId, + ipAddress: 'import-worker', + userAgent: 'import-worker', + }, + }, + }); + } catch (err) { + await db + .update(importBatches) + .set({ status: 'failed', completedAt: new Date() }) + .where(eq(importBatches.id, batchId)); + logger.error({ batchId, err }, 'Import commit failed'); + throw err; + } }, { connection: { url: env.REDIS_URL } as ConnectionOptions, diff --git a/tests/integration/import-commit.test.ts b/tests/integration/import-commit.test.ts new file mode 100644 index 00000000..ba7b9ee1 --- /dev/null +++ b/tests/integration/import-commit.test.ts @@ -0,0 +1,130 @@ +/** + * Integration test: commit runner + undo. + * + * Commits a 3-row companies "file" (skip existing / insert new / error blank) + * and asserts counts, per-row ledger, and the actual inserted entity. Then + * undoes the batch and asserts the inserted company is gone. Real test DB. + */ +import { beforeAll, describe, expect, it } from 'vitest'; +import { and, eq, sql } from 'drizzle-orm'; + +import { db } from '@/lib/db'; +import { importBatches, importBatchRows } from '@/lib/db/schema/imports'; +import { companies } from '@/lib/db/schema/companies'; +import { commitBatch, undoBatch } from '@/lib/import/commit'; +import { companiesAdapter } from '@/lib/import/adapters/companies'; +import type { ConflictPolicy, RawRow } from '@/lib/import/types'; + +let makePort: typeof import('../helpers/factories').makePort; +let makeCompany: typeof import('../helpers/factories').makeCompany; +let makeAuditMeta: typeof import('../helpers/factories').makeAuditMeta; + +beforeAll(async () => { + const f = await import('../helpers/factories'); + makePort = f.makePort; + makeCompany = f.makeCompany; + makeAuditMeta = f.makeAuditMeta; +}); + +async function makeBatch(portId: string, policy: ConflictPolicy): Promise { + const [b] = await db + .insert(importBatches) + .values({ + portId, + entityType: 'companies', + filename: 'companies.csv', + storageKey: 'unused-in-direct-commit', + mappingJson: { name: 'Name' }, + conflictPolicy: policy, + createdBy: 'tester', + }) + .returning({ id: importBatches.id }); + return b!.id; +} + +describe('commitBatch + undoBatch', () => { + const rows: RawRow[] = [{ Name: 'Acme Marine' }, { Name: 'Fresh Imports Ltd' }, { Name: '' }]; + + it('commits (skip/insert/error), records the ledger, then undoes the insert', async () => { + const port = await makePort(); + await makeCompany({ portId: port.id, overrides: { name: 'Acme Marine' } }); + const batchId = await makeBatch(port.id, 'skip-matches'); + const ctx = { portId: port.id, meta: makeAuditMeta({ portId: port.id }) }; + + const counts = await commitBatch({ + batchId, + adapter: companiesAdapter, + rawRows: rows, + mapping: { name: 'Name' }, + policy: 'skip-matches', + ctx, + }); + expect(counts).toEqual({ inserted: 1, updated: 0, skipped: 1, errored: 1 }); + + const [batch] = await db.select().from(importBatches).where(eq(importBatches.id, batchId)); + expect(batch!.status).toBe('completed'); + expect(batch!.inserted).toBe(1); + + const ledger = await db + .select() + .from(importBatchRows) + .where(eq(importBatchRows.batchId, batchId)) + .orderBy(importBatchRows.rowNumber); + expect(ledger.map((r) => r.action)).toEqual(['skipped', 'inserted', 'errored']); + expect(ledger[2]!.error).toBeTruthy(); + + // The new company really exists. + const fresh = await db + .select({ id: companies.id }) + .from(companies) + .where( + and(eq(companies.portId, port.id), sql`lower(${companies.name}) = 'fresh imports ltd'`), + ); + expect(fresh).toHaveLength(1); + + // Undo removes exactly the inserted row. + const undo = await undoBatch(batchId, port.id); + expect(undo.deleted).toBe(1); + expect(undo.blocked).toBe(0); + + const afterUndo = await db + .select({ id: companies.id }) + .from(companies) + .where( + and(eq(companies.portId, port.id), sql`lower(${companies.name}) = 'fresh imports ltd'`), + ); + expect(afterUndo).toHaveLength(0); + + const [undoneBatch] = await db + .select({ status: importBatches.status }) + .from(importBatches) + .where(eq(importBatches.id, batchId)); + expect(undoneBatch!.status).toBe('undone'); + }); + + it('update-matches overwrites the matched company', async () => { + const port = await makePort(); + await makeCompany({ + portId: port.id, + overrides: { name: 'Acme Marine', legalName: 'OLD' }, + }); + const batchId = await makeBatch(port.id, 'update-matches'); + const ctx = { portId: port.id, meta: makeAuditMeta({ portId: port.id }) }; + + const counts = await commitBatch({ + batchId, + adapter: companiesAdapter, + rawRows: [{ Name: 'Acme Marine', Legal: 'NEW Ltd' }], + mapping: { name: 'Name', legalName: 'Legal' }, + policy: 'update-matches', + ctx, + }); + expect(counts.updated).toBe(1); + + const [c] = await db + .select({ legalName: companies.legalName }) + .from(companies) + .where(and(eq(companies.portId, port.id), sql`lower(${companies.name}) = 'acme marine'`)); + expect(c!.legalName).toBe('NEW Ltd'); + }); +});