feat(pipeline): 9→7 stage refactor + v1.1 hardening wave
Replaces the legacy 9-stage pipeline with 7 canonical stages
(enquiry → qualified → eoi → reservation → deposit_paid → contract →
nurturing) plus three doc sub-status columns (eoi_doc_status,
reservation_doc_status, contract_doc_status) that track sent/signed
within a single stage instead of branching it.
Schema (migration 0062):
- interests gains assigned_to, deposit_expected_amount/currency,
three doc-status columns, two documenso-id columns, and
date_reservation_signed.
- New tables: qualification_criteria (per-port admin-configurable),
interest_qualifications (per-interest state), payments (deposit /
balance / refund records keyed to interest + client).
- Default qualification criteria seeded for every existing port.
- Dummy-data UPDATEs collapse Sent/Signed pairs and 'completed' into
the new stage + doc-status + outcome shape.
Migration 0063 adds interest_contact_log.voice_transcript and
template_used columns for v1.1-A/B (quick-template buttons + voice
transcription via Web Speech API).
v1.1 phase work bundled here:
- A/B: Quick-template buttons (Call / Visit / Email) + mic toggle on
the contact-log compose dialog (useVoiceTranscription hook).
- C: berth-rules-engine wraps state writes in pg_advisory_xact_lock
with an idempotent re-read; emits rule_evaluated audit traces.
- D: Documenso webhook: reservation/contract sub-status stamping
moved out of the PDF-download try-block so a download failure
no longer swallows the stamp. New integration test coverage.
- E: /admin/qualification-criteria CRUD page + admin component.
- F: default_new_interest_owner exposed in System Settings.
- G: recentActivityCount + active_engagement deal-pulse signal
surfaced as a chip on interests + hot-deals card.
- H: interest_assigned notification on assignedTo change (skips
self-assign, uses a dedupe key).
Plus the supporting components: AssignedToChip, DealPulseChip,
PaymentsSection, QualificationChecklist, MultiEoiChip,
SkipAheadBanner, WonStatusPanel, InterestBerthStatusBanner,
SupplementalInfoRequestButton, UserPicker.
Tests: 1370/1370 vitest pass (added deal-health unit suite +
expanded constants/validators/pipeline-transitions coverage). tsc
clean, eslint clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import { and, eq } from 'drizzle-orm';
|
||||
import { and, eq, sql } from 'drizzle-orm';
|
||||
|
||||
import { db } from '@/lib/db';
|
||||
import { interests } from '@/lib/db/schema/interests';
|
||||
@@ -7,6 +7,7 @@ import { systemSettings } from '@/lib/db/schema/system';
|
||||
import { createAuditLog, type AuditMeta } from '@/lib/audit';
|
||||
import { emitToRoom } from '@/lib/socket/server';
|
||||
import { getPrimaryBerth } from '@/lib/services/interest-berths.service';
|
||||
import { logger } from '@/lib/logger';
|
||||
|
||||
// ─── Types ────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -94,45 +95,105 @@ export async function evaluateRule(
|
||||
const rulesConfig = await getRulesConfig(portId);
|
||||
const rule = rulesConfig[trigger];
|
||||
|
||||
// Decision-trace audit: ALWAYS record what we decided to do (or not do),
|
||||
// including the rule mode, so admins can debug "why didn't this fire?" /
|
||||
// "why did this fire" without grepping server logs. Tagged `berth_rule_decision`
|
||||
// so it's distinct from the actual mutation audit row below.
|
||||
void createAuditLog({
|
||||
userId: meta.userId,
|
||||
portId,
|
||||
action: 'rule_evaluated',
|
||||
entityType: 'berth',
|
||||
entityId: targetBerthId,
|
||||
metadata: {
|
||||
type: 'berth_rule_decision',
|
||||
trigger,
|
||||
mode: rule.mode,
|
||||
targetStatus: rule.targetStatus,
|
||||
interestId,
|
||||
},
|
||||
ipAddress: meta.ipAddress,
|
||||
userAgent: meta.userAgent,
|
||||
});
|
||||
|
||||
if (rule.mode === 'off') {
|
||||
return { action: 'none' };
|
||||
}
|
||||
|
||||
if (rule.mode === 'auto') {
|
||||
await db
|
||||
.update(berths)
|
||||
.set({
|
||||
status: rule.targetStatus,
|
||||
statusLastChangedBy: meta.userId,
|
||||
statusLastChangedReason: `Auto-applied by rule: ${trigger}`,
|
||||
statusLastModified: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(and(eq(berths.id, targetBerthId), eq(berths.portId, portId)));
|
||||
// Concurrency hardening: wrap the read-then-write in a transaction with a
|
||||
// berth-scoped advisory lock so two concurrent webhook retries can't both
|
||||
// commit the same status flip (which produces duplicate audit rows + a
|
||||
// double socket emit). Also short-circuit when the target status is
|
||||
// already in place — re-writing 'sold'→'sold' is technically harmless
|
||||
// but pollutes the audit trail and the socket stream.
|
||||
const result = await db.transaction(async (tx) => {
|
||||
// pg_advisory_xact_lock takes a single bigint. We hash port+berth into
|
||||
// a stable 32-bit slot. The lock auto-releases at transaction end so
|
||||
// there's no risk of a stuck lock if the handler crashes mid-write.
|
||||
await tx.execute(
|
||||
sql`SELECT pg_advisory_xact_lock(hashtext(${`berth-rule:${portId}:${targetBerthId}`}))`,
|
||||
);
|
||||
|
||||
void createAuditLog({
|
||||
userId: meta.userId,
|
||||
portId,
|
||||
action: 'update',
|
||||
entityType: 'berth',
|
||||
entityId: targetBerthId,
|
||||
newValue: { status: rule.targetStatus },
|
||||
metadata: { type: 'berth_rule_auto', trigger, interestId },
|
||||
ipAddress: meta.ipAddress,
|
||||
userAgent: meta.userAgent,
|
||||
// Re-read inside the lock so we observe the post-lock state, not the
|
||||
// pre-lock snapshot. If the prior contender already moved status to
|
||||
// our target, we're idempotent and bail.
|
||||
const [current] = await tx
|
||||
.select({ status: berths.status })
|
||||
.from(berths)
|
||||
.where(and(eq(berths.id, targetBerthId), eq(berths.portId, portId)));
|
||||
|
||||
if (!current) return { changed: false as const };
|
||||
if (current.status === rule.targetStatus) {
|
||||
// Idempotent re-fire. We already audited the decision above; nothing
|
||||
// more to do here.
|
||||
logger.debug(
|
||||
{ trigger, targetBerthId, portId, status: current.status },
|
||||
'Berth-rule auto: target status already set, skipping duplicate write',
|
||||
);
|
||||
return { changed: false as const };
|
||||
}
|
||||
|
||||
await tx
|
||||
.update(berths)
|
||||
.set({
|
||||
status: rule.targetStatus,
|
||||
statusLastChangedBy: meta.userId,
|
||||
statusLastChangedReason: `Auto-applied by rule: ${trigger}`,
|
||||
statusLastModified: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(and(eq(berths.id, targetBerthId), eq(berths.portId, portId)));
|
||||
|
||||
return { changed: true as const, previousStatus: current.status };
|
||||
});
|
||||
|
||||
emitToRoom(`port:${portId}`, 'berth:statusChanged', {
|
||||
berthId: targetBerthId,
|
||||
newStatus: rule.targetStatus,
|
||||
triggeredBy: meta.userId,
|
||||
trigger,
|
||||
});
|
||||
if (result.changed) {
|
||||
void createAuditLog({
|
||||
userId: meta.userId,
|
||||
portId,
|
||||
action: 'update',
|
||||
entityType: 'berth',
|
||||
entityId: targetBerthId,
|
||||
oldValue: { status: result.previousStatus },
|
||||
newValue: { status: rule.targetStatus },
|
||||
metadata: { type: 'berth_rule_auto', trigger, interestId },
|
||||
ipAddress: meta.ipAddress,
|
||||
userAgent: meta.userAgent,
|
||||
});
|
||||
|
||||
emitToRoom(`port:${portId}`, 'berth:statusChanged', {
|
||||
berthId: targetBerthId,
|
||||
newStatus: rule.targetStatus,
|
||||
triggeredBy: meta.userId,
|
||||
trigger,
|
||||
});
|
||||
}
|
||||
|
||||
return { action: 'applied', newStatus: rule.targetStatus };
|
||||
}
|
||||
|
||||
// suggest mode
|
||||
// suggest mode — the decision-trace audit above already records the suggestion.
|
||||
return {
|
||||
action: 'suggested',
|
||||
newStatus: rule.targetStatus,
|
||||
|
||||
Reference in New Issue
Block a user