refactor(interests): migrate callers to interest_berths junction + drop berth_id

Phase 2b of the berth-recommender refactor (plan §3.4). Every caller of
the legacy `interests.berth_id` column now reads / writes through the
`interest_berths` junction via the helper service introduced in Phase 2a;
the column itself is dropped in a final migration.

Service-layer changes
- interests.service: filter `?berthId=X` becomes EXISTS-against-junction;
  list enrichment uses `getPrimaryBerthsForInterests`; create/update/
  linkBerth/unlinkBerth all dispatch through the junction helpers, with
  createInterest's row insert + junction write sharing a single transaction.
- clients / dashboard / report-generators / search: leftJoin chains pivot
  through `interest_berths` filtered by `is_primary=true`.
- eoi-context / document-templates / berth-rules-engine / portal /
  record-export / queue worker: read primary via `getPrimaryBerth(...)`.
- interest-scoring: berthLinked is now derived from any junction row count.
- dedup/migration-apply + public interest route: write a primary junction
  row alongside the interest insert when a berth is provided.

API contract preserved: list/detail responses still emit `berthId` and
`berthMooringNumber`, derived from the primary junction row, so frontend
consumers (interest-form, interest-detail-header) need no changes.

Schema + migration
- Drop `interestsRelations.berth` and `idx_interests_berth`.
- Replace `berthsRelations.interests` with `interestBerths`.
- Migration 0029_puzzling_romulus drops `interests.berth_id` + the index.
- Tests that previously inserted `interests.berthId` now seed a primary
  junction row alongside the interest.

Verified: vitest 995 passing (1 unrelated pre-existing flake in
maintenance-cleanup.test.ts), tsc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Matt Ciaccio
2026-05-05 02:41:52 +02:00
parent ff92a08620
commit 6e3d910c76
26 changed files with 11351 additions and 220 deletions

View File

