feat(tenancies-p3): webhook auto-create on signed Reservation Agreement + first-insert flip

- berth-tenancies.service.ts: autoCreatePendingTenancies(portId, interestId, opts)
  loops over interest_berths WHERE is_in_eoi_bundle=true and mints ONE
  pending tenancy per in-bundle berth. Wrapped in pg_advisory_xact_lock
  per port + idempotent skip when a (pending|active) tenancy already
  exists for the berth (webhook retry-safe). Each insert audit-logged
  + emits berth_tenancy:created socket event.
- createPending: same advisory-lock + tx pattern, additionally calls
  enableTenanciesModule(portId) so the FIRST manual tenancy in a port
  lazily flips tenancies_module_enabled=true (idempotent UPSERT, no-op
  on subsequent inserts).
- handleDocumentCompleted: branch on reservation_agreement completion
  gates on isTenanciesModuleEnabled, then calls autoCreatePendingTenancies
  with the just-committed signedFileId. Per design §"When disabled":
  stage advance + reservationDocStatus flip still fire when the module
  is off; only the tenancy mint is skipped.
- 5-case integration test covering bundle expansion, idempotent retry,
  empty-bundle no-op, missing-interest no-op, and the first-insert
  module-enable side effect.

Verified: tsc clean, 1485/1485 vitest (5 new cases).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-25 15:14:37 +02:00
parent ccc775dc66
commit 20549fb22e
3 changed files with 364 additions and 25 deletions

View File

