fix(audit): berth rules/recommender — M4 (bundle-wide status), M5 (berth_unlinked target), M20/L27 (interest_berths invariant + cross-port guard), L3 (recommender stage-scale), L4 (dead branch)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-02 12:59:12 +02:00
parent 4084029962
commit 70bf26aea1
5 changed files with 163 additions and 58 deletions

View File

@@ -212,6 +212,25 @@ const STAGE_ORDER: Record<string, number> = {
/** Stage at which a berth is "in late stage" (Tier D when active). */
const LATE_STAGE_THRESHOLD = STAGE_ORDER.deposit_paid!; // 5
/**
* SQL `CASE` that maps `i.pipeline_stage` → the {@link STAGE_ORDER} rank,
* defaulting to 0 for unknown stages.
*
* Audit L3: the recommender's SQL aggregates (`max_active_stage`,
* `fallthrough_max_stage`) previously hard-coded their OWN 1-7 ordering
* (`reservation=5, deposit_paid=6`) that diverged from this JS map
* (`reservation=4, deposit_paid=5`). `classifyTier`/`computeHeat` compare those
* SQL values against `LATE_STAGE_THRESHOLD` (derived from STAGE_ORDER), so a
* `reservation`-stage interest (SQL 5) tripped `>= 5` and got classified Tier D
* — suppressed under the default `tier_ladder_hide_late_stage`, a full stage
* early. Generating the CASE from `STAGE_ORDER` makes it the single source of
* truth so SQL and JS can never drift again.
*/
function stageRankCaseSql(column: string): ReturnType<typeof sql> {
const whens = Object.entries(STAGE_ORDER).map(([stage, rank]) => sql`WHEN ${stage} THEN ${rank}`);
return sql`CASE ${sql.raw(column)} ${sql.join(whens, sql` `)} ELSE 0 END`;
}
export type Tier = 'A' | 'B' | 'C' | 'D';
interface TierInputs {
@@ -237,9 +256,10 @@ export function classifyTier(t: TierInputs): Tier {
const normStatus = (t.status ?? '').toLowerCase();
if (normStatus === 'sold') return 'D';
if (t.activeInterestCount > 0 && t.maxActiveStage >= LATE_STAGE_THRESHOLD) return 'D';
if (normStatus === 'under offer' || normStatus === 'under_offer') {
return t.activeInterestCount > 0 ? 'C' : 'C';
}
// Audit L4: collapsed the dead `activeInterestCount > 0 ? 'C' : 'C'` ternary
// and dropped the unreachable `'under offer'` (space) literal — canonical
// status is always `under_offer`.
if (normStatus === 'under_offer') return 'C';
if (t.activeInterestCount > 0) return 'C';
if (t.lostCount > 0) return 'B';
return 'A';
@@ -554,32 +574,16 @@ export async function recommendBerths(args: RecommendBerthsArgs): Promise<Recomm
WHERE i.outcome IS NOT NULL AND (i.outcome::text LIKE 'lost%' OR i.outcome = 'cancelled')
) AS lost_count,
COALESCE(
MAX(CASE i.pipeline_stage
WHEN 'enquiry' THEN 1
WHEN 'nurturing' THEN 2
WHEN 'qualified' THEN 3
WHEN 'eoi' THEN 4
WHEN 'reservation' THEN 5
WHEN 'deposit_paid' THEN 6
WHEN 'contract' THEN 7
ELSE 0 END
) FILTER (WHERE i.archived_at IS NULL AND i.outcome IS NULL),
MAX(${stageRankCaseSql('i.pipeline_stage')})
FILTER (WHERE i.archived_at IS NULL AND i.outcome IS NULL),
0
) AS max_active_stage,
MAX(i.outcome_at) FILTER (
WHERE i.outcome IS NOT NULL AND (i.outcome::text LIKE 'lost%' OR i.outcome = 'cancelled')
) AS latest_fallthrough_at,
COALESCE(
MAX(CASE i.pipeline_stage
WHEN 'enquiry' THEN 1
WHEN 'nurturing' THEN 2
WHEN 'qualified' THEN 3
WHEN 'eoi' THEN 4
WHEN 'reservation' THEN 5
WHEN 'deposit_paid' THEN 6
WHEN 'contract' THEN 7
ELSE 0 END
) FILTER (WHERE i.outcome IS NOT NULL AND (i.outcome::text LIKE 'lost%' OR i.outcome = 'cancelled')),
MAX(${stageRankCaseSql('i.pipeline_stage')})
FILTER (WHERE i.outcome IS NOT NULL AND (i.outcome::text LIKE 'lost%' OR i.outcome = 'cancelled')),
0
) AS fallthrough_max_stage,
-- COUNT(ib.berth_id) (not COUNT(*)) so a berth with no junction

View File

@@ -1,7 +1,7 @@
import { and, eq, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { interests } from '@/lib/db/schema/interests';
import { interestBerths, interests } from '@/lib/db/schema/interests';
import { berths } from '@/lib/db/schema/berths';
import { systemSettings } from '@/lib/db/schema/system';
import { createAuditLog, type AuditMeta } from '@/lib/audit';
@@ -55,6 +55,30 @@ const DEFAULT_RULES: Record<BerthRuleTrigger, RuleConfig> = {
berth_unlinked: { mode: 'off', targetStatus: 'available' },
};
// ─── Bundle-aware triggers (audit M4) ───────────────────────────────────────────
//
// A deal progressing (EOI sent/signed, reservation signed, deposit received,
// contract signed, won) commits the WHOLE EOI bundle, not just the primary
// berth. For these "status-advancing" triggers we flip every berth covered by
// the signature (`interest_berths.is_in_eoi_bundle = true`); otherwise a
// multi-berth bundle leaves its siblings on `available`/`under_offer` and they
// stay publicly visible + pitchable while the deal is locked up.
//
// The release/unlink triggers are deliberately NOT in this set:
// • `interest_archived` / `deal_lost` target `available` and run in
// suggest/auto-but-rarely-on modes; freeing the whole bundle is handled
// elsewhere (smart-archive decision log) and isn't this trigger's job.
// • `berth_unlinked` targets exactly one berth — the just-unlinked one
// (audit M5) — via `targetBerthIdOverride`, never the bundle.
const BUNDLE_TRIGGERS: ReadonlySet<BerthRuleTrigger> = new Set<BerthRuleTrigger>([
'eoi_sent',
'eoi_signed',
'reservation_signed',
'deposit_received',
'contract_signed',
'interest_completed',
]);
// ─── Config ───────────────────────────────────────────────────────────────────
async function getRulesConfig(portId: string): Promise<Record<BerthRuleTrigger, RuleConfig>> {
@@ -85,6 +109,14 @@ export async function evaluateRule(
interestId: string,
portId: string,
meta: AuditMeta,
/**
* Force the rule onto a specific berth instead of resolving the target from
* the interest's primary/bundle. Required by `berth_unlinked` (audit M5):
* the junction row is deleted before the rule fires, so resolving via
* `getPrimaryBerth` would target a DIFFERENT still-linked berth. The caller
* passes the just-unlinked berthId here and evaluates BEFORE the delete.
*/
targetBerthIdOverride?: string,
): Promise<BerthRuleResult> {
const interest = await db.query.interests.findFirst({
where: and(eq(interests.id, interestId), eq(interests.portId, portId)),
@@ -94,17 +126,79 @@ export async function evaluateRule(
return { action: 'none' };
}
// Rule evaluation targets the interest's primary berth (plan §3.4) -
// resolved via interest_berths rather than the legacy column.
const primaryBerth = await getPrimaryBerth(interestId);
const targetBerthId = primaryBerth?.berthId;
if (!targetBerthId) {
// Resolve which berth(s) this rule targets:
// • explicit override (berth_unlinked) → exactly that berth;
// • status-advancing bundle trigger → every berth covered by the EOI
// signature (`is_in_eoi_bundle = true`) so siblings don't go stale
// (audit M4);
// • everything else → the interest's primary berth (plan §3.4), resolved
// via interest_berths rather than the legacy column.
let targetBerthIds: string[];
if (targetBerthIdOverride) {
targetBerthIds = [targetBerthIdOverride];
} else if (BUNDLE_TRIGGERS.has(trigger)) {
const bundleRows = await db
.select({ berthId: interestBerths.berthId })
.from(interestBerths)
.where(
and(eq(interestBerths.interestId, interestId), eq(interestBerths.isInEoiBundle, true)),
);
targetBerthIds = bundleRows.map((r) => r.berthId);
if (targetBerthIds.length === 0) {
// No bundle rows (e.g. a single primary that somehow lost its bundle
// flag, or a berthless interest). Fall back to the primary so the
// common single-berth case still advances.
const primaryBerth = await getPrimaryBerth(interestId);
if (primaryBerth?.berthId) targetBerthIds = [primaryBerth.berthId];
}
} else {
const primaryBerth = await getPrimaryBerth(interestId);
targetBerthIds = primaryBerth?.berthId ? [primaryBerth.berthId] : [];
}
if (targetBerthIds.length === 0) {
return { action: 'none' };
}
const rulesConfig = await getRulesConfig(portId);
const rule = rulesConfig[trigger];
for (const targetBerthId of targetBerthIds) {
await applyRuleToBerth(trigger, rule, interestId, portId, targetBerthId, meta);
}
if (rule.mode === 'off') {
return { action: 'none' };
}
if (rule.mode === 'auto') {
// Preserve the original contract: auto mode reports 'applied' (with the
// rule's target status) regardless of whether any individual berth was a
// no-op idempotent re-fire.
return { action: 'applied', newStatus: rule.targetStatus };
}
// suggest mode - the decision-trace audit already records the suggestion.
return {
action: 'suggested',
newStatus: rule.targetStatus,
message: `Suggested status change to "${rule.targetStatus}" based on trigger "${trigger}"`,
};
}
/**
* Apply a resolved rule to a single berth under the per-berth advisory lock +
* idempotency pattern. Factored out of {@link evaluateRule} so the multi-berth
* bundle path (audit M4) reuses the exact same locking/auditing/socket emit
* for every berth it touches.
*/
async function applyRuleToBerth(
trigger: BerthRuleTrigger,
rule: RuleConfig,
interestId: string,
portId: string,
targetBerthId: string,
meta: AuditMeta,
): Promise<void> {
// Decision-trace audit: ALWAYS record what we decided to do (or not do),
// including the rule mode, so admins can debug "why didn't this fire?" /
// "why did this fire" without grepping server logs. Tagged `berth_rule_decision`
@@ -127,7 +221,7 @@ export async function evaluateRule(
});
if (rule.mode === 'off') {
return { action: 'none' };
return;
}
if (rule.mode === 'auto') {
@@ -205,13 +299,8 @@ export async function evaluateRule(
});
}
return { action: 'applied', newStatus: rule.targetStatus };
return;
}
// suggest mode - the decision-trace audit above already records the suggestion.
return {
action: 'suggested',
newStatus: rule.targetStatus,
message: `Suggested status change to "${rule.targetStatus}" based on trigger "${trigger}"`,
};
}

View File

@@ -21,7 +21,7 @@ import { and, eq, ne, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import type { Tx } from '@/lib/db/utils';
import { clients } from '@/lib/db/schema/clients';
import { interests, interestBerths } from '@/lib/db/schema/interests';
import { interests } from '@/lib/db/schema/interests';
import { berths } from '@/lib/db/schema/berths';
import { yachts } from '@/lib/db/schema/yachts';
import { portalUsers } from '@/lib/db/schema/portal';
@@ -29,6 +29,7 @@ import { documents } from '@/lib/db/schema/documents';
import { createAuditLog, type AuditMeta } from '@/lib/audit';
import { activeInterestsWhere } from '@/lib/services/active-interest';
import { transferOwnershipTx } from '@/lib/services/yachts.service';
import { upsertInterestBerthTx } from '@/lib/services/interest-berths.service';
import { ConflictError, NotFoundError } from '@/lib/errors';
import type { ArchiveMetadata } from '@/lib/services/client-archive.service';
@@ -380,18 +381,17 @@ async function applyReversal(
.limit(1);
if (!iv || iv.archivedAt) break;
// Idempotent re-insert: the unique index on (interestId, berthId)
// means a duplicate is a no-op via onConflictDoNothing.
await tx
.insert(interestBerths)
.values({
interestId,
berthId: r.refId,
isPrimary: false,
isSpecificInterest: true,
isInEoiBundle: false,
})
.onConflictDoNothing();
// Idempotent re-link via the canonical junction helper (audit L27):
// routes through `upsertInterestBerthTx` so the cross-port guard runs
// (the prior raw insert bypassed it) and the unique (interestId, berthId)
// index keeps a duplicate a benign merge. This row is a non-primary
// re-attach, so the primary↔bundle invariant doesn't force the bundle
// flag on — it stays an EOI-only/legal link as before.
await upsertInterestBerthTx(tx, interestId, r.refId, {
isPrimary: false,
isSpecificInterest: true,
isInEoiBundle: false,
});
// Flip berth status back to under_offer so the public map reflects
// the re-link. Only when berth is currently 'available' (sold
// berths are immutable; under_offer to another client is handled

View File

@@ -418,17 +418,24 @@ export async function removeInterestBerth(
if (!interestRow || !berthRow) {
throw new NotFoundError('interest or berth');
}
await db
.delete(interestBerths)
.where(and(eq(interestBerths.interestId, interestId), eq(interestBerths.berthId, berthId)));
// G-C4: fire the berth_unlinked berth-rule. Default mode is 'off' so this
// is a silent no-op unless an admin opted in via system_settings.berth_rules.
// Dynamic import avoids a static cycle: berth-rules-engine imports this file
// (getPrimaryBerth). meta is optional so older callers that haven't been
// threaded through can still call this without triggering the rule.
//
// Audit M5: evaluate BEFORE the delete and pass the just-unlinked `berthId`
// as an explicit target override. Firing after the delete would let the rule
// re-resolve its target via `getPrimaryBerth`, which — with the row already
// gone — points at a DIFFERENT still-linked berth and would corrupt that
// unrelated berth's status if an admin enabled auto/suggest mode.
if (meta) {
const { evaluateRule } = await import('@/lib/services/berth-rules-engine');
void evaluateRule('berth_unlinked', interestId, portId, meta);
await evaluateRule('berth_unlinked', interestId, portId, meta, berthId);
}
await db
.delete(interestBerths)
.where(and(eq(interestBerths.interestId, interestId), eq(interestBerths.berthId, berthId)));
}

View File

@@ -21,7 +21,8 @@ import { and, eq, isNull, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { withTransaction } from '@/lib/db/utils';
import { interests, interestBerths } from '@/lib/db/schema/interests';
import { interests } from '@/lib/db/schema/interests';
import { upsertInterestBerthTx } from '@/lib/services/interest-berths.service';
import { clients, clientContacts, clientAddresses } from '@/lib/db/schema/clients';
import { berths } from '@/lib/db/schema/berths';
import { yachts, yachtOwnershipHistory } from '@/lib/db/schema/yachts';
@@ -235,12 +236,16 @@ export async function createPublicInterest(
.returning();
if (berthId) {
await tx.insert(interestBerths).values({
interestId: newInterest!.id,
berthId,
// Route through the canonical junction helper (audit M20 + L27) rather
// than a raw insert. The helper:
// • forces `is_in_eoi_bundle=true` for the primary berth, so the
// website-originated interest doesn't violate the primary↔bundle
// invariant that migration 0083 had to repair (audit M20);
// • applies the cross-port guard the raw insert bypassed (audit L27).
await upsertInterestBerthTx(tx, newInterest!.id, berthId, {
isPrimary: true,
isSpecificInterest: true,
isInEoiBundle: false,
addedBy: 'public-submission',
});
}