/** * Shared BullMQ → audit log glue. * * Each worker calls `attachWorkerAudit(worker, workerName)` after * defining itself. We listen on the worker's BullMQ events and emit: * * - `job_failed` (severity error, source 'job') for every BullMQ * `failed` event, regardless of attempt number. (Producers know if * this was a final failure via the existing per-worker logic.) * - `cron_run` (severity info, source 'cron') for every successful * completion of a job whose name matches a recurring scheduler * entry — gives operators a heartbeat row per cron tick. * * Audit writes are fire-and-forget and never throw. */ import type { Job, Worker } from 'bullmq'; import { createAuditLog } from '@/lib/audit'; import { logger } from '@/lib/logger'; /** * Names that match recurring jobs registered in `scheduler.ts`. * Keep in sync — a typo here just means the cron-tick row gets logged * as a regular job instead of a cron run, no functional impact. */ const RECURRING_JOB_NAMES: ReadonlySet = new Set([ 'signature-poll', 'reminder-check', 'reminder-overdue-check', 'calendar-sync', 'invoice-overdue-check', 'tenure-expiry-check', 'currency-refresh', 'database-backup', 'backup-cleanup', 'session-cleanup', 'report-scheduler', 'notification-digest', 'temp-file-cleanup', 'form-expiry-check', 'alerts-evaluate', 'analytics-refresh', 'gdpr-export-cleanup', 'ai-usage-retention', 'error-events-retention', 'website-submissions-retention', ]); export function attachWorkerAudit(worker: Worker, workerName: string): void { worker.on('failed', (job: Job | undefined, err: Error) => { void createAuditLog({ userId: null, portId: null, action: 'job_failed', entityType: 'queue_job', entityId: job?.id ?? `${workerName}:unknown`, metadata: { worker: workerName, jobName: job?.name ?? 'unknown', attemptsMade: job?.attemptsMade ?? null, opts: job?.opts ? { attempts: job.opts.attempts } : null, error: err.message?.slice(0, 1024) ?? null, }, severity: 'error', source: 'job', }); }); worker.on('completed', (job: Job) => { if (!RECURRING_JOB_NAMES.has(job.name)) return; void createAuditLog({ userId: null, portId: null, action: 'cron_run', entityType: 'cron', entityId: job.name, metadata: { worker: workerName, jobId: job.id ?? null, durationMs: job.processedOn && job.finishedOn ? job.finishedOn - job.processedOn : null, }, severity: 'info', source: 'cron', }); }); // Defensive logger — surface any audit-side failure to the worker log. worker.on('error', (err) => { logger.warn({ workerName, err }, 'BullMQ worker error'); }); }