fix(audit): reports workers — M9 (no duplicate scheduled emails), L5 (idempotent render artefacts), L6 (atomic schedule claim), L7 (per-port notification From)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-02 13:07:30 +02:00
parent 64c73a5d77
commit cc5c053a79
4 changed files with 280 additions and 115 deletions

View File

@@ -92,10 +92,17 @@ export const notificationsWorker = new Worker(
? await getPortBrandingConfig(notif.portId).catch(() => null)
: null;
const prefix = portBrand?.appName?.trim() || 'CRM';
// L7: pass `portId` (6th positional arg) so `getPortEmailConfig`
// resolves the notification's per-port send-from identity instead
// of falling back to the global default From. `from`/`text` stay
// undefined.
await sendEmail(
authUser.email,
`[${prefix}] ${notif.title}`,
`<p>${bodyText}</p>${linkHtml}`,
undefined,
undefined,
notif.portId ?? undefined,
);
await db

View File

@@ -22,69 +22,92 @@ export const reportsWorker = new Worker(
// weekly/monthly reports that's an instant flood of dupe
// emails to recipients. Now we compute the next fire from
// the cron expression and UPDATE the row atomically.
//
// L6: this poller does a select-due → per-row update. With a
// single `crm-worker` (concurrency 1) that's safe, but the moment
// `MULTI_NODE_DEPLOYMENT` adds a second replica two pollers would
// both read the same due rows and double-fire (duplicate runs +
// email blasts). We now atomically CLAIM due rows with
// `FOR UPDATE SKIP LOCKED` inside a transaction: a concurrent
// replica skips rows this tx already holds, so each due row is
// claimed by exactly one poller. `nextRunAt` is row-specific
// (cron-derived) so we keep the per-row update — the row lock,
// not a bulk UPDATE, is what makes the claim atomic. Enqueues are
// deferred to AFTER commit so a rolled-back claim never leaves an
// orphaned generate-report job.
const { db } = await import('@/lib/db');
const { scheduledReports } = await import('@/lib/db/schema/operations');
const { generatedReports } = await import('@/lib/db/schema/operations');
const { eq, and, lte } = await import('drizzle-orm');
const { CronExpressionParser } = await import('cron-parser');
const dueReports = await db
.select()
.from(scheduledReports)
.where(
and(eq(scheduledReports.isActive, true), lte(scheduledReports.nextRunAt, new Date())),
);
const enqueueIds: string[] = [];
for (const report of dueReports) {
const { getQueue } = await import('@/lib/queue');
await db.transaction(async (tx) => {
const dueReports = await tx
.select()
.from(scheduledReports)
.where(
and(eq(scheduledReports.isActive, true), lte(scheduledReports.nextRunAt, new Date())),
)
.for('update', { skipLocked: true });
// Compute next_run_at BEFORE the enqueue so a failure in the
// parse path (malformed cron) doesn't get repeat-fired.
let nextRunAt: Date | null = null;
try {
nextRunAt = CronExpressionParser.parse(report.schedule, {
currentDate: new Date(),
tz: process.env.SCHEDULER_TZ ?? 'Europe/Warsaw',
})
.next()
.toDate();
} catch (err) {
logger.error(
{ err, reportId: report.id, schedule: report.schedule },
'Failed to parse cron schedule for scheduled report; pausing it',
);
// Disable the row so we don't re-attempt the malformed cron
// every minute.
await db
for (const report of dueReports) {
// Compute next_run_at BEFORE the enqueue so a failure in the
// parse path (malformed cron) doesn't get repeat-fired.
let nextRunAt: Date | null = null;
try {
nextRunAt = CronExpressionParser.parse(report.schedule, {
currentDate: new Date(),
tz: process.env.SCHEDULER_TZ ?? 'Europe/Warsaw',
})
.next()
.toDate();
} catch (err) {
logger.error(
{ err, reportId: report.id, schedule: report.schedule },
'Failed to parse cron schedule for scheduled report; pausing it',
);
// Disable the row so we don't re-attempt the malformed cron
// every minute.
await tx
.update(scheduledReports)
.set({ isActive: false, updatedAt: new Date() })
.where(eq(scheduledReports.id, report.id));
continue;
}
await tx
.update(scheduledReports)
.set({ isActive: false, updatedAt: new Date() })
.set({ nextRunAt, updatedAt: new Date() })
.where(eq(scheduledReports.id, report.id));
continue;
const [genReport] = await tx
.insert(generatedReports)
.values({
portId: report.portId,
scheduledReportId: report.id,
reportType: report.reportType,
name: `${report.name} - ${new Date().toISOString().split('T')[0]}`,
status: 'queued',
parameters: (report.config as Record<string, unknown>) ?? {},
requestedBy: report.createdBy,
})
.returning();
if (genReport) {
enqueueIds.push(genReport.id);
}
}
});
await db
.update(scheduledReports)
.set({ nextRunAt, updatedAt: new Date() })
.where(eq(scheduledReports.id, report.id));
const [genReport] = await db
.insert(generatedReports)
.values({
portId: report.portId,
scheduledReportId: report.id,
reportType: report.reportType,
name: `${report.name} - ${new Date().toISOString().split('T')[0]}`,
status: 'queued',
parameters: (report.config as Record<string, unknown>) ?? {},
requestedBy: report.createdBy,
})
.returning();
if (genReport) {
if (enqueueIds.length > 0) {
const { getQueue } = await import('@/lib/queue');
for (const genReportId of enqueueIds) {
await getQueue('reports').add(
'generate-report',
{ reportJobId: genReport.id },
{ jobId: `generate-report:${genReport.id}` },
{ reportJobId: genReportId },
{ jobId: `generate-report:${genReportId}` },
);
}
}
@@ -102,46 +125,73 @@ export const reportsWorker = new Worker(
case 'report-schedules-poll': {
// Scan report_schedules due to fire, mint a report_runs row per
// schedule, advance next_run_at by cadence math, enqueue render.
//
// L6: same select-due → per-row update shape as the legacy poller
// above. Safe under the single `crm-worker` (concurrency 1) today,
// but double-fires under multiple replicas once
// `MULTI_NODE_DEPLOYMENT` is on. We atomically CLAIM due rows in a
// `FOR UPDATE SKIP LOCKED` transaction that ALSO advances
// `nextRunAt`/`lastRunAt` (and pauses templateless rows). Because
// the claim advances `nextRunAt` past `now`, a concurrent replica
// re-polling immediately afterwards no longer sees the row as due,
// and `SKIP LOCKED` keeps two pollers from claiming the same row
// mid-flight. The heavier per-row work (`createReportRun` + render
// enqueue) runs AFTER commit on the claimed rows — `createReportRun`
// is a service that uses its own db handle, and advancing the fire
// time before minting already preserves the "no-op doesn't slip"
// rule, so a downstream mint failure just retries on the next poll.
const { db } = await import('@/lib/db');
const { reportSchedules, reportTemplates } = await import('@/lib/db/schema/reports');
const { createReportRun } = await import('@/lib/services/report-runs.service');
const { nextRunFor } = await import('@/lib/services/report-schedules.service');
const { and, eq, lte } = await import('drizzle-orm');
type ReportSchedule = import('@/lib/db/schema/reports').ReportSchedule;
type ReportTemplate = import('@/lib/db/schema/reports').ReportTemplate;
const now = new Date();
const due = await db
.select()
.from(reportSchedules)
.where(and(eq(reportSchedules.enabled, true), lte(reportSchedules.nextRunAt, now)));
for (const schedule of due) {
const template = await db.query.reportTemplates.findFirst({
where: eq(reportTemplates.id, schedule.templateId),
});
if (!template) {
logger.warn(
{ scheduleId: schedule.id, templateId: schedule.templateId },
'Skipping schedule: template missing (likely archived); pausing',
);
await db
const claimed: Array<{ schedule: ReportSchedule; template: ReportTemplate }> = [];
await db.transaction(async (tx) => {
const due = await tx
.select()
.from(reportSchedules)
.where(and(eq(reportSchedules.enabled, true), lte(reportSchedules.nextRunAt, now)))
.for('update', { skipLocked: true });
for (const schedule of due) {
const template = await tx.query.reportTemplates.findFirst({
where: eq(reportTemplates.id, schedule.templateId),
});
if (!template) {
logger.warn(
{ scheduleId: schedule.id, templateId: schedule.templateId },
'Skipping schedule: template missing (likely archived); pausing',
);
await tx
.update(reportSchedules)
.set({ enabled: false, updatedAt: new Date() })
.where(eq(reportSchedules.id, schedule.id));
continue;
}
// Compute the next fire BEFORE the enqueue so a downstream
// failure (storage outage, etc.) doesn't pin the schedule on
// the same tick — preserves the "no-op doesn't slip" rule.
await tx
.update(reportSchedules)
.set({ enabled: false, updatedAt: new Date() })
.set({
lastRunAt: now,
nextRunAt: nextRunFor(schedule.cadence as Parameters<typeof nextRunFor>[0], now),
updatedAt: new Date(),
})
.where(eq(reportSchedules.id, schedule.id));
continue;
claimed.push({ schedule, template });
}
});
// Compute the next fire BEFORE the enqueue so a downstream
// failure (storage outage, etc.) doesn't pin the schedule on
// the same tick — preserves the "no-op doesn't slip" rule.
await db
.update(reportSchedules)
.set({
lastRunAt: now,
nextRunAt: nextRunFor(schedule.cadence as Parameters<typeof nextRunFor>[0], now),
updatedAt: new Date(),
})
.where(eq(reportSchedules.id, schedule.id));
for (const { schedule, template } of claimed) {
try {
const { REPORT_KINDS } = await import('@/lib/validators/reports');
const kindNarrowed = (REPORT_KINDS as readonly string[]).includes(template.kind)

View File

@@ -17,6 +17,8 @@
* land alongside the builder UI (P4+).
*/
import { createHash } from 'node:crypto';
import { and, eq } from 'drizzle-orm';
import { db } from '@/lib/db';
@@ -85,6 +87,28 @@ function rowsToCsv(rows: Array<Array<string | number | null | undefined>>): Buff
return Buffer.from(lines.join('\r\n') + '\r\n', 'utf-8');
}
/**
* L5 — deterministic artefact file id per report run.
*
* The render path can crash between `backend.put` and the `files`
* insert / status write; BullMQ then retries (reports maxAttempts 3).
* Deriving the `files.id` (and therefore the storage key, which embeds
* the file id) deterministically from the run id makes the retry land
* on the SAME storage key and the SAME PK, so `onConflictDoNothing`
* collapses the duplicate insert and `backend.put` overwrites in place
* instead of leaking an orphaned blob + dangling `files` row.
*
* We format a SHA-256 of the run id as a UUID-shaped string so the
* value is a stable, collision-free `files.id` (UUIDv4 columns accept
* any 36-char UUID-shaped text).
*/
function deterministicFileId(runId: string): string {
const h = createHash('sha256').update(`report-run:${runId}`).digest('hex');
return [h.slice(0, 8), h.slice(8, 12), h.slice(12, 16), h.slice(16, 20), h.slice(20, 32)].join(
'-',
);
}
const REPORT_RENDER_MAP: Record<string, KindRenderer> = {
dashboard: {
fetchData: fetchPipelineData as KindRenderer['fetchData'],
@@ -289,7 +313,10 @@ export async function renderReportRun(reportRunId: string): Promise<ReportRun> {
contentType = 'application/pdf';
}
const fileId = crypto.randomUUID();
// L5: deterministic per-run id + storage key so a retry after a
// mid-render crash overwrites the same blob and no-ops the duplicate
// `files` insert instead of leaking an orphan.
const fileId = deterministicFileId(run.id);
const storagePath = buildStoragePath(port.slug, 'reports', run.id, fileId, extension);
const backend = await getStorageBackend();
@@ -299,18 +326,21 @@ export async function renderReportRun(reportRunId: string): Promise<ReportRun> {
});
putStoragePath = storagePath;
await db.insert(files).values({
id: fileId,
portId: run.portId,
filename: `${run.kind}-${run.id.slice(0, 8)}.${extension}`,
originalName: `${run.kind}-report.${extension}`,
mimeType: contentType,
sizeBytes: String(bytes.length),
storagePath,
storageBucket: env.MINIO_BUCKET,
category: 'misc',
uploadedBy: run.triggeredByUserId ?? 'system',
});
await db
.insert(files)
.values({
id: fileId,
portId: run.portId,
filename: `${run.kind}-${run.id.slice(0, 8)}.${extension}`,
originalName: `${run.kind}-report.${extension}`,
mimeType: contentType,
sizeBytes: String(bytes.length),
storagePath,
storageBucket: env.MINIO_BUCKET,
category: 'misc',
uploadedBy: run.triggeredByUserId ?? 'system',
})
.onConflictDoNothing();
const updated = await updateReportRunStatus(run.id, run.portId, {
status: 'complete',
@@ -342,8 +372,16 @@ export async function renderReportRun(reportRunId: string): Promise<ReportRun> {
/**
* Schedule-driven email side effect. Looks up the schedule's recipients
* and ships an email with the rendered PDF attached. Stamps `emailedAt`
* on success; logs + rethrows on failure so BullMQ retries.
* and ships an email with the rendered PDF attached.
*
* M9 — idempotent against BullMQ retries (reports maxAttempts 3):
* - Early-returns when `emailedAt` is already stamped, so a retry never
* re-blasts recipients who already received the report.
* - Stamps `emailedAt` BEFORE the recipient loop, so a transient SMTP
* failure on recipient N does not re-send to 1..N-1 on the next
* attempt — at most the failing send is lost, never duplicated.
* - Treats per-recipient send failures as logged-not-thrown so one bad
* address cannot re-trigger the whole loop on retry.
*/
export async function emailReportRun(reportRunId: string): Promise<void> {
const run = await db.query.reportRuns.findFirst({
@@ -355,6 +393,14 @@ export async function emailReportRun(reportRunId: string): Promise<void> {
internalMessage: `Cannot email report ${run.id} — status=${run.status}, storageKey=${run.storageKey}`,
});
}
// M9: already emailed on a prior attempt — never re-send on BullMQ retry.
if (run.emailedAt) {
logger.info(
{ reportRunId: run.id, emailedAt: run.emailedAt },
'Report already emailed; skipping (M9 idempotency)',
);
return;
}
if (!run.scheduleId) {
logger.info({ reportRunId: run.id }, 'Skipping email for user-triggered report (no schedule)');
return;
@@ -392,16 +438,29 @@ export async function emailReportRun(reportRunId: string): Promise<void> {
const subject = `${port.name} · ${run.kind} report`;
const html = `<p>Your scheduled ${run.kind} report is attached.</p>`;
for (const recipient of recipients) {
await sendEmail(recipient.email, subject, html, undefined, undefined, run.portId, [
{ fileId: fileRow.id, filename: fileRow.originalName ?? `${run.kind}-report.pdf` },
]);
}
// M9: stamp `emailedAt` BEFORE sending so a transient SMTP failure
// mid-loop cannot cause a BullMQ retry to re-send to the recipients
// who already received the report (the early-return above now fires).
await updateReportRunStatus(run.id, run.portId, {
status: 'complete',
emailedAt: new Date(),
});
// M9: log-not-throw per recipient — one bad address must not re-blast
// the rest on retry (the run is already marked emailed). At-most-once
// delivery per recipient; a failed send is logged and dropped.
for (const recipient of recipients) {
try {
await sendEmail(recipient.email, subject, html, undefined, undefined, run.portId, [
{ fileId: fileRow.id, filename: fileRow.originalName ?? `${run.kind}-report.pdf` },
]);
} catch (err) {
logger.error(
{ err, reportRunId: run.id, recipient: recipient.email },
'Failed to email scheduled report to recipient; skipping (run already marked emailed)',
);
}
}
}
/**
@@ -456,7 +515,10 @@ async function renderStandaloneReportRun(run: ReportRun): Promise<ReportRun> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const bytes = (await renderToBuffer(element as any)) as Buffer;
const fileId = crypto.randomUUID();
// L5: deterministic per-run id + storage key so a retry after a
// mid-render crash overwrites the same blob and no-ops the duplicate
// `files` insert instead of leaking an orphan.
const fileId = deterministicFileId(run.id);
const storagePath = buildStoragePath(port.slug, 'reports', run.id, fileId, 'pdf');
const backend = await getStorageBackend();
await backend.put(storagePath, bytes, {
@@ -465,18 +527,21 @@ async function renderStandaloneReportRun(run: ReportRun): Promise<ReportRun> {
});
putStoragePath = storagePath;
await db.insert(files).values({
id: fileId,
portId: run.portId,
filename: `${run.kind}-${run.id.slice(0, 8)}.pdf`,
originalName: `${run.kind}-report.pdf`,
mimeType: 'application/pdf',
sizeBytes: String(bytes.length),
storagePath,
storageBucket: env.MINIO_BUCKET,
category: 'misc',
uploadedBy: run.triggeredByUserId ?? 'system',
});
await db
.insert(files)
.values({
id: fileId,
portId: run.portId,
filename: `${run.kind}-${run.id.slice(0, 8)}.pdf`,
originalName: `${run.kind}-report.pdf`,
mimeType: 'application/pdf',
sizeBytes: String(bytes.length),
storagePath,
storageBucket: env.MINIO_BUCKET,
category: 'misc',
uploadedBy: run.triggeredByUserId ?? 'system',
})
.onConflictDoNothing();
const updated = await updateReportRunStatus(run.id, run.portId, {
status: 'complete',

View File

@@ -1,3 +1,5 @@
import { createHash } from 'node:crypto';
import { and, desc, eq } from 'drizzle-orm';
import type { ReactElement } from 'react';
import type { DocumentProps } from '@react-pdf/renderer';
@@ -193,6 +195,24 @@ export async function getDownloadUrl(reportId: string, portId: string) {
// ─── generateReport ───────────────────────────────────────────────────────────
/**
* L5 — deterministic artefact file id per legacy report job.
*
* `generateReport` can crash between `backend.put` and the
* `generatedReports` status write; BullMQ then retries (reports
* maxAttempts 3). Deriving the `files.id` (and therefore the storage
* key, which embeds the file id) deterministically from the job id
* makes the retry land on the SAME storage key and SAME PK, so the blob
* is overwritten in place and the duplicate `files` insert no-ops via
* `onConflictDoNothing` — no orphaned blob + dangling row.
*/
function deterministicReportFileId(reportJobId: string): string {
const h = createHash('sha256').update(`generated-report:${reportJobId}`).digest('hex');
return [h.slice(0, 8), h.slice(8, 12), h.slice(12, 16), h.slice(16, 20), h.slice(20, 32)].join(
'-',
);
}
export async function generateReport(reportJobId: string): Promise<void> {
// 1. Fetch the generatedReports record
const report = await db.query.generatedReports.findFirst({
@@ -203,6 +223,15 @@ export async function generateReport(reportJobId: string): Promise<void> {
throw new NotFoundError('report job');
}
// L5: idempotency early-return. If a prior attempt already produced the
// artefact (status='ready' with a fileId), a BullMQ retry must not
// re-render + re-upload + re-insert a second `files` row — that leaks an
// orphaned blob. Bail out treating the run as already done.
if (report.status === 'ready' && report.fileId) {
logger.info({ reportJobId }, 'Report already generated; skipping re-render (L5 idempotency)');
return;
}
const { portId, reportType, name, parameters, requestedBy } = report;
try {
@@ -260,8 +289,11 @@ export async function generateReport(reportJobId: string): Promise<void> {
) => ReactElement<DocumentProps>;
const pdfBytes = await renderPdf(renderFn(data, ctx));
// 8. Build storage path
const fileId = crypto.randomUUID();
// 8. Build storage path.
// L5: deterministic per-job file id + storage key so a retry after a
// mid-render crash overwrites the same blob and no-ops the duplicate
// `files` insert instead of leaking an orphan.
const fileId = deterministicReportFileId(reportJobId);
const storagePath = buildStoragePath(portSlug, 'reports', reportJobId, fileId, 'pdf');
// 9. Upload PDF via the active storage backend (filesystem or s3)
@@ -272,8 +304,8 @@ export async function generateReport(reportJobId: string): Promise<void> {
sizeBytes: buffer.length,
});
// 10. Insert into files table
const [fileRecord] = await db
// 10. Insert into files table (idempotent on retry via deterministic id)
let [fileRecord] = await db
.insert(files)
.values({
id: fileId,
@@ -287,8 +319,19 @@ export async function generateReport(reportJobId: string): Promise<void> {
category: 'misc',
uploadedBy: requestedBy,
})
.onConflictDoNothing()
.returning();
// L5: a retry that already inserted the file row on a prior attempt
// (but crashed before the generatedReports status write) gets an empty
// returning() from onConflictDoNothing — re-fetch the existing row
// rather than failing.
if (!fileRecord) {
fileRecord = await db.query.files.findFirst({
where: and(eq(files.id, fileId), eq(files.portId, portId)),
});
}
if (!fileRecord) {
throw new CodedError('INSERT_RETURNING_EMPTY', {
internalMessage: 'Failed to insert file record for generated report',