/** * Client merge service - atomically combines two client records. * * Used by: * - /admin/duplicates review queue (when an admin confirms a merge) * - the at-create suggestion path ("use existing client") - though * that path uses the lighter `attachInterestToClient` and never * actually merges two pre-existing clients * - the migration script's `--apply` (eventually) * * NOT reversible: a merge is permanent. Every merge writes a * `client_merge_log` row containing a snapshot of the loser's pre-merge * state, but this is a forensic/audit record only — there is NO * `unmergeClients` implementation, and the child rows reattached to the * winner are not restorable from the snapshot. Operators must treat * merge as a destructive, one-way operation. (The original design called * for a 7-day `dedup_undo_window_days` reversibility window; that undo * pathway was never built, so the setting has no effect and the snapshot * is retained purely as an audit trail.) * * Design reference: docs/superpowers/specs/2026-05-03-dedup-and-migration-design.md §6. */ import { and, eq, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { clients, clientContacts, clientAddresses, clientNotes, clientTags, clientRelationships, clientMergeLog, clientMergeCandidates, } from '@/lib/db/schema/clients'; import { interests } from '@/lib/db/schema/interests'; import { berthTenancies } from '@/lib/db/schema/tenancies'; import { payments } from '@/lib/db/schema/pipeline'; import { companyMemberships } from '@/lib/db/schema/companies'; import { yachts } from '@/lib/db/schema/yachts'; import { invoices } from '@/lib/db/schema/financial'; import { auditLogs } from '@/lib/db/schema/system'; import { ConflictError, NotFoundError, ValidationError } from '@/lib/errors'; // ─── Public API ───────────────────────────────────────────────────────────── export interface MergeFieldChoices { /** Per-field overrides - `winner` keeps the surviving client's value; * `loser` copies the loser's value over. Fields not listed default * to `winner` (no change). */ fullName?: 'winner' | 'loser'; nationalityIso?: 'winner' | 'loser'; preferredContactMethod?: 'winner' | 'loser'; preferredLanguage?: 'winner' | 'loser'; timezone?: 'winner' | 'loser'; source?: 'winner' | 'loser'; sourceDetails?: 'winner' | 'loser'; } export interface MergeOptions { winnerId: string; loserId: string; /** ID of the user performing the merge (for audit + clientMergeLog.mergedBy). */ mergedBy: string; /** * Caller's port - defends against any future caller forgetting to * pre-validate. Today the sole route caller pre-checks via ctx.portId, * so this is defense-in-depth: a future bulk-import or CLI caller * that omits the check still cannot trigger a cross-tenant merge. * Optional for backwards compat; mergeClients enforces same-port * regardless, but when set the assertion is "winner.portId === callerPortId" * (and by transitive same-port rule, loser too). */ callerPortId?: string; /** Per-field choice overrides. Multi-value fields (contacts, addresses, * notes, tags) are always preserved from both sides; this only * affects single-value scalar fields on the `clients` row. */ fieldChoices?: MergeFieldChoices; } export interface MergeResult { mergeLogId: string; movedRows: { interests: number; contacts: number; addresses: number; notes: number; tags: number; relationships: number; tenancies: number; payments: number; companyMemberships: number; yachts: number; invoices: number; }; } /** * Atomically merge `loserId` into `winnerId`. Throws if: * - either id doesn't exist or belongs to a different port * - the loser has already been merged (mergedIntoClientId set) * - the winner is itself archived */ export async function mergeClients(opts: MergeOptions): Promise { if (opts.winnerId === opts.loserId) { throw new ValidationError('Cannot merge a client into itself'); } return await db.transaction(async (tx) => { // ── Lock both rows for the duration. The first FOR UPDATE that // arrives wins; a concurrent second merge of the same loser // will see `mergedIntoClientId` set and bail. ────────────────────── const [winnerRow] = await tx .select() .from(clients) .where(eq(clients.id, opts.winnerId)) .for('update'); const [loserRow] = await tx .select() .from(clients) .where(eq(clients.id, opts.loserId)) .for('update'); if (!winnerRow) throw new NotFoundError('client'); if (!loserRow) throw new NotFoundError('client'); if (winnerRow.portId !== loserRow.portId) { throw new ValidationError('Cannot merge clients across different ports'); } if (opts.callerPortId && winnerRow.portId !== opts.callerPortId) { // Defense-in-depth: even though the route already pre-checks, a // future caller that wires this service from a CLI / bulk-import // path would otherwise be able to merge any two clients sharing a // port (or, if the same-port check above were ever weakened, two // arbitrary clients). Refuse upfront. throw new ValidationError('Cannot merge clients in another port'); } if (loserRow.mergedIntoClientId) { throw new ConflictError('That client has already been merged into another record.'); } if (winnerRow.archivedAt) { throw new ConflictError('Cannot merge into an archived client'); } // ── Snapshot the loser's full state before any mutation. Written to // `client_merge_log.mergeDetails` as a forensic/audit record only; // NOT used to restore — merge is one-way (no `unmergeClients`). ───── const loserContacts = await tx .select() .from(clientContacts) .where(eq(clientContacts.clientId, opts.loserId)); const loserAddresses = await tx .select() .from(clientAddresses) .where(eq(clientAddresses.clientId, opts.loserId)); const loserNotes = await tx .select() .from(clientNotes) .where(eq(clientNotes.clientId, opts.loserId)); const loserTags = await tx .select() .from(clientTags) .where(eq(clientTags.clientId, opts.loserId)); const loserInterests = await tx .select({ id: interests.id }) .from(interests) .where(eq(interests.clientId, opts.loserId)); const loserTenancies = await tx .select({ id: berthTenancies.id }) .from(berthTenancies) .where(eq(berthTenancies.clientId, opts.loserId)); const loserRelationshipsAsA = await tx .select() .from(clientRelationships) .where(eq(clientRelationships.clientAId, opts.loserId)); const loserRelationshipsAsB = await tx .select() .from(clientRelationships) .where(eq(clientRelationships.clientBId, opts.loserId)); const snapshot = { loser: loserRow, contacts: loserContacts, addresses: loserAddresses, notes: loserNotes, tags: loserTags, interests: loserInterests.map((r) => r.id), tenancies: loserTenancies.map((r) => r.id), relationshipsAsA: loserRelationshipsAsA, relationshipsAsB: loserRelationshipsAsB, fieldChoices: opts.fieldChoices ?? {}, mergedAt: new Date().toISOString(), }; // ── Apply field choices on the winner. We only touch fields the // caller explicitly asked to copy from the loser; everything // else stays as-is. ──────────────────────────────────────────────── const fieldUpdates: Partial = {}; if (opts.fieldChoices?.fullName === 'loser') fieldUpdates.fullName = loserRow.fullName; if (opts.fieldChoices?.nationalityIso === 'loser') fieldUpdates.nationalityIso = loserRow.nationalityIso; if (opts.fieldChoices?.preferredContactMethod === 'loser') fieldUpdates.preferredContactMethod = loserRow.preferredContactMethod; if (opts.fieldChoices?.preferredLanguage === 'loser') fieldUpdates.preferredLanguage = loserRow.preferredLanguage; if (opts.fieldChoices?.timezone === 'loser') fieldUpdates.timezone = loserRow.timezone; if (opts.fieldChoices?.source === 'loser') fieldUpdates.source = loserRow.source; if (opts.fieldChoices?.sourceDetails === 'loser') fieldUpdates.sourceDetails = loserRow.sourceDetails; if (Object.keys(fieldUpdates).length > 0) { await tx .update(clients) .set({ ...fieldUpdates, updatedAt: new Date() }) .where(eq(clients.id, opts.winnerId)); } // ── Reattach. Each table that points at the loser via clientId // gets pointed at the winner instead. ───────────────────────────── const movedInterests = ( await tx .update(interests) .set({ clientId: opts.winnerId, updatedAt: new Date() }) .where(eq(interests.clientId, opts.loserId)) .returning({ id: interests.id }) ).length; const movedTenancies = ( await tx .update(berthTenancies) .set({ clientId: opts.winnerId, updatedAt: new Date() }) .where(eq(berthTenancies.clientId, opts.loserId)) .returning({ id: berthTenancies.id }) ).length; // Contacts: move loser's contacts to winner, but DON'T duplicate any // already-present (channel, value) pair. Loser-only ones get // demoted to non-primary so the winner's primary stays intact. const winnerContacts = await tx .select({ channel: clientContacts.channel, value: clientContacts.value }) .from(clientContacts) .where(eq(clientContacts.clientId, opts.winnerId)); const winnerContactKeys = new Set( winnerContacts.map((c) => `${c.channel}::${c.value.toLowerCase()}`), ); let movedContacts = 0; for (const c of loserContacts) { const key = `${c.channel}::${c.value.toLowerCase()}`; if (winnerContactKeys.has(key)) { // Winner already has this contact - drop loser's row (cascade // will clean up when loser is archived). The snapshot records it // for audit, but this drop is not reversible (merge is one-way). continue; } await tx .update(clientContacts) .set({ clientId: opts.winnerId, isPrimary: false, updatedAt: new Date() }) .where(eq(clientContacts.id, c.id)); movedContacts += 1; } // Addresses: same shape as contacts, but uniqueness is harder to // detect cleanly (free-text street). Just move them all and let the // user dedupe in the UI later. const movedAddresses = ( await tx .update(clientAddresses) .set({ clientId: opts.winnerId, isPrimary: false, updatedAt: new Date() }) .where(eq(clientAddresses.clientId, opts.loserId)) .returning({ id: clientAddresses.id }) ).length; const movedNotes = ( await tx .update(clientNotes) .set({ clientId: opts.winnerId, updatedAt: new Date() }) .where(eq(clientNotes.clientId, opts.loserId)) .returning({ id: clientNotes.id }) ).length; // Tags: copy any loser-only tag to the winner; drop overlap. const winnerTags = await tx .select({ tagId: clientTags.tagId }) .from(clientTags) .where(eq(clientTags.clientId, opts.winnerId)); const winnerTagSet = new Set(winnerTags.map((t) => t.tagId)); let movedTags = 0; for (const t of loserTags) { if (!winnerTagSet.has(t.tagId)) { await tx.insert(clientTags).values({ clientId: opts.winnerId, tagId: t.tagId }); movedTags += 1; } } await tx.delete(clientTags).where(eq(clientTags.clientId, opts.loserId)); // Relationships: rewrite each FK side to point at the winner. Keep // both sides regardless - even if A and B both end up as the same // person, the row is preserved for audit; the UI hides self-loops. const movedRelationships = ( await tx .update(clientRelationships) .set({ clientAId: opts.winnerId }) .where(eq(clientRelationships.clientAId, opts.loserId)) .returning({ id: clientRelationships.id }) ).length + ( await tx .update(clientRelationships) .set({ clientBId: opts.winnerId }) .where(eq(clientRelationships.clientBId, opts.loserId)) .returning({ id: clientRelationships.id }) ).length; // Payments: re-point every payment row from the loser to the winner. // Critical: payments.clientId is notNull onDelete:'cascade', so leaving // them on the archived loser would let a later hard-delete cascade away // the winner's financial history. Scoped to the port defensively. const movedPayments = ( await tx .update(payments) .set({ clientId: opts.winnerId }) .where(and(eq(payments.clientId, opts.loserId), eq(payments.portId, winnerRow.portId))) .returning({ id: payments.id }) ).length; // Company memberships: re-point, but dedup against the unique constraint // `unique_cm_exact` (companyId, clientId, role, startDate). If the winner // already has an equivalent membership, drop the loser's row instead of // re-pointing so we don't violate the unique index. const winnerMemberships = await tx .select({ companyId: companyMemberships.companyId, role: companyMemberships.role, startDate: companyMemberships.startDate, }) .from(companyMemberships) .where(eq(companyMemberships.clientId, opts.winnerId)); const winnerMembershipKeys = new Set( winnerMemberships.map( (m) => `${m.companyId}::${m.role}::${m.startDate ? m.startDate.toISOString() : ''}`, ), ); const loserMemberships = await tx .select({ id: companyMemberships.id, companyId: companyMemberships.companyId, role: companyMemberships.role, startDate: companyMemberships.startDate, }) .from(companyMemberships) .where(eq(companyMemberships.clientId, opts.loserId)); let movedCompanyMemberships = 0; for (const m of loserMemberships) { const key = `${m.companyId}::${m.role}::${m.startDate ? m.startDate.toISOString() : ''}`; if (winnerMembershipKeys.has(key)) { // Winner already has an equivalent membership - drop the loser's row. await tx.delete(companyMemberships).where(eq(companyMemberships.id, m.id)); continue; } await tx .update(companyMemberships) .set({ clientId: opts.winnerId, updatedAt: new Date() }) .where(eq(companyMemberships.id, m.id)); movedCompanyMemberships += 1; } // Yachts: re-point polymorphic ownership on the denormalized owner // columns only. yacht_ownership_history is handled separately. const movedYachts = ( await tx .update(yachts) .set({ currentOwnerId: opts.winnerId, updatedAt: new Date() }) .where( and( eq(yachts.currentOwnerType, 'client'), eq(yachts.currentOwnerId, opts.loserId), eq(yachts.portId, winnerRow.portId), ), ) .returning({ id: yachts.id }) ).length; // Invoices: re-point polymorphic billing entity from loser to winner. const movedInvoices = ( await tx .update(invoices) .set({ billingEntityId: opts.winnerId, updatedAt: new Date() }) .where( and( eq(invoices.billingEntityType, 'client'), eq(invoices.billingEntityId, opts.loserId), eq(invoices.portId, winnerRow.portId), ), ) .returning({ id: invoices.id }) ).length; // ── Archive the loser. The row stays in the DB (soft-archived, not // deleted) so `mergedIntoClientId` can act as the redirect pointer // for any stragglers (links / direct queries / saved views). This // is a permanent redirect — the loser is never un-archived by a // reverse-merge, as no unmerge pathway exists. ───────────────────── await tx .update(clients) .set({ archivedAt: new Date(), mergedIntoClientId: opts.winnerId, updatedAt: new Date(), }) .where(eq(clients.id, opts.loserId)); // ── Mark any open merge candidate row for this pair as resolved. ─── await tx .update(clientMergeCandidates) .set({ status: 'merged', resolvedAt: new Date(), resolvedBy: opts.mergedBy, }) .where( and( eq(clientMergeCandidates.portId, winnerRow.portId), // pair stored in canonical order - match either direction sql`( (${clientMergeCandidates.clientAId} = ${opts.winnerId} AND ${clientMergeCandidates.clientBId} = ${opts.loserId}) OR (${clientMergeCandidates.clientAId} = ${opts.loserId} AND ${clientMergeCandidates.clientBId} = ${opts.winnerId}) )`, ), ); // ── Write the merge log + audit log. ──────────────────────────────── const [logRow] = await tx .insert(clientMergeLog) .values({ portId: winnerRow.portId, survivingClientId: opts.winnerId, mergedClientId: opts.loserId, mergedBy: opts.mergedBy, mergeDetails: snapshot, }) .returning({ id: clientMergeLog.id }); await tx.insert(auditLogs).values({ portId: winnerRow.portId, userId: opts.mergedBy, entityType: 'client', entityId: opts.winnerId, action: 'merge', newValue: { loserId: opts.loserId, loserName: loserRow.fullName, movedInterests, movedTenancies, movedContacts, movedAddresses, movedPayments, movedCompanyMemberships, movedYachts, movedInvoices, }, }); return { mergeLogId: logRow!.id, movedRows: { interests: movedInterests, contacts: movedContacts, addresses: movedAddresses, notes: movedNotes, tags: movedTags, relationships: movedRelationships, tenancies: movedTenancies, payments: movedPayments, companyMemberships: movedCompanyMemberships, yachts: movedYachts, invoices: movedInvoices, }, }; }); } // ─── Convenience: list merge candidates for a port ────────────────────────── export interface MergeCandidatePair { id: string; clientAId: string; clientBId: string; score: number; reasons: string[]; status: string; createdAt: Date; } /** Fetch pending merge candidate pairs for the admin review queue. */ export async function listPendingMergeCandidates(portId: string): Promise { const rows = await db .select() .from(clientMergeCandidates) .where( and(eq(clientMergeCandidates.portId, portId), eq(clientMergeCandidates.status, 'pending')), ) .orderBy(sql`${clientMergeCandidates.score} DESC`); return rows.map((r) => ({ id: r.id, clientAId: r.clientAId, clientBId: r.clientBId, score: r.score, reasons: Array.isArray(r.reasons) ? (r.reasons as string[]) : [], status: r.status, createdAt: r.createdAt, })); }