import { Worker, type Job } from 'bullmq'; import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; export const reportsWorker = new Worker( 'reports', async (job: Job) => { logger.info({ jobId: job.id, jobName: job.name }, 'Processing reports job'); switch (job.name) { case 'report-scheduler': { // Check scheduled_reports for reports due to run. // // datetime-auditor C3: the previous version selected due rows // and enqueued the generate-report job but NEVER advanced // `next_run_at`. The minutely scheduler then re-fired every // single tick until a human zeroed the row out — for // weekly/monthly reports that's an instant flood of dupe // emails to recipients. Now we compute the next fire from // the cron expression and UPDATE the row atomically. const { db } = await import('@/lib/db'); const { scheduledReports } = await import('@/lib/db/schema/operations'); const { generatedReports } = await import('@/lib/db/schema/operations'); const { eq, and, lte } = await import('drizzle-orm'); const { CronExpressionParser } = await import('cron-parser'); const dueReports = await db .select() .from(scheduledReports) .where( and(eq(scheduledReports.isActive, true), lte(scheduledReports.nextRunAt, new Date())), ); for (const report of dueReports) { const { getQueue } = await import('@/lib/queue'); // Compute next_run_at BEFORE the enqueue so a failure in the // parse path (malformed cron) doesn't get repeat-fired. let nextRunAt: Date | null = null; try { nextRunAt = CronExpressionParser.parse(report.schedule, { currentDate: new Date(), tz: process.env.SCHEDULER_TZ ?? 'Europe/Warsaw', }) .next() .toDate(); } catch (err) { logger.error( { err, reportId: report.id, schedule: report.schedule }, 'Failed to parse cron schedule for scheduled report; pausing it', ); // Disable the row so we don't re-attempt the malformed cron // every minute. await db .update(scheduledReports) .set({ isActive: false, updatedAt: new Date() }) .where(eq(scheduledReports.id, report.id)); continue; } await db .update(scheduledReports) .set({ nextRunAt, updatedAt: new Date() }) .where(eq(scheduledReports.id, report.id)); const [genReport] = await db .insert(generatedReports) .values({ portId: report.portId, scheduledReportId: report.id, reportType: report.reportType, name: `${report.name} - ${new Date().toISOString().split('T')[0]}`, status: 'queued', parameters: (report.config as Record) ?? {}, requestedBy: report.createdBy, }) .returning(); if (genReport) { await getQueue('reports').add('generate-report', { reportJobId: genReport.id, }); } } break; } case 'generate-report': { const { reportJobId } = job.data as { reportJobId: string }; const { generateReport } = await import('@/lib/services/reports.service'); await generateReport(reportJobId); break; } default: logger.warn({ jobName: job.name }, 'Unknown reports job'); } }, { connection: { url: env.REDIS_URL } as ConnectionOptions, concurrency: QUEUE_CONFIGS.reports.concurrency, }, ); reportsWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Reports job failed'); }); attachWorkerAudit(reportsWorker, 'reports');