import { Worker, type Job } from 'bullmq'; import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; export const documentsWorker = new Worker( 'documents', async (job: Job) => { logger.info({ jobId: job.id, jobName: job.name }, 'Processing documents job'); switch (job.name) { case 'signature-poll': { const { processDocumensoPoll } = await import('@/jobs/processors/documenso-poll'); await processDocumensoPoll(); break; } case 'documenso-void': { // Async cleanup of a Documenso envelope. Producers: smart-archive // (when the operator opts to void in-flight envelopes during // client archive). BullMQ retries with exponential backoff per // QUEUE_CONFIGS; permanently-failed jobs land in the DLQ via // the failed-job listener. const { documentId, documensoId, portId } = job.data as { documentId: string; documensoId: string; portId: string; }; if (!documensoId) { logger.warn({ documentId }, 'documenso-void: no documensoId, skipping'); return; } const { voidDocument } = await import('@/lib/services/documenso-client'); await voidDocument(documensoId, portId); logger.info({ documentId, documensoId, portId }, 'Documenso envelope voided'); break; } default: logger.warn({ jobName: job.name }, 'Unknown documents job'); } }, { connection: { url: env.REDIS_URL } as ConnectionOptions, concurrency: QUEUE_CONFIGS.documents.concurrency, }, ); documentsWorker.on('failed', async (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Documents job failed'); // Final-attempt failure on documenso-void → notify all super admins // so they can void the envelope manually in Documenso. Without this // alert hook, a persistent 401/403 from Documenso retries until // BullMQ exhausts attempts and the failure disappears into the // audit log unnoticed. if (job?.name === 'documenso-void' && job.attemptsMade >= (job.opts.attempts ?? 1)) { try { const { documentId, documensoId, portId } = (job.data ?? {}) as { documentId?: string; documensoId?: string; portId?: string; }; if (!documentId || !documensoId) return; const { db } = await import('@/lib/db'); const { userProfiles } = await import('@/lib/db/schema/users'); const { createNotification } = await import('@/lib/services/notifications.service'); const { eq, and } = await import('drizzle-orm'); const superAdmins = await db .select({ userId: userProfiles.userId }) .from(userProfiles) .where(and(eq(userProfiles.isSuperAdmin, true), eq(userProfiles.isActive, true))); // createNotification requires a portId; if the job didn't carry // one we can't tag the notification — bail out cleanly. if (!portId) return; for (const admin of superAdmins) { void createNotification({ portId, userId: admin.userId, type: 'system_alert', title: 'Documenso void failed', description: `Document ${documentId.slice(0, 8)}… could not be voided in Documenso after ${job.attemptsMade} attempts. Void manually in Documenso if still active.`, link: `/admin/documents`, entityType: 'document', entityId: documentId, dedupeKey: `doc:void_failed:${documentId}`, cooldownMs: 0, }); } } catch (notifyErr) { logger.error({ notifyErr }, 'Failed to alert super-admins of documenso-void DLQ'); } } }); attachWorkerAudit(documentsWorker, 'documents');