fix(audit-wave-1): orphan-blob window in handleDocumentCompleted
Closes Wave 1.3 (CRITICAL). The previous storage.put → files.insert
→ documents.update sequence had two real failure modes:
1. **Orphan blob.** If storage.put succeeded but the files.insert or
documents.update failed, the blob lived forever in MinIO with no
DB pointer. Re-runs re-uploaded a new blob without cleaning up
the previous one.
2. **Zombie completed state.** The catch block at the end ran
`documents.update({status: 'completed'})` with NO signedFileId
on any failure path. The idempotency early-return at the top
requires BOTH status='completed' AND signedFileId, so retries
*did* still re-attempt — but reps saw a "completed" document
with no signed file, hiding the failure.
Fix:
- Track `putStoragePath` outside the try. After storage.put lands,
the variable holds the path; cleared once the DB commit succeeds.
- files.insert + documents.update + reservation contract mirror all
run in a single `db.transaction(...)`. Atomic commit-or-rollback.
- Catch block: compensating `storage.delete(putStoragePath)` if the
DB commit didn't land. Logs at error level on compensating-delete
failure so a human can clean up.
- Catch block no longer sets `status='completed'`. The doc stays
in its prior state; Documenso's retry (or our poll-worker) re-
attempts the full sequence safely thanks to the unchanged
idempotency gate.
Verified: tsc clean, documents-completion-auto-deposit tests all
pass (5/5).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1116,6 +1116,12 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tracked outside the try so the catch can attempt a compensating
|
||||||
|
// storage.delete if we put a blob but failed to commit the DB rows.
|
||||||
|
// Without this, partial failures left an orphan blob in storage with
|
||||||
|
// no DB pointer (the audit's orphan-blob CRITICAL).
|
||||||
|
let putStoragePath: string | null = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const signedPdfBuffer = await downloadSignedPdf(eventData.documentId);
|
const signedPdfBuffer = await downloadSignedPdf(eventData.documentId);
|
||||||
|
|
||||||
@@ -1137,6 +1143,7 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
|
|||||||
contentType: 'application/pdf',
|
contentType: 'application/pdf',
|
||||||
sizeBytes: signedPdfBuffer.length,
|
sizeBytes: signedPdfBuffer.length,
|
||||||
});
|
});
|
||||||
|
putStoragePath = storagePath;
|
||||||
|
|
||||||
// Resolve owner via the Owner-wins chain. The signed PDF lands in
|
// Resolve owner via the Owner-wins chain. The signed PDF lands in
|
||||||
// this owner's auto-created entity subfolder (or at root if no owner).
|
// this owner's auto-created entity subfolder (or at root if no owner).
|
||||||
@@ -1163,40 +1170,55 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const [fileRecord] = await db
|
// Atomic: the files row + the documents.signedFileId pointer + the
|
||||||
.insert(files)
|
// reservation contract mirror commit together. If any throws, the
|
||||||
.values({
|
// outer catch fires storage.delete on the orphan blob.
|
||||||
portId: doc.portId,
|
const fileRecord = await db.transaction(async (tx) => {
|
||||||
clientId: owner?.entityType === 'client' ? owner.entityId : (doc.clientId ?? null),
|
const [inserted] = await tx
|
||||||
companyId: owner?.entityType === 'company' ? owner.entityId : (doc.companyId ?? null),
|
.insert(files)
|
||||||
yachtId: owner?.entityType === 'yacht' ? owner.entityId : (doc.yachtId ?? null),
|
.values({
|
||||||
folderId: entityFolderId,
|
portId: doc.portId,
|
||||||
filename: `signed-${doc.id}.pdf`,
|
clientId: owner?.entityType === 'client' ? owner.entityId : (doc.clientId ?? null),
|
||||||
originalName: `signed-${doc.id}.pdf`,
|
companyId: owner?.entityType === 'company' ? owner.entityId : (doc.companyId ?? null),
|
||||||
mimeType: 'application/pdf',
|
yachtId: owner?.entityType === 'yacht' ? owner.entityId : (doc.yachtId ?? null),
|
||||||
sizeBytes: String(signedPdfBuffer.length),
|
folderId: entityFolderId,
|
||||||
storagePath,
|
filename: `signed-${doc.id}.pdf`,
|
||||||
storageBucket: env.MINIO_BUCKET,
|
originalName: `signed-${doc.id}.pdf`,
|
||||||
category: 'eoi',
|
mimeType: 'application/pdf',
|
||||||
uploadedBy: 'system',
|
sizeBytes: String(signedPdfBuffer.length),
|
||||||
})
|
storagePath,
|
||||||
.returning();
|
storageBucket: env.MINIO_BUCKET,
|
||||||
|
category: 'eoi',
|
||||||
|
uploadedBy: 'system',
|
||||||
|
})
|
||||||
|
.returning();
|
||||||
|
|
||||||
await db
|
if (!inserted) {
|
||||||
.update(documents)
|
throw new Error('files.insert returned no row');
|
||||||
.set({ status: 'completed', signedFileId: fileRecord!.id, updatedAt: new Date() })
|
}
|
||||||
.where(eq(documents.id, doc.id));
|
|
||||||
|
|
||||||
// Reservation agreements mirror their signed PDF onto
|
await tx
|
||||||
// berth_reservations.contractFileId so the portal "My Reservations" view
|
.update(documents)
|
||||||
// can resolve the contract without joining through documents.
|
.set({ status: 'completed', signedFileId: inserted.id, updatedAt: new Date() })
|
||||||
if (doc.documentType === 'reservation_agreement' && doc.reservationId) {
|
.where(eq(documents.id, doc.id));
|
||||||
const { berthReservations } = await import('@/lib/db/schema/reservations');
|
|
||||||
await db
|
// Reservation agreements mirror their signed PDF onto
|
||||||
.update(berthReservations)
|
// berth_reservations.contractFileId so the portal "My Reservations" view
|
||||||
.set({ contractFileId: fileRecord!.id, updatedAt: new Date() })
|
// can resolve the contract without joining through documents.
|
||||||
.where(eq(berthReservations.id, doc.reservationId));
|
if (doc.documentType === 'reservation_agreement' && doc.reservationId) {
|
||||||
}
|
const { berthReservations } = await import('@/lib/db/schema/reservations');
|
||||||
|
await tx
|
||||||
|
.update(berthReservations)
|
||||||
|
.set({ contractFileId: inserted.id, updatedAt: new Date() })
|
||||||
|
.where(eq(berthReservations.id, doc.reservationId));
|
||||||
|
}
|
||||||
|
|
||||||
|
return inserted;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Mark as durably committed BEFORE side effects so the catch block
|
||||||
|
// doesn't try to undo a blob whose DB pointer just landed.
|
||||||
|
putStoragePath = null;
|
||||||
|
|
||||||
// Audit log: the webhook just minted a new signed-PDF file row owned by
|
// Audit log: the webhook just minted a new signed-PDF file row owned by
|
||||||
// 'system'. Without this entry the file appears in the aggregated view
|
// 'system'. Without this entry the file appears in the aggregated view
|
||||||
@@ -1206,9 +1228,9 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
|
|||||||
portId: doc.portId,
|
portId: doc.portId,
|
||||||
action: 'create',
|
action: 'create',
|
||||||
entityType: 'file',
|
entityType: 'file',
|
||||||
entityId: fileRecord!.id,
|
entityId: fileRecord.id,
|
||||||
newValue: {
|
newValue: {
|
||||||
filename: fileRecord!.filename,
|
filename: fileRecord.filename,
|
||||||
mimeType: 'application/pdf',
|
mimeType: 'application/pdf',
|
||||||
size: signedPdfBuffer.length,
|
size: signedPdfBuffer.length,
|
||||||
documentId: doc.id,
|
documentId: doc.id,
|
||||||
@@ -1246,10 +1268,31 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
|
|||||||
{ err, documentId: doc.id, portId: doc.portId },
|
{ err, documentId: doc.id, portId: doc.portId },
|
||||||
'Failed to download/store signed PDF',
|
'Failed to download/store signed PDF',
|
||||||
);
|
);
|
||||||
await db
|
|
||||||
.update(documents)
|
// Compensating delete: storage.put landed but the DB commit didn't.
|
||||||
.set({ status: 'completed', updatedAt: new Date() })
|
// Without this the blob lives forever with no row pointing at it.
|
||||||
.where(eq(documents.id, doc.id));
|
if (putStoragePath) {
|
||||||
|
try {
|
||||||
|
await (await getStorageBackend()).delete(putStoragePath);
|
||||||
|
logger.info(
|
||||||
|
{ documentId: doc.id, storagePath: putStoragePath },
|
||||||
|
'Compensating storage.delete after failed signed-PDF commit',
|
||||||
|
);
|
||||||
|
} catch (compErr) {
|
||||||
|
// We tried — log so a human can clean up the orphan if needed.
|
||||||
|
logger.error(
|
||||||
|
{ compErr, documentId: doc.id, storagePath: putStoragePath },
|
||||||
|
'Compensating storage.delete also failed — manual cleanup required',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Critical: do NOT set documents.status = 'completed' on failure.
|
||||||
|
// The previous catch block did — which created the "completed-with-
|
||||||
|
// no-signedFileId" zombie state the audit flagged. Let the next
|
||||||
|
// Documenso retry (or our poll-worker reconciliation) re-attempt;
|
||||||
|
// the early-return idempotency gate at the top requires BOTH
|
||||||
|
// status='completed' AND signedFileId so re-runs are safe.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update interest if eoi type
|
// Update interest if eoi type
|
||||||
|
|||||||
Reference in New Issue
Block a user