fix(audit-wave-10): concurrency hardening (concurrency-auditor)
Close the CRITICAL + HIGH-tractable race conditions the concurrency-auditor flagged. The wide-impact items (BullMQ jobId plumbing — C-2; webhook outbound retry idempotency keys; etc.) span too many call sites for a single contained wave and stay deferred. **C-1 — handleDocumentCompleted concurrent-retry orphan-blob** Wave 1 fixed the compensating-delete on single-process failure but the idempotency gate at line 1110 reads `doc.status` outside any row lock. Two webhook deliveries arriving in parallel both pass the gate, both storage.put + db.insert(files), and the losing files row orphans its blob since documents.signed_file_id only points at one. Now the transaction at line 1176 SELECTs the document `FOR UPDATE` and re-checks the gate; if a concurrent worker already completed, throws a sentinel `DocumentAlreadyCompletedError` which the outer catch recognizes and runs the compensating storage.delete at info level (not error). Net effect: at-most-once signed-PDF persistence even under Documenso 5xx-then-retry storms. **H-1 — moveFolder cycle check race** Two concurrent folder moves (A → B and B → A) in READ COMMITTED can each pass the cycle check against pre-state and both commit, leaving A↔B in the tree. Add a per-port `pg_advisory_xact_lock` at the top of the move transaction so the walk-and-write is atomic per port. Lock auto-releases on tx end; no impact on cross-port folder ops. **H-3 — upsertInterestBerth 23505 → generic 500** Two concurrent `setPrimaryBerth` calls hit `idx_interest_berths_one_primary` and the loser surfaced as a generic 500. Catch the 23505 + constraint name and remap to ConflictError so the UI gets a "Another rep changed the primary berth at the same time. Refresh and try again." toast. **M-2 — username uniqueness 23505 → generic 500** Same TOCTOU shape: pre-check at me/route.ts:132 says "available", the UPDATE then fails at the partial unique index. Catch 23505 + `idx_user_profiles_username_unique` and remap to ConflictError. Tests 1315/1315. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -162,11 +162,24 @@ export const PATCH = withAuth(async (req, ctx: AuthContext) => {
|
|||||||
updates.preferences = merged;
|
updates.preferences = merged;
|
||||||
}
|
}
|
||||||
|
|
||||||
const [updated] = await db
|
// concurrency-auditor M-2: pre-check at line 132-139 is TOCTOU
|
||||||
.update(userProfiles)
|
// against `idx_user_profiles_username_unique`. Two concurrent claims
|
||||||
.set(updates)
|
// on the same username will see "available" in their own pre-check
|
||||||
.where(eq(userProfiles.userId, ctx.userId))
|
// and the loser's UPDATE fails with 23505 — surface that as
|
||||||
.returning();
|
// ConflictError rather than letting it bubble as a generic 500.
|
||||||
|
let updated;
|
||||||
|
try {
|
||||||
|
[updated] = await db
|
||||||
|
.update(userProfiles)
|
||||||
|
.set(updates)
|
||||||
|
.where(eq(userProfiles.userId, ctx.userId))
|
||||||
|
.returning();
|
||||||
|
} catch (err) {
|
||||||
|
if (isUsernameUniqueConflict(err)) {
|
||||||
|
throw new ConflictError('That username is already taken.');
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
|
||||||
return NextResponse.json({
|
return NextResponse.json({
|
||||||
data: {
|
data: {
|
||||||
@@ -184,3 +197,9 @@ export const PATCH = withAuth(async (req, ctx: AuthContext) => {
|
|||||||
return errorResponse(error);
|
return errorResponse(error);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
function isUsernameUniqueConflict(err: unknown): boolean {
|
||||||
|
if (typeof err !== 'object' || err === null) return false;
|
||||||
|
const e = err as { code?: string; constraint_name?: string };
|
||||||
|
return e.code === '23505' && e.constraint_name === 'idx_user_profiles_username_unique';
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { and, asc, eq } from 'drizzle-orm';
|
import { and, asc, eq, sql } from 'drizzle-orm';
|
||||||
|
|
||||||
import { db } from '@/lib/db';
|
import { db } from '@/lib/db';
|
||||||
import { documentFolders, documents, files, type DocumentFolder } from '@/lib/db/schema/documents';
|
import { documentFolders, documents, files, type DocumentFolder } from '@/lib/db/schema/documents';
|
||||||
@@ -224,6 +224,14 @@ export async function moveFolder(
|
|||||||
// write is atomic per move attempt.
|
// write is atomic per move attempt.
|
||||||
try {
|
try {
|
||||||
return await db.transaction(async (tx) => {
|
return await db.transaction(async (tx) => {
|
||||||
|
// Serialize all folder-move work for this port via a per-port
|
||||||
|
// advisory lock. The cycle check walks the ancestor chain with
|
||||||
|
// multiple SELECTs, and READ COMMITTED doesn't see other in-flight
|
||||||
|
// updates without an explicit lock. Two concurrent moves (A → B
|
||||||
|
// and B → A) would otherwise each see the pre-state and both
|
||||||
|
// commit, leaving an A↔B cycle. The lock auto-releases on tx end.
|
||||||
|
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext(${portId} || ':folder-move'))`);
|
||||||
|
|
||||||
if (newParentId !== null) {
|
if (newParentId !== null) {
|
||||||
const newParent = await tx.query.documentFolders.findFirst({
|
const newParent = await tx.query.documentFolders.findFirst({
|
||||||
where: and(eq(documentFolders.id, newParentId), eq(documentFolders.portId, portId)),
|
where: and(eq(documentFolders.id, newParentId), eq(documentFolders.portId, portId)),
|
||||||
|
|||||||
@@ -1097,6 +1097,19 @@ async function resolveDocumentOwner(
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sentinel thrown by `handleDocumentCompleted`'s in-tx race-check when a
|
||||||
|
* concurrent webhook delivery has already committed the signed-PDF
|
||||||
|
* file. Caught by the outer try so we can run the compensating blob
|
||||||
|
* delete + log at info level instead of error.
|
||||||
|
*/
|
||||||
|
class DocumentAlreadyCompletedError extends Error {
|
||||||
|
constructor() {
|
||||||
|
super('document already marked completed by a concurrent webhook');
|
||||||
|
this.name = 'DocumentAlreadyCompletedError';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function handleDocumentCompleted(eventData: { documentId: string; portId?: string }) {
|
export async function handleDocumentCompleted(eventData: { documentId: string; portId?: string }) {
|
||||||
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
|
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
|
||||||
if (!doc) return;
|
if (!doc) return;
|
||||||
@@ -1173,7 +1186,28 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
|
|||||||
// Atomic: the files row + the documents.signedFileId pointer + the
|
// Atomic: the files row + the documents.signedFileId pointer + the
|
||||||
// reservation contract mirror commit together. If any throws, the
|
// reservation contract mirror commit together. If any throws, the
|
||||||
// outer catch fires storage.delete on the orphan blob.
|
// outer catch fires storage.delete on the orphan blob.
|
||||||
|
//
|
||||||
|
// concurrency-auditor C-1: re-check the idempotency gate INSIDE the
|
||||||
|
// tx with SELECT … FOR UPDATE so two near-simultaneous webhook
|
||||||
|
// retries can't both pass the read-outside-lock gate at line 1110
|
||||||
|
// and both insert into `files` (the losing row would orphan its blob
|
||||||
|
// since `documents.signed_file_id` only points at one). The outer
|
||||||
|
// catch handles the "we put a blob but a concurrent worker won the
|
||||||
|
// race" cleanup via the existing putStoragePath finalizer.
|
||||||
const fileRecord = await db.transaction(async (tx) => {
|
const fileRecord = await db.transaction(async (tx) => {
|
||||||
|
const [locked] = await tx
|
||||||
|
.select({ status: documents.status, signedFileId: documents.signedFileId })
|
||||||
|
.from(documents)
|
||||||
|
.where(eq(documents.id, doc.id))
|
||||||
|
.for('update');
|
||||||
|
|
||||||
|
if (locked && locked.status === 'completed' && locked.signedFileId) {
|
||||||
|
// Concurrent webhook beat us — abort so the outer catch deletes
|
||||||
|
// the duplicate blob we just put into storage. Throw a sentinel
|
||||||
|
// we recognize so we don't log it as an error.
|
||||||
|
throw new DocumentAlreadyCompletedError();
|
||||||
|
}
|
||||||
|
|
||||||
const [inserted] = await tx
|
const [inserted] = await tx
|
||||||
.insert(files)
|
.insert(files)
|
||||||
.values({
|
.values({
|
||||||
@@ -1264,10 +1298,20 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error(
|
// Distinguish "we lost the concurrent race" from a real failure —
|
||||||
{ err, documentId: doc.id, portId: doc.portId },
|
// the loser of the SELECT FOR UPDATE re-check should clean up its
|
||||||
'Failed to download/store signed PDF',
|
// blob silently, not log an error.
|
||||||
);
|
if (err instanceof DocumentAlreadyCompletedError) {
|
||||||
|
logger.info(
|
||||||
|
{ documentId: doc.id, portId: doc.portId },
|
||||||
|
'Webhook race lost — another worker already committed the signed PDF; deleting our duplicate blob',
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
logger.error(
|
||||||
|
{ err, documentId: doc.id, portId: doc.portId },
|
||||||
|
'Failed to download/store signed PDF',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Compensating delete: storage.put landed but the DB commit didn't.
|
// Compensating delete: storage.put landed but the DB commit didn't.
|
||||||
// Without this the blob lives forever with no row pointing at it.
|
// Without this the blob lives forever with no row pointing at it.
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ import { and, desc, eq, inArray } from 'drizzle-orm';
|
|||||||
import { db } from '@/lib/db';
|
import { db } from '@/lib/db';
|
||||||
import { interestBerths, interests, type InterestBerth } from '@/lib/db/schema/interests';
|
import { interestBerths, interests, type InterestBerth } from '@/lib/db/schema/interests';
|
||||||
import { berths } from '@/lib/db/schema/berths';
|
import { berths } from '@/lib/db/schema/berths';
|
||||||
import { CodedError, NotFoundError } from '@/lib/errors';
|
import { CodedError, ConflictError, NotFoundError } from '@/lib/errors';
|
||||||
import type { AuditMeta } from '@/lib/audit';
|
import type { AuditMeta } from '@/lib/audit';
|
||||||
|
|
||||||
type DbOrTx = typeof db | Parameters<Parameters<typeof db.transaction>[0]>[0];
|
type DbOrTx = typeof db | Parameters<Parameters<typeof db.transaction>[0]>[0];
|
||||||
@@ -183,9 +183,31 @@ export async function upsertInterestBerth(
|
|||||||
berthId: string,
|
berthId: string,
|
||||||
opts: AddOrUpdateOpts = {},
|
opts: AddOrUpdateOpts = {},
|
||||||
): Promise<InterestBerth> {
|
): Promise<InterestBerth> {
|
||||||
return db.transaction(async (tx) => {
|
// concurrency-auditor H-3: two concurrent setPrimaryBerth calls on
|
||||||
return upsertInterestBerthTx(tx, interestId, berthId, opts);
|
// the same interest hit `idx_interest_berths_one_primary` (partial
|
||||||
});
|
// unique on `is_primary=true`). The loser surfaced as a generic
|
||||||
|
// 500 because the 23505 wasn't translated. Catch and remap to a
|
||||||
|
// ConflictError so the UI gets a "another rep just changed the
|
||||||
|
// primary berth" toast instead.
|
||||||
|
try {
|
||||||
|
return await db.transaction(async (tx) => {
|
||||||
|
return upsertInterestBerthTx(tx, interestId, berthId, opts);
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
if (isPrimaryBerthConflict(err)) {
|
||||||
|
throw new ConflictError(
|
||||||
|
'Another rep changed the primary berth at the same time. Refresh and try again.',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function isPrimaryBerthConflict(err: unknown): boolean {
|
||||||
|
if (typeof err !== 'object' || err === null) return false;
|
||||||
|
// postgres.js surfaces the constraint name in `constraint_name`.
|
||||||
|
const e = err as { code?: string; constraint_name?: string };
|
||||||
|
return e.code === '23505' && e.constraint_name === 'idx_interest_berths_one_primary';
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user