@@ -6,6 +6,7 @@ import { berths } from '@/lib/db/schema/berths';
import { systemSettings } from '@/lib/db/schema/system';
import { createAuditLog, type AuditMeta } from '@/lib/audit';
import { emitToRoom } from '@/lib/socket/server';
import { getPrimaryBerth } from '@/lib/services/interest-berths.service';
// ─── Types ────────────────────────────────────────────────────────────────────
@@ -78,7 +79,15 @@ export async function evaluateRule(
where: and(eq(interests.id, interestId), eq(interests.portId, portId)),
});
if (!interest?.berthId) {
if (!interest) {
return { action: 'none' };
}
// Rule evaluation targets the interest's primary berth (plan §3.4) -
// resolved via interest_berths rather than the legacy column.
const primaryBerth = await getPrimaryBerth(interestId);
const targetBerthId = primaryBerth?.berthId;
if (!targetBerthId) {
return { action: 'none' };
}
@@ -99,14 +108,14 @@ export async function evaluateRule(
statusLastModified: new Date(),
updatedAt: new Date(),
})
.where(and(eq(berths.id, interest.berthId), eq(berths.portId, portId)));
.where(and(eq(berths.id, targetBerthId), eq(berths.portId, portId)));
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'berth',
entityId: interest.berthId,
entityId: targetBerthId,
newValue: { status: rule.targetStatus },
metadata: { type: 'berth_rule_auto', trigger, interestId },
ipAddress: meta.ipAddress,
@@ -114,7 +123,7 @@ export async function evaluateRule(
});
emitToRoom(`port:${portId}`, 'berth:statusChanged', {
berthId: interest.berthId,
berthId: targetBerthId,
newStatus: rule.targetStatus,
triggeredBy: meta.userId,
trigger,

View File

@@ -12,7 +12,7 @@ import {
import { companies, companyMemberships } from '@/lib/db/schema/companies';
import { yachts } from '@/lib/db/schema/yachts';
import { berthReservations } from '@/lib/db/schema/reservations';
import { interests } from '@/lib/db/schema/interests';
import { interests, interestBerths } from '@/lib/db/schema/interests';
import { berths } from '@/lib/db/schema/berths';
import { tags } from '@/lib/db/schema/system';
import { createAuditLog, type AuditMeta } from '@/lib/audit';
@@ -103,6 +103,10 @@ export async function listClients(portId: string, query: ListClientsInput) {
.from(companyMemberships)
.where(and(inArray(companyMemberships.clientId, ids), isNull(companyMemberships.endDate)))
.groupBy(companyMemberships.clientId),
// Latest interest per client + its primary-berth mooring (resolved via
// interest_berths join, plan §3.4). The is_primary filter narrows the
// join to ≤1 berth row per interest; non-primary links never surface
// through this list-page derivation.
db
.select({
clientId: interests.clientId,
@@ -111,7 +115,11 @@ export async function listClients(portId: string, query: ListClientsInput) {
mooringNumber: berths.mooringNumber,
})
.from(interests)
.leftJoin(berths, eq(berths.id, interests.berthId))
.leftJoin(
interestBerths,
and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)),
)
.leftJoin(berths, eq(berths.id, interestBerths.berthId))
.where(
and(
eq(interests.portId, portId),

View File

@@ -2,7 +2,7 @@ import { and, count, desc, eq, isNull, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { clients } from '@/lib/db/schema/clients';
import { interests } from '@/lib/db/schema/interests';
import { interests, interestBerths } from '@/lib/db/schema/interests';
import { berths } from '@/lib/db/schema/berths';
import { systemSettings, auditLogs } from '@/lib/db/schema/system';
import { PIPELINE_STAGES, STAGE_WEIGHTS } from '@/lib/constants';
@@ -29,19 +29,17 @@ export async function getKpis(portId: string) {
// Pipeline value: SUM each berth's price ONCE regardless of how many active
// interests reference it. A berth with multiple interests would otherwise be
// counted multiple times, inflating the total.
// counted multiple times, inflating the total. Reads the primary-berth link
// via interest_berths (plan §3.4).
const pipelineRows = await db
.selectDistinct({ berthId: interests.berthId, price: berths.price })
.selectDistinct({ berthId: interestBerths.berthId, price: berths.price })
.from(interests)
.innerJoin(berths, eq(interests.berthId, berths.id))
.where(
and(
eq(interests.portId, portId),
isNull(interests.archivedAt),
isActiveInterest,
sql`${interests.berthId} IS NOT NULL`,
),
);
.innerJoin(
interestBerths,
and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)),
)
.innerJoin(berths, eq(interestBerths.berthId, berths.id))
.where(and(eq(interests.portId, portId), isNull(interests.archivedAt), isActiveInterest));
const pipelineValueUsd = pipelineRows.reduce((acc, row) => {
return acc + (row.price ? parseFloat(String(row.price)) : 0);
@@ -111,7 +109,8 @@ export async function getRevenueForecast(portId: string) {
}
// Forecast excludes lost/cancelled - only currently-active or won-out
// interests should affect the weighted pipeline value.
// interests should affect the weighted pipeline value. Reads the
// primary-berth link via interest_berths (plan §3.4).
const interestRows = await db
.select({
id: interests.id,
@@ -119,15 +118,12 @@ export async function getRevenueForecast(portId: string) {
berthPrice: berths.price,
})
.from(interests)
.innerJoin(berths, eq(interests.berthId, berths.id))
.where(
and(
eq(interests.portId, portId),
isNull(interests.archivedAt),
isActiveInterest,
sql`${interests.berthId} IS NOT NULL`,
),
);
.innerJoin(
interestBerths,
and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)),
)
.innerJoin(berths, eq(interestBerths.berthId, berths.id))
.where(and(eq(interests.portId, portId), isNull(interests.archivedAt), isActiveInterest));
// Build stageBreakdown
const stageMap: Record<string, { count: number; weightedValue: number }> = {};

View File

@@ -27,6 +27,7 @@ import { buildDocumensoPayload, getPortEoiSigners } from '@/lib/services/documen
import { generateEoiPdfFromTemplate } from '@/lib/pdf/fill-eoi-form';
import { MERGE_FIELDS, type MergeFieldCatalog } from '@/lib/templates/merge-fields';
import { buildEoiContext } from '@/lib/services/eoi-context';
import { getPrimaryBerth } from '@/lib/services/interest-berths.service';
import { sendEmail } from '@/lib/email';
import type {
CreateTemplateInput,
@@ -374,15 +375,16 @@ export async function resolveTemplate(
? new Date(interest.dateContractSigned).toLocaleDateString('en-GB')
: '';
// Derive berth number from the interest when berthId wasn't passed and
// the EOI path didn't already populate it.
if (!eoiContextLoaded && interest.berthId && !context.berthId) {
const interestBerth = await db.query.berths.findFirst({
where: eq(berths.id, interest.berthId),
});
if (interestBerth) {
tokenMap['{{interest.berthNumber}}'] = interestBerth.mooringNumber;
// the EOI path didn't already populate it. Resolves through the
// interest_berths junction (plan §3.4) - the legacy interest.berth_id
// column has been removed.
const interestPrimaryBerth =
!eoiContextLoaded && !context.berthId ? await getPrimaryBerth(interest.id) : null;
if (!eoiContextLoaded && interestPrimaryBerth?.berthId && !context.berthId) {
if (interestPrimaryBerth.mooringNumber) {
tokenMap['{{interest.berthNumber}}'] = interestPrimaryBerth.mooringNumber;
if (!tokenMap['{{berth.mooringNumber}}']) {
tokenMap['{{berth.mooringNumber}}'] = interestBerth.mooringNumber;
tokenMap['{{berth.mooringNumber}}'] = interestPrimaryBerth.mooringNumber;
}
} else {
tokenMap['{{interest.berthNumber}}'] ??= '';

View File

@@ -9,6 +9,7 @@ import { ports } from '@/lib/db/schema/ports';
import { yachts } from '@/lib/db/schema/yachts';
import { getCountryName } from '@/lib/i18n/countries';
import { NotFoundError, ValidationError } from '@/lib/errors';
import { getPrimaryBerth } from '@/lib/services/interest-berths.service';
// ─── Types ────────────────────────────────────────────────────────────────────
@@ -96,6 +97,11 @@ export async function buildEoiContext(interestId: string, portId: string): Promi
throw new NotFoundError('Interest');
}
// Resolve the interest's primary berth via the junction (plan §3.4).
// EOI Section 3 stays blank when no primary is set.
const primaryBerth = await getPrimaryBerth(interest.id);
const primaryBerthId = primaryBerth?.berthId ?? null;
// Parallelise independent reads. Yacht and berth are both nullable -
// the EOI's Section 3 stays blank when they're absent.
const [yacht, berth, client, port] = await Promise.all([
@@ -104,9 +110,9 @@ export async function buildEoiContext(interestId: string, portId: string): Promi
where: and(eq(yachts.id, interest.yachtId), eq(yachts.portId, portId)),
})
: Promise.resolve(undefined),
interest.berthId
primaryBerthId
? db.query.berths.findFirst({
where: and(eq(berths.id, interest.berthId), eq(berths.portId, portId)),
where: and(eq(berths.id, primaryBerthId), eq(berths.portId, portId)),
})
: Promise.resolve(undefined),
db.query.clients.findFirst({

View File

@@ -22,6 +22,8 @@ import { db } from '@/lib/db';
import { interestBerths, type InterestBerth } from '@/lib/db/schema/interests';
import { berths } from '@/lib/db/schema/berths';
type DbOrTx = typeof db | Parameters<Parameters<typeof db.transaction>[0]>[0];
// ─── Reads ──────────────────────────────────────────────────────────────────
export interface PrimaryBerthRef {
@@ -156,40 +158,54 @@ export async function upsertInterestBerth(
opts: AddOrUpdateOpts = {},
): Promise<InterestBerth> {
return db.transaction(async (tx) => {
if (opts.isPrimary === true) {
await tx
.update(interestBerths)
.set({ isPrimary: false })
.where(and(eq(interestBerths.interestId, interestId), eq(interestBerths.isPrimary, true)));
}
const setForUpdate: Partial<InterestBerth> = {};
if (opts.isPrimary !== undefined) setForUpdate.isPrimary = opts.isPrimary;
if (opts.isSpecificInterest !== undefined)
setForUpdate.isSpecificInterest = opts.isSpecificInterest;
if (opts.isInEoiBundle !== undefined) setForUpdate.isInEoiBundle = opts.isInEoiBundle;
if (opts.addedBy !== undefined) setForUpdate.addedBy = opts.addedBy;
if (opts.notes !== undefined) setForUpdate.notes = opts.notes;
const [row] = await tx
.insert(interestBerths)
.values({
interestId,
berthId,
isPrimary: opts.isPrimary ?? false,
isSpecificInterest: opts.isSpecificInterest ?? true,
isInEoiBundle: opts.isInEoiBundle ?? false,
addedBy: opts.addedBy,
notes: opts.notes,
})
.onConflictDoUpdate({
target: [interestBerths.interestId, interestBerths.berthId],
set: setForUpdate,
})
.returning();
return row!;
return upsertInterestBerthTx(tx, interestId, berthId, opts);
});
}
/**
* Transaction-bound variant of {@link upsertInterestBerth}. Use this when the
* junction write must roll back together with another write (e.g. inserting
* the parent interest row in the same transaction).
*/
export async function upsertInterestBerthTx(
tx: DbOrTx,
interestId: string,
berthId: string,
opts: AddOrUpdateOpts = {},
): Promise<InterestBerth> {
if (opts.isPrimary === true) {
await tx
.update(interestBerths)
.set({ isPrimary: false })
.where(and(eq(interestBerths.interestId, interestId), eq(interestBerths.isPrimary, true)));
}
const setForUpdate: Partial<InterestBerth> = {};
if (opts.isPrimary !== undefined) setForUpdate.isPrimary = opts.isPrimary;
if (opts.isSpecificInterest !== undefined)
setForUpdate.isSpecificInterest = opts.isSpecificInterest;
if (opts.isInEoiBundle !== undefined) setForUpdate.isInEoiBundle = opts.isInEoiBundle;
if (opts.addedBy !== undefined) setForUpdate.addedBy = opts.addedBy;
if (opts.notes !== undefined) setForUpdate.notes = opts.notes;
const [row] = await tx
.insert(interestBerths)
.values({
interestId,
berthId,
isPrimary: opts.isPrimary ?? false,
isSpecificInterest: opts.isSpecificInterest ?? true,
isInEoiBundle: opts.isInEoiBundle ?? false,
addedBy: opts.addedBy,
notes: opts.notes,
})
.onConflictDoUpdate({
target: [interestBerths.interestId, interestBerths.berthId],
set: setForUpdate,
})
.returning();
return row!;
}
/** Promote a single berth to primary for the interest. Demotes any prior primary. */
export async function setPrimaryBerth(interestId: string, berthId: string): Promise<void> {
await upsertInterestBerth(interestId, berthId, { isPrimary: true });

View File

@@ -2,7 +2,7 @@ import { and, count, eq, gte, isNull } from 'drizzle-orm';
import { db } from '@/lib/db';
import { redis } from '@/lib/redis';
import { interests, interestNotes } from '@/lib/db/schema/interests';
import { interests, interestBerths, interestNotes } from '@/lib/db/schema/interests';
import { reminders } from '@/lib/db/schema/operations';
import { emailThreads } from '@/lib/db/schema/email';
import { logger } from '@/lib/logger';
@@ -134,7 +134,7 @@ export async function calculateInterestScore(
// 4. Engagement - notes, emails, reminders in last 30 days
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const [notesResult, remindersResult, emailResult] = await Promise.all([
const [notesResult, remindersResult, emailResult, berthLinkResult] = await Promise.all([
db
.select({ value: count() })
.from(interestNotes)
@@ -161,6 +161,13 @@ export async function calculateInterestScore(
gte(emailThreads.lastMessageAt, thirtyDaysAgo),
),
),
// Plan §3.4: any junction row counts as "berth linked", not just the
// primary - the score awards engagement for an interest that has *any*
// berth association at all.
db
.select({ value: count() })
.from(interestBerths)
.where(eq(interestBerths.interestId, interestId)),
]);
const notesCount = notesResult[0]?.value ?? 0;
@@ -172,8 +179,8 @@ export async function calculateInterestScore(
const remindersScore = Math.min(remindersCount * 10, 20);
const engagement = Math.min(notesScore + emailScore + remindersScore, 100);
// 5. Berth linked
const berthLinked = interest.berthId != null ? 25 : 0;
// 5. Berth linked - true when the interest has at least one junction row.
const berthLinked = (berthLinkResult[0]?.value ?? 0) > 0 ? 25 : 0;
// ── Normalise: max raw = 100+100+100+100+25 = 425 → /425 * 100 ──
const RAW_MAX = 425;

View File

@@ -1,7 +1,7 @@
import { and, desc, eq, inArray, isNull, sql } from 'drizzle-orm';
import { and, desc, eq, exists, inArray, isNull, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { interests, interestTags, interestNotes } from '@/lib/db/schema/interests';
import { interests, interestBerths, interestTags, interestNotes } from '@/lib/db/schema/interests';
import { reminders } from '@/lib/db/schema/operations';
import { clients, clientAddresses, clientContacts } from '@/lib/db/schema/clients';
import { berths } from '@/lib/db/schema/berths';
@@ -12,6 +12,13 @@ import { createAuditLog, type AuditMeta } from '@/lib/audit';
import { NotFoundError, ConflictError, ValidationError } from '@/lib/errors';
import { emitToRoom } from '@/lib/socket/server';
import { setEntityTags } from '@/lib/services/entity-tags.helper';
import {
getPrimaryBerth,
getPrimaryBerthsForInterests,
removeInterestBerth,
upsertInterestBerth,
upsertInterestBerthTx,
} from '@/lib/services/interest-berths.service';
import { buildListQuery } from '@/lib/db/query-builder';
import { diffEntity } from '@/lib/entity-diff';
import { softDelete, restore, withTransaction } from '@/lib/db/utils';
@@ -151,7 +158,19 @@ export async function listInterests(portId: string, query: ListInterestsInput) {
filters.push(eq(interests.yachtId, yachtId));
}
if (berthId) {
filters.push(eq(interests.berthId, berthId));
// EXISTS subquery against the junction: matches whether or not the
// berth is the interest's primary, mirroring "this berth is linked
// to this interest in any role" semantics from plan §3.4.
filters.push(
exists(
db
.select({ one: sql`1` })
.from(interestBerths)
.where(
and(eq(interestBerths.interestId, interests.id), eq(interestBerths.berthId, berthId)),
),
),
);
}
if (pipelineStage && pipelineStage.length > 0) {
filters.push(inArray(interests.pipelineStage, pipelineStage));
@@ -209,20 +228,11 @@ export async function listInterests(portId: string, query: ListInterestsInput) {
archivedAtColumn: interests.archivedAt,
});
// Join client names, berth mooring numbers, and yacht names.
const interestIds = (
result.data as Array<{ id: string; clientId: string; berthId: string | null }>
).map((i) => i.id);
// Join client names, primary-berth mooring numbers, and yacht names.
const interestIds = (result.data as Array<{ id: string; clientId: string }>).map((i) => i.id);
const clientIds = [
...new Set((result.data as Array<{ clientId: string }>).map((i) => i.clientId)),
];
const berthIds = [
...new Set(
(result.data as Array<{ berthId: string | null }>)
.map((i) => i.berthId)
.filter(Boolean) as string[],
),
];
const yachtIds = [
...new Set(
(result.data as Array<{ yachtId: string | null }>)
@@ -232,7 +242,6 @@ export async function listInterests(portId: string, query: ListInterestsInput) {
];
let clientsMap: Record<string, string> = {};
let berthsMap: Record<string, string> = {};
let yachtsMap: Record<string, string> = {};
const tagsByInterestId: Record<string, Array<{ id: string; name: string; color: string }>> = {};
const notesCountByInterestId: Record<string, number> = {};
@@ -245,13 +254,10 @@ export async function listInterests(portId: string, query: ListInterestsInput) {
clientsMap = Object.fromEntries(clientRows.map((c) => [c.id, c.fullName]));
}
if (berthIds.length > 0) {
const berthRows = await db
.select({ id: berths.id, mooringNumber: berths.mooringNumber })
.from(berths)
.where(inArray(berths.id, berthIds));
berthsMap = Object.fromEntries(berthRows.map((b) => [b.id, b.mooringNumber]));
}
// Primary-berth lookup via the interest_berths junction. Single round-trip
// by interestId list - see plan §3.4: every "the berth for this interest"
// surface resolves through getPrimaryBerth(...) rather than a column read.
const primaryBerthMap = await getPrimaryBerthsForInterests(interestIds);
if (yachtIds.length > 0) {
const yachtRows = await db
@@ -292,14 +298,18 @@ export async function listInterests(portId: string, query: ListInterestsInput) {
}
}
const data = (result.data as Array<Record<string, unknown>>).map((i) => ({
...i,
clientName: clientsMap[i.clientId as string] ?? null,
berthMooringNumber: i.berthId ? (berthsMap[i.berthId as string] ?? null) : null,
yachtName: i.yachtId ? (yachtsMap[i.yachtId as string] ?? null) : null,
tags: tagsByInterestId[i.id as string] ?? [],
notesCount: notesCountByInterestId[i.id as string] ?? 0,
}));
const data = (result.data as Array<Record<string, unknown>>).map((i) => {
const primary = primaryBerthMap.get(i.id as string) ?? null;
return {
...i,
clientName: clientsMap[i.clientId as string] ?? null,
berthId: primary?.berthId ?? null,
berthMooringNumber: primary?.mooringNumber ?? null,
yachtName: i.yachtId ? (yachtsMap[i.yachtId as string] ?? null) : null,
tags: tagsByInterestId[i.id as string] ?? [],
notesCount: notesCountByInterestId[i.id as string] ?? 0,
};
});
return { data, total: result.total };
}
@@ -351,14 +361,10 @@ export async function getInterestById(id: string, portId: string) {
)
.limit(1);
let berthMooringNumber: string | null = null;
if (interest.berthId) {
const [berthRow] = await db
.select({ mooringNumber: berths.mooringNumber })
.from(berths)
.where(eq(berths.id, interest.berthId));
berthMooringNumber = berthRow?.mooringNumber ?? null;
}
// Primary berth comes from the interest_berths junction (plan §3.4).
const primaryBerth = await getPrimaryBerth(interest.id);
const berthId = primaryBerth?.berthId ?? null;
const berthMooringNumber = primaryBerth?.mooringNumber ?? null;
const tagRows = await db
.select({ id: tags.id, name: tags.name, color: tags.color })
@@ -401,6 +407,7 @@ export async function getInterestById(id: string, portId: string) {
clientPrimaryPhone: phoneContact?.value ?? null,
clientPrimaryPhoneE164: phoneContact?.valueE164 ?? null,
clientHasAddress: !!addressRow,
berthId,
berthMooringNumber,
tags: tagRows,
notesCount,
@@ -422,7 +429,7 @@ export async function createInterest(portId: string, data: CreateInterestInput,
await assertYachtBelongsToClient(portId, data.yachtId, data.clientId);
}
const { tagIds, ...interestData } = data;
const { tagIds, berthId: inputBerthId, ...interestData } = data;
// BR-011: auto-promote leadCategory
const resolvedLeadCategory = await resolveLeadCategory(
@@ -447,6 +454,18 @@ export async function createInterest(portId: string, data: CreateInterestInput,
.values(tagIds.map((tagId) => ({ interestId: interest!.id, tagId })));
}
// Plan §3.4: when berthId is provided we materialise it as a junction
// row inside the same transaction so an interest is never created
// without its primary-berth link surviving rollback.
if (inputBerthId) {
await upsertInterestBerthTx(tx, interest!.id, inputBerthId, {
isPrimary: true,
isSpecificInterest: true,
isInEoiBundle: false,
addedBy: meta.userId,
});
}
return interest!;
});
@@ -464,7 +483,7 @@ export async function createInterest(portId: string, data: CreateInterestInput,
emitToRoom(`port:${portId}`, 'interest:created', {
interestId: result.id,
clientId: result.clientId,
berthId: result.berthId ?? null,
berthId: inputBerthId ?? null,
source: result.source ?? '',
});
@@ -494,8 +513,13 @@ export async function updateInterest(
throw new NotFoundError('Interest');
}
// berthId no longer lives on the interests row - resolve current primary
// via the junction so we know whether the caller is asking for a change.
const currentPrimary = await getPrimaryBerth(id);
const currentBerthId = currentPrimary?.berthId ?? null;
await assertInterestFksInPort(portId, {
berthId: data.berthId && data.berthId !== existing.berthId ? data.berthId : null,
berthId: data.berthId && data.berthId !== currentBerthId ? data.berthId : null,
yachtId: data.yachtId && data.yachtId !== existing.yachtId ? data.yachtId : null,
});
@@ -513,10 +537,14 @@ export async function updateInterest(
)) as typeof data.leadCategory;
}
const updateData = { ...data, leadCategory: resolvedLeadCategory };
// Strip berthId out of the row write - the column was removed by the
// junction-migration. We keep the value for diff/audit purposes and
// dispatch the junction write separately.
const { berthId: incomingBerthId, ...rowData } = data;
const updateData = { ...rowData, leadCategory: resolvedLeadCategory };
const { diff } = diffEntity(
existing as Record<string, unknown>,
updateData as Record<string, unknown>,
{ ...(existing as Record<string, unknown>), berthId: currentBerthId },
{ ...(updateData as Record<string, unknown>), berthId: incomingBerthId ?? currentBerthId },
);
const [updated] = await db
@@ -525,6 +553,20 @@ export async function updateInterest(
.where(and(eq(interests.id, id), eq(interests.portId, portId)))
.returning();
// Apply primary-berth change through the junction so the unique
// partial index is respected and the previous primary is demoted.
if ('berthId' in data && incomingBerthId !== currentBerthId) {
if (incomingBerthId) {
await upsertInterestBerth(id, incomingBerthId, {
isPrimary: true,
isSpecificInterest: true,
addedBy: meta.userId,
});
} else if (currentBerthId) {
await removeInterestBerth(id, currentBerthId);
}
}
void createAuditLog({
userId: meta.userId,
portId,
@@ -888,9 +930,19 @@ export async function linkBerth(id: string, portId: string, berthId: string, met
await assertInterestFksInPort(portId, { berthId });
const previousPrimary = await getPrimaryBerth(id);
const oldBerthId = previousPrimary?.berthId ?? null;
await upsertInterestBerth(id, berthId, {
isPrimary: true,
isSpecificInterest: true,
addedBy: meta.userId,
});
// Touch updatedAt so list/sort surfaces still reflect the change.
const [updated] = await db
.update(interests)
.set({ berthId, updatedAt: new Date() })
.set({ updatedAt: new Date() })
.where(and(eq(interests.id, id), eq(interests.portId, portId)))
.returning();
@@ -900,7 +952,7 @@ export async function linkBerth(id: string, portId: string, berthId: string, met
action: 'update',
entityType: 'interest',
entityId: id,
oldValue: { berthId: existing.berthId },
oldValue: { berthId: oldBerthId },
newValue: { berthId },
metadata: { type: 'berth_linked' },
ipAddress: meta.ipAddress,
@@ -925,11 +977,16 @@ export async function unlinkBerth(id: string, portId: string, meta: AuditMeta) {
throw new NotFoundError('Interest');
}
const oldBerthId = existing.berthId;
const previousPrimary = await getPrimaryBerth(id);
const oldBerthId = previousPrimary?.berthId ?? null;
if (oldBerthId) {
await removeInterestBerth(id, oldBerthId);
}
const [updated] = await db
.update(interests)
.set({ berthId: null, updatedAt: new Date() })
.set({ updatedAt: new Date() })
.where(and(eq(interests.id, id), eq(interests.portId, portId)))
.returning();

View File

@@ -3,6 +3,7 @@ import { and, eq, count, inArray, isNull, desc } from 'drizzle-orm';
import { db } from '@/lib/db';
import { clients } from '@/lib/db/schema/clients';
import { interests } from '@/lib/db/schema/interests';
import { getPrimaryBerthsForInterests } from '@/lib/services/interest-berths.service';
import { documents, files } from '@/lib/db/schema/documents';
import { invoices } from '@/lib/db/schema/financial';
import { berths } from '@/lib/db/schema/berths';
@@ -123,7 +124,6 @@ export async function getClientInterests(
id: interests.id,
pipelineStage: interests.pipelineStage,
leadCategory: interests.leadCategory,
berthId: interests.berthId,
eoiStatus: interests.eoiStatus,
contractStatus: interests.contractStatus,
dateFirstContact: interests.dateFirstContact,
@@ -133,31 +133,39 @@ export async function getClientInterests(
.where(and(eq(interests.clientId, clientId), eq(interests.portId, portId)))
.orderBy(interests.createdAt);
// Fetch berth details for interests that have a berth
const berthIds = rows.flatMap((r) => (r.berthId ? [r.berthId] : []));
const berthMap = new Map<string, { mooringNumber: string; area: string | null }>();
// Resolve each interest's primary berth via the junction (plan §3.4) -
// single round-trip for the whole list.
const primaryBerthMap = await getPrimaryBerthsForInterests(rows.map((r) => r.id));
const primaryBerthIds = Array.from(
new Set(Array.from(primaryBerthMap.values(), (b) => b.berthId)),
);
if (berthIds.length > 0) {
const berthMap = new Map<string, { mooringNumber: string; area: string | null }>();
if (primaryBerthIds.length > 0) {
const berthRows = await db
.select({ id: berths.id, mooringNumber: berths.mooringNumber, area: berths.area })
.from(berths)
.where(eq(berths.portId, portId));
.where(and(eq(berths.portId, portId), inArray(berths.id, primaryBerthIds)));
for (const b of berthRows) {
berthMap.set(b.id, { mooringNumber: b.mooringNumber, area: b.area });
}
}
return rows.map((r) => ({
id: r.id,
pipelineStage: r.pipelineStage,
leadCategory: r.leadCategory,
berthMooringNumber: r.berthId ? (berthMap.get(r.berthId)?.mooringNumber ?? null) : null,
berthArea: r.berthId ? (berthMap.get(r.berthId)?.area ?? null) : null,
eoiStatus: r.eoiStatus,
contractStatus: r.contractStatus,
dateFirstContact: r.dateFirstContact,
createdAt: r.createdAt,
}));
return rows.map((r) => {
const primary = primaryBerthMap.get(r.id);
const berthMeta = primary ? (berthMap.get(primary.berthId) ?? null) : null;
return {
id: r.id,
pipelineStage: r.pipelineStage,
leadCategory: r.leadCategory,
berthMooringNumber: berthMeta?.mooringNumber ?? null,
berthArea: berthMeta?.area ?? null,
eoiStatus: r.eoiStatus,
contractStatus: r.contractStatus,
dateFirstContact: r.dateFirstContact,
createdAt: r.createdAt,
};
});
}
// ─── Documents ────────────────────────────────────────────────────────────────

View File

@@ -1,9 +1,13 @@
import { and, desc, eq, inArray, isNull, or } from 'drizzle-orm';
import { and, desc, eq, exists, inArray, isNull, or, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { clients, clientContacts } from '@/lib/db/schema/clients';
import { interests } from '@/lib/db/schema/interests';
import { interests, interestBerths } from '@/lib/db/schema/interests';
import { berths, berthWaitingList, berthMaintenanceLog } from '@/lib/db/schema/berths';
import {
getPrimaryBerth,
getPrimaryBerthsForInterests,
} from '@/lib/services/interest-berths.service';
import { yachts } from '@/lib/db/schema/yachts';
import { companyMemberships } from '@/lib/db/schema/companies';
import { auditLogs } from '@/lib/db/schema/system';
@@ -61,22 +65,18 @@ export async function exportClientPdf(clientId: string, portId: string): Promise
.orderBy(desc(auditLogs.createdAt))
.limit(20);
// Enrich interests with berth mooring numbers
const berthIds = interestList.map((i) => i.berthId).filter(Boolean) as string[];
// Enrich interests with primary-berth mooring numbers (plan §3.4 - the
// legacy interest.berth_id column has been replaced by the junction).
const primaryBerthMap = await getPrimaryBerthsForInterests(interestList.map((i) => i.id));
let berthsMap: Record<string, string> = {};
if (berthIds.length > 0) {
const berthRows = await db
.select({ id: berths.id, mooringNumber: berths.mooringNumber })
.from(berths)
.where(inArray(berths.id, berthIds));
berthsMap = Object.fromEntries(berthRows.map((b) => [b.id, b.mooringNumber]));
}
const enrichedInterests = interestList.map((i) => ({
...i,
berthMooringNumber: i.berthId ? (berthsMap[i.berthId] ?? null) : null,
}));
const enrichedInterests = interestList.map((i) => {
const primary = primaryBerthMap.get(i.id);
return {
...i,
berthId: primary?.berthId ?? null,
berthMooringNumber: primary?.mooringNumber ?? null,
};
});
// Yachts owned by the client directly OR by a company they're an active
// member of. Active membership = no end date.
@@ -169,11 +169,24 @@ export async function exportBerthPdf(berthId: string, portId: string): Promise<U
.orderBy(desc(berthMaintenanceLog.performedDate))
.limit(20);
// Linked interests
// Linked interests - "this berth is linked to this interest in any role"
// (plan §3.4 - EXISTS against the junction).
const linkedInterests = await db
.select()
.from(interests)
.where(and(eq(interests.berthId, berthId), eq(interests.portId, portId)))
.where(
and(
eq(interests.portId, portId),
exists(
db
.select({ one: sql`1` })
.from(interestBerths)
.where(
and(eq(interestBerths.interestId, interests.id), eq(interestBerths.berthId, berthId)),
),
),
),
)
.orderBy(desc(interests.updatedAt))
.limit(20);
@@ -204,9 +217,11 @@ export async function exportInterestPdf(interestId: string, portId: string): Pro
db.query.ports.findFirst({ where: eq(ports.id, portId) }),
]);
// Resolve primary berth via the junction (plan §3.4).
const primaryBerth = await getPrimaryBerth(interest.id);
let berth = null;
if (interest.berthId) {
berth = await db.query.berths.findFirst({ where: eq(berths.id, interest.berthId) });
if (primaryBerth?.berthId) {
berth = await db.query.berths.findFirst({ where: eq(berths.id, primaryBerth.berthId) });
}
let yacht = null;

View File

@@ -1,7 +1,7 @@
import { and, count, eq, gte, isNull, lte, sql, sum } from 'drizzle-orm';
import { db } from '@/lib/db';
import { interests } from '@/lib/db/schema/interests';
import { interests, interestBerths } from '@/lib/db/schema/interests';
import { berths } from '@/lib/db/schema/berths';
import { auditLogs } from '@/lib/db/schema/system';
@@ -64,7 +64,7 @@ export async function fetchPipelineData(
stageCountMap[row.stage] = row.count;
}
// Top 10 interests by berth price (via join)
// Top 10 interests by berth price (via primary-berth junction join, plan §3.4).
const topInterestsRows = await db
.select({
id: interests.id,
@@ -73,7 +73,11 @@ export async function fetchPipelineData(
berthPrice: berths.price,
})
.from(interests)
.leftJoin(berths, eq(interests.berthId, berths.id))
.leftJoin(
interestBerths,
and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)),
)
.leftJoin(berths, eq(interestBerths.berthId, berths.id))
.where(and(eq(interests.portId, portId), isNull(interests.archivedAt)))
.orderBy(sql`${berths.price} DESC NULLS LAST`)
.limit(10);
@@ -96,14 +100,20 @@ export async function fetchRevenueData(
portId: string,
_params: Record<string, unknown>,
): Promise<RevenueData> {
// Sum berth prices grouped by pipeline stage
// Sum berth prices grouped by pipeline stage. Reads the primary-berth link
// via interest_berths (plan §3.4) - non-primary junction rows do not
// contribute to the revenue rollup.
const stageRevenue = await db
.select({
stage: interests.pipelineStage,
revenue: sum(berths.price),
})
.from(interests)
.leftJoin(berths, eq(interests.berthId, berths.id))
.leftJoin(
interestBerths,
and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)),
)
.leftJoin(berths, eq(interestBerths.berthId, berths.id))
.where(and(eq(interests.portId, portId), isNull(interests.archivedAt)))
.groupBy(interests.pipelineStage);
@@ -112,11 +122,15 @@ export async function fetchRevenueData(
stageRevenueMap[row.stage] = row.revenue ? String(row.revenue) : '0';
}
// Total revenue from completed interests
// Total revenue from completed interests (primary-berth link only).
const completedRevenue = await db
.select({ total: sum(berths.price) })
.from(interests)
.leftJoin(berths, eq(interests.berthId, berths.id))
.leftJoin(
interestBerths,
and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)),
)
.leftJoin(berths, eq(interestBerths.berthId, berths.id))
.where(
and(
eq(interests.portId, portId),
@@ -146,10 +160,7 @@ export async function fetchActivityData(
const fromDate = dateFrom ? new Date(dateFrom) : thirtyDaysAgo;
const conditions = [
eq(auditLogs.portId, portId),
gte(auditLogs.createdAt, fromDate),
];
const conditions = [eq(auditLogs.portId, portId), gte(auditLogs.createdAt, fromDate)];
if (dateTo) {
conditions.push(lte(auditLogs.createdAt, new Date(dateTo)));
@@ -205,8 +216,7 @@ export async function fetchOccupancyData(
totalBerths += row.count;
}
const occupiedCount =
(statusCountMap['under_offer'] ?? 0) + (statusCountMap['sold'] ?? 0);
const occupiedCount = (statusCountMap['under_offer'] ?? 0) + (statusCountMap['sold'] ?? 0);
const occupancyRate = totalBerths > 0 ? (occupiedCount / totalBerths) * 100 : 0;
return {

View File

@@ -75,7 +75,9 @@ export async function search(portId: string, query: string): Promise<SearchResul
LIMIT 10
`),
// Interests: JOIN to clients and berths, ILIKE search
// Interests: JOIN to clients and primary-berth via interest_berths
// (plan §3.4 - the legacy interests.berth_id column has been replaced
// by the junction).
db.execute<{
id: string;
full_name: string;
@@ -89,7 +91,9 @@ export async function search(portId: string, query: string): Promise<SearchResul
i.pipeline_stage
FROM interests i
JOIN clients c ON i.client_id = c.id
LEFT JOIN berths b ON i.berth_id = b.id
LEFT JOIN interest_berths ib
ON ib.interest_id = i.id AND ib.is_primary = true
LEFT JOIN berths b ON ib.berth_id = b.id
WHERE i.port_id = ${portId}
AND i.archived_at IS NULL
AND (