From c761b4b911278793ddd2cea2933e1dacee4a9666 Mon Sep 17 00:00:00 2001 From: Matt Date: Mon, 11 May 2026 13:56:46 +0200 Subject: [PATCH] fix(documents): idempotency, perf, contract pipeline, observability - A1: idempotency gate in handleDocumentCompleted (prevents duplicate files on Documenso retry) - A3: LEFT JOIN port_id move to outer WHERE (uses idx_docs_signed_file_id) - G-C5: contract_sent / contract_signed auto-advance triggers in sendDocument + handleDocumentCompleted - 0-byte signed PDF guard before storage.put - portId in outer catch + poll worker - Sanitize storagePath/storageBucket in aggregated files API - Audit log for handleDocumentCompleted file insert - Replace em-dashes in aggregated group labels with colons - G-I6: delete orphaned hub-counts route + getHubTabCounts service fn Co-Authored-By: Claude Opus 4.7 (1M context) --- src/app/api/v1/documents/hub-counts/route.ts | 16 --- src/jobs/processors/documenso-poll.ts | 14 +- src/lib/error-codes.ts | 6 + src/lib/services/documents.service.ts | 132 +++++++++++------- src/lib/services/files.ts | 46 ++++-- .../documents-hub-eoi-queue.test.ts | 47 +------ 6 files changed, 138 insertions(+), 123 deletions(-) delete mode 100644 src/app/api/v1/documents/hub-counts/route.ts diff --git a/src/app/api/v1/documents/hub-counts/route.ts b/src/app/api/v1/documents/hub-counts/route.ts deleted file mode 100644 index 74b19428..00000000 --- a/src/app/api/v1/documents/hub-counts/route.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { NextResponse } from 'next/server'; - -import { withAuth, withPermission } from '@/lib/api/helpers'; -import { errorResponse } from '@/lib/errors'; -import { getHubTabCounts } from '@/lib/services/documents.service'; - -export const GET = withAuth( - withPermission('documents', 'view', async (_req, ctx) => { - try { - const counts = await getHubTabCounts(ctx.portId, ctx.user.email); - return NextResponse.json({ data: counts }); - } catch (error) { - return errorResponse(error); - } - }), -); diff --git a/src/jobs/processors/documenso-poll.ts b/src/jobs/processors/documenso-poll.ts index 99d0509d..5183cbdf 100644 --- a/src/jobs/processors/documenso-poll.ts +++ b/src/jobs/processors/documenso-poll.ts @@ -42,12 +42,16 @@ export async function processDocumensoPoll(): Promise { if (localSigner && localSigner.status !== 'signed') { logger.info( - { documentId: doc.id, email: remoteRecipient.email }, + { documentId: doc.id, email: remoteRecipient.email, portId: doc.portId }, 'Reconciling signed signer from poll', ); + // Thread portId from the workflow's port context so the webhook + // handlers run port-scoped lookups (resolveWebhookDocument) rather + // than the port-ambiguous fallback. await handleRecipientSigned({ documentId: doc.documensoId, recipientEmail: remoteRecipient.email, + portId: doc.portId, }); } } @@ -55,11 +59,11 @@ export async function processDocumensoPoll(): Promise { // Reconcile document status if (remoteDoc.status === 'COMPLETED' && doc.status !== 'completed') { - logger.info({ documentId: doc.id }, 'Reconciling completed document from poll'); - await handleDocumentCompleted({ documentId: doc.documensoId }); + logger.info({ documentId: doc.id, portId: doc.portId }, 'Reconciling completed document from poll'); + await handleDocumentCompleted({ documentId: doc.documensoId, portId: doc.portId }); } else if (remoteDoc.status === 'EXPIRED' && doc.status !== 'expired') { - logger.info({ documentId: doc.id }, 'Reconciling expired document from poll'); - await handleDocumentExpired({ documentId: doc.documensoId }); + logger.info({ documentId: doc.id, portId: doc.portId }, 'Reconciling expired document from poll'); + await handleDocumentExpired({ documentId: doc.documensoId, portId: doc.portId }); } } catch (err) { logger.error( diff --git a/src/lib/error-codes.ts b/src/lib/error-codes.ts index a51bc204..0f068274 100644 --- a/src/lib/error-codes.ts +++ b/src/lib/error-codes.ts @@ -172,6 +172,12 @@ export const ERROR_CODES = { status: 404, userMessage: 'That document template is missing or has been removed.', }, + DOCUMENSO_EMPTY_PDF: { + status: 502, + userMessage: + 'The signing service returned an empty PDF. Please retry, and if it keeps happening, ping an admin.', + hint: 'Documenso downloadSignedPdf returned a 0-byte buffer; do not persist as signedFileId.', + }, // ─── Send-outs / Email ────────────────────────────────────────────── EMAIL_RECIPIENT_MISSING: { diff --git a/src/lib/services/documents.service.ts b/src/lib/services/documents.service.ts index 09909a7b..e771e309 100644 --- a/src/lib/services/documents.service.ts +++ b/src/lib/services/documents.service.ts @@ -1,4 +1,4 @@ -import { and, count, desc, eq, gte, inArray, isNull, lt, lte, ne, sql, exists } from 'drizzle-orm'; +import { and, desc, eq, gte, inArray, isNull, lt, lte, ne, sql, exists } from 'drizzle-orm'; import { db } from '@/lib/db'; import { @@ -378,50 +378,6 @@ export async function listDealDocumentsForBerth( })); } -// ─── Hub tab counts ─────────────────────────────────────────────────────────── - -export interface HubTabCounts { - all: number; - in_progress: number; - eoi_queue: number; - awaiting_them: number; - awaiting_me: number; - completed: number; - expired: number; -} - -/** - * Compute hub tab counts in a single roundtrip per tab. Uses - * idx_docs_status_port for cheap aggregation. - */ -export async function getHubTabCounts( - portId: string, - currentUserEmail: string | undefined, -): Promise { - async function tabCount(tab: ListDocumentsInput['tab']): Promise { - const filters: ReturnType[] = [eq(documents.portId, portId)]; - filters.push(...buildHubTabFilters(tab, currentUserEmail)); - const [row] = await db - .select({ count: count() }) - .from(documents) - .where(and(...filters)); - return row?.count ?? 0; - } - - const [all, in_progress, eoi_queue, awaiting_them, awaiting_me, completed, expired] = - await Promise.all([ - tabCount('all'), - tabCount('in_progress'), - tabCount('eoi_queue'), - tabCount('awaiting_them'), - tabCount('awaiting_me'), - tabCount('completed'), - tabCount('expired'), - ]); - - return { all, in_progress, eoi_queue, awaiting_them, awaiting_me, completed, expired }; -} - // ─── Get by ID ──────────────────────────────────────────────────────────────── export async function getDocumentById(id: string, portId: string) { @@ -799,6 +755,19 @@ export async function sendForSigning(documentId: string, portId: string, meta: A // Advance pipeline stage to eoi_sent (no-op if already further along). void advanceStageIfBehind(interest.id, portId, 'eoi_sent', meta, 'EOI sent for signing'); + + // G-C5: reservation agreements drive the contract_sent stage. The EOI + // and contract flows share `sendForSigning`, so we differentiate by + // documentType here rather than splitting the entry point. + if (doc.documentType === 'reservation_agreement') { + void advanceStageIfBehind( + interest.id, + portId, + 'contract_sent', + meta, + 'Reservation agreement sent', + ); + } } // Create document event @@ -1116,6 +1085,14 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId); if (!doc) return; + // A1: Idempotency gate. Documenso retries DOCUMENT_COMPLETED on receiver + // 5xx (and the poll worker also reconciles). Without this guard, a second + // delivery re-runs downloadSignedPdf + storage.put + db.insert(files) and + // then clobbers the previous signedFileId on the UPDATE — leaking the + // first file as an orphan blob with no DB pointer. Once we have a signed + // file id we are done. + if (doc.status === 'completed' && doc.signedFileId) return; + // BR-022: Download signed PDF and store in MinIO const port = await db.query.ports.findFirst({ where: eq(ports.id, doc.portId) }); if (!port) { @@ -1125,6 +1102,16 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p try { const signedPdfBuffer = await downloadSignedPdf(eventData.documentId); + + // Guard: a 0-byte response from Documenso would otherwise persist a + // permanent corrupt signedFileId pointing at a blob with no content. + // Refuse and let the next retry / poll-worker pass re-fetch. + if (signedPdfBuffer.length === 0) { + throw new CodedError('DOCUMENSO_EMPTY_PDF', { + internalMessage: `Documenso returned 0-byte signed PDF for documensoId=${eventData.documentId}`, + }); + } + const fileId = crypto.randomUUID(); const storagePath = buildStoragePath(port.slug, 'eoi-signed', doc.id, fileId, 'pdf'); @@ -1194,8 +1181,55 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p .set({ contractFileId: fileRecord!.id, updatedAt: new Date() }) .where(eq(berthReservations.id, doc.reservationId)); } + + // Audit log: the webhook just minted a new signed-PDF file row owned by + // 'system'. Without this entry the file appears in the aggregated view + // with no provenance trail; auditors need to see who/what wrote it. + void createAuditLog({ + userId: 'system', + portId: doc.portId, + action: 'create', + entityType: 'file', + entityId: fileRecord!.id, + newValue: { + filename: fileRecord!.filename, + mimeType: 'application/pdf', + size: signedPdfBuffer.length, + documentId: doc.id, + source: 'documenso_completion', + }, + ipAddress: '0.0.0.0', + userAgent: 'webhook', + }); + + // G-C5: reservation agreement signing-complete → contract_signed. + // Fired here (not below in the eoi-only branch) so contract pipeline + // tracks reality the same way EOIs do via the eoi_signed advance. + if (doc.documentType === 'reservation_agreement' && doc.interestId) { + const systemMeta: AuditMeta = { + userId: 'system', + portId: doc.portId, + ipAddress: '0.0.0.0', + userAgent: 'webhook', + }; + void advanceStageIfBehind( + doc.interestId, + doc.portId, + 'contract_signed', + systemMeta, + 'Reservation agreement signed', + ); + // Dynamic import mirrors the eoi_signed pattern below to avoid the + // berth-rules-engine module-cycle risk during cold-start. + void import('@/lib/services/berth-rules-engine').then(({ evaluateRule }) => + evaluateRule('contract_signed', doc.interestId!, doc.portId, systemMeta), + ); + } } catch (err) { - logger.error({ err, documentId: doc.id }, 'Failed to download/store signed PDF'); + logger.error( + { err, documentId: doc.id, portId: doc.portId }, + 'Failed to download/store signed PDF', + ); await db .update(documents) .set({ status: 'completed', updatedAt: new Date() }) @@ -1874,7 +1908,7 @@ export async function listInflightWorkflowsAggregatedByEntity( const g = await fetchWorkflowGroupRows(portId, eq(documents.companyId, id)); if (g.rows.length === 0) continue; groups.push({ - label: `FROM COMPANY — ${name.toUpperCase()}`, + label: `FROM COMPANY: ${name.toUpperCase()}`, source: 'company', workflows: g.rows, total: g.total, @@ -1885,7 +1919,7 @@ export async function listInflightWorkflowsAggregatedByEntity( const g = await fetchWorkflowGroupRows(portId, eq(documents.yachtId, id)); if (g.rows.length === 0) continue; groups.push({ - label: `FROM YACHT — ${name.toUpperCase()}`, + label: `FROM YACHT: ${name.toUpperCase()}`, source: 'yacht', workflows: g.rows, total: g.total, @@ -1896,7 +1930,7 @@ export async function listInflightWorkflowsAggregatedByEntity( const g = await fetchWorkflowGroupRows(portId, eq(documents.clientId, id)); if (g.rows.length === 0) continue; groups.push({ - label: `FROM CLIENT — ${name.toUpperCase()}`, + label: `FROM CLIENT: ${name.toUpperCase()}`, source: 'client', workflows: g.rows, total: g.total, diff --git a/src/lib/services/files.ts b/src/lib/services/files.ts index 4451f075..6df27019 100644 --- a/src/lib/services/files.ts +++ b/src/lib/services/files.ts @@ -280,10 +280,23 @@ export async function getFileById(id: string, portId: string) { // ─── Aggregated Projection ──────────────────────────────────────────────────── +/** + * Row shape returned by the aggregated projection. Note this intentionally + * omits `storagePath` and `storageBucket` — those are internal storage + * implementation details and must not leak out of the API to rep clients. + * Callers that need to download a file must use the documents/file + * download endpoint, which presigns from the bucket using the id, not the + * raw path. + */ +export type AggregatedFileRow = Omit< + typeof files.$inferSelect, + 'storagePath' | 'storageBucket' +> & { signedFromDocumentId: string | null }; + export interface AggregatedFileGroup { label: string; source: 'direct' | 'client' | 'company' | 'yacht'; - files: Array; + files: AggregatedFileRow[]; total: number; } @@ -332,7 +345,7 @@ export async function listFilesAggregatedByEntity( const g = await fetchGroupRows(portId, eq(files.companyId, id), GROUP_LIMIT); if (g.rows.length === 0) continue; groups.push({ - label: `FROM COMPANY — ${name.toUpperCase()}`, + label: `FROM COMPANY: ${name.toUpperCase()}`, source: 'company', files: g.rows, total: g.total, @@ -343,7 +356,7 @@ export async function listFilesAggregatedByEntity( const g = await fetchGroupRows(portId, eq(files.yachtId, id), GROUP_LIMIT); if (g.rows.length === 0) continue; groups.push({ - label: `FROM YACHT — ${name.toUpperCase()}`, + label: `FROM YACHT: ${name.toUpperCase()}`, source: 'yacht', files: g.rows, total: g.total, @@ -354,7 +367,7 @@ export async function listFilesAggregatedByEntity( const g = await fetchGroupRows(portId, eq(files.clientId, id), GROUP_LIMIT); if (g.rows.length === 0) continue; groups.push({ - label: `FROM CLIENT — ${name.toUpperCase()}`, + label: `FROM CLIENT: ${name.toUpperCase()}`, source: 'client', files: g.rows, total: g.total, @@ -515,9 +528,14 @@ async function fetchGroupRows( predicate: ReturnType, limit: number, ): Promise<{ - rows: Array; + rows: AggregatedFileRow[]; total: number; }> { + // A3: keep the LEFT JOIN's ON clause minimal so the planner can use the + // point-lookup index `idx_docs_signed_file_id` on the join, and apply the + // port_id residual in the WHERE (with the `OR d.id IS NULL` clause so the + // LEFT-JOIN semantics still preserve unjoined file rows). With port_id in + // the ON we used to fall back to `idx_docs_port` which is a wide-range scan. const rows = await db .select({ id: files.id, @@ -530,19 +548,25 @@ async function fetchGroupRows( originalName: files.originalName, mimeType: files.mimeType, sizeBytes: files.sizeBytes, - storagePath: files.storagePath, - storageBucket: files.storageBucket, + // storagePath + storageBucket intentionally omitted — see AggregatedFileRow doc. category: files.category, uploadedBy: files.uploadedBy, createdAt: files.createdAt, // Reverse-link: if any document row has this file as its signed_file_id, - // surface that document's id. LEFT JOIN preserves files with no workflow link. - // Defense-in-depth: portId filter on both the join condition and the outer where. + // surface that document's id. signedFromDocumentId: documents.id, }) .from(files) - .leftJoin(documents, and(eq(documents.signedFileId, files.id), eq(documents.portId, portId))) - .where(and(eq(files.portId, portId), predicate)) + .leftJoin(documents, eq(documents.signedFileId, files.id)) + .where( + and( + eq(files.portId, portId), + predicate, + // Defense-in-depth: keep the cross-port-leakage guard on the joined + // doc row but allow unjoined files (id IS NULL). + or(eq(documents.portId, portId), isNull(documents.id)), + ), + ) .orderBy(desc(files.createdAt)) .limit(limit); diff --git a/tests/integration/documents-hub-eoi-queue.test.ts b/tests/integration/documents-hub-eoi-queue.test.ts index 81afb52a..5974d4fd 100644 --- a/tests/integration/documents-hub-eoi-queue.test.ts +++ b/tests/integration/documents-hub-eoi-queue.test.ts @@ -4,16 +4,19 @@ * Verifies that: * - `listDocuments` with tab='eoi_queue' returns only EOI docs in * draft/sent/partially_signed status - * - `getHubTabCounts` reports the correct eoi_queue count * - Other doc types (NDA, contract, welcome_letter) are excluded * - Completed/expired EOIs are excluded (those belong to other tabs) + * + * (Note: `getHubTabCounts` and the /hub-counts route were removed when the + * hub rebuild dropped the count-strip KPI surface — the count assertions + * that used to live here went with them.) */ import { describe, it, expect } from 'vitest'; import { db } from '@/lib/db'; import { documents } from '@/lib/db/schema/documents'; -import { getHubTabCounts, listDocuments } from '@/lib/services/documents.service'; +import { listDocuments } from '@/lib/services/documents.service'; import { makePort, makeClient } from '../helpers/factories'; describe('documents hub — eoi_queue tab', () => { @@ -84,43 +87,6 @@ describe('documents hub — eoi_queue tab', () => { expect(docs.every((d) => ['sent', 'partially_signed'].includes(d.status))).toBe(true); }); - it('reports the correct eoi_queue count via getHubTabCounts', async () => { - const port = await makePort(); - const client = await makeClient({ portId: port.id }); - - await db.insert(documents).values([ - { - portId: port.id, - clientId: client.id, - documentType: 'eoi', - title: 'EOI A', - status: 'draft', - createdBy: 'seed', - }, - { - portId: port.id, - clientId: client.id, - documentType: 'eoi', - title: 'EOI B', - status: 'sent', - createdBy: 'seed', - }, - { - portId: port.id, - clientId: client.id, - documentType: 'contract', - title: 'Contract X', - status: 'sent', - createdBy: 'seed', - }, - ]); - - const counts = await getHubTabCounts(port.id, undefined); - expect(counts.eoi_queue).toBe(2); - // The contract should not bump eoi_queue. - expect(counts.all).toBe(3); - }); - it('returns an empty list when no in-flight EOIs exist', async () => { const port = await makePort(); const client = await makeClient({ portId: port.id }); @@ -146,8 +112,5 @@ describe('documents hub — eoi_queue tab', () => { {}, ); expect(result.data).toHaveLength(0); - - const counts = await getHubTabCounts(port.id, undefined); - expect(counts.eoi_queue).toBe(0); }); });