feat(import): commit runner + undo + wired BullMQ worker
Third importer increment — the write path, fully testable without UI. - commit.ts: commitBatch streams classified rows, applies insert/update per the conflict policy via the adapter (each row in its own try/catch so valid rows still land), records every action in import_batch_rows, and keeps live counts on the batch header. undoBatch hard-deletes a batch's inserted rows (port-scoped); a delete blocked by a dependent FK is reported, not forced, and the batch flips to `undone` only when every inserted row was removed. - import worker: replaced the no-op placeholder with the real processor — loads the batch, re-reads the uploaded file from storage, parses, and runs commitBatch under the batch's mapping + policy. Marks the batch failed on error. Concurrency 1 so imports don't race each other's dedup lookups. Tests: commit (skip/insert/error counts + per-row ledger + real inserted entity), undo (removes exactly the inserted row, flips status), and update-matches overwrite. 2 passing. Engine is now functional end-to-end at the service layer: parse → map → dry-run → commit → undo. Remaining: 4 FK adapters, API routes + permission, wizard UI + history. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
187
src/lib/import/commit.ts
Normal file
187
src/lib/import/commit.ts
Normal file
@@ -0,0 +1,187 @@
|
||||
/**
|
||||
* Commit + undo for the importer.
|
||||
*
|
||||
* commitBatch streams classified rows, applies insert/update per the conflict
|
||||
* policy via the adapter (each row in its own try/catch so valid rows land),
|
||||
* records every action in import_batch_rows, and keeps live counts on the
|
||||
* batch header.
|
||||
*
|
||||
* undoBatch hard-deletes the rows a batch *inserted* (reverse order), port-
|
||||
* scoped, leaning on DB referential integrity as the guard: a row that now
|
||||
* has dependents fails its delete and is reported, not force-removed.
|
||||
*/
|
||||
import { and, eq } from 'drizzle-orm';
|
||||
|
||||
import { db } from '@/lib/db';
|
||||
import { importBatches, importBatchRows } from '@/lib/db/schema/imports';
|
||||
import { companies } from '@/lib/db/schema/companies';
|
||||
import { clients } from '@/lib/db/schema/clients';
|
||||
import { berths } from '@/lib/db/schema/berths';
|
||||
|
||||
import { classifyRow } from './classify';
|
||||
import type { ConflictPolicy, ImportAdapter, ImportCtx, RawRow } from './types';
|
||||
|
||||
/** Adapter keys whose inserted rows the importer can hard-delete on undo.
|
||||
* Extended as the FK adapters land. */
|
||||
const UNDOABLE = new Set(['companies', 'clients', 'berths']);
|
||||
|
||||
/** Hard-delete one imported entity, port-scoped. Throws on FK violation
|
||||
* (a dependent now exists) — the caller reports that row as blocked. */
|
||||
async function deleteEntity(entityType: string, entityId: string, portId: string): Promise<void> {
|
||||
switch (entityType) {
|
||||
case 'companies':
|
||||
await db
|
||||
.delete(companies)
|
||||
.where(and(eq(companies.id, entityId), eq(companies.portId, portId)));
|
||||
return;
|
||||
case 'clients':
|
||||
await db.delete(clients).where(and(eq(clients.id, entityId), eq(clients.portId, portId)));
|
||||
return;
|
||||
case 'berths':
|
||||
await db.delete(berths).where(and(eq(berths.id, entityId), eq(berths.portId, portId)));
|
||||
return;
|
||||
default:
|
||||
throw new Error(`Undo not supported for entity "${entityType}"`);
|
||||
}
|
||||
}
|
||||
|
||||
async function recordRow(
|
||||
batchId: string,
|
||||
rowNumber: number,
|
||||
action: 'inserted' | 'updated' | 'skipped' | 'errored',
|
||||
entityId: string | null,
|
||||
error: string | null,
|
||||
): Promise<void> {
|
||||
await db.insert(importBatchRows).values({ batchId, rowNumber, action, entityId, error });
|
||||
}
|
||||
|
||||
export interface CommitResult {
|
||||
inserted: number;
|
||||
updated: number;
|
||||
skipped: number;
|
||||
errored: number;
|
||||
}
|
||||
|
||||
export async function commitBatch(params: {
|
||||
batchId: string;
|
||||
adapter: ImportAdapter;
|
||||
rawRows: RawRow[];
|
||||
mapping: Record<string, string>;
|
||||
policy: ConflictPolicy;
|
||||
ctx: ImportCtx;
|
||||
}): Promise<CommitResult> {
|
||||
const { batchId, adapter, rawRows, mapping, policy, ctx } = params;
|
||||
const counts: CommitResult = { inserted: 0, updated: 0, skipped: 0, errored: 0 };
|
||||
|
||||
await db
|
||||
.update(importBatches)
|
||||
.set({ status: 'committing', totalRows: rawRows.length })
|
||||
.where(eq(importBatches.id, batchId));
|
||||
|
||||
for (let i = 0; i < rawRows.length; i++) {
|
||||
const rowNumber = i + 1;
|
||||
try {
|
||||
const c = await classifyRow(adapter, rawRows[i]!, mapping, rowNumber, policy, ctx);
|
||||
if (c.outcome === 'error') {
|
||||
counts.errored += 1;
|
||||
await recordRow(
|
||||
batchId,
|
||||
rowNumber,
|
||||
'errored',
|
||||
null,
|
||||
(c.errors ?? []).map((e) => `${e.field}: ${e.message}`).join('; ') || 'Invalid row',
|
||||
);
|
||||
} else if (c.outcome === 'skip') {
|
||||
counts.skipped += 1;
|
||||
await recordRow(batchId, rowNumber, 'skipped', c.existingId ?? null, null);
|
||||
} else if (c.outcome === 'update' && adapter.update) {
|
||||
await adapter.update(c.existingId!, c.mapped!, c.resolved ?? {}, ctx);
|
||||
counts.updated += 1;
|
||||
await recordRow(batchId, rowNumber, 'updated', c.existingId!, null);
|
||||
} else {
|
||||
// insert
|
||||
const { id } = await adapter.insert(c.mapped!, c.resolved ?? {}, ctx);
|
||||
counts.inserted += 1;
|
||||
await recordRow(batchId, rowNumber, 'inserted', id, null);
|
||||
}
|
||||
} catch (e) {
|
||||
counts.errored += 1;
|
||||
await recordRow(batchId, rowNumber, 'errored', null, (e as Error).message);
|
||||
}
|
||||
|
||||
if (rowNumber % 25 === 0) {
|
||||
await db
|
||||
.update(importBatches)
|
||||
.set({ ...counts })
|
||||
.where(eq(importBatches.id, batchId));
|
||||
}
|
||||
}
|
||||
|
||||
await db
|
||||
.update(importBatches)
|
||||
.set({ ...counts, status: 'completed', completedAt: new Date() })
|
||||
.where(eq(importBatches.id, batchId));
|
||||
|
||||
return counts;
|
||||
}
|
||||
|
||||
export interface UndoResult {
|
||||
deleted: number;
|
||||
blocked: number;
|
||||
blockedRows: number[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Hard-delete the rows a batch inserted (port-scoped). A delete blocked by a
|
||||
* dependent (FK) is reported, not forced. Marks the batch `undone` when every
|
||||
* inserted row was removed.
|
||||
*/
|
||||
export async function undoBatch(batchId: string, portId: string): Promise<UndoResult> {
|
||||
const [batch] = await db
|
||||
.select({
|
||||
id: importBatches.id,
|
||||
entityType: importBatches.entityType,
|
||||
status: importBatches.status,
|
||||
})
|
||||
.from(importBatches)
|
||||
.where(and(eq(importBatches.id, batchId), eq(importBatches.portId, portId)))
|
||||
.limit(1);
|
||||
if (!batch) throw new Error('Import batch not found');
|
||||
|
||||
if (!UNDOABLE.has(batch.entityType)) {
|
||||
throw new Error(`Undo not supported for entity "${batch.entityType}"`);
|
||||
}
|
||||
|
||||
const inserted = await db
|
||||
.select({
|
||||
id: importBatchRows.id,
|
||||
rowNumber: importBatchRows.rowNumber,
|
||||
entityId: importBatchRows.entityId,
|
||||
})
|
||||
.from(importBatchRows)
|
||||
.where(and(eq(importBatchRows.batchId, batchId), eq(importBatchRows.action, 'inserted')));
|
||||
|
||||
const result: UndoResult = { deleted: 0, blocked: 0, blockedRows: [] };
|
||||
for (const row of inserted) {
|
||||
if (!row.entityId) continue;
|
||||
try {
|
||||
await deleteEntity(batch.entityType, row.entityId, portId);
|
||||
// Flip the ledger row so a re-undo is a no-op.
|
||||
await db
|
||||
.update(importBatchRows)
|
||||
.set({ action: 'skipped', error: 'undone' })
|
||||
.where(eq(importBatchRows.id, row.id));
|
||||
result.deleted += 1;
|
||||
} catch {
|
||||
result.blocked += 1;
|
||||
result.blockedRows.push(row.rowNumber);
|
||||
}
|
||||
}
|
||||
|
||||
await db
|
||||
.update(importBatches)
|
||||
.set({ status: result.blocked === 0 ? 'undone' : 'completed' })
|
||||
.where(eq(importBatches.id, batchId));
|
||||
|
||||
return result;
|
||||
}
|
||||
@@ -1,42 +1,91 @@
|
||||
import { Worker, type Job } from 'bullmq';
|
||||
import { eq } from 'drizzle-orm';
|
||||
|
||||
import { env } from '@/lib/env';
|
||||
|
||||
import type { ConnectionOptions } from 'bullmq';
|
||||
import { logger } from '@/lib/logger';
|
||||
import { attachWorkerAudit } from '@/lib/queue/audit-helpers';
|
||||
import { QUEUE_CONFIGS } from '@/lib/queue';
|
||||
import { db } from '@/lib/db';
|
||||
import { importBatches } from '@/lib/db/schema/imports';
|
||||
import { getStorageBackend } from '@/lib/storage';
|
||||
import { parseImportFile } from '@/lib/import/engine';
|
||||
import { getAdapter } from '@/lib/import/registry';
|
||||
import { commitBatch } from '@/lib/import/commit';
|
||||
import type { ConflictPolicy } from '@/lib/import/types';
|
||||
|
||||
function streamToBuffer(stream: NodeJS.ReadableStream): Promise<Buffer> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const chunks: Buffer[] = [];
|
||||
stream.on('data', (c: Buffer) => chunks.push(c));
|
||||
stream.on('end', () => resolve(Buffer.concat(chunks)));
|
||||
stream.on('error', reject);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Bulk-import worker - DEFERRED FEATURE (placeholder).
|
||||
*
|
||||
* Status: registered with BullMQ so any future enqueue site lands on a
|
||||
* real worker instance instead of disappearing into an unbound queue.
|
||||
* No callers currently dispatch to this worker - the body is intentionally
|
||||
* a no-op that logs the dispatch for forensics.
|
||||
*
|
||||
* Why deferred (vs implemented inline):
|
||||
* - CSV/Excel import is a real product feature, not a refactor. Done
|
||||
* properly it needs: per-entity schema mapping (clients / berths /
|
||||
* interests / companies / yachts), zod-level row validation, per-row
|
||||
* error rollup with line-numbered diagnostics, dry-run preview,
|
||||
* progress reporting, dedupe-on-conflict policy, admin upload UI
|
||||
* with column-mapping UX. Building it speculatively without a
|
||||
* customer in the room would lock in a UX that may not match what
|
||||
* real importers want.
|
||||
* - When the trigger comes (a customer needs to bulk-load a season
|
||||
* roster or migrate from another CRM), build it from product spec
|
||||
* not from this placeholder.
|
||||
*
|
||||
* What's required to ship: papaparse (CSV) + a thin schema-per-entity
|
||||
* mapping layer, plus an admin /admin/import page with a per-entity
|
||||
* picker + file dropzone. The queue registration here stays as-is.
|
||||
* Bulk-import commit worker. Enqueued by the dry-run "commit" action with
|
||||
* `{ batchId }`. Re-reads the uploaded file from storage, re-classifies +
|
||||
* applies each row under the batch's conflict policy (see commitBatch), and
|
||||
* records per-row actions + live counts. Concurrency 1 (queue config) so
|
||||
* imports never race each other's dedup lookups.
|
||||
*/
|
||||
export const importWorker = new Worker(
|
||||
'import',
|
||||
async (job: Job) => {
|
||||
logger.info({ jobId: job.id, jobName: job.name }, 'Processing import job');
|
||||
// Deferred - no callers enqueue this. If a job lands, we log + swallow
|
||||
// so a future test enqueue doesn't trip the failed-job alert.
|
||||
const { batchId } = (job.data ?? {}) as { batchId?: string };
|
||||
logger.info({ jobId: job.id, batchId }, 'Processing import commit');
|
||||
if (!batchId) return;
|
||||
|
||||
const [batch] = await db
|
||||
.select()
|
||||
.from(importBatches)
|
||||
.where(eq(importBatches.id, batchId))
|
||||
.limit(1);
|
||||
if (!batch) {
|
||||
logger.warn({ batchId }, 'Import batch vanished before commit');
|
||||
return;
|
||||
}
|
||||
|
||||
const adapter = getAdapter(batch.entityType);
|
||||
if (!adapter || !batch.storageKey || !batch.mappingJson) {
|
||||
await db
|
||||
.update(importBatches)
|
||||
.set({ status: 'failed', completedAt: new Date() })
|
||||
.where(eq(importBatches.id, batchId));
|
||||
logger.error({ batchId, entityType: batch.entityType }, 'Import batch not committable');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const stream = await (await getStorageBackend()).get(batch.storageKey);
|
||||
const buffer = await streamToBuffer(stream);
|
||||
const parsed = await parseImportFile(batch.filename, buffer);
|
||||
await commitBatch({
|
||||
batchId,
|
||||
adapter,
|
||||
rawRows: parsed.rows,
|
||||
mapping: batch.mappingJson,
|
||||
policy: batch.conflictPolicy as ConflictPolicy,
|
||||
ctx: {
|
||||
portId: batch.portId,
|
||||
meta: {
|
||||
userId: batch.createdBy,
|
||||
portId: batch.portId,
|
||||
ipAddress: 'import-worker',
|
||||
userAgent: 'import-worker',
|
||||
},
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
await db
|
||||
.update(importBatches)
|
||||
.set({ status: 'failed', completedAt: new Date() })
|
||||
.where(eq(importBatches.id, batchId));
|
||||
logger.error({ batchId, err }, 'Import commit failed');
|
||||
throw err;
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: { url: env.REDIS_URL } as ConnectionOptions,
|
||||
|
||||
Reference in New Issue
Block a user