@@ -1,4 +1,4 @@
import { and, eq, isNull } from 'drizzle-orm'; import { and, eq, inArray, isNull, sql } from 'drizzle-orm';
import { db } from '@/lib/db'; import { db } from '@/lib/db';
import { berthTenancies, type BerthTenancy } from '@/lib/db/schema/tenancies'; import { berthTenancies, type BerthTenancy } from '@/lib/db/schema/tenancies';
import { berths } from '@/lib/db/schema/berths'; import { berths } from '@/lib/db/schema/berths';
@@ -6,6 +6,7 @@ import { clients } from '@/lib/db/schema/clients';
import { files } from '@/lib/db/schema/documents'; import { files } from '@/lib/db/schema/documents';
import { yachts } from '@/lib/db/schema/yachts'; import { yachts } from '@/lib/db/schema/yachts';
import { companyMemberships } from '@/lib/db/schema/companies'; import { companyMemberships } from '@/lib/db/schema/companies';
import { interests, interestBerths } from '@/lib/db/schema/interests';
import { buildListQuery } from '@/lib/db/query-builder'; import { buildListQuery } from '@/lib/db/query-builder';
import { createAuditLog, type AuditMeta } from '@/lib/audit'; import { createAuditLog, type AuditMeta } from '@/lib/audit';
import { ConflictError, NotFoundError, ValidationError } from '@/lib/errors'; import { ConflictError, NotFoundError, ValidationError } from '@/lib/errors';
@@ -118,45 +119,60 @@ export async function createPending(
// z.input is too loose to satisfy that. // z.input is too loose to satisfy that.
const parsed = createPendingSchema.parse(data); const parsed = createPendingSchema.parse(data);
const [tenancy] = await db // Lazy module-enable: wrap in tx + advisory lock so the first-ever
.insert(berthTenancies) // tenancy for this port flips `tenancies_module_enabled=true` atomically
.values({ // with the insert. Subsequent inserts no-op the enable call (idempotent
portId, // UPSERT in the service).
berthId: parsed.berthId, const tenancy = await db.transaction(async (tx) => {
clientId: parsed.clientId, await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtextextended(${portId}, 0))`);
yachtId: parsed.yachtId,
interestId: parsed.interestId ?? null, const [row] = await tx
status: 'pending', .insert(berthTenancies)
startDate: parsed.startDate, .values({
tenureType: parsed.tenureType, portId,
notes: parsed.notes ?? null, berthId: parsed.berthId,
createdBy: meta.userId, clientId: parsed.clientId,
}) yachtId: parsed.yachtId,
.returning(); interestId: parsed.interestId ?? null,
status: 'pending',
startDate: parsed.startDate,
tenureType: parsed.tenureType,
notes: parsed.notes ?? null,
createdBy: meta.userId,
})
.returning();
// First insert in this port? Flip the module on. We import lazily to
// avoid a circular import (tenancies-module imports berthTenancies).
const { enableTenanciesModule } = await import('@/lib/services/tenancies-module.service');
await enableTenanciesModule(portId);
return row!;
});
void createAuditLog({ void createAuditLog({
userId: meta.userId, userId: meta.userId,
portId, portId,
action: 'create', action: 'create',
entityType: 'berth_tenancy', entityType: 'berth_tenancy',
entityId: tenancy!.id, entityId: tenancy.id,
newValue: { newValue: {
berthId: tenancy!.berthId, berthId: tenancy.berthId,
clientId: tenancy!.clientId, clientId: tenancy.clientId,
yachtId: tenancy!.yachtId, yachtId: tenancy.yachtId,
status: tenancy!.status, status: tenancy.status,
startDate: tenancy!.startDate, startDate: tenancy.startDate,
}, },
ipAddress: meta.ipAddress, ipAddress: meta.ipAddress,
userAgent: meta.userAgent, userAgent: meta.userAgent,
}); });
emitToRoom(`port:${portId}`, 'berth_tenancy:created', { emitToRoom(`port:${portId}`, 'berth_tenancy:created', {
tenancyId: tenancy!.id, tenancyId: tenancy.id,
berthId: tenancy!.berthId, berthId: tenancy.berthId,
}); });
return tenancy!; return tenancy;
} }
// ─── Activate (pending → active) ───────────────────────────────────────────── // ─── Activate (pending → active) ─────────────────────────────────────────────
@@ -382,3 +398,124 @@ export async function listTenancies(
return result; return result;
} }
// ─── Webhook auto-create (signed Reservation Agreement → pending tenancies) ─
export interface AutoCreateOptions {
signedAt: Date;
/** Documents row id that fired the webhook; for audit + linkage. */
sourceDocumentId: string;
/** Signed-PDF files row id; mirrored to tenancies.contractFileId when set. */
signedFileId: string | null;
}
/**
* Webhook-driven branch: a `reservation_agreement` document just landed
* `completed`. Materialize one **pending** tenancy per berth covered by the
* interest's EOI bundle (`interest_berths.is_in_eoi_bundle = true`). The rep
* then confirms start date + tenure type via the entity-tab UI to flip
* `pending → active`.
*
* Caller is responsible for gating on `isTenanciesModuleEnabled(portId)` —
* this function does NOT check the flag itself (per design line 132: the
* webhook caller short-circuits when the module is off). Service-level
* idempotency: skips any berth that already has a non-terminal tenancy in
* `(pending, active)` so a webhook re-delivery never double-mints.
*
* Returns the newly-inserted rows (empty array when nothing to mint).
*/
export async function autoCreatePendingTenancies(
portId: string,
interestId: string,
opts: AutoCreateOptions,
): Promise<BerthTenancy[]> {
const interest = await db.query.interests.findFirst({
where: and(eq(interests.id, interestId), eq(interests.portId, portId)),
columns: { id: true, clientId: true, yachtId: true },
});
if (!interest) return [];
if (!interest.yachtId) return [];
const bundleBerths = await db
.select({ berthId: interestBerths.berthId })
.from(interestBerths)
.where(and(eq(interestBerths.interestId, interestId), eq(interestBerths.isInEoiBundle, true)));
if (bundleBerths.length === 0) return [];
// Skip berths that already have a live (pending|active) tenancy — the
// webhook may fire again after a retry and we never want to double-mint.
const existingLive = await db
.select({ berthId: berthTenancies.berthId })
.from(berthTenancies)
.where(
and(
eq(berthTenancies.portId, portId),
inArray(berthTenancies.status, ['pending', 'active']),
inArray(
berthTenancies.berthId,
bundleBerths.map((b) => b.berthId),
),
),
);
const liveBerthIds = new Set(existingLive.map((r) => r.berthId));
const toInsert = bundleBerths.map((b) => b.berthId).filter((id) => !liveBerthIds.has(id));
if (toInsert.length === 0) return [];
// pg_advisory_xact_lock keyed off the port keeps concurrent webhooks
// for the same port from racing on the per-berth idempotency check —
// the partial unique index `idx_bt_active` is the real safety net but
// the lock spares us the unique-violation roundtrip on the hot path.
const inserted = await db.transaction(async (tx) => {
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtextextended(${portId}, 0))`);
const rows = await tx
.insert(berthTenancies)
.values(
toInsert.map((berthId) => ({
portId,
berthId,
clientId: interest.clientId,
yachtId: interest.yachtId!,
interestId,
status: 'pending',
startDate: opts.signedAt,
tenureType: 'permanent',
contractFileId: opts.signedFileId,
notes: `Auto-created from signed Reservation Agreement (document ${opts.sourceDocumentId.slice(0, 8)}).`,
createdBy: 'system',
})),
)
.returning();
return rows;
});
for (const row of inserted) {
void createAuditLog({
userId: 'system',
portId,
action: 'create',
entityType: 'berth_tenancy',
entityId: row.id,
newValue: {
berthId: row.berthId,
clientId: row.clientId,
yachtId: row.yachtId,
interestId: row.interestId,
status: row.status,
source: 'reservation_agreement_signed',
sourceDocumentId: opts.sourceDocumentId,
},
ipAddress: '0.0.0.0',
userAgent: 'webhook',
});
emitToRoom(`port:${portId}`, 'berth_tenancy:created', {
tenancyId: row.id,
berthId: row.berthId,
});
}
return inserted;
}

