diff --git a/src/lib/queue/workers/reports.ts b/src/lib/queue/workers/reports.ts index 7284a1d2..78e04cf7 100644 --- a/src/lib/queue/workers/reports.ts +++ b/src/lib/queue/workers/reports.ts @@ -81,9 +81,11 @@ export const reportsWorker = new Worker( .returning(); if (genReport) { - await getQueue('reports').add('generate-report', { - reportJobId: genReport.id, - }); + await getQueue('reports').add( + 'generate-report', + { reportJobId: genReport.id }, + { jobId: `generate-report:${genReport.id}` }, + ); } } break; diff --git a/src/lib/services/expenses.ts b/src/lib/services/expenses.ts index df3657b1..cc7ee376 100644 --- a/src/lib/services/expenses.ts +++ b/src/lib/services/expenses.ts @@ -187,7 +187,11 @@ export async function createExpense(portId: string, data: CreateExpenseInput, me // queue-side hiccup to fail the user's create. try { const { getQueue } = await import('@/lib/queue'); - await getQueue('maintenance').add('expense-dedup-scan', { expenseId: expense.id }); + await getQueue('maintenance').add( + 'expense-dedup-scan', + { expenseId: expense.id }, + { jobId: `expense-dedup-scan:${expense.id}` }, + ); } catch (err) { logger.warn({ err, expenseId: expense.id }, 'Failed to enqueue expense-dedup-scan'); } diff --git a/src/lib/services/gdpr-export.service.ts b/src/lib/services/gdpr-export.service.ts index b1033747..579ad0f1 100644 --- a/src/lib/services/gdpr-export.service.ts +++ b/src/lib/services/gdpr-export.service.ts @@ -98,13 +98,20 @@ export async function requestGdprExport(input: RequestExportInput): Promise - emailQueue.add('send-inquiry-sales-notification', { - to: externalEmail, - fullName: clientFullName, - email: clientEmail, - phone: clientPhone, - mooringNumber, - crmUrl, - portId, - portName: 'Port Nimara', - }), + emailQueue.add( + 'send-inquiry-sales-notification', + { + to: externalEmail, + fullName: clientFullName, + email: clientEmail, + phone: clientPhone, + mooringNumber, + crmUrl, + portId, + portName: 'Port Nimara', + }, + // Per-recipient per-interest jobId so a public-form retry + // doesn't fan out duplicate sales notifications. + { jobId: `send-inquiry-sales-notification:${interestId}:${externalEmail}` }, + ), ), ); } diff --git a/src/lib/services/invoices.ts b/src/lib/services/invoices.ts index 7bf2a427..93d3acee 100644 --- a/src/lib/services/invoices.ts +++ b/src/lib/services/invoices.ts @@ -589,7 +589,16 @@ export async function sendInvoice(id: string, portId: string, meta: AuditMeta) { // remains intact; downstream consumers can decide whether to render // an external document, link to the in-app view, or wait for the // admin-uploaded AcroForm-fill feature to ship. - await getQueue('email').add('send-invoice', { invoiceId: id, portId }); + // Stable jobId for natural dedup: a double-click on the Send button + // or a webhook retry on the upstream caller can fire this twice. BullMQ + // rejects a duplicate `jobId` while the original is still queued or + // active, so we get at-most-once email per invoice-send action. + // concurrency-auditor C-2. + await getQueue('email').add( + 'send-invoice', + { invoiceId: id, portId }, + { jobId: `send-invoice:${id}` }, + ); // Update status to 'sent' const [updated] = await db @@ -718,10 +727,17 @@ export async function detectOverdue(portId: string) { daysPastDue, }); - await getQueue('notifications').add('invoice-overdue-notify', { - invoiceId: inv.id, - portId, - }); + // Stable jobId: detectOverdue runs daily; if it fires twice in + // the same UTC day (e.g. a manual re-trigger after a worker + // restart) we don't want duplicate overdue emails. Per-day key + // gives idempotency for the daily fire while letting tomorrow's + // run re-notify if the invoice still hasn't been paid. + const dayKey = new Date().toISOString().slice(0, 10); + await getQueue('notifications').add( + 'invoice-overdue-notify', + { invoiceId: inv.id, portId }, + { jobId: `invoice-overdue-notify:${inv.id}:${dayKey}` }, + ); logger.info( { invoiceId: inv.id, invoiceNumber: inv.invoiceNumber, portId }, diff --git a/src/lib/services/notifications.service.ts b/src/lib/services/notifications.service.ts index 95e75172..110c40a6 100644 --- a/src/lib/services/notifications.service.ts +++ b/src/lib/services/notifications.service.ts @@ -163,7 +163,11 @@ export async function createNotification( if (shouldEmail) { const queue = getQueue('notifications'); - await queue.add('send-notification-email', { notificationId: notif.id }); + await queue.add( + 'send-notification-email', + { notificationId: notif.id }, + { jobId: `send-notification-email:${notif.id}` }, + ); } return notif; diff --git a/src/lib/services/webhook-dispatch.ts b/src/lib/services/webhook-dispatch.ts index da1d1e54..1d08a858 100644 --- a/src/lib/services/webhook-dispatch.ts +++ b/src/lib/services/webhook-dispatch.ts @@ -56,13 +56,22 @@ export async function dispatchWebhookEvent( }) .returning({ id: webhookDeliveries.id }); - await queue.add('deliver', { - webhookId: webhook.id, - portId, - event: webhookEvent, - deliveryId: delivery!.id, - payload, - }); + // Stable jobId off the delivery row's UUID — the row exists once + // per (webhook, event-instance) so this naturally dedups a + // double-dispatch of the same internal event without blocking + // legitimate retries (those re-enqueue from the worker with the + // attempt count instead of going through this service). + await queue.add( + 'deliver', + { + webhookId: webhook.id, + portId, + event: webhookEvent, + deliveryId: delivery!.id, + payload, + }, + { jobId: `deliver:${delivery!.id}` }, + ); } } catch (err) { // Never block callers - log and swallow diff --git a/src/lib/services/webhooks.service.ts b/src/lib/services/webhooks.service.ts index 57836f3c..22b6584d 100644 --- a/src/lib/services/webhooks.service.ts +++ b/src/lib/services/webhooks.service.ts @@ -320,13 +320,17 @@ export async function redeliverWebhookDelivery( .returning(); const queue = getQueue('webhooks'); - await queue.add('deliver', { - webhookId, - portId, - event: source.eventType, - deliveryId: next!.id, - payload: replayPayload, - }); + await queue.add( + 'deliver', + { + webhookId, + portId, + event: source.eventType, + deliveryId: next!.id, + payload: replayPayload, + }, + { jobId: `deliver:${next!.id}` }, + ); void createAuditLog({ userId: meta.userId, @@ -371,13 +375,17 @@ export async function sendTestWebhook(portId: string, webhookId: string, eventTy // Enqueue the job const queue = getQueue('webhooks'); - await queue.add('deliver', { - webhookId, - portId, - event: eventType, - deliveryId: delivery!.id, - payload: delivery!.payload, - }); + await queue.add( + 'deliver', + { + webhookId, + portId, + event: eventType, + deliveryId: delivery!.id, + payload: delivery!.payload, + }, + { jobId: `deliver:${delivery!.id}` }, + ); return { deliveryId: delivery!.id, status: 'queued' }; } diff --git a/tests/integration/gdpr-export.test.ts b/tests/integration/gdpr-export.test.ts index 692e6677..b303be91 100644 --- a/tests/integration/gdpr-export.test.ts +++ b/tests/integration/gdpr-export.test.ts @@ -166,6 +166,7 @@ describe('requestGdprExport', () => { expect(add).toHaveBeenCalledWith( 'gdpr-export', expect.objectContaining({ exportId: row.id, emailToClient: true }), + expect.objectContaining({ jobId: `gdpr-export:${row.id}` }), ); // Cleanup the mock so other tests don't see a stubbed queue.