import { Worker, type Job } from 'bullmq'; import { eq } from 'drizzle-orm'; import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; import { db } from '@/lib/db'; import { importBatches } from '@/lib/db/schema/imports'; import { getStorageBackend } from '@/lib/storage'; import { parseImportFile } from '@/lib/import/engine'; import { getAdapter } from '@/lib/import/registry'; import { commitBatch } from '@/lib/import/commit'; import type { ConflictPolicy } from '@/lib/import/types'; function streamToBuffer(stream: NodeJS.ReadableStream): Promise { return new Promise((resolve, reject) => { const chunks: Buffer[] = []; stream.on('data', (c: Buffer) => chunks.push(c)); stream.on('end', () => resolve(Buffer.concat(chunks))); stream.on('error', reject); }); } /** * Bulk-import commit worker. Enqueued by the dry-run "commit" action with * `{ batchId }`. Re-reads the uploaded file from storage, re-classifies + * applies each row under the batch's conflict policy (see commitBatch), and * records per-row actions + live counts. Concurrency 1 (queue config) so * imports never race each other's dedup lookups. */ export const importWorker = new Worker( 'import', async (job: Job) => { const { batchId } = (job.data ?? {}) as { batchId?: string }; logger.info({ jobId: job.id, batchId }, 'Processing import commit'); if (!batchId) return; const [batch] = await db .select() .from(importBatches) .where(eq(importBatches.id, batchId)) .limit(1); if (!batch) { logger.warn({ batchId }, 'Import batch vanished before commit'); return; } // Idempotency guard (audit M27): only a batch still awaiting commit may be // committed. A re-enqueue (future commit endpoint, operator re-trigger, or // any stray duplicate job) of an already-committing/completed/failed/undone // batch must NOT re-run — re-processing appends a second full set of // import_batch_rows, so undo later sees both run-1 inserts and run-2 skips // and the header counts no longer reconcile with the ledger undo trusts. // commitBatch itself also gates the status transition with a conditional // UPDATE (defense in depth against a TOCTOU race with another worker), but // this early return avoids the wasted file re-read + parse in the common // case. NOTE for the future authorization boundary (audit L35): when a // commit/dry-run API route lands it MUST re-derive portId from the session // and assert batch.portId === session.portId before enqueuing, and gate on // an `import` permission — this worker trusts batch.portId wholesale. if (batch.status !== 'dry_run' && batch.status !== 'uploaded') { logger.warn( { batchId, status: batch.status }, 'Import batch already past the commit gate — skipping re-run', ); return; } const adapter = getAdapter(batch.entityType); if (!adapter || !batch.storageKey || !batch.mappingJson) { await db .update(importBatches) .set({ status: 'failed', completedAt: new Date() }) .where(eq(importBatches.id, batchId)); logger.error({ batchId, entityType: batch.entityType }, 'Import batch not committable'); return; } try { const stream = await (await getStorageBackend()).get(batch.storageKey); const buffer = await streamToBuffer(stream); const parsed = await parseImportFile(batch.filename, buffer); await commitBatch({ batchId, adapter, rawRows: parsed.rows, mapping: batch.mappingJson, policy: batch.conflictPolicy as ConflictPolicy, ctx: { // Trust boundary (audit L35): portId is taken from the persisted // batch and trusted. Safe only because batches are created // server-side with no client-supplied portId today. The commit/ // dry-run API route, when it lands, MUST re-derive portId from the // session, assert batch.portId === session.portId, and gate on an // `import` permission before enqueuing this job. See ImportCtx.portId. portId: batch.portId, meta: { userId: batch.createdBy, portId: batch.portId, ipAddress: 'import-worker', userAgent: 'import-worker', }, }, }); } catch (err) { await db .update(importBatches) .set({ status: 'failed', completedAt: new Date() }) .where(eq(importBatches.id, batchId)); logger.error({ batchId, err }, 'Import commit failed'); throw err; } }, { connection: { url: env.REDIS_URL } as ConnectionOptions, concurrency: QUEUE_CONFIGS.import.concurrency, }, ); importWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Import job failed'); }); attachWorkerAudit(importWorker, 'import');