View File

@@ -1660,6 +1660,36 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
void import('@/lib/services/berth-rules-engine').then(({ evaluateRule }) => void import('@/lib/services/berth-rules-engine').then(({ evaluateRule }) =>
evaluateRule('contract_signed', doc.interestId!, doc.portId, systemMeta), evaluateRule('contract_signed', doc.interestId!, doc.portId, systemMeta),
); );
// Tenancies P3 — auto-create pending tenancies (one per in-bundle berth)
// when the module is enabled for this port. Gating is at the call site:
// disabled module = stage + docStatus updates still fire, only the
// tenancy mint is skipped (per docs/tenancies-design.md §"When disabled").
void (async () => {
try {
const { isTenanciesModuleEnabled } =
await import('@/lib/services/tenancies-module.service');
if (!(await isTenanciesModuleEnabled(doc.portId))) return;
const { autoCreatePendingTenancies } =
await import('@/lib/services/berth-tenancies.service');
// Re-read signedFileId from the post-commit row; the in-tx update
// above sets it, but doc was loaded before completion.
const fresh = await db.query.documents.findFirst({
where: eq(documents.id, doc.id),
columns: { signedFileId: true },
});
await autoCreatePendingTenancies(doc.portId, doc.interestId!, {
signedAt: new Date(),
sourceDocumentId: doc.id,
signedFileId: fresh?.signedFileId ?? null,
});
} catch (err) {
logger.error(
{ err, documentId: doc.id, interestId: doc.interestId, portId: doc.portId },
'autoCreatePendingTenancies failed during reservation_agreement completion',
);
}
})();
} }
// Update interest if contract type. Outcome flip to 'won' is a separate // Update interest if contract type. Outcome flip to 'won' is a separate

View File

