M25 DB unique-index backstop deferred: needs a migration (column + backfill + insert-stamp trigger + dedup) — tracked as a follow-up. The classify in-file dedup (preview accuracy) ships now. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
128 lines
5.0 KiB
TypeScript
128 lines
5.0 KiB
TypeScript
import { Worker, type Job } from 'bullmq';
|
|
import { eq } from 'drizzle-orm';
|
|
|
|
import { env } from '@/lib/env';
|
|
|
|
import type { ConnectionOptions } from 'bullmq';
|
|
import { logger } from '@/lib/logger';
|
|
import { attachWorkerAudit } from '@/lib/queue/audit-helpers';
|
|
import { QUEUE_CONFIGS } from '@/lib/queue';
|
|
import { db } from '@/lib/db';
|
|
import { importBatches } from '@/lib/db/schema/imports';
|
|
import { getStorageBackend } from '@/lib/storage';
|
|
import { parseImportFile } from '@/lib/import/engine';
|
|
import { getAdapter } from '@/lib/import/registry';
|
|
import { commitBatch } from '@/lib/import/commit';
|
|
import type { ConflictPolicy } from '@/lib/import/types';
|
|
|
|
function streamToBuffer(stream: NodeJS.ReadableStream): Promise<Buffer> {
|
|
return new Promise((resolve, reject) => {
|
|
const chunks: Buffer[] = [];
|
|
stream.on('data', (c: Buffer) => chunks.push(c));
|
|
stream.on('end', () => resolve(Buffer.concat(chunks)));
|
|
stream.on('error', reject);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Bulk-import commit worker. Enqueued by the dry-run "commit" action with
|
|
* `{ batchId }`. Re-reads the uploaded file from storage, re-classifies +
|
|
* applies each row under the batch's conflict policy (see commitBatch), and
|
|
* records per-row actions + live counts. Concurrency 1 (queue config) so
|
|
* imports never race each other's dedup lookups.
|
|
*/
|
|
export const importWorker = new Worker(
|
|
'import',
|
|
async (job: Job) => {
|
|
const { batchId } = (job.data ?? {}) as { batchId?: string };
|
|
logger.info({ jobId: job.id, batchId }, 'Processing import commit');
|
|
if (!batchId) return;
|
|
|
|
const [batch] = await db
|
|
.select()
|
|
.from(importBatches)
|
|
.where(eq(importBatches.id, batchId))
|
|
.limit(1);
|
|
if (!batch) {
|
|
logger.warn({ batchId }, 'Import batch vanished before commit');
|
|
return;
|
|
}
|
|
|
|
// Idempotency guard (audit M27): only a batch still awaiting commit may be
|
|
// committed. A re-enqueue (future commit endpoint, operator re-trigger, or
|
|
// any stray duplicate job) of an already-committing/completed/failed/undone
|
|
// batch must NOT re-run — re-processing appends a second full set of
|
|
// import_batch_rows, so undo later sees both run-1 inserts and run-2 skips
|
|
// and the header counts no longer reconcile with the ledger undo trusts.
|
|
// commitBatch itself also gates the status transition with a conditional
|
|
// UPDATE (defense in depth against a TOCTOU race with another worker), but
|
|
// this early return avoids the wasted file re-read + parse in the common
|
|
// case. NOTE for the future authorization boundary (audit L35): when a
|
|
// commit/dry-run API route lands it MUST re-derive portId from the session
|
|
// and assert batch.portId === session.portId before enqueuing, and gate on
|
|
// an `import` permission — this worker trusts batch.portId wholesale.
|
|
if (batch.status !== 'dry_run' && batch.status !== 'uploaded') {
|
|
logger.warn(
|
|
{ batchId, status: batch.status },
|
|
'Import batch already past the commit gate — skipping re-run',
|
|
);
|
|
return;
|
|
}
|
|
|
|
const adapter = getAdapter(batch.entityType);
|
|
if (!adapter || !batch.storageKey || !batch.mappingJson) {
|
|
await db
|
|
.update(importBatches)
|
|
.set({ status: 'failed', completedAt: new Date() })
|
|
.where(eq(importBatches.id, batchId));
|
|
logger.error({ batchId, entityType: batch.entityType }, 'Import batch not committable');
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const stream = await (await getStorageBackend()).get(batch.storageKey);
|
|
const buffer = await streamToBuffer(stream);
|
|
const parsed = await parseImportFile(batch.filename, buffer);
|
|
await commitBatch({
|
|
batchId,
|
|
adapter,
|
|
rawRows: parsed.rows,
|
|
mapping: batch.mappingJson,
|
|
policy: batch.conflictPolicy as ConflictPolicy,
|
|
ctx: {
|
|
// Trust boundary (audit L35): portId is taken from the persisted
|
|
// batch and trusted. Safe only because batches are created
|
|
// server-side with no client-supplied portId today. The commit/
|
|
// dry-run API route, when it lands, MUST re-derive portId from the
|
|
// session, assert batch.portId === session.portId, and gate on an
|
|
// `import` permission before enqueuing this job. See ImportCtx.portId.
|
|
portId: batch.portId,
|
|
meta: {
|
|
userId: batch.createdBy,
|
|
portId: batch.portId,
|
|
ipAddress: 'import-worker',
|
|
userAgent: 'import-worker',
|
|
},
|
|
},
|
|
});
|
|
} catch (err) {
|
|
await db
|
|
.update(importBatches)
|
|
.set({ status: 'failed', completedAt: new Date() })
|
|
.where(eq(importBatches.id, batchId));
|
|
logger.error({ batchId, err }, 'Import commit failed');
|
|
throw err;
|
|
}
|
|
},
|
|
{
|
|
connection: { url: env.REDIS_URL } as ConnectionOptions,
|
|
concurrency: QUEUE_CONFIGS.import.concurrency,
|
|
},
|
|
);
|
|
|
|
importWorker.on('failed', (job, err) => {
|
|
logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Import job failed');
|
|
});
|
|
|
|
attachWorkerAudit(importWorker, 'import');
|