Files
pn-new-crm/src/lib/services/inquiry-notifications.service.ts
Matt b4e502fedd fix(audit-wave-11): BullMQ jobId plumbing for natural dedup
concurrency-auditor C-2: every queue.add(...) site previously enqueued
without a stable jobId, so a double-dispatch (webhook retry, double-
click on Send, scheduler tick collision) would create two queue jobs
and the downstream worker would deliver twice. BullMQ rejects a
duplicate jobId while the original is still queued or active, so a
stable per-entity key gives at-most-once semantics naturally.

Added jobIds across all 10 enqueue sites:

- email send-invoice → `send-invoice:<invoiceId>`
- notifications invoice-overdue-notify → keyed per UTC day so dupes
  collapse intra-day but tomorrow's run can re-notify if unpaid
- export gdpr-export → keyed on the exportId (unique per request)
- webhooks deliver (3 sites: dispatch, retry, test) → keyed on the
  webhook_deliveries row UUID
- maintenance expense-dedup-scan → keyed on expenseId
- notifications send-notification-email → keyed on notification id
- email send-inquiry-confirmation → keyed on interestId (1 per
  submission)
- email send-inquiry-sales-notification → keyed on interestId+email
  (1 per recipient per submission)
- reports generate-report → keyed on the generated_reports row id

Pure refactor — no UX impact. Closes the BullMQ dedup gap that was
the second half of the concurrency-auditor's CRITICAL-tier findings.

Test fixture update: gdpr-export integration test now asserts the
jobId option on the queue.add call.

Tests 1315/1315.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 13:02:38 +02:00

169 lines
5.4 KiB
TypeScript

import { eq } from 'drizzle-orm';
import { db } from '@/lib/db';
import { userPortRoles, roles } from '@/lib/db/schema/users';
import type { RolePermissions } from '@/lib/db/schema/users';
import { createNotification } from '@/lib/services/notifications.service';
import { getSetting } from '@/lib/services/settings.service';
import { getQueue } from '@/lib/queue';
import { logger } from '@/lib/logger';
interface InquiryNotificationParams {
portId: string;
portSlug: string;
interestId: string;
clientFullName: string;
clientEmail: string;
clientPhone: string;
mooringNumber: string | null;
firstName: string;
}
/**
* Sends inquiry notifications to all relevant parties:
* 1. Confirmation email to the client
* 2. In-app + email notifications to CRM users with interests.view permission
* 3. Email to any external recipients configured in system settings
*
* All operations are fire-and-forget (errors are logged, not thrown).
*/
export async function sendInquiryNotifications(params: InquiryNotificationParams): Promise<void> {
const {
portId,
portSlug,
interestId,
clientFullName,
clientEmail,
clientPhone,
mooringNumber,
firstName,
} = params;
// 1. Queue client confirmation email
try {
const contactEmailSetting = await getSetting('inquiry_contact_email', portId);
const contactEmail =
typeof contactEmailSetting?.value === 'string'
? contactEmailSetting.value
: 'sales@portnimara.com';
const emailQueue = getQueue('email');
await emailQueue.add(
'send-inquiry-confirmation',
{
to: clientEmail,
firstName,
mooringNumber,
contactEmail,
portId,
portName: 'Port Nimara', // future: resolve from getPortBrandingConfig
},
{ jobId: `send-inquiry-confirmation:${interestId}` },
);
} catch (err) {
logger.error({ err, interestId }, 'Failed to queue client confirmation email');
}
// 2. Notify CRM users with interests.view permission on this port.
// The previous implementation `await`ed createNotification per user,
// burning ≥3 DB round trips + 2 socket emits per call serially — a
// port with 20 users meant ~80 round trips before this public POST
// could even respond. Promise.all parallelises the DB writes; the
// socket emit fan-out is the only thing that still scales linearly,
// and that's a fire-and-forget local broadcast.
try {
const usersWithAccess = await findUsersWithInterestsPermission(portId);
const crmUrl = `/${portSlug}/interests/${interestId}`;
const description = `${clientFullName} has registered interest${
mooringNumber ? ` in Berth ${mooringNumber}` : ''
} via the website`;
const settled = await Promise.allSettled(
usersWithAccess.map((userId) =>
createNotification({
portId,
userId,
type: 'new_registration',
title: 'New Interest Registered',
description,
link: crmUrl,
entityType: 'interest',
entityId: interestId,
dedupeKey: `inquiry-${interestId}`,
}),
),
);
for (const [i, r] of settled.entries()) {
if (r.status === 'rejected') {
logger.error(
{ err: r.reason, userId: usersWithAccess[i], interestId },
'Failed to create notification for user',
);
}
}
} catch (err) {
logger.error({ err, interestId }, 'Failed to notify CRM users');
}
// 3. Notify external recipients (parallel queue enqueues).
try {
const recipientsSetting = await getSetting('inquiry_notification_recipients', portId);
const externalEmails: string[] = Array.isArray(recipientsSetting?.value)
? recipientsSetting.value.filter((v): v is string => typeof v === 'string')
: [];
if (externalEmails.length > 0) {
const emailQueue = getQueue('email');
const appUrl = process.env.APP_URL ?? '';
const crmUrl = `${appUrl}/${portSlug}/interests/${interestId}`;
await Promise.all(
externalEmails.map((externalEmail) =>
emailQueue.add(
'send-inquiry-sales-notification',
{
to: externalEmail,
fullName: clientFullName,
email: clientEmail,
phone: clientPhone,
mooringNumber,
crmUrl,
portId,
portName: 'Port Nimara',
},
// Per-recipient per-interest jobId so a public-form retry
// doesn't fan out duplicate sales notifications.
{ jobId: `send-inquiry-sales-notification:${interestId}:${externalEmail}` },
),
),
);
}
} catch (err) {
logger.error({ err, interestId }, 'Failed to notify external recipients');
}
}
/**
* Finds all user IDs on a port whose role grants `interests.view` permission.
*/
async function findUsersWithInterestsPermission(portId: string): Promise<string[]> {
const assignments = await db
.select({
userId: userPortRoles.userId,
permissions: roles.permissions,
})
.from(userPortRoles)
.innerJoin(roles, eq(userPortRoles.roleId, roles.id))
.where(eq(userPortRoles.portId, portId));
const userIds = new Set<string>();
for (const row of assignments) {
const perms = row.permissions as RolePermissions | null;
if (perms?.interests?.view) {
userIds.add(row.userId);
}
}
return Array.from(userIds);
}