fix(documents): idempotency, perf, contract pipeline, observability

- A1: idempotency gate in handleDocumentCompleted (prevents duplicate files on Documenso retry)
- A3: LEFT JOIN port_id move to outer WHERE (uses idx_docs_signed_file_id)
- G-C5: contract_sent / contract_signed auto-advance triggers in sendDocument + handleDocumentCompleted
- 0-byte signed PDF guard before storage.put
- portId in outer catch + poll worker
- Sanitize storagePath/storageBucket in aggregated files API
- Audit log for handleDocumentCompleted file insert
- Replace em-dashes in aggregated group labels with colons
- G-I6: delete orphaned hub-counts route + getHubTabCounts service fn

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-11 13:56:46 +02:00
parent c0e5af8b92
commit c761b4b911
6 changed files with 138 additions and 123 deletions

View File

@@ -1,4 +1,4 @@
import { and, count, desc, eq, gte, inArray, isNull, lt, lte, ne, sql, exists } from 'drizzle-orm';
import { and, desc, eq, gte, inArray, isNull, lt, lte, ne, sql, exists } from 'drizzle-orm';
import { db } from '@/lib/db';
import {
@@ -378,50 +378,6 @@ export async function listDealDocumentsForBerth(
}));
}
// ─── Hub tab counts ───────────────────────────────────────────────────────────
export interface HubTabCounts {
all: number;
in_progress: number;
eoi_queue: number;
awaiting_them: number;
awaiting_me: number;
completed: number;
expired: number;
}
/**
* Compute hub tab counts in a single roundtrip per tab. Uses
* idx_docs_status_port for cheap aggregation.
*/
export async function getHubTabCounts(
portId: string,
currentUserEmail: string | undefined,
): Promise<HubTabCounts> {
async function tabCount(tab: ListDocumentsInput['tab']): Promise<number> {
const filters: ReturnType<typeof and>[] = [eq(documents.portId, portId)];
filters.push(...buildHubTabFilters(tab, currentUserEmail));
const [row] = await db
.select({ count: count() })
.from(documents)
.where(and(...filters));
return row?.count ?? 0;
}
const [all, in_progress, eoi_queue, awaiting_them, awaiting_me, completed, expired] =
await Promise.all([
tabCount('all'),
tabCount('in_progress'),
tabCount('eoi_queue'),
tabCount('awaiting_them'),
tabCount('awaiting_me'),
tabCount('completed'),
tabCount('expired'),
]);
return { all, in_progress, eoi_queue, awaiting_them, awaiting_me, completed, expired };
}
// ─── Get by ID ────────────────────────────────────────────────────────────────
export async function getDocumentById(id: string, portId: string) {
@@ -799,6 +755,19 @@ export async function sendForSigning(documentId: string, portId: string, meta: A
// Advance pipeline stage to eoi_sent (no-op if already further along).
void advanceStageIfBehind(interest.id, portId, 'eoi_sent', meta, 'EOI sent for signing');
// G-C5: reservation agreements drive the contract_sent stage. The EOI
// and contract flows share `sendForSigning`, so we differentiate by
// documentType here rather than splitting the entry point.
if (doc.documentType === 'reservation_agreement') {
void advanceStageIfBehind(
interest.id,
portId,
'contract_sent',
meta,
'Reservation agreement sent',
);
}
}
// Create document event
@@ -1116,6 +1085,14 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
// A1: Idempotency gate. Documenso retries DOCUMENT_COMPLETED on receiver
// 5xx (and the poll worker also reconciles). Without this guard, a second
// delivery re-runs downloadSignedPdf + storage.put + db.insert(files) and
// then clobbers the previous signedFileId on the UPDATE — leaking the
// first file as an orphan blob with no DB pointer. Once we have a signed
// file id we are done.
if (doc.status === 'completed' && doc.signedFileId) return;
// BR-022: Download signed PDF and store in MinIO
const port = await db.query.ports.findFirst({ where: eq(ports.id, doc.portId) });
if (!port) {
@@ -1125,6 +1102,16 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
try {
const signedPdfBuffer = await downloadSignedPdf(eventData.documentId);
// Guard: a 0-byte response from Documenso would otherwise persist a
// permanent corrupt signedFileId pointing at a blob with no content.
// Refuse and let the next retry / poll-worker pass re-fetch.
if (signedPdfBuffer.length === 0) {
throw new CodedError('DOCUMENSO_EMPTY_PDF', {
internalMessage: `Documenso returned 0-byte signed PDF for documensoId=${eventData.documentId}`,
});
}
const fileId = crypto.randomUUID();
const storagePath = buildStoragePath(port.slug, 'eoi-signed', doc.id, fileId, 'pdf');
@@ -1194,8 +1181,55 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
.set({ contractFileId: fileRecord!.id, updatedAt: new Date() })
.where(eq(berthReservations.id, doc.reservationId));
}
// Audit log: the webhook just minted a new signed-PDF file row owned by
// 'system'. Without this entry the file appears in the aggregated view
// with no provenance trail; auditors need to see who/what wrote it.
void createAuditLog({
userId: 'system',
portId: doc.portId,
action: 'create',
entityType: 'file',
entityId: fileRecord!.id,
newValue: {
filename: fileRecord!.filename,
mimeType: 'application/pdf',
size: signedPdfBuffer.length,
documentId: doc.id,
source: 'documenso_completion',
},
ipAddress: '0.0.0.0',
userAgent: 'webhook',
});
// G-C5: reservation agreement signing-complete → contract_signed.
// Fired here (not below in the eoi-only branch) so contract pipeline
// tracks reality the same way EOIs do via the eoi_signed advance.
if (doc.documentType === 'reservation_agreement' && doc.interestId) {
const systemMeta: AuditMeta = {
userId: 'system',
portId: doc.portId,
ipAddress: '0.0.0.0',
userAgent: 'webhook',
};
void advanceStageIfBehind(
doc.interestId,
doc.portId,
'contract_signed',
systemMeta,
'Reservation agreement signed',
);
// Dynamic import mirrors the eoi_signed pattern below to avoid the
// berth-rules-engine module-cycle risk during cold-start.
void import('@/lib/services/berth-rules-engine').then(({ evaluateRule }) =>
evaluateRule('contract_signed', doc.interestId!, doc.portId, systemMeta),
);
}
} catch (err) {
logger.error({ err, documentId: doc.id }, 'Failed to download/store signed PDF');
logger.error(
{ err, documentId: doc.id, portId: doc.portId },
'Failed to download/store signed PDF',
);
await db
.update(documents)
.set({ status: 'completed', updatedAt: new Date() })
@@ -1874,7 +1908,7 @@ export async function listInflightWorkflowsAggregatedByEntity(
const g = await fetchWorkflowGroupRows(portId, eq(documents.companyId, id));
if (g.rows.length === 0) continue;
groups.push({
label: `FROM COMPANY ${name.toUpperCase()}`,
label: `FROM COMPANY: ${name.toUpperCase()}`,
source: 'company',
workflows: g.rows,
total: g.total,
@@ -1885,7 +1919,7 @@ export async function listInflightWorkflowsAggregatedByEntity(
const g = await fetchWorkflowGroupRows(portId, eq(documents.yachtId, id));
if (g.rows.length === 0) continue;
groups.push({
label: `FROM YACHT ${name.toUpperCase()}`,
label: `FROM YACHT: ${name.toUpperCase()}`,
source: 'yacht',
workflows: g.rows,
total: g.total,
@@ -1896,7 +1930,7 @@ export async function listInflightWorkflowsAggregatedByEntity(
const g = await fetchWorkflowGroupRows(portId, eq(documents.clientId, id));
if (g.rows.length === 0) continue;
groups.push({
label: `FROM CLIENT ${name.toUpperCase()}`,
label: `FROM CLIENT: ${name.toUpperCase()}`,
source: 'client',
workflows: g.rows,
total: g.total,