import { and, eq, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { interests } from '@/lib/db/schema/interests'; import { berths } from '@/lib/db/schema/berths'; import { systemSettings } from '@/lib/db/schema/system'; import { createAuditLog, type AuditMeta } from '@/lib/audit'; import { emitToRoom } from '@/lib/socket/server'; import { getPrimaryBerth } from '@/lib/services/interest-berths.service'; import { logger } from '@/lib/logger'; // ─── Types ──────────────────────────────────────────────────────────────────── export type BerthRuleTrigger = | 'eoi_sent' | 'eoi_signed' | 'deposit_received' | 'contract_signed' | 'interest_archived' | 'interest_completed' | 'berth_unlinked'; export type BerthRuleMode = 'auto' | 'suggest' | 'off'; export interface BerthRuleResult { action: 'applied' | 'suggested' | 'none'; newStatus?: string; message?: string; } interface RuleConfig { mode: BerthRuleMode; targetStatus: string; } // ─── Defaults ──────────────────────────────────────────────────────────────── const DEFAULT_RULES: Record = { eoi_sent: { mode: 'suggest', targetStatus: 'under_offer' }, eoi_signed: { mode: 'auto', targetStatus: 'under_offer' }, deposit_received: { mode: 'auto', targetStatus: 'sold' }, contract_signed: { mode: 'auto', targetStatus: 'sold' }, interest_archived: { mode: 'suggest', targetStatus: 'available' }, interest_completed: { mode: 'auto', targetStatus: 'sold' }, berth_unlinked: { mode: 'off', targetStatus: 'available' }, }; // ─── Config ─────────────────────────────────────────────────────────────────── async function getRulesConfig(portId: string): Promise> { const setting = await db.query.systemSettings.findFirst({ where: and(eq(systemSettings.key, 'berth_rules'), eq(systemSettings.portId, portId)), }); if (!setting?.value) { return { ...DEFAULT_RULES }; } const stored = setting.value as Partial>; const merged = { ...DEFAULT_RULES }; for (const trigger of Object.keys(DEFAULT_RULES) as BerthRuleTrigger[]) { if (stored[trigger]) { merged[trigger] = stored[trigger]!; } } return merged; } // ─── Evaluate Rule ──────────────────────────────────────────────────────────── export async function evaluateRule( trigger: BerthRuleTrigger, interestId: string, portId: string, meta: AuditMeta, ): Promise { const interest = await db.query.interests.findFirst({ where: and(eq(interests.id, interestId), eq(interests.portId, portId)), }); if (!interest) { return { action: 'none' }; } // Rule evaluation targets the interest's primary berth (plan §3.4) - // resolved via interest_berths rather than the legacy column. const primaryBerth = await getPrimaryBerth(interestId); const targetBerthId = primaryBerth?.berthId; if (!targetBerthId) { return { action: 'none' }; } const rulesConfig = await getRulesConfig(portId); const rule = rulesConfig[trigger]; // Decision-trace audit: ALWAYS record what we decided to do (or not do), // including the rule mode, so admins can debug "why didn't this fire?" / // "why did this fire" without grepping server logs. Tagged `berth_rule_decision` // so it's distinct from the actual mutation audit row below. void createAuditLog({ userId: meta.userId, portId, action: 'rule_evaluated', entityType: 'berth', entityId: targetBerthId, metadata: { type: 'berth_rule_decision', trigger, mode: rule.mode, targetStatus: rule.targetStatus, interestId, }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); if (rule.mode === 'off') { return { action: 'none' }; } if (rule.mode === 'auto') { // Concurrency hardening: wrap the read-then-write in a transaction with a // berth-scoped advisory lock so two concurrent webhook retries can't both // commit the same status flip (which produces duplicate audit rows + a // double socket emit). Also short-circuit when the target status is // already in place - re-writing 'sold'→'sold' is technically harmless // but pollutes the audit trail and the socket stream. const result = await db.transaction(async (tx) => { // pg_advisory_xact_lock takes a single bigint. We hash port+berth into // a stable 32-bit slot. The lock auto-releases at transaction end so // there's no risk of a stuck lock if the handler crashes mid-write. await tx.execute( sql`SELECT pg_advisory_xact_lock(hashtext(${`berth-rule:${portId}:${targetBerthId}`}))`, ); // Re-read inside the lock so we observe the post-lock state, not the // pre-lock snapshot. If the prior contender already moved status to // our target, we're idempotent and bail. const [current] = await tx .select({ status: berths.status }) .from(berths) .where(and(eq(berths.id, targetBerthId), eq(berths.portId, portId))); if (!current) return { changed: false as const }; if (current.status === rule.targetStatus) { // Idempotent re-fire. We already audited the decision above; nothing // more to do here. logger.debug( { trigger, targetBerthId, portId, status: current.status }, 'Berth-rule auto: target status already set, skipping duplicate write', ); return { changed: false as const }; } await tx .update(berths) .set({ status: rule.targetStatus, statusLastChangedBy: meta.userId, statusLastChangedReason: `Auto-applied by rule: ${trigger}`, statusLastModified: new Date(), // #67 Phase 1: stamp the source so the reconciliation queue // can filter "Manual only" - rules-engine writes are never // candidates for catch-up because they already have a backing // interest driving them. statusOverrideMode: 'automated', updatedAt: new Date(), }) .where(and(eq(berths.id, targetBerthId), eq(berths.portId, portId))); return { changed: true as const, previousStatus: current.status }; }); if (result.changed) { void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'berth', entityId: targetBerthId, oldValue: { status: result.previousStatus }, newValue: { status: rule.targetStatus }, metadata: { type: 'berth_rule_auto', trigger, interestId }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'berth:statusChanged', { berthId: targetBerthId, newStatus: rule.targetStatus, triggeredBy: meta.userId, trigger, }); } return { action: 'applied', newStatus: rule.targetStatus }; } // suggest mode - the decision-trace audit above already records the suggestion. return { action: 'suggested', newStatus: rule.targetStatus, message: `Suggested status change to "${rule.targetStatus}" based on trigger "${trigger}"`, }; }