import { Worker, type Job } from 'bullmq'; import { eq } from 'drizzle-orm'; import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; import { db } from '@/lib/db'; import { importBatches } from '@/lib/db/schema/imports'; import { getStorageBackend } from '@/lib/storage'; import { parseImportFile } from '@/lib/import/engine'; import { getAdapter } from '@/lib/import/registry'; import { commitBatch } from '@/lib/import/commit'; import type { ConflictPolicy } from '@/lib/import/types'; function streamToBuffer(stream: NodeJS.ReadableStream): Promise { return new Promise((resolve, reject) => { const chunks: Buffer[] = []; stream.on('data', (c: Buffer) => chunks.push(c)); stream.on('end', () => resolve(Buffer.concat(chunks))); stream.on('error', reject); }); } /** * Bulk-import commit worker. Enqueued by the dry-run "commit" action with * `{ batchId }`. Re-reads the uploaded file from storage, re-classifies + * applies each row under the batch's conflict policy (see commitBatch), and * records per-row actions + live counts. Concurrency 1 (queue config) so * imports never race each other's dedup lookups. */ export const importWorker = new Worker( 'import', async (job: Job) => { const { batchId } = (job.data ?? {}) as { batchId?: string }; logger.info({ jobId: job.id, batchId }, 'Processing import commit'); if (!batchId) return; const [batch] = await db .select() .from(importBatches) .where(eq(importBatches.id, batchId)) .limit(1); if (!batch) { logger.warn({ batchId }, 'Import batch vanished before commit'); return; } const adapter = getAdapter(batch.entityType); if (!adapter || !batch.storageKey || !batch.mappingJson) { await db .update(importBatches) .set({ status: 'failed', completedAt: new Date() }) .where(eq(importBatches.id, batchId)); logger.error({ batchId, entityType: batch.entityType }, 'Import batch not committable'); return; } try { const stream = await (await getStorageBackend()).get(batch.storageKey); const buffer = await streamToBuffer(stream); const parsed = await parseImportFile(batch.filename, buffer); await commitBatch({ batchId, adapter, rawRows: parsed.rows, mapping: batch.mappingJson, policy: batch.conflictPolicy as ConflictPolicy, ctx: { portId: batch.portId, meta: { userId: batch.createdBy, portId: batch.portId, ipAddress: 'import-worker', userAgent: 'import-worker', }, }, }); } catch (err) { await db .update(importBatches) .set({ status: 'failed', completedAt: new Date() }) .where(eq(importBatches.id, batchId)); logger.error({ batchId, err }, 'Import commit failed'); throw err; } }, { connection: { url: env.REDIS_URL } as ConnectionOptions, concurrency: QUEUE_CONFIGS.import.concurrency, }, ); importWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Import job failed'); }); attachWorkerAudit(importWorker, 'import');