From 1ae5d88af4990c82e4fdef62fdbb2e3c46c17284 Mon Sep 17 00:00:00 2001 From: Matt Ciaccio Date: Wed, 6 May 2026 19:12:55 +0200 Subject: [PATCH] feat(client-archive): async Documenso voids + next-in-line sales notifications Post-archive side-effects now run with backpressure: - Documenso envelope voids enqueue to BullMQ documents queue with retry/DLQ - Released berths fan out a "next in line" notification to port users with interests.change_stage; informational only, no auto stage transitions Co-Authored-By: Claude Opus 4.7 (1M context) --- src/app/api/v1/clients/[id]/archive/route.ts | 56 ++++++++++-- src/lib/queue/workers/documents.ts | 20 +++++ .../services/next-in-line-notify.service.ts | 88 +++++++++++++++++++ 3 files changed, 159 insertions(+), 5 deletions(-) create mode 100644 src/lib/services/next-in-line-notify.service.ts diff --git a/src/app/api/v1/clients/[id]/archive/route.ts b/src/app/api/v1/clients/[id]/archive/route.ts index a5df33b..ae9dcc0 100644 --- a/src/app/api/v1/clients/[id]/archive/route.ts +++ b/src/app/api/v1/clients/[id]/archive/route.ts @@ -8,6 +8,9 @@ import { type ArchiveDecisions, } from '@/lib/services/client-archive.service'; import { getClientArchiveDossier } from '@/lib/services/client-archive-dossier.service'; +import { notifyNextInLine } from '@/lib/services/next-in-line-notify.service'; +import { getQueue } from '@/lib/queue'; +import { logger } from '@/lib/logger'; import { errorResponse, NotFoundError } from '@/lib/errors'; const decisionsSchema = z.object({ @@ -80,11 +83,54 @@ export const POST = withAuth( }, }); - // External cleanups (Documenso void) + next-in-line notifications - // are queued post-commit. v1 fires them best-effort inline; future - // iteration: enqueue to BullMQ for retry/dead-letter (see - // bulletproof-webhooks design). - // TODO(bulletproof-webhooks): move to queue. + // ─── Post-commit side-effects ──────────────────────────────────── + // 1. Documenso envelope voids → queued for retry on the documents + // queue. Permanently-failed jobs land in BullMQ's failed jobs + // list (the DLQ in admin/monitoring). + if (result.externalCleanups.length > 0) { + const queue = getQueue('documents'); + for (const c of result.externalCleanups) { + if (c.kind === 'documenso_void') { + await queue + .add('documenso-void', { + documentId: c.documentId, + documensoId: c.documensoId, + portId: ctx.portId, + }) + .catch((err) => + logger.error({ err, documentId: c.documentId }, 'Failed to enqueue Documenso void'), + ); + } + } + } + + // 2. Next-in-line notifications → fire-and-forget per released + // berth so the sales team knows who else expressed interest. + // No automatic stage transitions on the next interests; sales + // rep decides what to do. + for (const released of result.releasedBerths) { + if (released.nextInLineInterestIds.length === 0) continue; + const otherInterests = + dossier.berths + .find((b) => b.berthId === released.berthId) + ?.otherInterests.map((o) => ({ + interestId: o.interestId, + clientName: o.clientName, + pipelineStage: o.pipelineStage, + })) ?? []; + void notifyNextInLine({ + portId: ctx.portId, + berthId: released.berthId, + mooringNumber: released.mooringNumber, + archivedClientName: dossier.client.fullName, + nextInLineInterests: otherInterests, + }).catch((err) => + logger.error( + { err, berthId: released.berthId }, + 'Failed to fire next-in-line notification', + ), + ); + } return NextResponse.json({ data: result }); } catch (error) { diff --git a/src/lib/queue/workers/documents.ts b/src/lib/queue/workers/documents.ts index 62a4795..52e9a2d 100644 --- a/src/lib/queue/workers/documents.ts +++ b/src/lib/queue/workers/documents.ts @@ -15,6 +15,26 @@ export const documentsWorker = new Worker( await processDocumensoPoll(); break; } + case 'documenso-void': { + // Async cleanup of a Documenso envelope. Producers: smart-archive + // (when the operator opts to void in-flight envelopes during + // client archive). BullMQ retries with exponential backoff per + // QUEUE_CONFIGS; permanently-failed jobs land in the DLQ via + // the failed-job listener. + const { documentId, documensoId, portId } = job.data as { + documentId: string; + documensoId: string; + portId: string; + }; + if (!documensoId) { + logger.warn({ documentId }, 'documenso-void: no documensoId, skipping'); + return; + } + const { voidDocument } = await import('@/lib/services/documenso-client'); + await voidDocument(documensoId, portId); + logger.info({ documentId, documensoId, portId }, 'Documenso envelope voided'); + break; + } default: logger.warn({ jobName: job.name }, 'Unknown documents job'); } diff --git a/src/lib/services/next-in-line-notify.service.ts b/src/lib/services/next-in-line-notify.service.ts new file mode 100644 index 0000000..51ef4a5 --- /dev/null +++ b/src/lib/services/next-in-line-notify.service.ts @@ -0,0 +1,88 @@ +/** + * "Next in line" notification fan-out. + * + * After a smart-archive releases a berth back to available, the sales + * team should be told who else expressed interest in that berth so they + * can follow up. This is informational only — no automatic stage + * transitions on the next interests. + * + * Recipients = port users whose role grants `interests.change_stage` + * (the canonical "this person handles the pipeline" permission). + */ + +import { eq } from 'drizzle-orm'; + +import { db } from '@/lib/db'; +import { userPortRoles, roles, type RolePermissions } from '@/lib/db/schema/users'; +import { logger } from '@/lib/logger'; +import { createNotification } from '@/lib/services/notifications.service'; + +export interface BerthReleaseNotificationInput { + portId: string; + berthId: string; + mooringNumber: string; + archivedClientName: string; + /** ids of the next-in-line interests on this berth (with the metadata + * needed for the notification body — comes from the dossier). */ + nextInLineInterests: Array<{ + interestId: string; + clientName: string | null; + pipelineStage: string; + }>; +} + +export async function notifyNextInLine(input: BerthReleaseNotificationInput): Promise { + // 1. Resolve recipients: every port user whose role permits interests.change_stage. + const portRoleRows = await db + .select({ + userId: userPortRoles.userId, + permissions: roles.permissions, + }) + .from(userPortRoles) + .innerJoin(roles, eq(userPortRoles.roleId, roles.id)) + .where(eq(userPortRoles.portId, input.portId)); + + const salesUserIds = new Set(); + for (const r of portRoleRows) { + const perms = r.permissions as RolePermissions | null; + if (perms?.interests?.change_stage) salesUserIds.add(r.userId); + } + + if (salesUserIds.size === 0) { + logger.debug( + { portId: input.portId, berthId: input.berthId }, + 'No sales recipients for next-in-line notification', + ); + return; + } + + // 2. Build a single description listing the next interests so the + // rep can act on it without opening the berth detail page first. + const previewLines = input.nextInLineInterests.slice(0, 5).map((i) => { + const days = i.pipelineStage.replace(/_/g, ' '); + return `${i.clientName ?? '(unknown)'} — ${days}`; + }); + const more = + input.nextInLineInterests.length > 5 + ? `\n…and ${input.nextInLineInterests.length - 5} more` + : ''; + const description = input.nextInLineInterests.length + ? `${previewLines.join('\n')}${more}` + : 'No prior interests recorded — this berth is fully available again.'; + + // 3. Fire-and-forget per recipient. dedupeKey collapses duplicate + // fires within the cooldown window if multiple events queue up. + for (const userId of salesUserIds) { + void createNotification({ + portId: input.portId, + userId, + type: 'berth_released', + title: `Berth ${input.mooringNumber} released — ${input.archivedClientName} archived`, + description, + link: `/berths/${input.berthId}`, + entityType: 'berth', + entityId: input.berthId, + dedupeKey: `berth-released:${input.berthId}`, + }); + } +}