From ecf49be18cf8f7b1ddb4d8d4e454bf98bf75765f Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 13 May 2026 12:34:23 +0200 Subject: [PATCH] fix(audit-wave-10): concurrency hardening (concurrency-auditor) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Close the CRITICAL + HIGH-tractable race conditions the concurrency-auditor flagged. The wide-impact items (BullMQ jobId plumbing — C-2; webhook outbound retry idempotency keys; etc.) span too many call sites for a single contained wave and stay deferred. **C-1 — handleDocumentCompleted concurrent-retry orphan-blob** Wave 1 fixed the compensating-delete on single-process failure but the idempotency gate at line 1110 reads `doc.status` outside any row lock. Two webhook deliveries arriving in parallel both pass the gate, both storage.put + db.insert(files), and the losing files row orphans its blob since documents.signed_file_id only points at one. Now the transaction at line 1176 SELECTs the document `FOR UPDATE` and re-checks the gate; if a concurrent worker already completed, throws a sentinel `DocumentAlreadyCompletedError` which the outer catch recognizes and runs the compensating storage.delete at info level (not error). Net effect: at-most-once signed-PDF persistence even under Documenso 5xx-then-retry storms. **H-1 — moveFolder cycle check race** Two concurrent folder moves (A → B and B → A) in READ COMMITTED can each pass the cycle check against pre-state and both commit, leaving A↔B in the tree. Add a per-port `pg_advisory_xact_lock` at the top of the move transaction so the walk-and-write is atomic per port. Lock auto-releases on tx end; no impact on cross-port folder ops. **H-3 — upsertInterestBerth 23505 → generic 500** Two concurrent `setPrimaryBerth` calls hit `idx_interest_berths_one_primary` and the loser surfaced as a generic 500. Catch the 23505 + constraint name and remap to ConflictError so the UI gets a "Another rep changed the primary berth at the same time. Refresh and try again." toast. **M-2 — username uniqueness 23505 → generic 500** Same TOCTOU shape: pre-check at me/route.ts:132 says "available", the UPDATE then fails at the partial unique index. Catch 23505 + `idx_user_profiles_username_unique` and remap to ConflictError. Tests 1315/1315. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/app/api/v1/me/route.ts | 29 +++++++++-- src/lib/services/document-folders.service.ts | 10 +++- src/lib/services/documents.service.ts | 52 ++++++++++++++++++-- src/lib/services/interest-berths.service.ts | 30 +++++++++-- 4 files changed, 107 insertions(+), 14 deletions(-) diff --git a/src/app/api/v1/me/route.ts b/src/app/api/v1/me/route.ts index e1994755..a5bbeaa8 100644 --- a/src/app/api/v1/me/route.ts +++ b/src/app/api/v1/me/route.ts @@ -162,11 +162,24 @@ export const PATCH = withAuth(async (req, ctx: AuthContext) => { updates.preferences = merged; } - const [updated] = await db - .update(userProfiles) - .set(updates) - .where(eq(userProfiles.userId, ctx.userId)) - .returning(); + // concurrency-auditor M-2: pre-check at line 132-139 is TOCTOU + // against `idx_user_profiles_username_unique`. Two concurrent claims + // on the same username will see "available" in their own pre-check + // and the loser's UPDATE fails with 23505 — surface that as + // ConflictError rather than letting it bubble as a generic 500. + let updated; + try { + [updated] = await db + .update(userProfiles) + .set(updates) + .where(eq(userProfiles.userId, ctx.userId)) + .returning(); + } catch (err) { + if (isUsernameUniqueConflict(err)) { + throw new ConflictError('That username is already taken.'); + } + throw err; + } return NextResponse.json({ data: { @@ -184,3 +197,9 @@ export const PATCH = withAuth(async (req, ctx: AuthContext) => { return errorResponse(error); } }); + +function isUsernameUniqueConflict(err: unknown): boolean { + if (typeof err !== 'object' || err === null) return false; + const e = err as { code?: string; constraint_name?: string }; + return e.code === '23505' && e.constraint_name === 'idx_user_profiles_username_unique'; +} diff --git a/src/lib/services/document-folders.service.ts b/src/lib/services/document-folders.service.ts index 1e1bb375..fd84ac5c 100644 --- a/src/lib/services/document-folders.service.ts +++ b/src/lib/services/document-folders.service.ts @@ -1,4 +1,4 @@ -import { and, asc, eq } from 'drizzle-orm'; +import { and, asc, eq, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { documentFolders, documents, files, type DocumentFolder } from '@/lib/db/schema/documents'; @@ -224,6 +224,14 @@ export async function moveFolder( // write is atomic per move attempt. try { return await db.transaction(async (tx) => { + // Serialize all folder-move work for this port via a per-port + // advisory lock. The cycle check walks the ancestor chain with + // multiple SELECTs, and READ COMMITTED doesn't see other in-flight + // updates without an explicit lock. Two concurrent moves (A → B + // and B → A) would otherwise each see the pre-state and both + // commit, leaving an A↔B cycle. The lock auto-releases on tx end. + await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${portId} || ':folder-move'))`); + if (newParentId !== null) { const newParent = await tx.query.documentFolders.findFirst({ where: and(eq(documentFolders.id, newParentId), eq(documentFolders.portId, portId)), diff --git a/src/lib/services/documents.service.ts b/src/lib/services/documents.service.ts index cddc331f..08b6d058 100644 --- a/src/lib/services/documents.service.ts +++ b/src/lib/services/documents.service.ts @@ -1097,6 +1097,19 @@ async function resolveDocumentOwner( return null; } +/** + * Sentinel thrown by `handleDocumentCompleted`'s in-tx race-check when a + * concurrent webhook delivery has already committed the signed-PDF + * file. Caught by the outer try so we can run the compensating blob + * delete + log at info level instead of error. + */ +class DocumentAlreadyCompletedError extends Error { + constructor() { + super('document already marked completed by a concurrent webhook'); + this.name = 'DocumentAlreadyCompletedError'; + } +} + export async function handleDocumentCompleted(eventData: { documentId: string; portId?: string }) { const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId); if (!doc) return; @@ -1173,7 +1186,28 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p // Atomic: the files row + the documents.signedFileId pointer + the // reservation contract mirror commit together. If any throws, the // outer catch fires storage.delete on the orphan blob. + // + // concurrency-auditor C-1: re-check the idempotency gate INSIDE the + // tx with SELECT … FOR UPDATE so two near-simultaneous webhook + // retries can't both pass the read-outside-lock gate at line 1110 + // and both insert into `files` (the losing row would orphan its blob + // since `documents.signed_file_id` only points at one). The outer + // catch handles the "we put a blob but a concurrent worker won the + // race" cleanup via the existing putStoragePath finalizer. const fileRecord = await db.transaction(async (tx) => { + const [locked] = await tx + .select({ status: documents.status, signedFileId: documents.signedFileId }) + .from(documents) + .where(eq(documents.id, doc.id)) + .for('update'); + + if (locked && locked.status === 'completed' && locked.signedFileId) { + // Concurrent webhook beat us — abort so the outer catch deletes + // the duplicate blob we just put into storage. Throw a sentinel + // we recognize so we don't log it as an error. + throw new DocumentAlreadyCompletedError(); + } + const [inserted] = await tx .insert(files) .values({ @@ -1264,10 +1298,20 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p ); } } catch (err) { - logger.error( - { err, documentId: doc.id, portId: doc.portId }, - 'Failed to download/store signed PDF', - ); + // Distinguish "we lost the concurrent race" from a real failure — + // the loser of the SELECT FOR UPDATE re-check should clean up its + // blob silently, not log an error. + if (err instanceof DocumentAlreadyCompletedError) { + logger.info( + { documentId: doc.id, portId: doc.portId }, + 'Webhook race lost — another worker already committed the signed PDF; deleting our duplicate blob', + ); + } else { + logger.error( + { err, documentId: doc.id, portId: doc.portId }, + 'Failed to download/store signed PDF', + ); + } // Compensating delete: storage.put landed but the DB commit didn't. // Without this the blob lives forever with no row pointing at it. diff --git a/src/lib/services/interest-berths.service.ts b/src/lib/services/interest-berths.service.ts index e6ec290f..b094ca8d 100644 --- a/src/lib/services/interest-berths.service.ts +++ b/src/lib/services/interest-berths.service.ts @@ -21,7 +21,7 @@ import { and, desc, eq, inArray } from 'drizzle-orm'; import { db } from '@/lib/db'; import { interestBerths, interests, type InterestBerth } from '@/lib/db/schema/interests'; import { berths } from '@/lib/db/schema/berths'; -import { CodedError, NotFoundError } from '@/lib/errors'; +import { CodedError, ConflictError, NotFoundError } from '@/lib/errors'; import type { AuditMeta } from '@/lib/audit'; type DbOrTx = typeof db | Parameters[0]>[0]; @@ -183,9 +183,31 @@ export async function upsertInterestBerth( berthId: string, opts: AddOrUpdateOpts = {}, ): Promise { - return db.transaction(async (tx) => { - return upsertInterestBerthTx(tx, interestId, berthId, opts); - }); + // concurrency-auditor H-3: two concurrent setPrimaryBerth calls on + // the same interest hit `idx_interest_berths_one_primary` (partial + // unique on `is_primary=true`). The loser surfaced as a generic + // 500 because the 23505 wasn't translated. Catch and remap to a + // ConflictError so the UI gets a "another rep just changed the + // primary berth" toast instead. + try { + return await db.transaction(async (tx) => { + return upsertInterestBerthTx(tx, interestId, berthId, opts); + }); + } catch (err) { + if (isPrimaryBerthConflict(err)) { + throw new ConflictError( + 'Another rep changed the primary berth at the same time. Refresh and try again.', + ); + } + throw err; + } +} + +function isPrimaryBerthConflict(err: unknown): boolean { + if (typeof err !== 'object' || err === null) return false; + // postgres.js surfaces the constraint name in `constraint_name`. + const e = err as { code?: string; constraint_name?: string }; + return e.code === '23505' && e.constraint_name === 'idx_interest_berths_one_primary'; } /**