import { Worker, type Job } from 'bullmq'; import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; export const documentsWorker = new Worker( 'documents', async (job: Job) => { logger.info({ jobId: job.id, jobName: job.name }, 'Processing documents job'); switch (job.name) { case 'signature-poll': { const { processDocumensoPoll } = await import('@/jobs/processors/documenso-poll'); await processDocumensoPoll(); break; } case 'documenso-void': { // Async cleanup of a Documenso envelope. Producers: smart-archive // (when the operator opts to void in-flight envelopes during // client archive). BullMQ retries with exponential backoff per // QUEUE_CONFIGS; permanently-failed jobs land in the DLQ via // the failed-job listener. const { documentId, documensoId, portId } = job.data as { documentId: string; documensoId: string; portId: string; }; if (!documensoId) { logger.warn({ documentId }, 'documenso-void: no documensoId, skipping'); return; } const { voidDocument } = await import('@/lib/services/documenso-client'); await voidDocument(documensoId, portId); logger.info({ documentId, documensoId, portId }, 'Documenso envelope voided'); break; } default: logger.warn({ jobName: job.name }, 'Unknown documents job'); } }, { connection: { url: env.REDIS_URL } as ConnectionOptions, concurrency: QUEUE_CONFIGS.documents.concurrency, }, ); documentsWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Documents job failed'); }); attachWorkerAudit(documentsWorker, 'documents');