Files
pn-new-crm/src/lib/services/interests.service.ts
Matt 4dc0bdd8c4
All checks were successful
Build & Push Docker Images / lint (push) Successful in 2m51s
Build & Push Docker Images / build-and-push (push) Successful in 9m16s
feat(crm): client-meeting batch — contact-pill cleanup, assignment toggle, receipt manual mode
CM-4: remove Email/Call/WhatsApp deep-link pills from the client + interest
  detail headers; relocate GDPR export into the client-header action cluster
  as a compact icon. Keeps the interest "Log contact" quick action.
CM-5: gate the interest assignment feature behind a per-port `assignment_enabled`
  setting (default OFF for single-rep ports). Hides the AssignedToChip +
  residential assigned-to row and skips tier-2/3 auto-assign on create; the
  column + data are preserved and reversible. Tests cover the auto-assign guard.
CM-6: add a per-port `manualEntry` receipt mode (skip all parsing → empty form).
  Threaded through ocr-config.service, the admin OCR form, the scan-receipt
  route, and the scanner shell (skips Tesseract + the server call). Tests cover
  the save/resolve round-trip.

Verified: tsc clean, lint 0 errors, 1631 vitest pass, prod build green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 21:42:36 +02:00

1788 lines
67 KiB
TypeScript

import { and, desc, eq, exists, gte, inArray, isNull, ne, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { interests, interestBerths, interestTags, interestNotes } from '@/lib/db/schema/interests';
import { reminders, interestContactLog } from '@/lib/db/schema/operations';
import { clients, clientAddresses, clientContacts, clientNotes } from '@/lib/db/schema/clients';
import { berths } from '@/lib/db/schema/berths';
import { documents, documentEvents } from '@/lib/db/schema/documents';
import { berthTenancies } from '@/lib/db/schema/tenancies';
import { yachts, yachtNotes } from '@/lib/db/schema/yachts';
import { companyMemberships, companyNotes } from '@/lib/db/schema/companies';
import { tags } from '@/lib/db/schema/system';
import { userProfiles, userPortRoles, roles } from '@/lib/db/schema/users';
import { createAuditLog, type AuditMeta } from '@/lib/audit';
import { activeInterestsWhere } from '@/lib/services/active-interest';
import { getPortReminderConfig } from '@/lib/services/port-config';
import { getSetting } from '@/lib/services/settings.service';
import { NotFoundError, ConflictError, ValidationError } from '@/lib/errors';
import { emitToRoom } from '@/lib/socket/server';
import { setEntityTags } from '@/lib/services/entity-tags.helper';
import { evaluateRule } from '@/lib/services/berth-rules-engine';
import { notifyNextInLine } from '@/lib/services/next-in-line-notify.service';
import { logger } from '@/lib/logger';
import {
getAllBerthMooringsForInterests,
getPrimaryBerth,
getPrimaryBerthsForInterests,
listBerthsForInterest,
removeInterestBerth,
upsertInterestBerth,
upsertInterestBerthTx,
} from '@/lib/services/interest-berths.service';
import { formatBerthRange } from '@/lib/templates/berth-range';
import { buildListQuery } from '@/lib/db/query-builder';
import { diffEntity } from '@/lib/entity-diff';
import { softDelete, restore, withTransaction } from '@/lib/db/utils';
import {
PIPELINE_STAGES,
STAGE_LABELS,
canTransitionStage,
canonicalizeStage,
type PipelineStage,
} from '@/lib/constants';
import type {
CreateInterestInput,
UpdateInterestInput,
ChangeStageInput,
ListInterestsInput,
SetOutcomeInput,
ClearOutcomeInput,
} from '@/lib/validators/interests';
// ─── Types ────────────────────────────────────────────────────────────────────
// ─── Port-scope FK validator ─────────────────────────────────────────────────
// Tenant scope: every FK referenced from an interest body - clientId, berthId,
// and yachtId - must belong to the caller's port. Without this, a body-supplied
// foreign-port id would create an interest that joins through these FKs and
// surfaces foreign-tenant data on subsequent reads (clientName, berth mooring
// number, yacht ownership). assertYachtBelongsToClient still runs separately to
// enforce the additional ownership invariant.
async function assertInterestFksInPort(
portId: string,
fks: { clientId?: string | null; berthId?: string | null; yachtId?: string | null },
): Promise<void> {
const checks: Array<Promise<void>> = [];
if (fks.clientId) {
checks.push(
db.query.clients
.findFirst({ where: and(eq(clients.id, fks.clientId), eq(clients.portId, portId)) })
.then((row) => {
if (!row) throw new ValidationError('clientId not found in this port');
}),
);
}
if (fks.berthId) {
checks.push(
db.query.berths
.findFirst({ where: and(eq(berths.id, fks.berthId), eq(berths.portId, portId)) })
.then((row) => {
if (!row) throw new ValidationError('berthId not found in this port');
}),
);
}
if (fks.yachtId) {
checks.push(
db.query.yachts
.findFirst({ where: and(eq(yachts.id, fks.yachtId), eq(yachts.portId, portId)) })
.then((row) => {
if (!row) throw new ValidationError('yachtId not found in this port');
}),
);
}
await Promise.all(checks);
}
// ─── Yacht ownership validator ───────────────────────────────────────────────
async function assertYachtBelongsToClient(
portId: string,
yachtId: string,
clientId: string,
): Promise<void> {
const yacht = await db.query.yachts.findFirst({
where: and(eq(yachts.id, yachtId), eq(yachts.portId, portId)),
});
if (!yacht) throw new ValidationError('yacht not found');
// Direct ownership by client
if (yacht.currentOwnerType === 'client' && yacht.currentOwnerId === clientId) {
return;
}
// Company-represented: client has active membership in the owning company
if (yacht.currentOwnerType === 'company') {
const membership = await db.query.companyMemberships.findFirst({
where: and(
eq(companyMemberships.companyId, yacht.currentOwnerId),
eq(companyMemberships.clientId, clientId),
isNull(companyMemberships.endDate),
),
});
if (membership) return;
}
throw new ValidationError('yacht does not belong to this client');
}
// ─── BR-011: Auto-promote leadCategory ───────────────────────────────────────
async function resolveLeadCategory(
clientId: string,
leadCategory: string | undefined | null,
yachtId?: string | null,
): Promise<string | undefined> {
if (leadCategory && leadCategory !== 'general_interest') {
return leadCategory;
}
if (yachtId) {
const yacht = await db.query.yachts.findFirst({ where: eq(yachts.id, yachtId) });
if (yacht && (yacht.lengthFt || yacht.lengthM)) {
return 'specific_qualified';
}
}
return leadCategory ?? undefined;
}
// ─── Board (kanban) ───────────────────────────────────────────────────────────
/**
* Soft cap on board rows. The kanban legitimately needs every active
* interest in one shot - paginating would split deals across pages and
* break drag-drop semantics - but unbounded SELECTs are a footgun if a
* port suddenly has tens of thousands of stale interests. At 5000 the
* payload is still well under a megabyte (≈50 bytes per minimal row),
* and any port near that ceiling needs virtualization in the kanban UI
* anyway, so failing loud here is the right escalation.
*/
const BOARD_MAX_ROWS = 5000;
export interface BoardInterestRow {
id: string;
clientName: string | null;
berthMooringNumber: string | null;
/** Every linked berth's mooring on this interest (sorted). Consumers
* pass this through `deriveInterestBerthLabel` for the header / card
* display so multi-berth interests render as `A1-A3, B5` rather than
* just the primary mooring. */
berthMoorings: string[];
leadCategory: string | null;
pipelineStage: string;
updatedAt: Date;
}
export interface BoardFilters {
/** Free-text search against client name. */
search?: string;
leadCategory?: string;
source?: string;
eoiStatus?: string;
/** Tag IDs the interest must be tagged with (any-of). */
tagIds?: string[];
}
/**
* Minimal-projection list for the kanban board. Skips the validator's
* `max(100)` page cap since the board renders the entire pipeline at
* once. Returns only the fields PipelineCard renders - no tags-list, no
* notes-count, no EOI status badges, no urgency joins. Always filters
* out archived interests (the kanban is for active deals; the list view
* has the includeArchived toggle for history).
*
* Filters are intentionally a SUBSET of listInterests - `pipelineStage`
* is omitted because the columns ARE the stages, and `includeArchived`
* is omitted because the kanban shouldn't surface archived deals.
*
* One round-trip for the interests + clientName join, one batched
* round-trip via getPrimaryBerthsForInterests for the mooring numbers,
* and one batched lookup for tag-id filtering when supplied.
*/
export async function listInterestsForBoard(
portId: string,
filters: BoardFilters = {},
): Promise<{ data: BoardInterestRow[]; truncated: boolean; total: number }> {
// Kanban shows only active deals - terminal (outcome-set) rows have
// their own /closed views. Pre-2026-05-14 this filter was just
// `archivedAt IS NULL`, which worked because setOutcome moved the
// stage to the 'completed' sentinel and the kanban only renders the
// 7-stage canon. With the sentinel-stage cleanup, terminal rows now
// keep their actual stage value, so we filter outcome explicitly.
const conditions = [
eq(interests.portId, portId),
isNull(interests.archivedAt),
isNull(interests.outcome),
];
if (filters.leadCategory) {
conditions.push(eq(interests.leadCategory, filters.leadCategory));
}
if (filters.source) {
conditions.push(eq(interests.source, filters.source));
}
if (filters.eoiStatus) {
conditions.push(eq(interests.eoiStatus, filters.eoiStatus));
}
// Tag-id filter resolves through the join table first so the main
// query stays a simple WHERE id IN (…) rather than a SELECT DISTINCT
// with LEFT JOIN - keeps Postgres' planner happy at scale.
if (filters.tagIds && filters.tagIds.length > 0) {
const tagMatches = await db
.selectDistinct({ interestId: interestTags.interestId })
.from(interestTags)
.where(inArray(interestTags.tagId, filters.tagIds));
const matchingIds = tagMatches.map((r) => r.interestId);
if (matchingIds.length === 0) {
return { data: [], truncated: false, total: 0 };
}
conditions.push(inArray(interests.id, matchingIds));
}
// Search hits client name via the LEFT JOIN. ILIKE is correct here -
// the kanban list is small (≤5000 rows) so an index scan isn't
// required, and pg_trgm would be overkill for the board surface.
if (filters.search && filters.search.trim().length > 0) {
const term = `%${filters.search.trim().replace(/[%_]/g, '\\$&')}%`;
conditions.push(sql`${clients.fullName} ILIKE ${term}`);
}
const rows = await db
.select({
id: interests.id,
clientName: clients.fullName,
leadCategory: interests.leadCategory,
pipelineStage: interests.pipelineStage,
updatedAt: interests.updatedAt,
})
.from(interests)
.leftJoin(clients, eq(interests.clientId, clients.id))
.where(and(...conditions))
.orderBy(desc(interests.updatedAt))
.limit(BOARD_MAX_ROWS + 1);
const truncated = rows.length > BOARD_MAX_ROWS;
const data = truncated ? rows.slice(0, BOARD_MAX_ROWS) : rows;
// Primary-berth resolution stays in the junction-aware service so the
// board sees the same "the berth for this deal" as every other surface.
// All-berth aggregator runs in parallel; both come from the same
// interest_berths table so the round-trips are independent.
const interestIds = data.map((r) => r.id);
const [primaryBerthMap, allBerthMooringsMap] = await Promise.all([
getPrimaryBerthsForInterests(interestIds),
getAllBerthMooringsForInterests(interestIds),
]);
return {
data: data.map((r) => ({
id: r.id,
clientName: r.clientName ?? null,
berthMooringNumber: primaryBerthMap.get(r.id)?.mooringNumber ?? null,
berthMoorings: allBerthMooringsMap.get(r.id) ?? [],
leadCategory: r.leadCategory ?? null,
pipelineStage: r.pipelineStage,
updatedAt: r.updatedAt,
})),
truncated,
total: data.length,
};
}
// ─── List ─────────────────────────────────────────────────────────────────────
export async function listInterests(portId: string, query: ListInterestsInput) {
const {
page,
limit,
sort,
order,
search,
includeArchived,
clientId,
yachtId,
berthId,
pipelineStage,
leadCategory,
eoiStatus,
tagIds,
} = query;
const filters = [];
if (clientId) {
filters.push(eq(interests.clientId, clientId));
}
if (yachtId) {
filters.push(eq(interests.yachtId, yachtId));
}
if (berthId) {
// EXISTS subquery against the junction: matches whether or not the
// berth is the interest's primary, mirroring "this berth is linked
// to this interest in any role" semantics from plan §3.4.
filters.push(
exists(
db
.select({ one: sql`1` })
.from(interestBerths)
.where(
and(eq(interestBerths.interestId, interests.id), eq(interestBerths.berthId, berthId)),
),
),
);
}
if (pipelineStage && pipelineStage.length > 0) {
filters.push(inArray(interests.pipelineStage, pipelineStage));
}
if (leadCategory) {
filters.push(eq(interests.leadCategory, leadCategory));
}
if (eoiStatus) {
filters.push(eq(interests.eoiStatus, eoiStatus));
}
if (tagIds && tagIds.length > 0) {
const interestsWithTags = await db
.selectDistinct({ interestId: interestTags.interestId })
.from(interestTags)
.where(inArray(interestTags.tagId, tagIds));
const matchingIds = interestsWithTags.map((r) => r.interestId);
if (matchingIds.length > 0) {
filters.push(inArray(interests.id, matchingIds));
} else {
return { data: [], total: 0 };
}
}
const sortColumn = (() => {
switch (sort) {
case 'pipelineStage':
return interests.pipelineStage;
case 'leadCategory':
return interests.leadCategory;
case 'createdAt':
return interests.createdAt;
case 'dateLastContact':
// Postgres sorts NULLs last on DESC by default, which is the right
// behaviour for triage (recently-contacted first, never-contacted
// at the bottom).
return interests.dateLastContact;
default:
return interests.updatedAt;
}
})();
const result = await buildListQuery({
table: interests,
portIdColumn: interests.portId,
portId,
idColumn: interests.id,
updatedAtColumn: interests.updatedAt,
filters,
sort: { column: sortColumn, direction: order },
page,
pageSize: limit,
searchColumns: [],
searchTerm: search,
includeArchived,
archivedAtColumn: interests.archivedAt,
});
// Join client names, primary-berth mooring numbers, and yacht names.
const interestIds = (result.data as Array<{ id: string; clientId: string }>).map((i) => i.id);
const clientIds = [
...new Set((result.data as Array<{ clientId: string }>).map((i) => i.clientId)),
];
const yachtIds = [
...new Set(
(result.data as Array<{ yachtId: string | null }>)
.map((i) => i.yachtId)
.filter(Boolean) as string[],
),
];
let clientsMap: Record<string, string> = {};
let yachtsMap: Record<string, string> = {};
const tagsByInterestId: Record<string, Array<{ id: string; name: string; color: string }>> = {};
const notesCountByInterestId: Record<string, number> = {};
if (clientIds.length > 0) {
const clientRows = await db
.select({ id: clients.id, fullName: clients.fullName })
.from(clients)
.where(inArray(clients.id, clientIds));
clientsMap = Object.fromEntries(clientRows.map((c) => [c.id, c.fullName]));
}
// Primary-berth lookup via the interest_berths junction. Single round-trip
// by interestId list - see plan §3.4: every "the berth for this interest"
// surface resolves through getPrimaryBerth(...) rather than a column read.
// Sibling all-mooring aggregator runs in parallel so the list endpoint
// can surface multi-berth labels (A1-A3, B5) without a second waterfall.
const [primaryBerthMap, allBerthMooringsMap] = await Promise.all([
getPrimaryBerthsForInterests(interestIds),
getAllBerthMooringsForInterests(interestIds),
]);
if (yachtIds.length > 0) {
const yachtRows = await db
.select({ id: yachts.id, name: yachts.name })
.from(yachts)
.where(inArray(yachts.id, yachtIds));
yachtsMap = Object.fromEntries(yachtRows.map((y) => [y.id, y.name]));
}
if (interestIds.length > 0) {
const tagRows = await db
.select({
interestId: interestTags.interestId,
id: tags.id,
name: tags.name,
color: tags.color,
})
.from(interestTags)
.innerJoin(tags, eq(interestTags.tagId, tags.id))
.where(inArray(interestTags.interestId, interestIds));
for (const row of tagRows) {
if (!tagsByInterestId[row.interestId]) tagsByInterestId[row.interestId] = [];
tagsByInterestId[row.interestId]!.push({ id: row.id, name: row.name, color: row.color });
}
// Note counts per interest, for the comment-icon row affordance.
const noteCountRows = await db
.select({
interestId: interestNotes.interestId,
count: sql<number>`count(*)::int`,
})
.from(interestNotes)
.where(inArray(interestNotes.interestId, interestIds))
.groupBy(interestNotes.interestId);
for (const row of noteCountRows) {
notesCountByInterestId[row.interestId] = row.count;
}
}
const data = (result.data as Array<Record<string, unknown>>).map((i) => {
const primary = primaryBerthMap.get(i.id as string) ?? null;
return {
...i,
clientName: clientsMap[i.clientId as string] ?? null,
berthId: primary?.berthId ?? null,
berthMooringNumber: primary?.mooringNumber ?? null,
berthMoorings: allBerthMooringsMap.get(i.id as string) ?? [],
yachtName: i.yachtId ? (yachtsMap[i.yachtId as string] ?? null) : null,
tags: tagsByInterestId[i.id as string] ?? [],
notesCount: notesCountByInterestId[i.id as string] ?? 0,
};
});
return { data, total: result.total };
}
// ─── Get by ID ────────────────────────────────────────────────────────────────
export async function getInterestById(id: string, portId: string) {
const interest = await db.query.interests.findFirst({
where: eq(interests.id, id),
});
if (!interest || interest.portId !== portId) {
throw new NotFoundError('Interest');
}
const [clientRow] = await db
.select({ fullName: clients.fullName })
.from(clients)
.where(eq(clients.id, interest.clientId));
// EOI prerequisites + interest-detail header contact actions: surface the
// linked client's primary email/phone (and the canonical E.164 form for
// wa.me) so the header can render Email / Call / WhatsApp buttons without
// a second fetch, and the Documents tab can show the EOI prereq checklist.
const [emailContact] = await db
.select({ id: clientContacts.id, value: clientContacts.value })
.from(clientContacts)
.where(and(eq(clientContacts.clientId, interest.clientId), eq(clientContacts.channel, 'email')))
.orderBy(desc(clientContacts.isPrimary), desc(clientContacts.updatedAt))
.limit(1);
const [phoneContact] = await db
.select({
id: clientContacts.id,
value: clientContacts.value,
valueE164: clientContacts.valueE164,
valueCountry: clientContacts.valueCountry,
})
.from(clientContacts)
.where(
and(
eq(clientContacts.clientId, interest.clientId),
inArray(clientContacts.channel, ['phone', 'whatsapp']),
),
)
.orderBy(desc(clientContacts.isPrimary), desc(clientContacts.updatedAt))
.limit(1);
const [addressRow] = await db
.select({ id: clientAddresses.id })
.from(clientAddresses)
.where(
and(eq(clientAddresses.clientId, interest.clientId), eq(clientAddresses.isPrimary, true)),
)
.limit(1);
// Primary berth comes from the interest_berths junction (plan §3.4).
// All linked moorings come from the same junction in one go - powers
// the multi-berth label rendered on every "interest header" surface.
const [primaryBerth, allMooringsMap] = await Promise.all([
getPrimaryBerth(interest.id),
getAllBerthMooringsForInterests([interest.id]),
]);
const berthId = primaryBerth?.berthId ?? null;
const berthMooringNumber = primaryBerth?.mooringNumber ?? null;
const berthMoorings = allMooringsMap.get(interest.id) ?? [];
// Total linked-berth count powers the "Berth Interest" milestone on
// the OverviewTab - first thing the rep needs to capture, especially
// for general_interest leads. Resolved here (not from the join above)
// so the count includes berths the rep added without marking primary.
const [{ count: linkedBerthCount } = { count: 0 }] = await db
.select({ count: sql<number>`count(*)::int` })
.from(interestBerths)
.where(eq(interestBerths.interestId, id));
const tagRows = await db
.select({ id: tags.id, name: tags.name, color: tags.color })
.from(interestTags)
.innerJoin(tags, eq(interestTags.tagId, tags.id))
.where(eq(interestTags.interestId, id));
// Most-recent note preview for the Overview tab (the "do you have anything
// outstanding on this lead?" peek). Returns the latest note's truncated
// content + author/timestamp so the UI can render a one-line teaser.
// Left-joins userProfiles so the teaser can show the author's display name
// instead of leaking the raw user-id UUID.
const [recentNote] = await db
.select({
id: interestNotes.id,
content: interestNotes.content,
authorId: interestNotes.authorId,
authorName: userProfiles.displayName,
createdAt: interestNotes.createdAt,
})
.from(interestNotes)
.leftJoin(userProfiles, eq(userProfiles.userId, interestNotes.authorId))
.where(eq(interestNotes.interestId, id))
.orderBy(desc(interestNotes.createdAt))
.limit(1);
const [{ count: notesCount } = { count: 0 }] = await db
.select({ count: sql<number>`count(*)::int` })
.from(interestNotes)
.where(eq(interestNotes.interestId, id));
// Aggregated note count = direct interest notes + notes attached to
// the linked client, yacht (if any), and any companies the linked
// client is an active member of. Surfaces as a separate field so the
// existing notesCount badge stays accurate to "this interest only",
// while the Notes tab can render the broader total when reps want
// the full picture at a glance.
const clientNotesP = db
.select({ count: sql<number>`count(*)::int` })
.from(clientNotes)
.where(eq(clientNotes.clientId, interest.clientId));
const yachtNotesP = interest.yachtId
? db
.select({ count: sql<number>`count(*)::int` })
.from(yachtNotes)
.where(eq(yachtNotes.yachtId, interest.yachtId))
: Promise.resolve([{ count: 0 }]);
const companyMembershipsP = db
.select({ companyId: companyMemberships.companyId })
.from(companyMemberships)
.where(
and(eq(companyMemberships.clientId, interest.clientId), isNull(companyMemberships.endDate)),
);
const [clientNotesRow, yachtNotesRow, memberships] = await Promise.all([
clientNotesP,
yachtNotesP,
companyMembershipsP,
]);
const companyIds = memberships.map((m) => m.companyId);
let companyNotesCount = 0;
if (companyIds.length > 0) {
const [row] = await db
.select({ count: sql<number>`count(*)::int` })
.from(companyNotes)
.where(inArray(companyNotes.companyId, companyIds));
companyNotesCount = row?.count ?? 0;
}
const notesCountAggregated =
notesCount +
(clientNotesRow[0]?.count ?? 0) +
(yachtNotesRow[0]?.count ?? 0) +
companyNotesCount;
// Active reminder count for the interest's bell badge. Counts reminders
// directly linked via interestId - `pending` and `snoozed` only;
// completed/dismissed don't surface.
const [{ count: activeReminderCount } = { count: 0 }] = await db
.select({ count: sql<number>`count(*)::int` })
.from(reminders)
.where(and(eq(reminders.interestId, id), inArray(reminders.status, ['pending', 'snoozed'])));
// Activity log entries in the last 7 days - surfaces "rep is engaged"
// as a separate signal in the deal-health pulse beyond the coarse
// dateLastContact bump.
const sevenDaysAgo = new Date(Date.now() - 7 * 86_400_000);
const [{ count: recentActivityCount } = { count: 0 }] = await db
.select({ count: sql<number>`count(*)::int` })
.from(interestContactLog)
.where(
and(eq(interestContactLog.interestId, id), gte(interestContactLog.occurredAt, sevenDaysAgo)),
);
// Phase 2 - risk-signal derivation. Three dates feed `computeDealHealth`
// off the existing event tables so the pulse chip surfaces document
// declines / cancelled reservations / berth-resold-to-other without
// adding bespoke timestamp columns on `interests`. Each query runs in
// parallel; all return `null` when no matching event exists.
const [declinedRow, cancelledReservationRow, berthResoldRow] = await Promise.all([
// Latest 'rejected' / 'declined' document event whose document is
// linked to this interest.
db
.select({ at: documentEvents.createdAt })
.from(documentEvents)
.innerJoin(documents, eq(documents.id, documentEvents.documentId))
.where(
and(
eq(documents.interestId, id),
inArray(documentEvents.eventType, ['rejected', 'declined']),
),
)
.orderBy(desc(documentEvents.createdAt))
.limit(1),
// Latest cancelled berth_tenancy row pointing at this interest.
// berth_tenancies has no cancelled_at column; updatedAt is set when
// the row flips to status='cancelled', so it tracks the same moment.
db
.select({ at: berthTenancies.updatedAt })
.from(berthTenancies)
.where(and(eq(berthTenancies.interestId, id), eq(berthTenancies.status, 'cancelled')))
.orderBy(desc(berthTenancies.updatedAt))
.limit(1),
// "Berth sold to another deal" - any of this interest's linked berths
// has at least one OTHER interest with a `won` outcome. Take the
// latest such outcome timestamp. archivedAt is a close proxy for the
// moment the win was finalised on the conflicting deal.
//
// The inner subquery resolves *this* interest's berth_ids; the outer
// query joins interestBerths to the won other-interest and filters
// its berth_id against that set. Using raw `sql` avoids the alias
// collision a Drizzle `exists()` would create with the same table on
// both sides of the correlation.
db.execute(
sql`SELECT MAX(other.archived_at) AS at
FROM interests other
JOIN interest_berths ob ON ob.interest_id = other.id
WHERE other.id <> ${id}
AND other.outcome = 'won'
AND ob.berth_id IN (
SELECT berth_id FROM interest_berths WHERE interest_id = ${id}
)`,
),
]);
const dateDocumentDeclined = declinedRow[0]?.at ?? null;
const dateReservationCancelled = cancelledReservationRow[0]?.at ?? null;
// db.execute(sql`...`) returns either an array (postgres-js driver) or
// a `{rows: []}` object depending on driver build - match the dual
// shape used by src/lib/storage/migrate.ts.
const berthResoldRaw = berthResoldRow as unknown as
| Array<{ at: Date | null }>
| { rows?: Array<{ at: Date | null }> };
const berthResoldRows = Array.isArray(berthResoldRaw)
? berthResoldRaw
: (berthResoldRaw.rows ?? []);
const dateBerthSoldToOther = berthResoldRows[0]?.at ?? null;
// Yacht dimensions for inheritance display in OverviewTab. When the
// interest has a linked yacht we ship the yacht's length/width/draft
// alongside the interest record so the Berth Requirements section can
// render a "from yacht" pill in place of an empty value. This is a
// display-only inheritance - the actual recommender source switch is
// still governed by `interests.useYachtDimensions`.
let yachtDimensions: {
lengthFt: string | null;
widthFt: string | null;
draftFt: string | null;
lengthM: string | null;
widthM: string | null;
draftM: string | null;
} | null = null;
if (interest.yachtId) {
const [yachtRow] = await db
.select({
lengthFt: yachts.lengthFt,
widthFt: yachts.widthFt,
draftFt: yachts.draftFt,
lengthM: yachts.lengthM,
widthM: yachts.widthM,
draftM: yachts.draftM,
})
.from(yachts)
.where(eq(yachts.id, interest.yachtId))
.limit(1);
if (yachtRow) {
const anyDim =
yachtRow.lengthFt ||
yachtRow.widthFt ||
yachtRow.draftFt ||
yachtRow.lengthM ||
yachtRow.widthM ||
yachtRow.draftM;
if (anyDim) yachtDimensions = yachtRow;
}
}
// Resolve the assignee's display name for the header chip - falling back
// to the raw ID is fine if the user record is missing (deleted/disabled).
let assignedToName: string | null = null;
if (interest.assignedTo) {
const [profile] = await db
.select({ displayName: userProfiles.displayName })
.from(userProfiles)
.where(eq(userProfiles.userId, interest.assignedTo))
.limit(1);
assignedToName = profile?.displayName ?? null;
}
return {
...interest,
clientName: clientRow?.fullName ?? null,
clientPrimaryEmail: emailContact?.value ?? null,
/** Contact-row id for the primary email - surfaces so the interest UI
* can inline-edit through PATCH /api/v1/clients/[id]/contacts/[contactId]. */
clientPrimaryEmailContactId: emailContact?.id ?? null,
clientPrimaryPhone: phoneContact?.value ?? null,
clientPrimaryPhoneContactId: phoneContact?.id ?? null,
clientPrimaryPhoneE164: phoneContact?.valueE164 ?? null,
clientPrimaryPhoneCountry: phoneContact?.valueCountry ?? null,
clientHasAddress: !!addressRow,
berthId,
berthMooringNumber,
berthMoorings,
linkedBerthCount,
tags: tagRows,
notesCount,
notesCountAggregated,
recentNote: recentNote ?? null,
activeReminderCount,
assignedToName,
recentActivityCount,
// Phase 2 - risk-signal dates derived from event tables. Feed
// computeDealHealth so the pulse chip surfaces document declines,
// cancelled reservations, and "berth resold to another deal" without
// bespoke timestamp columns on the interest record.
dateDocumentDeclined,
dateReservationCancelled,
dateBerthSoldToOther,
yachtDimensions,
};
}
// ─── Create ───────────────────────────────────────────────────────────────────
export async function createInterest(portId: string, data: CreateInterestInput, meta: AuditMeta) {
await assertInterestFksInPort(portId, {
clientId: data.clientId,
berthId: data.berthId,
yachtId: data.yachtId,
});
if (data.yachtId) {
await assertYachtBelongsToClient(portId, data.yachtId, data.clientId);
}
const { tagIds, berthId: inputBerthId, ...interestData } = data;
// BR-011: auto-promote leadCategory
const resolvedLeadCategory = await resolveLeadCategory(
data.clientId,
data.leadCategory,
data.yachtId,
);
// Per-port reminder defaults - applied only when the caller omitted
// reminderEnabled / reminderDays. Honors the /admin/reminders page.
const reminderConfig = await getPortReminderConfig(portId);
const resolvedReminderEnabled = interestData.reminderEnabled ?? reminderConfig.defaultEnabled;
const resolvedReminderDays =
interestData.reminderDays ?? (resolvedReminderEnabled ? reminderConfig.defaultDays : null);
// Resolve the deal owner. Three-tier chain:
// 1. Explicit `data.assignedTo` from the caller (rep picked an
// assignee in the create form).
// 2. Port's `default_new_interest_owner` setting (used for round-
// robin / "front desk owns all new leads" rules).
// 3. Auto-assign to the creating user when they're a regular role
// (sales rep, sales manager, etc.). Skipped for super-admins who
// often create on behalf of other reps - they'd otherwise hijack
// every new lead. Falls back to null (Unassigned) when none of
// the above resolve.
let resolvedAssignedTo = interestData.assignedTo ?? null;
// CM-5: tiers 2 & 3 (port default-owner + auto-assign-to-creator) only run
// when the per-port assignment feature is enabled. Tier 1 (an explicit
// assignedTo from the caller) is always honored. Default is OFF.
const assignmentSetting = await getSetting('assignment_enabled', portId);
const assignmentEnabled = assignmentSetting?.value === true;
if (assignmentEnabled && resolvedAssignedTo === null && !('assignedTo' in interestData)) {
const defaultOwner = await getSetting('default_new_interest_owner', portId);
const v = defaultOwner?.value as { userId?: string } | null | undefined;
if (v?.userId) {
resolvedAssignedTo = v.userId;
} else {
// Tier 3: auto-assign to creator only when their role is a working
// sales rep — super_admin / director / residential_partner / viewer
// intentionally skip (they create on behalf of others or shouldn't
// own interests at all).
const AUTO_ASSIGN_ROLES = new Set(['sales_agent', 'sales_manager']);
const [profile] = await db
.select({ isSuperAdmin: userProfiles.isSuperAdmin })
.from(userProfiles)
.where(eq(userProfiles.userId, meta.userId))
.limit(1);
if (profile && !profile.isSuperAdmin) {
const userRoles = await db
.select({ name: roles.name })
.from(userPortRoles)
.innerJoin(roles, eq(userPortRoles.roleId, roles.id))
.where(and(eq(userPortRoles.userId, meta.userId), eq(userPortRoles.portId, portId)));
if (userRoles.some((r) => AUTO_ASSIGN_ROLES.has(r.name))) {
resolvedAssignedTo = meta.userId;
}
}
}
}
const result = await withTransaction(async (tx) => {
const [interest] = await tx
.insert(interests)
.values({
portId,
...interestData,
assignedTo: resolvedAssignedTo,
reminderEnabled: resolvedReminderEnabled,
reminderDays: resolvedReminderDays,
leadCategory: resolvedLeadCategory,
})
.returning();
if (tagIds && tagIds.length > 0) {
await tx
.insert(interestTags)
.values(tagIds.map((tagId) => ({ interestId: interest!.id, tagId })));
}
// Plan §3.4: when berthId is provided we materialise it as a junction
// row inside the same transaction so an interest is never created
// without its primary-berth link surviving rollback.
if (inputBerthId) {
await upsertInterestBerthTx(tx, interest!.id, inputBerthId, {
isPrimary: true,
isSpecificInterest: true,
isInEoiBundle: false,
addedBy: meta.userId,
});
}
return interest!;
});
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'interest',
entityId: result.id,
newValue: { clientId: result.clientId, pipelineStage: result.pipelineStage },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'interest:created', {
interestId: result.id,
clientId: result.clientId,
berthId: inputBerthId ?? null,
source: result.source ?? '',
});
void import('@/lib/services/webhook-dispatch').then(({ dispatchWebhookEvent }) =>
dispatchWebhookEvent(portId, 'interest:created', {
interestId: result.id,
clientId: result.clientId,
}),
);
// Phase 6 - CRM → Umami attribution. Fire an inbound-lead event so
// marketing can correlate inquiry volume with website traffic by
// source / referrer.
void import('@/lib/services/umami.service').then(({ trackEvent }) =>
trackEvent(portId, 'interest-created', {
interestId: result.id,
source: result.source ?? null,
}),
);
return result;
}
// ─── Update ───────────────────────────────────────────────────────────────────
export async function updateInterest(
id: string,
portId: string,
data: UpdateInterestInput,
meta: AuditMeta,
) {
const existing = await db.query.interests.findFirst({
where: eq(interests.id, id),
});
if (!existing || existing.portId !== portId) {
throw new NotFoundError('Interest');
}
// berthId no longer lives on the interests row - resolve current primary
// via the junction so we know whether the caller is asking for a change.
const currentPrimary = await getPrimaryBerth(id);
const currentBerthId = currentPrimary?.berthId ?? null;
await assertInterestFksInPort(portId, {
berthId: data.berthId && data.berthId !== currentBerthId ? data.berthId : null,
yachtId: data.yachtId && data.yachtId !== existing.yachtId ? data.yachtId : null,
});
if (data.yachtId && data.yachtId !== existing.yachtId) {
await assertYachtBelongsToClient(portId, data.yachtId, existing.clientId);
}
// BR-011: auto-promote leadCategory if provided
let resolvedLeadCategory = data.leadCategory;
if ('leadCategory' in data) {
resolvedLeadCategory = (await resolveLeadCategory(
existing.clientId,
data.leadCategory,
data.yachtId ?? existing.yachtId,
)) as typeof data.leadCategory;
}
// Strip berthId out of the row write - the column was removed by the
// junction-migration. We keep the value for diff/audit purposes and
// dispatch the junction write separately.
const { berthId: incomingBerthId, ...rowData } = data;
const updateData = { ...rowData, leadCategory: resolvedLeadCategory };
const { diff } = diffEntity(
{ ...(existing as Record<string, unknown>), berthId: currentBerthId },
{ ...(updateData as Record<string, unknown>), berthId: incomingBerthId ?? currentBerthId },
);
const [updated] = await db
.update(interests)
.set({ ...updateData, updatedAt: new Date() })
.where(and(eq(interests.id, id), eq(interests.portId, portId)))
.returning();
// Apply primary-berth change through the junction so the unique
// partial index is respected and the previous primary is demoted.
if ('berthId' in data && incomingBerthId !== currentBerthId) {
if (incomingBerthId) {
await upsertInterestBerth(id, incomingBerthId, {
isPrimary: true,
isSpecificInterest: true,
addedBy: meta.userId,
});
} else if (currentBerthId) {
await removeInterestBerth(id, currentBerthId, portId, meta);
}
}
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'interest',
entityId: id,
oldValue: diff as Record<string, unknown>,
newValue: updateData as Record<string, unknown>,
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'interest:updated', {
interestId: id,
changedFields: Object.keys(diff),
});
// Owner change → notify the new assignee. We skip self-reassign so a rep
// re-claiming their own deal doesn't get a noise notification.
if (
'assignedTo' in data &&
data.assignedTo &&
data.assignedTo !== existing.assignedTo &&
data.assignedTo !== meta.userId
) {
const [clientRow] = await db
.select({ fullName: clients.fullName })
.from(clients)
.where(eq(clients.id, existing.clientId))
.limit(1);
const clientLabel = clientRow?.fullName ?? 'a client';
void import('@/lib/services/notifications.service').then(({ createNotification }) =>
createNotification({
portId,
userId: data.assignedTo!,
type: 'interest_assigned',
title: 'New deal assigned to you',
description: `${clientLabel} - ${existing.pipelineStage.replace(/_/g, ' ')}`,
link: `/interests/${id}` as never,
entityType: 'interest',
entityId: id,
dedupeKey: `interest_assigned:${id}:${data.assignedTo}`,
}),
);
}
return updated!;
}
// ─── Change Stage ─────────────────────────────────────────────────────────────
/**
* Sentinel returned by changeInterestStage when the requested target
* matches the current stage (no audit row, no socket emit, no DB update).
* The route handler translates this to a 204 No Content response.
*/
export const STAGE_NOOP = Symbol('stage-noop');
export async function changeInterestStage(
id: string,
portId: string,
data: ChangeStageInput,
meta: AuditMeta,
// M3: distinguishes a manual/UI stage move (the /stage route + bulk route
// call this directly) from a lifecycle/signing-driven advance routed
// through advanceStageIfBehind (EOI sent/signed, deposit met, contract
// signed, reservation signed, custom-doc upload). For signing-driven
// advances the milestone date is owned by the doc-send/sign flow that
// already stamped the real event timestamp - auto-stamping `now` here on
// top of it back-dates "sent → signed" to ~0. So we only auto-populate
// milestone columns when `lifecycleDriven` is false (manual/UI move).
options?: { lifecycleDriven?: boolean },
) {
const lifecycleDriven = options?.lifecycleDriven ?? false;
const existing = await db.query.interests.findFirst({
where: eq(interests.id, id),
});
if (!existing || existing.portId !== portId) {
throw new NotFoundError('Interest');
}
// F27 / A19: same-stage write is a no-op. The service signals this via
// the sentinel `STAGE_NOOP` so the route handler can return 204 No
// Content instead of 200 + full body. Pre-fix every re-submit (e.g.
// accidental double-click) wrote a "Same → Same" audit entry and
// triggered downstream invalidations.
if (existing.pipelineStage === data.pipelineStage) {
return STAGE_NOOP;
}
// Plan: yachtId required to leave the initial enquiry stage
if (
existing.pipelineStage === 'enquiry' &&
data.pipelineStage !== 'enquiry' &&
!existing.yachtId
) {
// F21: user-readable; was "yachtId is required before leaving stage=enquiry"
throw new ValidationError('A yacht must be linked before leaving the Enquiry stage.');
}
// Block egregious skips. The transition table allows reasonable forward
// jumps (e.g. enquiry → eoi) while rejecting things like contract → enquiry.
// Same-stage no-ops are allowed.
// Override (sales-rep manual fix) bypasses the table - the route handler
// gates this on the `interests.override_stage` permission and requires
// a reason, recorded in the audit log below.
if (!data.override && !canTransitionStage(existing.pipelineStage, data.pipelineStage)) {
// F21: use the human-readable stage labels in error copy.
throw new ValidationError(
`Cannot move interest from "${STAGE_LABELS[existing.pipelineStage as PipelineStage] ?? existing.pipelineStage}" directly to "${STAGE_LABELS[data.pipelineStage as PipelineStage] ?? data.pipelineStage}". Use the override option if you need to skip stages - requires a reason.`,
);
}
if (data.override && (!data.reason || data.reason.trim().length < 5)) {
throw new ValidationError(
'Override requires a reason (min 5 chars) explaining the manual stage change.',
);
}
// T65 (Bucket 4 bug #2): block advancing past Qualified onto a
// primary berth whose `price` is NULL. EOI / Reservation / Deposit /
// Contract docs all render the price in templates / merge fields and
// generating a $0 contract is a real production gotcha. Allow the
// skip when `override` is true so a sales-manager fix path stays
// open (recorded with a reason in audit log).
const PRICED_STAGES = new Set(['eoi', 'reservation', 'deposit_paid', 'contract']);
if (!data.override && PRICED_STAGES.has(data.pipelineStage)) {
const primaryBerth = await db
.select({ price: berths.price })
.from(interestBerths)
.innerJoin(berths, eq(interestBerths.berthId, berths.id))
.where(and(eq(interestBerths.interestId, id), eq(interestBerths.isPrimary, true)))
.limit(1);
const price = primaryBerth[0]?.price;
if (!price || parseFloat(String(price)) === 0) {
throw new ValidationError(
`Primary berth has no price set. Set the berth price before advancing past Qualified, or override with a reason.`,
);
}
}
const oldStage = existing.pipelineStage;
// BR-133: Auto-populate milestones based on stage. The rep can override the
// stamp via `milestoneDate` when they're back-dating a real event (e.g.
// "deposit landed yesterday"); we still default to now when omitted.
//
// M3: only stamp milestone columns for manual/UI moves. Lifecycle/signing-
// driven advances (routed through advanceStageIfBehind) get their milestone
// dates from the doc-send/sign flow that already recorded the true event
// timestamp; auto-stamping `now` here on top of that back-dates intervals
// like "contract sent → signed" to ~0. Folding this into the same UPDATE as
// the stage change also removes the previous non-transactional double-write.
const milestoneUpdates: Record<string, unknown> = {};
if (!lifecycleDriven) {
const milestoneDate = data.milestoneDate ? new Date(data.milestoneDate) : new Date();
if (data.pipelineStage === 'eoi') milestoneUpdates.dateEoiSent = milestoneDate;
if (data.pipelineStage === 'reservation')
milestoneUpdates.dateReservationSigned = milestoneDate;
if (data.pipelineStage === 'deposit_paid') milestoneUpdates.dateDepositReceived = milestoneDate;
if (data.pipelineStage === 'contract') milestoneUpdates.dateContractSent = milestoneDate;
}
const [updated] = await db
.update(interests)
.set({ pipelineStage: data.pipelineStage, ...milestoneUpdates, updatedAt: new Date() })
.where(and(eq(interests.id, id), eq(interests.portId, portId)))
.returning();
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'interest',
entityId: id,
oldValue: { pipelineStage: oldStage },
newValue: { pipelineStage: data.pipelineStage, reason: data.reason },
metadata: { type: 'stage_change', reason: data.reason },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'interest:stageChanged', {
interestId: id,
oldStage: oldStage ?? '',
newStage: data.pipelineStage,
clientName: '',
berthNumber: '',
});
void import('@/lib/services/webhook-dispatch').then(({ dispatchWebhookEvent }) =>
dispatchWebhookEvent(portId, 'interest:stageChanged', {
interestId: id,
oldStage: oldStage ?? null,
newStage: data.pipelineStage,
}),
);
// Phase 6 - CRM → Umami attribution for pipeline movement.
void import('@/lib/services/umami.service').then(({ trackEvent }) =>
trackEvent(portId, 'interest-stage-changed', {
interestId: id,
oldStage: oldStage ?? null,
newStage: data.pipelineStage,
}),
);
// Fire-and-forget notification to the acting user. Resolve a friendly
// label (client full name → primary mooring number → "this interest") so
// the inbox doesn't surface a raw UUID; stage names go through the
// canonical STAGE_LABELS dictionary so "deposit_10pct" reads as
// "10% Deposit" everywhere.
void (async () => {
const [{ createNotification }, clientRow, allBerths] = await Promise.all([
import('@/lib/services/notifications.service'),
db.query.clients.findFirst({
where: eq(clients.id, existing.clientId),
columns: { fullName: true },
}),
listBerthsForInterest(id).catch(
() => [] as Awaited<ReturnType<typeof listBerthsForInterest>>,
),
]);
const primaryBerth = allBerths[0] ?? null;
const moorings = allBerths.map((b) => b.mooringNumber).filter((m): m is string => Boolean(m));
const berthSuffix = moorings.length > 0 ? ` [${formatBerthRange(moorings)}]` : '';
const subject =
clientRow?.fullName ??
(primaryBerth?.mooringNumber ? `Berth ${primaryBerth.mooringNumber}` : 'this interest');
const fromLabel = oldStage
? (STAGE_LABELS[oldStage as PipelineStage] ?? oldStage.replace(/_/g, ' '))
: 'unknown';
const toLabel =
STAGE_LABELS[data.pipelineStage as PipelineStage] ?? data.pipelineStage.replace(/_/g, ' ');
await createNotification({
portId,
userId: meta.userId,
type: 'interest_stage_changed',
title: `${subject} moved to ${toLabel}${berthSuffix}`,
description: `Stage changed from ${fromLabel} to ${toLabel}.`,
link: `/interests/${id}`,
entityType: 'interest',
entityId: id,
dedupeKey: `interest:${id}:stage:${data.pipelineStage}`,
cooldownMs: 300_000,
});
})();
return updated!;
}
// ─── Advance Stage If Behind ─────────────────────────────────────────────────
//
// Moves an interest forward to `target` if (and only if) it is currently behind
// it in the pipeline order. Used by lifecycle events (EOI sent, EOI signed,
// deposit recorded, contract signed) so the user-visible stage tracks reality
// without overwriting a more advanced state - e.g. a late-arriving signed-EOI
// webhook on an interest that has already moved on to `contract_sent` is a
// no-op rather than a regression.
//
// Returns true when the stage was changed.
export async function advanceStageIfBehind(
interestId: string,
portId: string,
target: PipelineStage,
meta: AuditMeta,
reason?: string,
): Promise<boolean> {
const existing = await db.query.interests.findFirst({
where: and(eq(interests.id, interestId), eq(interests.portId, portId)),
});
if (!existing) return false;
const currentIdx = PIPELINE_STAGES.indexOf(existing.pipelineStage as PipelineStage);
const targetIdx = PIPELINE_STAGES.indexOf(target);
if (currentIdx === -1 || targetIdx === -1 || currentIdx >= targetIdx) {
return false;
}
// yachtId gate: changeInterestStage requires a yacht before leaving the
// initial enquiry stage. EOI events imply a yacht is in the picture, but
// if the data is missing we bail rather than throw - the EOI itself
// shouldn't fail because of this.
if (existing.pipelineStage === 'enquiry' && !existing.yachtId) {
return false;
}
// M3: this helper is the single funnel for every lifecycle/signing-driven
// advance (EOI sent/signed, deposit met, contract signed, reservation
// signed, custom-doc upload). Flag the move so changeInterestStage does not
// auto-stamp milestone dates - those are owned by the doc-send/sign flow,
// which already recorded the real event timestamp.
await changeInterestStage(interestId, portId, { pipelineStage: target, reason }, meta, {
lifecycleDriven: true,
});
return true;
}
/**
* Gated variant: reads the per-port `stage_advance_rules` setting for the
* given trigger and either:
* - 'auto' → calls advanceStageIfBehind (same behaviour as the base helper)
* - 'suggest' → emits an in-CRM notification with an Approve link so the
* rep can advance with one click (no auto-move)
* - 'off' → no-op (audit log of the event still fires upstream)
*
* Use this from every lifecycle event handler that wants admin-controlled
* cadence - the bare `advanceStageIfBehind` stays available for paths
* where the move is unconditional (manual rep action, completion of a
* doc the admin can't disable).
*/
export async function advanceStageIfBehindGated(
interestId: string,
portId: string,
target: PipelineStage,
meta: AuditMeta,
reason: string | undefined,
trigger:
| 'eoi_sent'
| 'eoi_signed'
| 'reservation_signed'
| 'deposit_received'
| 'contract_signed',
): Promise<boolean> {
const { getStageAdvanceMode } = await import('@/lib/services/port-config');
const mode = await getStageAdvanceMode(portId, trigger);
if (mode === 'off') return false;
if (mode === 'auto') return advanceStageIfBehind(interestId, portId, target, meta, reason);
// 'suggest' - notify the rep with an Approve link, no auto-move. The
// rep can click the notification to fire the same advance manually.
const existing = await db.query.interests.findFirst({
where: and(eq(interests.id, interestId), eq(interests.portId, portId)),
columns: { pipelineStage: true, assignedTo: true },
});
if (!existing) return false;
const currentIdx = PIPELINE_STAGES.indexOf(existing.pipelineStage as PipelineStage);
const targetIdx = PIPELINE_STAGES.indexOf(target);
if (currentIdx === -1 || targetIdx === -1 || currentIdx >= targetIdx) return false;
if (existing.assignedTo) {
void import('@/lib/services/notifications.service').then(({ createNotification }) =>
createNotification({
portId,
userId: existing.assignedTo!,
type: 'stage_advance_suggested',
title: `Advance to ${target}?`,
description:
reason ??
`${trigger} fired - suggested advance from ${existing.pipelineStage} to ${target}.`,
link: `/interests/${interestId}`,
entityType: 'interest',
entityId: interestId,
dedupeKey: `interest:${interestId}:advance-suggest:${trigger}`,
}).catch(() => {
// Notification failure shouldn't block the parent webhook handler.
}),
);
}
return false;
}
// ─── Set Outcome (Won / Lost) ────────────────────────────────────────────────
//
// Records a terminal outcome for the interest. The `outcome` column is the
// canonical terminal-state signal; `pipelineStage` stays where it was so
// reports can answer "what stage was this deal at when it closed?". Prior to
// 2026-05-14 this method forced pipelineStage='completed' - a sentinel
// outside the 7-stage canon that broke type narrowing + downstream stage
// label lookups. Active-interest queries filter by `outcome IS NULL` so
// the rep-facing kanban still hides closed deals.
export async function setInterestOutcome(
id: string,
portId: string,
data: SetOutcomeInput,
meta: AuditMeta,
) {
const existing = await db.query.interests.findFirst({
where: and(eq(interests.id, id), eq(interests.portId, portId)),
});
if (!existing) throw new NotFoundError('Interest');
// M1: terminal-state guard. Once an outcome is set this method must not run
// again - re-firing it (won→lost flip, double-submit, an idempotent webhook
// retry) would re-evaluate the berth rule, rename the document folder, write
// a duplicate audit row, re-emit the socket event and re-fire the Umami
// event. Mirrors clearInterestOutcome's `if (!existing.outcome)` guard:
// changing a recorded outcome requires clearing it first (which reopens the
// deal), so the side effects only ever run on a genuine close transition.
if (existing.outcome) {
throw new ConflictError(
'Interest already has an outcome. Clear the current outcome before setting a new one.',
);
}
const oldOutcome = existing.outcome;
const stageAtOutcome = existing.pipelineStage;
const now = new Date();
await db
.update(interests)
.set({
outcome: data.outcome,
outcomeReason: data.reason ?? null,
outcomeAt: now,
updatedAt: now,
})
.where(and(eq(interests.id, id), eq(interests.portId, portId)));
void createAuditLog({
userId: meta.userId,
portId,
// M-AU04: distinct verb so the audit filter / FTS surface it directly.
action: 'outcome_set',
entityType: 'interest',
entityId: id,
oldValue: { outcome: oldOutcome, pipelineStage: stageAtOutcome },
newValue: { outcome: data.outcome, pipelineStage: stageAtOutcome, reason: data.reason },
metadata: { stageAtOutcome },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'interest:outcomeSet', {
interestId: id,
outcome: data.outcome,
stageAtOutcome,
});
// Berth-rule on deal close (audit C2). Only a WON deal should drive the
// berth to 'sold' (via interest_completed). Lost/cancelled deals fire
// `deal_lost`, which frees the berth — previously every outcome reused
// interest_completed and silently flipped the berth to 'sold'.
void evaluateRule(data.outcome === 'won' ? 'interest_completed' : 'deal_lost', id, portId, meta);
// Phase 2 nested-subfolders - rename the interest's document folder
// to surface the outcome inline (e.g. "Deal A1-A3 (Won)"). Dynamic
// import avoids the circular dep with document-folders.service which
// already pulls from interests.service for the primary-berth label.
void import('@/lib/services/document-folders.service')
.then((m) =>
m.renameInterestFolderForOutcome
? m.renameInterestFolderForOutcome(id, portId, data.outcome)
: null,
)
.catch(() => {
// Folder may not exist yet (first upload happens later) - silent.
});
// Phase 6 - CRM → Umami attribution. Fire a custom Umami event so
// marketing can correlate inbound website traffic with the resulting
// deal outcome. Dynamic import to avoid a circular service dep at
// module-load time.
void import('@/lib/services/umami.service').then(({ trackEvent }) =>
trackEvent(portId, 'interest-outcome-set', {
interestId: id,
outcome: data.outcome,
stageAtOutcome,
}),
);
return { ok: true as const };
}
// Clears a terminal outcome and reopens the interest. Used when an outcome
// was set in error or a "lost" deal comes back to life.
export async function clearInterestOutcome(
id: string,
portId: string,
data: ClearOutcomeInput,
meta: AuditMeta,
) {
const existing = await db.query.interests.findFirst({
where: and(eq(interests.id, id), eq(interests.portId, portId)),
});
if (!existing) throw new NotFoundError('Interest');
if (!existing.outcome) {
throw new ValidationError('Interest has no outcome to clear');
}
// Reopen-stage logic:
// - If the caller passed `data.reopenStage`, honor it (rep override).
// - Else preserve the current stage - post-refactor setOutcome stops
// touching pipelineStage, so the deal already knows where it was
// when the rep closed it. Reopening should drop the rep back into
// that same column on the kanban.
// L1: the dead `pipelineStage === 'completed' ? 'qualified'` branch is
// removed (the 'completed' sentinel was dropped in the 9→7 migration).
// Any legacy stage value still on the row is folded to its canonical
// 7-stage equivalent via canonicalizeStage so a pre-migration
// 'completed' row reopens to `contract` (its true pre-close stage) and
// never re-enters the kanban with a non-canonical value.
const reopenStage = data.reopenStage ?? canonicalizeStage(existing.pipelineStage);
const now = new Date();
await db
.update(interests)
.set({
outcome: null,
outcomeReason: null,
outcomeAt: null,
pipelineStage: reopenStage,
updatedAt: now,
})
.where(and(eq(interests.id, id), eq(interests.portId, portId)));
void createAuditLog({
userId: meta.userId,
portId,
// M-AU04: distinct verb so the audit filter / FTS surface it directly.
action: 'outcome_cleared',
entityType: 'interest',
entityId: id,
oldValue: { outcome: existing.outcome, pipelineStage: existing.pipelineStage },
newValue: { outcome: null, pipelineStage: reopenStage },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'interest:outcomeCleared', { interestId: id });
return { ok: true as const };
}
// ─── Archive / Restore ────────────────────────────────────────────────────────
export async function archiveInterest(id: string, portId: string, meta: AuditMeta) {
const existing = await db.query.interests.findFirst({
where: eq(interests.id, id),
});
if (!existing || existing.portId !== portId) {
throw new NotFoundError('Interest');
}
// BR-014: Block archive if pending EOI/contract
if (existing.eoiStatus === 'waiting_for_signatures' || existing.contractStatus === 'pending') {
throw new ConflictError(
'Cannot archive interest with pending documents. Cancel documents first.',
);
}
// Resolve the primary berth BEFORE the soft-delete so the berth-rule and
// next-in-line lookups still see the interest's junction rows. softDelete
// toggles archivedAt; the junction isn't archived alongside it, but the
// rule reads the primary via the junction which is unaffected.
const primaryBerth = await getPrimaryBerth(id);
await softDelete(interests, interests.id, id);
void createAuditLog({
userId: meta.userId,
portId,
action: 'archive',
entityType: 'interest',
entityId: id,
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'interest:archived', { interestId: id });
// G-C4: fire the berth-rule (default mode 'suggest' for interest_archived).
// G-I2: notify sales of the next-in-line interests on the released berth so
// they can follow up - mirrors the client-archive flow but scoped to a
// single interest's primary berth.
if (primaryBerth) {
void evaluateRule('interest_archived', id, portId, meta);
// Build the next-in-line dossier: any other active interest linked to
// the same berth that doesn't belong to this archived interest.
void (async () => {
try {
const others = await db
.select({
interestId: interests.id,
clientId: interests.clientId,
clientName: clients.fullName,
pipelineStage: interests.pipelineStage,
})
.from(interestBerths)
.innerJoin(interests, eq(interestBerths.interestId, interests.id))
.leftJoin(clients, eq(interests.clientId, clients.id))
.where(
and(
activeInterestsWhere(portId),
eq(interestBerths.berthId, primaryBerth.berthId),
ne(interests.id, id),
),
)
.orderBy(desc(interests.updatedAt))
.limit(10);
const archivedClient = existing.clientId
? await db.query.clients.findFirst({ where: eq(clients.id, existing.clientId) })
: null;
await notifyNextInLine({
portId,
berthId: primaryBerth.berthId,
mooringNumber: primaryBerth.mooringNumber ?? '',
archivedClientName: archivedClient?.fullName ?? '(unknown client)',
nextInLineInterests: others.map((o) => ({
interestId: o.interestId,
clientName: o.clientName,
pipelineStage: o.pipelineStage,
})),
});
} catch (err) {
logger.error(
{ err, interestId: id, berthId: primaryBerth.berthId },
'Failed to fire next-in-line notification on interest archive',
);
}
})();
}
}
export async function restoreInterest(id: string, portId: string, meta: AuditMeta) {
const existing = await db.query.interests.findFirst({
where: eq(interests.id, id),
});
if (!existing || existing.portId !== portId) {
throw new NotFoundError('Interest');
}
await restore(interests, interests.id, id);
void createAuditLog({
userId: meta.userId,
portId,
action: 'restore',
entityType: 'interest',
entityId: id,
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'interest:updated', { interestId: id, changedFields: [] });
}
// ─── Set Tags ─────────────────────────────────────────────────────────────────
export async function setInterestTags(
id: string,
portId: string,
tagIds: string[],
meta: AuditMeta,
) {
const existing = await db.query.interests.findFirst({
where: eq(interests.id, id),
});
if (!existing || existing.portId !== portId) {
throw new NotFoundError('Interest');
}
const result = await setEntityTags({
joinTable: interestTags,
entityColumn: interestTags.interestId,
tagColumn: interestTags.tagId,
entityId: id,
portId,
tagIds,
meta,
entityType: 'interest',
});
return { interestId: result.entityId, tagIds: result.tagIds };
}
// ─── Link / Unlink Berth ──────────────────────────────────────────────────────
export async function linkBerth(id: string, portId: string, berthId: string, meta: AuditMeta) {
const existing = await db.query.interests.findFirst({
where: eq(interests.id, id),
});
if (!existing || existing.portId !== portId) {
throw new NotFoundError('Interest');
}
await assertInterestFksInPort(portId, { berthId });
const previousPrimary = await getPrimaryBerth(id);
const oldBerthId = previousPrimary?.berthId ?? null;
await upsertInterestBerth(id, berthId, {
isPrimary: true,
isSpecificInterest: true,
addedBy: meta.userId,
});
// Touch updatedAt so list/sort surfaces still reflect the change.
const [updated] = await db
.update(interests)
.set({ updatedAt: new Date() })
.where(and(eq(interests.id, id), eq(interests.portId, portId)))
.returning();
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'interest',
entityId: id,
oldValue: { berthId: oldBerthId },
newValue: { berthId },
metadata: { type: 'berth_linked' },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'interest:berthLinked', { interestId: id, berthId });
void import('@/lib/services/webhook-dispatch').then(({ dispatchWebhookEvent }) =>
dispatchWebhookEvent(portId, 'interest:berthLinked', { interestId: id, berthId }),
);
return updated!;
}
export async function unlinkBerth(id: string, portId: string, meta: AuditMeta) {
const existing = await db.query.interests.findFirst({
where: eq(interests.id, id),
});
if (!existing || existing.portId !== portId) {
throw new NotFoundError('Interest');
}
const previousPrimary = await getPrimaryBerth(id);
const oldBerthId = previousPrimary?.berthId ?? null;
if (oldBerthId) {
await removeInterestBerth(id, oldBerthId, portId, meta);
}
const [updated] = await db
.update(interests)
.set({ updatedAt: new Date() })
.where(and(eq(interests.id, id), eq(interests.portId, portId)))
.returning();
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'interest',
entityId: id,
oldValue: { berthId: oldBerthId },
newValue: { berthId: null },
metadata: { type: 'berth_unlinked' },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'interest:berthUnlinked', {
interestId: id,
berthId: oldBerthId ?? '',
});
return updated!;
}
// ─── Stage Counts (for board) ────────────────────────────────────────────────
export async function getInterestStageCounts(portId: string) {
// Kanban / board counts surface active deals only (no terminal
// outcomes) - terminal rows belong on a separate /closed surface.
const rows = await db
.select({ stage: interests.pipelineStage, count: sql<number>`count(*)::int` })
.from(interests)
.where(activeInterestsWhere(portId))
.groupBy(interests.pipelineStage);
return Object.fromEntries(rows.map((r) => [r.stage, r.count]));
}