Files
pn-new-crm/src/lib/queue/workers/reports.ts
Matt b4e502fedd fix(audit-wave-11): BullMQ jobId plumbing for natural dedup
concurrency-auditor C-2: every queue.add(...) site previously enqueued
without a stable jobId, so a double-dispatch (webhook retry, double-
click on Send, scheduler tick collision) would create two queue jobs
and the downstream worker would deliver twice. BullMQ rejects a
duplicate jobId while the original is still queued or active, so a
stable per-entity key gives at-most-once semantics naturally.

Added jobIds across all 10 enqueue sites:

- email send-invoice → `send-invoice:<invoiceId>`
- notifications invoice-overdue-notify → keyed per UTC day so dupes
  collapse intra-day but tomorrow's run can re-notify if unpaid
- export gdpr-export → keyed on the exportId (unique per request)
- webhooks deliver (3 sites: dispatch, retry, test) → keyed on the
  webhook_deliveries row UUID
- maintenance expense-dedup-scan → keyed on expenseId
- notifications send-notification-email → keyed on notification id
- email send-inquiry-confirmation → keyed on interestId (1 per
  submission)
- email send-inquiry-sales-notification → keyed on interestId+email
  (1 per recipient per submission)
- reports generate-report → keyed on the generated_reports row id

Pure refactor — no UX impact. Closes the BullMQ dedup gap that was
the second half of the concurrency-auditor's CRITICAL-tier findings.

Test fixture update: gdpr-export integration test now asserts the
jobId option on the queue.add call.

Tests 1315/1315.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 13:02:38 +02:00

116 lines
4.2 KiB
TypeScript

import { Worker, type Job } from 'bullmq';
import { env } from '@/lib/env';
import type { ConnectionOptions } from 'bullmq';
import { logger } from '@/lib/logger';
import { attachWorkerAudit } from '@/lib/queue/audit-helpers';
import { QUEUE_CONFIGS } from '@/lib/queue';
export const reportsWorker = new Worker(
'reports',
async (job: Job) => {
logger.info({ jobId: job.id, jobName: job.name }, 'Processing reports job');
switch (job.name) {
case 'report-scheduler': {
// Check scheduled_reports for reports due to run.
//
// datetime-auditor C3: the previous version selected due rows
// and enqueued the generate-report job but NEVER advanced
// `next_run_at`. The minutely scheduler then re-fired every
// single tick until a human zeroed the row out — for
// weekly/monthly reports that's an instant flood of dupe
// emails to recipients. Now we compute the next fire from
// the cron expression and UPDATE the row atomically.
const { db } = await import('@/lib/db');
const { scheduledReports } = await import('@/lib/db/schema/operations');
const { generatedReports } = await import('@/lib/db/schema/operations');
const { eq, and, lte } = await import('drizzle-orm');
const { CronExpressionParser } = await import('cron-parser');
const dueReports = await db
.select()
.from(scheduledReports)
.where(
and(eq(scheduledReports.isActive, true), lte(scheduledReports.nextRunAt, new Date())),
);
for (const report of dueReports) {
const { getQueue } = await import('@/lib/queue');
// Compute next_run_at BEFORE the enqueue so a failure in the
// parse path (malformed cron) doesn't get repeat-fired.
let nextRunAt: Date | null = null;
try {
nextRunAt = CronExpressionParser.parse(report.schedule, {
currentDate: new Date(),
tz: process.env.SCHEDULER_TZ ?? 'Europe/Warsaw',
})
.next()
.toDate();
} catch (err) {
logger.error(
{ err, reportId: report.id, schedule: report.schedule },
'Failed to parse cron schedule for scheduled report; pausing it',
);
// Disable the row so we don't re-attempt the malformed cron
// every minute.
await db
.update(scheduledReports)
.set({ isActive: false, updatedAt: new Date() })
.where(eq(scheduledReports.id, report.id));
continue;
}
await db
.update(scheduledReports)
.set({ nextRunAt, updatedAt: new Date() })
.where(eq(scheduledReports.id, report.id));
const [genReport] = await db
.insert(generatedReports)
.values({
portId: report.portId,
scheduledReportId: report.id,
reportType: report.reportType,
name: `${report.name} - ${new Date().toISOString().split('T')[0]}`,
status: 'queued',
parameters: (report.config as Record<string, unknown>) ?? {},
requestedBy: report.createdBy,
})
.returning();
if (genReport) {
await getQueue('reports').add(
'generate-report',
{ reportJobId: genReport.id },
{ jobId: `generate-report:${genReport.id}` },
);
}
}
break;
}
case 'generate-report': {
const { reportJobId } = job.data as { reportJobId: string };
const { generateReport } = await import('@/lib/services/reports.service');
await generateReport(reportJobId);
break;
}
default:
logger.warn({ jobName: job.name }, 'Unknown reports job');
}
},
{
connection: { url: env.REDIS_URL } as ConnectionOptions,
concurrency: QUEUE_CONFIGS.reports.concurrency,
},
);
reportsWorker.on('failed', (job, err) => {
logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Reports job failed');
});
attachWorkerAudit(reportsWorker, 'reports');