diff --git a/src/lib/queue/workers/notifications.ts b/src/lib/queue/workers/notifications.ts index 911a2ffc..0ca8a517 100644 --- a/src/lib/queue/workers/notifications.ts +++ b/src/lib/queue/workers/notifications.ts @@ -92,10 +92,17 @@ export const notificationsWorker = new Worker( ? await getPortBrandingConfig(notif.portId).catch(() => null) : null; const prefix = portBrand?.appName?.trim() || 'CRM'; + // L7: pass `portId` (6th positional arg) so `getPortEmailConfig` + // resolves the notification's per-port send-from identity instead + // of falling back to the global default From. `from`/`text` stay + // undefined. await sendEmail( authUser.email, `[${prefix}] ${notif.title}`, `

${bodyText}

${linkHtml}`, + undefined, + undefined, + notif.portId ?? undefined, ); await db diff --git a/src/lib/queue/workers/reports.ts b/src/lib/queue/workers/reports.ts index e9c8d2c6..b5a1e1a8 100644 --- a/src/lib/queue/workers/reports.ts +++ b/src/lib/queue/workers/reports.ts @@ -22,69 +22,92 @@ export const reportsWorker = new Worker( // weekly/monthly reports that's an instant flood of dupe // emails to recipients. Now we compute the next fire from // the cron expression and UPDATE the row atomically. + // + // L6: this poller does a select-due → per-row update. With a + // single `crm-worker` (concurrency 1) that's safe, but the moment + // `MULTI_NODE_DEPLOYMENT` adds a second replica two pollers would + // both read the same due rows and double-fire (duplicate runs + + // email blasts). We now atomically CLAIM due rows with + // `FOR UPDATE SKIP LOCKED` inside a transaction: a concurrent + // replica skips rows this tx already holds, so each due row is + // claimed by exactly one poller. `nextRunAt` is row-specific + // (cron-derived) so we keep the per-row update — the row lock, + // not a bulk UPDATE, is what makes the claim atomic. Enqueues are + // deferred to AFTER commit so a rolled-back claim never leaves an + // orphaned generate-report job. const { db } = await import('@/lib/db'); const { scheduledReports } = await import('@/lib/db/schema/operations'); const { generatedReports } = await import('@/lib/db/schema/operations'); const { eq, and, lte } = await import('drizzle-orm'); const { CronExpressionParser } = await import('cron-parser'); - const dueReports = await db - .select() - .from(scheduledReports) - .where( - and(eq(scheduledReports.isActive, true), lte(scheduledReports.nextRunAt, new Date())), - ); + const enqueueIds: string[] = []; - for (const report of dueReports) { - const { getQueue } = await import('@/lib/queue'); + await db.transaction(async (tx) => { + const dueReports = await tx + .select() + .from(scheduledReports) + .where( + and(eq(scheduledReports.isActive, true), lte(scheduledReports.nextRunAt, new Date())), + ) + .for('update', { skipLocked: true }); - // Compute next_run_at BEFORE the enqueue so a failure in the - // parse path (malformed cron) doesn't get repeat-fired. - let nextRunAt: Date | null = null; - try { - nextRunAt = CronExpressionParser.parse(report.schedule, { - currentDate: new Date(), - tz: process.env.SCHEDULER_TZ ?? 'Europe/Warsaw', - }) - .next() - .toDate(); - } catch (err) { - logger.error( - { err, reportId: report.id, schedule: report.schedule }, - 'Failed to parse cron schedule for scheduled report; pausing it', - ); - // Disable the row so we don't re-attempt the malformed cron - // every minute. - await db + for (const report of dueReports) { + // Compute next_run_at BEFORE the enqueue so a failure in the + // parse path (malformed cron) doesn't get repeat-fired. + let nextRunAt: Date | null = null; + try { + nextRunAt = CronExpressionParser.parse(report.schedule, { + currentDate: new Date(), + tz: process.env.SCHEDULER_TZ ?? 'Europe/Warsaw', + }) + .next() + .toDate(); + } catch (err) { + logger.error( + { err, reportId: report.id, schedule: report.schedule }, + 'Failed to parse cron schedule for scheduled report; pausing it', + ); + // Disable the row so we don't re-attempt the malformed cron + // every minute. + await tx + .update(scheduledReports) + .set({ isActive: false, updatedAt: new Date() }) + .where(eq(scheduledReports.id, report.id)); + continue; + } + + await tx .update(scheduledReports) - .set({ isActive: false, updatedAt: new Date() }) + .set({ nextRunAt, updatedAt: new Date() }) .where(eq(scheduledReports.id, report.id)); - continue; + + const [genReport] = await tx + .insert(generatedReports) + .values({ + portId: report.portId, + scheduledReportId: report.id, + reportType: report.reportType, + name: `${report.name} - ${new Date().toISOString().split('T')[0]}`, + status: 'queued', + parameters: (report.config as Record) ?? {}, + requestedBy: report.createdBy, + }) + .returning(); + + if (genReport) { + enqueueIds.push(genReport.id); + } } + }); - await db - .update(scheduledReports) - .set({ nextRunAt, updatedAt: new Date() }) - .where(eq(scheduledReports.id, report.id)); - - const [genReport] = await db - .insert(generatedReports) - .values({ - portId: report.portId, - scheduledReportId: report.id, - reportType: report.reportType, - name: `${report.name} - ${new Date().toISOString().split('T')[0]}`, - status: 'queued', - parameters: (report.config as Record) ?? {}, - requestedBy: report.createdBy, - }) - .returning(); - - if (genReport) { + if (enqueueIds.length > 0) { + const { getQueue } = await import('@/lib/queue'); + for (const genReportId of enqueueIds) { await getQueue('reports').add( 'generate-report', - { reportJobId: genReport.id }, - { jobId: `generate-report:${genReport.id}` }, + { reportJobId: genReportId }, + { jobId: `generate-report:${genReportId}` }, ); } } @@ -102,46 +125,73 @@ export const reportsWorker = new Worker( case 'report-schedules-poll': { // Scan report_schedules due to fire, mint a report_runs row per // schedule, advance next_run_at by cadence math, enqueue render. + // + // L6: same select-due → per-row update shape as the legacy poller + // above. Safe under the single `crm-worker` (concurrency 1) today, + // but double-fires under multiple replicas once + // `MULTI_NODE_DEPLOYMENT` is on. We atomically CLAIM due rows in a + // `FOR UPDATE SKIP LOCKED` transaction that ALSO advances + // `nextRunAt`/`lastRunAt` (and pauses templateless rows). Because + // the claim advances `nextRunAt` past `now`, a concurrent replica + // re-polling immediately afterwards no longer sees the row as due, + // and `SKIP LOCKED` keeps two pollers from claiming the same row + // mid-flight. The heavier per-row work (`createReportRun` + render + // enqueue) runs AFTER commit on the claimed rows — `createReportRun` + // is a service that uses its own db handle, and advancing the fire + // time before minting already preserves the "no-op doesn't slip" + // rule, so a downstream mint failure just retries on the next poll. const { db } = await import('@/lib/db'); const { reportSchedules, reportTemplates } = await import('@/lib/db/schema/reports'); const { createReportRun } = await import('@/lib/services/report-runs.service'); const { nextRunFor } = await import('@/lib/services/report-schedules.service'); const { and, eq, lte } = await import('drizzle-orm'); + type ReportSchedule = import('@/lib/db/schema/reports').ReportSchedule; + type ReportTemplate = import('@/lib/db/schema/reports').ReportTemplate; const now = new Date(); - const due = await db - .select() - .from(reportSchedules) - .where(and(eq(reportSchedules.enabled, true), lte(reportSchedules.nextRunAt, now))); - for (const schedule of due) { - const template = await db.query.reportTemplates.findFirst({ - where: eq(reportTemplates.id, schedule.templateId), - }); - if (!template) { - logger.warn( - { scheduleId: schedule.id, templateId: schedule.templateId }, - 'Skipping schedule: template missing (likely archived); pausing', - ); - await db + const claimed: Array<{ schedule: ReportSchedule; template: ReportTemplate }> = []; + + await db.transaction(async (tx) => { + const due = await tx + .select() + .from(reportSchedules) + .where(and(eq(reportSchedules.enabled, true), lte(reportSchedules.nextRunAt, now))) + .for('update', { skipLocked: true }); + + for (const schedule of due) { + const template = await tx.query.reportTemplates.findFirst({ + where: eq(reportTemplates.id, schedule.templateId), + }); + if (!template) { + logger.warn( + { scheduleId: schedule.id, templateId: schedule.templateId }, + 'Skipping schedule: template missing (likely archived); pausing', + ); + await tx + .update(reportSchedules) + .set({ enabled: false, updatedAt: new Date() }) + .where(eq(reportSchedules.id, schedule.id)); + continue; + } + + // Compute the next fire BEFORE the enqueue so a downstream + // failure (storage outage, etc.) doesn't pin the schedule on + // the same tick — preserves the "no-op doesn't slip" rule. + await tx .update(reportSchedules) - .set({ enabled: false, updatedAt: new Date() }) + .set({ + lastRunAt: now, + nextRunAt: nextRunFor(schedule.cadence as Parameters[0], now), + updatedAt: new Date(), + }) .where(eq(reportSchedules.id, schedule.id)); - continue; + + claimed.push({ schedule, template }); } + }); - // Compute the next fire BEFORE the enqueue so a downstream - // failure (storage outage, etc.) doesn't pin the schedule on - // the same tick — preserves the "no-op doesn't slip" rule. - await db - .update(reportSchedules) - .set({ - lastRunAt: now, - nextRunAt: nextRunFor(schedule.cadence as Parameters[0], now), - updatedAt: new Date(), - }) - .where(eq(reportSchedules.id, schedule.id)); - + for (const { schedule, template } of claimed) { try { const { REPORT_KINDS } = await import('@/lib/validators/reports'); const kindNarrowed = (REPORT_KINDS as readonly string[]).includes(template.kind) diff --git a/src/lib/services/report-render.service.ts b/src/lib/services/report-render.service.ts index 026aae79..b2df134f 100644 --- a/src/lib/services/report-render.service.ts +++ b/src/lib/services/report-render.service.ts @@ -17,6 +17,8 @@ * land alongside the builder UI (P4+). */ +import { createHash } from 'node:crypto'; + import { and, eq } from 'drizzle-orm'; import { db } from '@/lib/db'; @@ -85,6 +87,28 @@ function rowsToCsv(rows: Array>): Buff return Buffer.from(lines.join('\r\n') + '\r\n', 'utf-8'); } +/** + * L5 — deterministic artefact file id per report run. + * + * The render path can crash between `backend.put` and the `files` + * insert / status write; BullMQ then retries (reports maxAttempts 3). + * Deriving the `files.id` (and therefore the storage key, which embeds + * the file id) deterministically from the run id makes the retry land + * on the SAME storage key and the SAME PK, so `onConflictDoNothing` + * collapses the duplicate insert and `backend.put` overwrites in place + * instead of leaking an orphaned blob + dangling `files` row. + * + * We format a SHA-256 of the run id as a UUID-shaped string so the + * value is a stable, collision-free `files.id` (UUIDv4 columns accept + * any 36-char UUID-shaped text). + */ +function deterministicFileId(runId: string): string { + const h = createHash('sha256').update(`report-run:${runId}`).digest('hex'); + return [h.slice(0, 8), h.slice(8, 12), h.slice(12, 16), h.slice(16, 20), h.slice(20, 32)].join( + '-', + ); +} + const REPORT_RENDER_MAP: Record = { dashboard: { fetchData: fetchPipelineData as KindRenderer['fetchData'], @@ -289,7 +313,10 @@ export async function renderReportRun(reportRunId: string): Promise { contentType = 'application/pdf'; } - const fileId = crypto.randomUUID(); + // L5: deterministic per-run id + storage key so a retry after a + // mid-render crash overwrites the same blob and no-ops the duplicate + // `files` insert instead of leaking an orphan. + const fileId = deterministicFileId(run.id); const storagePath = buildStoragePath(port.slug, 'reports', run.id, fileId, extension); const backend = await getStorageBackend(); @@ -299,18 +326,21 @@ export async function renderReportRun(reportRunId: string): Promise { }); putStoragePath = storagePath; - await db.insert(files).values({ - id: fileId, - portId: run.portId, - filename: `${run.kind}-${run.id.slice(0, 8)}.${extension}`, - originalName: `${run.kind}-report.${extension}`, - mimeType: contentType, - sizeBytes: String(bytes.length), - storagePath, - storageBucket: env.MINIO_BUCKET, - category: 'misc', - uploadedBy: run.triggeredByUserId ?? 'system', - }); + await db + .insert(files) + .values({ + id: fileId, + portId: run.portId, + filename: `${run.kind}-${run.id.slice(0, 8)}.${extension}`, + originalName: `${run.kind}-report.${extension}`, + mimeType: contentType, + sizeBytes: String(bytes.length), + storagePath, + storageBucket: env.MINIO_BUCKET, + category: 'misc', + uploadedBy: run.triggeredByUserId ?? 'system', + }) + .onConflictDoNothing(); const updated = await updateReportRunStatus(run.id, run.portId, { status: 'complete', @@ -342,8 +372,16 @@ export async function renderReportRun(reportRunId: string): Promise { /** * Schedule-driven email side effect. Looks up the schedule's recipients - * and ships an email with the rendered PDF attached. Stamps `emailedAt` - * on success; logs + rethrows on failure so BullMQ retries. + * and ships an email with the rendered PDF attached. + * + * M9 — idempotent against BullMQ retries (reports maxAttempts 3): + * - Early-returns when `emailedAt` is already stamped, so a retry never + * re-blasts recipients who already received the report. + * - Stamps `emailedAt` BEFORE the recipient loop, so a transient SMTP + * failure on recipient N does not re-send to 1..N-1 on the next + * attempt — at most the failing send is lost, never duplicated. + * - Treats per-recipient send failures as logged-not-thrown so one bad + * address cannot re-trigger the whole loop on retry. */ export async function emailReportRun(reportRunId: string): Promise { const run = await db.query.reportRuns.findFirst({ @@ -355,6 +393,14 @@ export async function emailReportRun(reportRunId: string): Promise { internalMessage: `Cannot email report ${run.id} — status=${run.status}, storageKey=${run.storageKey}`, }); } + // M9: already emailed on a prior attempt — never re-send on BullMQ retry. + if (run.emailedAt) { + logger.info( + { reportRunId: run.id, emailedAt: run.emailedAt }, + 'Report already emailed; skipping (M9 idempotency)', + ); + return; + } if (!run.scheduleId) { logger.info({ reportRunId: run.id }, 'Skipping email for user-triggered report (no schedule)'); return; @@ -392,16 +438,29 @@ export async function emailReportRun(reportRunId: string): Promise { const subject = `${port.name} · ${run.kind} report`; const html = `

Your scheduled ${run.kind} report is attached.

`; - for (const recipient of recipients) { - await sendEmail(recipient.email, subject, html, undefined, undefined, run.portId, [ - { fileId: fileRow.id, filename: fileRow.originalName ?? `${run.kind}-report.pdf` }, - ]); - } - + // M9: stamp `emailedAt` BEFORE sending so a transient SMTP failure + // mid-loop cannot cause a BullMQ retry to re-send to the recipients + // who already received the report (the early-return above now fires). await updateReportRunStatus(run.id, run.portId, { status: 'complete', emailedAt: new Date(), }); + + // M9: log-not-throw per recipient — one bad address must not re-blast + // the rest on retry (the run is already marked emailed). At-most-once + // delivery per recipient; a failed send is logged and dropped. + for (const recipient of recipients) { + try { + await sendEmail(recipient.email, subject, html, undefined, undefined, run.portId, [ + { fileId: fileRow.id, filename: fileRow.originalName ?? `${run.kind}-report.pdf` }, + ]); + } catch (err) { + logger.error( + { err, reportRunId: run.id, recipient: recipient.email }, + 'Failed to email scheduled report to recipient; skipping (run already marked emailed)', + ); + } + } } /** @@ -456,7 +515,10 @@ async function renderStandaloneReportRun(run: ReportRun): Promise { // eslint-disable-next-line @typescript-eslint/no-explicit-any const bytes = (await renderToBuffer(element as any)) as Buffer; - const fileId = crypto.randomUUID(); + // L5: deterministic per-run id + storage key so a retry after a + // mid-render crash overwrites the same blob and no-ops the duplicate + // `files` insert instead of leaking an orphan. + const fileId = deterministicFileId(run.id); const storagePath = buildStoragePath(port.slug, 'reports', run.id, fileId, 'pdf'); const backend = await getStorageBackend(); await backend.put(storagePath, bytes, { @@ -465,18 +527,21 @@ async function renderStandaloneReportRun(run: ReportRun): Promise { }); putStoragePath = storagePath; - await db.insert(files).values({ - id: fileId, - portId: run.portId, - filename: `${run.kind}-${run.id.slice(0, 8)}.pdf`, - originalName: `${run.kind}-report.pdf`, - mimeType: 'application/pdf', - sizeBytes: String(bytes.length), - storagePath, - storageBucket: env.MINIO_BUCKET, - category: 'misc', - uploadedBy: run.triggeredByUserId ?? 'system', - }); + await db + .insert(files) + .values({ + id: fileId, + portId: run.portId, + filename: `${run.kind}-${run.id.slice(0, 8)}.pdf`, + originalName: `${run.kind}-report.pdf`, + mimeType: 'application/pdf', + sizeBytes: String(bytes.length), + storagePath, + storageBucket: env.MINIO_BUCKET, + category: 'misc', + uploadedBy: run.triggeredByUserId ?? 'system', + }) + .onConflictDoNothing(); const updated = await updateReportRunStatus(run.id, run.portId, { status: 'complete', diff --git a/src/lib/services/reports.service.tsx b/src/lib/services/reports.service.tsx index 03b5c501..ca511797 100644 --- a/src/lib/services/reports.service.tsx +++ b/src/lib/services/reports.service.tsx @@ -1,3 +1,5 @@ +import { createHash } from 'node:crypto'; + import { and, desc, eq } from 'drizzle-orm'; import type { ReactElement } from 'react'; import type { DocumentProps } from '@react-pdf/renderer'; @@ -193,6 +195,24 @@ export async function getDownloadUrl(reportId: string, portId: string) { // ─── generateReport ─────────────────────────────────────────────────────────── +/** + * L5 — deterministic artefact file id per legacy report job. + * + * `generateReport` can crash between `backend.put` and the + * `generatedReports` status write; BullMQ then retries (reports + * maxAttempts 3). Deriving the `files.id` (and therefore the storage + * key, which embeds the file id) deterministically from the job id + * makes the retry land on the SAME storage key and SAME PK, so the blob + * is overwritten in place and the duplicate `files` insert no-ops via + * `onConflictDoNothing` — no orphaned blob + dangling row. + */ +function deterministicReportFileId(reportJobId: string): string { + const h = createHash('sha256').update(`generated-report:${reportJobId}`).digest('hex'); + return [h.slice(0, 8), h.slice(8, 12), h.slice(12, 16), h.slice(16, 20), h.slice(20, 32)].join( + '-', + ); +} + export async function generateReport(reportJobId: string): Promise { // 1. Fetch the generatedReports record const report = await db.query.generatedReports.findFirst({ @@ -203,6 +223,15 @@ export async function generateReport(reportJobId: string): Promise { throw new NotFoundError('report job'); } + // L5: idempotency early-return. If a prior attempt already produced the + // artefact (status='ready' with a fileId), a BullMQ retry must not + // re-render + re-upload + re-insert a second `files` row — that leaks an + // orphaned blob. Bail out treating the run as already done. + if (report.status === 'ready' && report.fileId) { + logger.info({ reportJobId }, 'Report already generated; skipping re-render (L5 idempotency)'); + return; + } + const { portId, reportType, name, parameters, requestedBy } = report; try { @@ -260,8 +289,11 @@ export async function generateReport(reportJobId: string): Promise { ) => ReactElement; const pdfBytes = await renderPdf(renderFn(data, ctx)); - // 8. Build storage path - const fileId = crypto.randomUUID(); + // 8. Build storage path. + // L5: deterministic per-job file id + storage key so a retry after a + // mid-render crash overwrites the same blob and no-ops the duplicate + // `files` insert instead of leaking an orphan. + const fileId = deterministicReportFileId(reportJobId); const storagePath = buildStoragePath(portSlug, 'reports', reportJobId, fileId, 'pdf'); // 9. Upload PDF via the active storage backend (filesystem or s3) @@ -272,8 +304,8 @@ export async function generateReport(reportJobId: string): Promise { sizeBytes: buffer.length, }); - // 10. Insert into files table - const [fileRecord] = await db + // 10. Insert into files table (idempotent on retry via deterministic id) + let [fileRecord] = await db .insert(files) .values({ id: fileId, @@ -287,8 +319,19 @@ export async function generateReport(reportJobId: string): Promise { category: 'misc', uploadedBy: requestedBy, }) + .onConflictDoNothing() .returning(); + // L5: a retry that already inserted the file row on a prior attempt + // (but crashed before the generatedReports status write) gets an empty + // returning() from onConflictDoNothing — re-fetch the existing row + // rather than failing. + if (!fileRecord) { + fileRecord = await db.query.files.findFirst({ + where: and(eq(files.id, fileId), eq(files.portId, portId)), + }); + } + if (!fileRecord) { throw new CodedError('INSERT_RETURNING_EMPTY', { internalMessage: 'Failed to insert file record for generated report',