@@ -0,0 +1,172 @@
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { and, eq } from 'drizzle-orm';
import { db } from '@/lib/db';
import { interests, interestBerths } from '@/lib/db/schema/interests';
import { berthTenancies } from '@/lib/db/schema/tenancies';
import { systemSettings } from '@/lib/db/schema/system';
import { autoCreatePendingTenancies } from '@/lib/services/berth-tenancies.service';
import {
enableTenanciesModule,
isTenanciesModuleEnabled,
} from '@/lib/services/tenancies-module.service';
import { makeBerth, makeClient, makePort, makeYacht } from '../helpers/factories';
async function seedInterestWithBundleBerths(
portId: string,
bundleCount: number,
): Promise<{ interestId: string; clientId: string; yachtId: string; berthIds: string[] }> {
const client = await makeClient({ portId });
const yacht = await makeYacht({ portId, ownerType: 'client', ownerId: client.id });
const [interest] = await db
.insert(interests)
.values({
portId,
clientId: client.id,
yachtId: yacht.id,
pipelineStage: 'reservation',
outcome: 'open',
})
.returning();
const berthIds: string[] = [];
for (let i = 0; i < bundleCount; i++) {
const b = await makeBerth({ portId });
berthIds.push(b.id);
await db.insert(interestBerths).values({
interestId: interest!.id,
berthId: b.id,
isInEoiBundle: true,
isPrimary: i === 0,
isSpecificInterest: true,
});
}
return { interestId: interest!.id, clientId: client.id, yachtId: yacht.id, berthIds };
}
async function disableModule(portId: string): Promise<void> {
await db
.insert(systemSettings)
.values({ key: 'tenancies_module_enabled', portId, value: false })
.onConflictDoUpdate({
target: [systemSettings.key, systemSettings.portId],
set: { value: false, updatedAt: new Date() },
});
}
describe('autoCreatePendingTenancies', () => {
let portId: string;
beforeEach(async () => {
const port = await makePort();
portId = port.id;
// Start each test with the module explicitly disabled so the lazy
// auto-enable path doesn't taint the next test's port.
await disableModule(portId);
});
afterEach(async () => {
// Make sure no stale advisory locks from a failed test linger.
});
it('mints one pending tenancy per in-bundle berth', async () => {
await enableTenanciesModule(portId);
const { interestId, berthIds } = await seedInterestWithBundleBerths(portId, 3);
const result = await autoCreatePendingTenancies(portId, interestId, {
signedAt: new Date('2026-01-15'),
sourceDocumentId: 'doc-fixture-123',
signedFileId: null,
});
expect(result).toHaveLength(3);
expect(result.every((r) => r.status === 'pending')).toBe(true);
expect(result.every((r) => r.interestId === interestId)).toBe(true);
expect(result.every((r) => r.contractFileId === null)).toBe(true);
expect(result.map((r) => r.berthId).sort()).toEqual(berthIds.sort());
});
it('skips berths that already have a live tenancy (idempotent under retry)', async () => {
await enableTenanciesModule(portId);
const { interestId, berthIds } = await seedInterestWithBundleBerths(portId, 2);
const first = await autoCreatePendingTenancies(portId, interestId, {
signedAt: new Date(),
sourceDocumentId: 'doc-1',
signedFileId: null,
});
expect(first).toHaveLength(2);
const replay = await autoCreatePendingTenancies(portId, interestId, {
signedAt: new Date(),
sourceDocumentId: 'doc-1',
signedFileId: null,
});
expect(replay).toHaveLength(0);
const allRows = await db
.select()
.from(berthTenancies)
.where(and(eq(berthTenancies.portId, portId), eq(berthTenancies.interestId, interestId)));
expect(allRows).toHaveLength(2);
expect(allRows.map((r) => r.berthId).sort()).toEqual(berthIds.sort());
});
it('returns empty when the interest has no in-bundle berths', async () => {
await enableTenanciesModule(portId);
const client = await makeClient({ portId });
const yacht = await makeYacht({ portId, ownerType: 'client', ownerId: client.id });
const [interest] = await db
.insert(interests)
.values({
portId,
clientId: client.id,
yachtId: yacht.id,
pipelineStage: 'reservation',
outcome: 'open',
})
.returning();
const result = await autoCreatePendingTenancies(portId, interest!.id, {
signedAt: new Date(),
sourceDocumentId: 'doc-empty',
signedFileId: null,
});
expect(result).toEqual([]);
});
it('does not mint when the interest is missing (deleted before webhook fires)', async () => {
await enableTenanciesModule(portId);
const result = await autoCreatePendingTenancies(portId, 'nonexistent-interest', {
signedAt: new Date(),
sourceDocumentId: 'doc-x',
signedFileId: null,
});
expect(result).toEqual([]);
});
});
describe('isTenanciesModuleEnabled (P3 lazy auto-enable on first manual createPending)', () => {
it('flips to enabled after first manual createPending', async () => {
const port = await makePort();
const portId = port.id;
await disableModule(portId);
expect(await isTenanciesModuleEnabled(portId)).toBe(false);
const client = await makeClient({ portId });
const yacht = await makeYacht({ portId, ownerType: 'client', ownerId: client.id });
const berth = await makeBerth({ portId });
const { createPending } = await import('@/lib/services/berth-tenancies.service');
await createPending(
portId,
{
berthId: berth.id,
clientId: client.id,
yachtId: yacht.id,
startDate: new Date(),
},
{ userId: 'system', portId, ipAddress: '0.0.0.0', userAgent: 'test' },
);
expect(await isTenanciesModuleEnabled(portId)).toBe(true);
});
});