fix(audit-wave-11): BullMQ jobId plumbing for natural dedup

concurrency-auditor C-2: every queue.add(...) site previously enqueued
without a stable jobId, so a double-dispatch (webhook retry, double-
click on Send, scheduler tick collision) would create two queue jobs
and the downstream worker would deliver twice. BullMQ rejects a
duplicate jobId while the original is still queued or active, so a
stable per-entity key gives at-most-once semantics naturally.

Added jobIds across all 10 enqueue sites:

- email send-invoice → `send-invoice:<invoiceId>`
- notifications invoice-overdue-notify → keyed per UTC day so dupes
  collapse intra-day but tomorrow's run can re-notify if unpaid
- export gdpr-export → keyed on the exportId (unique per request)
- webhooks deliver (3 sites: dispatch, retry, test) → keyed on the
  webhook_deliveries row UUID
- maintenance expense-dedup-scan → keyed on expenseId
- notifications send-notification-email → keyed on notification id
- email send-inquiry-confirmation → keyed on interestId (1 per
  submission)
- email send-inquiry-sales-notification → keyed on interestId+email
  (1 per recipient per submission)
- reports generate-report → keyed on the generated_reports row id

Pure refactor — no UX impact. Closes the BullMQ dedup gap that was
the second half of the concurrency-auditor's CRITICAL-tier findings.

Test fixture update: gdpr-export integration test now asserts the
jobId option on the queue.add call.

Tests 1315/1315.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-13 13:02:38 +02:00
parent 2496911dc4
commit b4e502fedd
9 changed files with 117 additions and 56 deletions

View File

@@ -81,9 +81,11 @@ export const reportsWorker = new Worker(
.returning(); .returning();
if (genReport) { if (genReport) {
await getQueue('reports').add('generate-report', { await getQueue('reports').add(
reportJobId: genReport.id, 'generate-report',
}); { reportJobId: genReport.id },
{ jobId: `generate-report:${genReport.id}` },
);
} }
} }
break; break;

View File

@@ -187,7 +187,11 @@ export async function createExpense(portId: string, data: CreateExpenseInput, me
// queue-side hiccup to fail the user's create. // queue-side hiccup to fail the user's create.
try { try {
const { getQueue } = await import('@/lib/queue'); const { getQueue } = await import('@/lib/queue');
await getQueue('maintenance').add('expense-dedup-scan', { expenseId: expense.id }); await getQueue('maintenance').add(
'expense-dedup-scan',
{ expenseId: expense.id },
{ jobId: `expense-dedup-scan:${expense.id}` },
);
} catch (err) { } catch (err) {
logger.warn({ err, expenseId: expense.id }, 'Failed to enqueue expense-dedup-scan'); logger.warn({ err, expenseId: expense.id }, 'Failed to enqueue expense-dedup-scan');
} }

View File

@@ -98,13 +98,20 @@ export async function requestGdprExport(input: RequestExportInput): Promise<Requ
userAgent: input.userAgent, userAgent: input.userAgent,
}); });
await getQueue('export').add('gdpr-export', { // Stable jobId: exportId is unique per request — dedup is guaranteed
exportId: row.id, // because a second enqueue with the same exportId would either be
portId: input.portId, // rejected (in-flight) or no-op (completed). concurrency-auditor C-2.
clientId: input.clientId, await getQueue('export').add(
emailToClient: input.emailToClient, 'gdpr-export',
emailOverride: input.emailOverride ?? null, {
}); exportId: row.id,
portId: input.portId,
clientId: input.clientId,
emailToClient: input.emailToClient,
emailOverride: input.emailOverride ?? null,
},
{ jobId: `gdpr-export:${row.id}` },
);
return { export: row }; return { export: row };
} }

View File

@@ -48,14 +48,18 @@ export async function sendInquiryNotifications(params: InquiryNotificationParams
: 'sales@portnimara.com'; : 'sales@portnimara.com';
const emailQueue = getQueue('email'); const emailQueue = getQueue('email');
await emailQueue.add('send-inquiry-confirmation', { await emailQueue.add(
to: clientEmail, 'send-inquiry-confirmation',
firstName, {
mooringNumber, to: clientEmail,
contactEmail, firstName,
portId, mooringNumber,
portName: 'Port Nimara', // future: resolve from getPortBrandingConfig contactEmail,
}); portId,
portName: 'Port Nimara', // future: resolve from getPortBrandingConfig
},
{ jobId: `send-inquiry-confirmation:${interestId}` },
);
} catch (err) { } catch (err) {
logger.error({ err, interestId }, 'Failed to queue client confirmation email'); logger.error({ err, interestId }, 'Failed to queue client confirmation email');
} }
@@ -115,16 +119,22 @@ export async function sendInquiryNotifications(params: InquiryNotificationParams
await Promise.all( await Promise.all(
externalEmails.map((externalEmail) => externalEmails.map((externalEmail) =>
emailQueue.add('send-inquiry-sales-notification', { emailQueue.add(
to: externalEmail, 'send-inquiry-sales-notification',
fullName: clientFullName, {
email: clientEmail, to: externalEmail,
phone: clientPhone, fullName: clientFullName,
mooringNumber, email: clientEmail,
crmUrl, phone: clientPhone,
portId, mooringNumber,
portName: 'Port Nimara', crmUrl,
}), portId,
portName: 'Port Nimara',
},
// Per-recipient per-interest jobId so a public-form retry
// doesn't fan out duplicate sales notifications.
{ jobId: `send-inquiry-sales-notification:${interestId}:${externalEmail}` },
),
), ),
); );
} }

