import { Worker, type Job } from 'bullmq'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; import { QUEUE_CONFIGS } from '@/lib/queue'; export const reportsWorker = new Worker( 'reports', async (job: Job) => { logger.info({ jobId: job.id, jobName: job.name }, 'Processing reports job'); switch (job.name) { case 'report-scheduler': { // Check scheduled_reports for reports due to run const { db } = await import('@/lib/db'); const { scheduledReports } = await import('@/lib/db/schema/operations'); const { generatedReports } = await import('@/lib/db/schema/operations'); const { eq, and, lte } = await import('drizzle-orm'); const dueReports = await db .select() .from(scheduledReports) .where( and( eq(scheduledReports.isActive, true), lte(scheduledReports.nextRunAt, new Date()), ), ); for (const report of dueReports) { const { getQueue } = await import('@/lib/queue'); const [genReport] = await db .insert(generatedReports) .values({ portId: report.portId, scheduledReportId: report.id, reportType: report.reportType, name: `${report.name} - ${new Date().toISOString().split('T')[0]}`, status: 'queued', parameters: (report.config as Record) ?? {}, requestedBy: report.createdBy, }) .returning(); if (genReport) { await getQueue('reports').add('generate-report', { reportJobId: genReport.id, }); } } break; } case 'generate-report': { const { reportJobId } = job.data as { reportJobId: string }; const { generateReport } = await import('@/lib/services/reports.service'); await generateReport(reportJobId); break; } default: logger.warn({ jobName: job.name }, 'Unknown reports job'); } }, { connection: { url: process.env.REDIS_URL! } as ConnectionOptions, concurrency: QUEUE_CONFIGS.reports.concurrency, }, ); reportsWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Reports job failed'); });