View File

@@ -589,7 +589,16 @@ export async function sendInvoice(id: string, portId: string, meta: AuditMeta) {
// remains intact; downstream consumers can decide whether to render // remains intact; downstream consumers can decide whether to render
// an external document, link to the in-app view, or wait for the // an external document, link to the in-app view, or wait for the
// admin-uploaded AcroForm-fill feature to ship. // admin-uploaded AcroForm-fill feature to ship.
await getQueue('email').add('send-invoice', { invoiceId: id, portId }); // Stable jobId for natural dedup: a double-click on the Send button
// or a webhook retry on the upstream caller can fire this twice. BullMQ
// rejects a duplicate `jobId` while the original is still queued or
// active, so we get at-most-once email per invoice-send action.
// concurrency-auditor C-2.
await getQueue('email').add(
'send-invoice',
{ invoiceId: id, portId },
{ jobId: `send-invoice:${id}` },
);
// Update status to 'sent' // Update status to 'sent'
const [updated] = await db const [updated] = await db
@@ -718,10 +727,17 @@ export async function detectOverdue(portId: string) {
daysPastDue, daysPastDue,
}); });
await getQueue('notifications').add('invoice-overdue-notify', { // Stable jobId: detectOverdue runs daily; if it fires twice in
invoiceId: inv.id, // the same UTC day (e.g. a manual re-trigger after a worker
portId, // restart) we don't want duplicate overdue emails. Per-day key
}); // gives idempotency for the daily fire while letting tomorrow's
// run re-notify if the invoice still hasn't been paid.
const dayKey = new Date().toISOString().slice(0, 10);
await getQueue('notifications').add(
'invoice-overdue-notify',
{ invoiceId: inv.id, portId },
{ jobId: `invoice-overdue-notify:${inv.id}:${dayKey}` },
);
logger.info( logger.info(
{ invoiceId: inv.id, invoiceNumber: inv.invoiceNumber, portId }, { invoiceId: inv.id, invoiceNumber: inv.invoiceNumber, portId },

View File

@@ -163,7 +163,11 @@ export async function createNotification(
if (shouldEmail) { if (shouldEmail) {
const queue = getQueue('notifications'); const queue = getQueue('notifications');
await queue.add('send-notification-email', { notificationId: notif.id }); await queue.add(
'send-notification-email',
{ notificationId: notif.id },
{ jobId: `send-notification-email:${notif.id}` },
);
} }
return notif; return notif;

View File

@@ -56,13 +56,22 @@ export async function dispatchWebhookEvent(
}) })
.returning({ id: webhookDeliveries.id }); .returning({ id: webhookDeliveries.id });
await queue.add('deliver', { // Stable jobId off the delivery row's UUID — the row exists once
webhookId: webhook.id, // per (webhook, event-instance) so this naturally dedups a
portId, // double-dispatch of the same internal event without blocking
event: webhookEvent, // legitimate retries (those re-enqueue from the worker with the
deliveryId: delivery!.id, // attempt count instead of going through this service).
payload, await queue.add(
}); 'deliver',
{
webhookId: webhook.id,
portId,
event: webhookEvent,
deliveryId: delivery!.id,
payload,
},
{ jobId: `deliver:${delivery!.id}` },
);
} }
} catch (err) { } catch (err) {
// Never block callers - log and swallow // Never block callers - log and swallow

View File

@@ -320,13 +320,17 @@ export async function redeliverWebhookDelivery(
.returning(); .returning();
const queue = getQueue('webhooks'); const queue = getQueue('webhooks');
await queue.add('deliver', { await queue.add(
webhookId, 'deliver',
portId, {
event: source.eventType, webhookId,
deliveryId: next!.id, portId,
payload: replayPayload, event: source.eventType,
}); deliveryId: next!.id,
payload: replayPayload,
},
{ jobId: `deliver:${next!.id}` },
);
void createAuditLog({ void createAuditLog({
userId: meta.userId, userId: meta.userId,
@@ -371,13 +375,17 @@ export async function sendTestWebhook(portId: string, webhookId: string, eventTy
// Enqueue the job // Enqueue the job
const queue = getQueue('webhooks'); const queue = getQueue('webhooks');
await queue.add('deliver', { await queue.add(
webhookId, 'deliver',
portId, {
event: eventType, webhookId,
deliveryId: delivery!.id, portId,
payload: delivery!.payload, event: eventType,
}); deliveryId: delivery!.id,
payload: delivery!.payload,
},
{ jobId: `deliver:${delivery!.id}` },
);
return { deliveryId: delivery!.id, status: 'queued' }; return { deliveryId: delivery!.id, status: 'queued' };
} }

View File

@@ -166,6 +166,7 @@ describe('requestGdprExport', () => {
expect(add).toHaveBeenCalledWith( expect(add).toHaveBeenCalledWith(
'gdpr-export', 'gdpr-export',
expect.objectContaining({ exportId: row.id, emailToClient: true }), expect.objectContaining({ exportId: row.id, emailToClient: true }),
expect.objectContaining({ jobId: `gdpr-export:${row.id}` }),
); );
// Cleanup the mock so other tests don't see a stubbed queue. // Cleanup the mock so other tests don't see a stubbed queue.