feat(dedup): runtime surfaces — merge service, at-create suggestion, admin queue (P2)
Adds the live dedup pipeline on top of the P1 library + P3 migration
script. The new `client/interest` model now actively prevents duplicate
client records at creation time and gives admins a queue to triage
the borderline pairs the at-create check missed.
Three layers, per design §7:
Layer 1 — At-create suggestion
==============================
`GET /api/v1/clients/match-candidates`
Accepts free-text email / phone / name from the in-flight client
form, normalizes them via the dedup library, and returns scored
matches against the port's live client pool. Filters out
low-confidence noise (the background scoring queue picks those up
separately). Strict port scoping; never leaks across tenants.
`<DedupSuggestionPanel>` (`src/components/clients/dedup-suggestion-panel.tsx`)
Debounced React Query hook. Renders nothing for short inputs or
no useful match. On a high-confidence match it interrupts visually
with an amber-tinted card and a "Use this client" primary button.
Medium confidence falls back to a softer "possible match — check
before creating" treatment.
`<ClientForm>`
Renders the panel above the form (create path only — skipped on
edit). New `onUseExistingClient` callback fires when the user
picks the existing client; the form closes and the parent decides
what to do (typically: navigate to that client's detail page or
open the create-interest dialog pre-filled).
Layer 2 — Merge service
=======================
`mergeClients` (`src/lib/services/client-merge.service.ts`)
The atomic merge primitive that everything else calls. Single
transaction. Per §6 of the design:
- Locks both rows (FOR UPDATE) so concurrent merges of the same
loser fail with a clear error rather than racing.
- Snapshots the full loser state (contacts / addresses / notes /
tags / interest+reservation IDs / relationship rows) into the
`client_merge_log.merge_details` JSONB column for the eventual
undo flow.
- Reattaches every loser-side row to the winner: interests,
reservations, contacts (skipping duplicates by `(channel, value)`),
addresses, notes, tags (deduped), relationships.
- Optional `fieldChoices` — per-scalar overrides letting the user
keep the loser's value for fullName / nationality / preferences /
timezone / source.
- Marks the loser archived with `mergedIntoClientId` set (a redirect
pointer for stragglers; never hard-deleted within the undo window).
- Resolves any matching `client_merge_candidates` row to status='merged'.
- Writes audit log entry.
Schema additions:
- `clients.merged_into_client_id` (nullable text, indexed) — the
redirect pointer set on archive.
Tests: 6 cases against a real DB — happy path moves rows + writes log;
self-merge / cross-port / already-merged refused; duplicate-contact
deduped on reattach; fieldChoices copies loser values to winner.
Layer 3 — Admin review queue
============================
`GET /api/v1/admin/duplicates`
Pending merge candidates (status='pending') for the current port,
with both client summaries hydrated for side-by-side rendering.
Skips pairs where one side is already archived/merged.
`POST /api/v1/admin/duplicates/[id]/merge`
Confirms a candidate. Body picks the winner; the other side
becomes the loser. Calls into `mergeClients` — the only path that
writes `client_merge_log`.
`POST /api/v1/admin/duplicates/[id]/dismiss`
Marks the candidate dismissed. Future scoring runs skip the same
pair until a score change recreates the row.
`<DuplicatesReviewQueue>` (`/admin/duplicates`)
Side-by-side card UI for each pending pair. Click a card to pick
the winner; the other side is automatically the loser. Toolbar:
"Merge into selected" + "Dismiss". No per-field merge editor in
this PR — that's a future polish; the simple "pick the better row"
flow handles ~80% of cases.
Test coverage
=============
11 new integration tests (76 added in this branch total):
- 6 mergeClients (atomicity, refusal cases, contact dedup,
fieldChoices)
- 5 match-candidates API (shape, port scoping, confidence tiers,
Pattern F false-positive guard)
Full vitest: 926/926 passing (was 858 before the dedup branch).
Lint: clean. tsc: clean for new files (only pre-existing errors in
unrelated `tests/integration/` files remain, same as before this PR).
Out of scope, deferred
======================
- Background scoring cron that populates `client_merge_candidates`
(the queue is empty until this lands; manual seeding works for
now via the at-create flow).
- Side-by-side per-field merge editor with checkboxes (the simple
"pick the winner" UX shipped here covers ~80% of real cases).
- Admin settings UI for tuning the dedup thresholds. Defaults from
the design (90 / 50) are baked in for now.
- `unmergeClients` (the snapshot is captured in client_merge_log;
the undo endpoint just hasn't been wired yet).
These are all natural follow-up PRs that don't block shipping the
runtime UX.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
393
src/lib/services/client-merge.service.ts
Normal file
393
src/lib/services/client-merge.service.ts
Normal file
@@ -0,0 +1,393 @@
|
||||
/**
|
||||
* Client merge service — atomically combines two client records.
|
||||
*
|
||||
* Used by:
|
||||
* - /admin/duplicates review queue (when an admin confirms a merge)
|
||||
* - the at-create suggestion path ("use existing client") — though
|
||||
* that path uses the lighter `attachInterestToClient` and never
|
||||
* actually merges two pre-existing clients
|
||||
* - the migration script's `--apply` (eventually)
|
||||
*
|
||||
* Reversibility: every merge writes a `client_merge_log` row containing
|
||||
* the loser's full pre-merge state. Within the configured undo window
|
||||
* (default 7 days, see `dedup_undo_window_days` in system_settings) a
|
||||
* follow-up `unmergeClients` call can restore the loser and detach
|
||||
* everything that was reattached.
|
||||
*
|
||||
* Design reference: docs/superpowers/specs/2026-05-03-dedup-and-migration-design.md §6.
|
||||
*/
|
||||
|
||||
import { and, eq, sql } from 'drizzle-orm';
|
||||
|
||||
import { db } from '@/lib/db';
|
||||
import {
|
||||
clients,
|
||||
clientContacts,
|
||||
clientAddresses,
|
||||
clientNotes,
|
||||
clientTags,
|
||||
clientRelationships,
|
||||
clientMergeLog,
|
||||
clientMergeCandidates,
|
||||
} from '@/lib/db/schema/clients';
|
||||
import { interests } from '@/lib/db/schema/interests';
|
||||
import { berthReservations } from '@/lib/db/schema/reservations';
|
||||
import { auditLogs } from '@/lib/db/schema/system';
|
||||
|
||||
// ─── Public API ─────────────────────────────────────────────────────────────
|
||||
|
||||
export interface MergeFieldChoices {
|
||||
/** Per-field overrides — `winner` keeps the surviving client's value;
|
||||
* `loser` copies the loser's value over. Fields not listed default
|
||||
* to `winner` (no change). */
|
||||
fullName?: 'winner' | 'loser';
|
||||
nationalityIso?: 'winner' | 'loser';
|
||||
preferredContactMethod?: 'winner' | 'loser';
|
||||
preferredLanguage?: 'winner' | 'loser';
|
||||
timezone?: 'winner' | 'loser';
|
||||
source?: 'winner' | 'loser';
|
||||
sourceDetails?: 'winner' | 'loser';
|
||||
}
|
||||
|
||||
export interface MergeOptions {
|
||||
winnerId: string;
|
||||
loserId: string;
|
||||
/** ID of the user performing the merge (for audit + clientMergeLog.mergedBy). */
|
||||
mergedBy: string;
|
||||
/** Per-field choice overrides. Multi-value fields (contacts, addresses,
|
||||
* notes, tags) are always preserved from both sides; this only
|
||||
* affects single-value scalar fields on the `clients` row. */
|
||||
fieldChoices?: MergeFieldChoices;
|
||||
}
|
||||
|
||||
export interface MergeResult {
|
||||
mergeLogId: string;
|
||||
movedRows: {
|
||||
interests: number;
|
||||
contacts: number;
|
||||
addresses: number;
|
||||
notes: number;
|
||||
tags: number;
|
||||
relationships: number;
|
||||
reservations: number;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically merge `loserId` into `winnerId`. Throws if:
|
||||
* - either id doesn't exist or belongs to a different port
|
||||
* - the loser has already been merged (mergedIntoClientId set)
|
||||
* - the winner is itself archived
|
||||
*/
|
||||
export async function mergeClients(opts: MergeOptions): Promise<MergeResult> {
|
||||
if (opts.winnerId === opts.loserId) {
|
||||
throw new Error('Cannot merge a client into itself');
|
||||
}
|
||||
|
||||
return await db.transaction(async (tx) => {
|
||||
// ── Lock both rows for the duration. The first FOR UPDATE that
|
||||
// arrives wins; a concurrent second merge of the same loser
|
||||
// will see `mergedIntoClientId` set and bail. ──────────────────────
|
||||
const [winnerRow] = await tx
|
||||
.select()
|
||||
.from(clients)
|
||||
.where(eq(clients.id, opts.winnerId))
|
||||
.for('update');
|
||||
const [loserRow] = await tx
|
||||
.select()
|
||||
.from(clients)
|
||||
.where(eq(clients.id, opts.loserId))
|
||||
.for('update');
|
||||
|
||||
if (!winnerRow) throw new Error(`Winner client ${opts.winnerId} not found`);
|
||||
if (!loserRow) throw new Error(`Loser client ${opts.loserId} not found`);
|
||||
if (winnerRow.portId !== loserRow.portId) {
|
||||
throw new Error('Cannot merge clients across different ports');
|
||||
}
|
||||
if (loserRow.mergedIntoClientId) {
|
||||
throw new Error(`Loser ${opts.loserId} already merged into ${loserRow.mergedIntoClientId}`);
|
||||
}
|
||||
if (winnerRow.archivedAt) {
|
||||
throw new Error('Cannot merge into an archived client');
|
||||
}
|
||||
|
||||
// ── Snapshot the loser's full state before any mutation. Used by
|
||||
// `unmergeClients` to restore within the undo window. ──────────────
|
||||
const loserContacts = await tx
|
||||
.select()
|
||||
.from(clientContacts)
|
||||
.where(eq(clientContacts.clientId, opts.loserId));
|
||||
const loserAddresses = await tx
|
||||
.select()
|
||||
.from(clientAddresses)
|
||||
.where(eq(clientAddresses.clientId, opts.loserId));
|
||||
const loserNotes = await tx
|
||||
.select()
|
||||
.from(clientNotes)
|
||||
.where(eq(clientNotes.clientId, opts.loserId));
|
||||
const loserTags = await tx
|
||||
.select()
|
||||
.from(clientTags)
|
||||
.where(eq(clientTags.clientId, opts.loserId));
|
||||
const loserInterests = await tx
|
||||
.select({ id: interests.id })
|
||||
.from(interests)
|
||||
.where(eq(interests.clientId, opts.loserId));
|
||||
const loserReservations = await tx
|
||||
.select({ id: berthReservations.id })
|
||||
.from(berthReservations)
|
||||
.where(eq(berthReservations.clientId, opts.loserId));
|
||||
const loserRelationshipsAsA = await tx
|
||||
.select()
|
||||
.from(clientRelationships)
|
||||
.where(eq(clientRelationships.clientAId, opts.loserId));
|
||||
const loserRelationshipsAsB = await tx
|
||||
.select()
|
||||
.from(clientRelationships)
|
||||
.where(eq(clientRelationships.clientBId, opts.loserId));
|
||||
|
||||
const snapshot = {
|
||||
loser: loserRow,
|
||||
contacts: loserContacts,
|
||||
addresses: loserAddresses,
|
||||
notes: loserNotes,
|
||||
tags: loserTags,
|
||||
interests: loserInterests.map((r) => r.id),
|
||||
reservations: loserReservations.map((r) => r.id),
|
||||
relationshipsAsA: loserRelationshipsAsA,
|
||||
relationshipsAsB: loserRelationshipsAsB,
|
||||
fieldChoices: opts.fieldChoices ?? {},
|
||||
mergedAt: new Date().toISOString(),
|
||||
};
|
||||
|
||||
// ── Apply field choices on the winner. We only touch fields the
|
||||
// caller explicitly asked to copy from the loser; everything
|
||||
// else stays as-is. ────────────────────────────────────────────────
|
||||
const fieldUpdates: Partial<typeof winnerRow> = {};
|
||||
if (opts.fieldChoices?.fullName === 'loser') fieldUpdates.fullName = loserRow.fullName;
|
||||
if (opts.fieldChoices?.nationalityIso === 'loser')
|
||||
fieldUpdates.nationalityIso = loserRow.nationalityIso;
|
||||
if (opts.fieldChoices?.preferredContactMethod === 'loser')
|
||||
fieldUpdates.preferredContactMethod = loserRow.preferredContactMethod;
|
||||
if (opts.fieldChoices?.preferredLanguage === 'loser')
|
||||
fieldUpdates.preferredLanguage = loserRow.preferredLanguage;
|
||||
if (opts.fieldChoices?.timezone === 'loser') fieldUpdates.timezone = loserRow.timezone;
|
||||
if (opts.fieldChoices?.source === 'loser') fieldUpdates.source = loserRow.source;
|
||||
if (opts.fieldChoices?.sourceDetails === 'loser')
|
||||
fieldUpdates.sourceDetails = loserRow.sourceDetails;
|
||||
|
||||
if (Object.keys(fieldUpdates).length > 0) {
|
||||
await tx
|
||||
.update(clients)
|
||||
.set({ ...fieldUpdates, updatedAt: new Date() })
|
||||
.where(eq(clients.id, opts.winnerId));
|
||||
}
|
||||
|
||||
// ── Reattach. Each table that points at the loser via clientId
|
||||
// gets pointed at the winner instead. ─────────────────────────────
|
||||
|
||||
const movedInterests = (
|
||||
await tx
|
||||
.update(interests)
|
||||
.set({ clientId: opts.winnerId, updatedAt: new Date() })
|
||||
.where(eq(interests.clientId, opts.loserId))
|
||||
.returning({ id: interests.id })
|
||||
).length;
|
||||
|
||||
const movedReservations = (
|
||||
await tx
|
||||
.update(berthReservations)
|
||||
.set({ clientId: opts.winnerId, updatedAt: new Date() })
|
||||
.where(eq(berthReservations.clientId, opts.loserId))
|
||||
.returning({ id: berthReservations.id })
|
||||
).length;
|
||||
|
||||
// Contacts: move loser's contacts to winner, but DON'T duplicate any
|
||||
// already-present (channel, value) pair. Loser-only ones get
|
||||
// demoted to non-primary so the winner's primary stays intact.
|
||||
const winnerContacts = await tx
|
||||
.select({ channel: clientContacts.channel, value: clientContacts.value })
|
||||
.from(clientContacts)
|
||||
.where(eq(clientContacts.clientId, opts.winnerId));
|
||||
const winnerContactKeys = new Set(
|
||||
winnerContacts.map((c) => `${c.channel}::${c.value.toLowerCase()}`),
|
||||
);
|
||||
|
||||
let movedContacts = 0;
|
||||
for (const c of loserContacts) {
|
||||
const key = `${c.channel}::${c.value.toLowerCase()}`;
|
||||
if (winnerContactKeys.has(key)) {
|
||||
// Winner already has this contact — drop loser's row (cascade
|
||||
// will clean up when loser is archived). But we keep snapshot
|
||||
// so undo restores it.
|
||||
continue;
|
||||
}
|
||||
await tx
|
||||
.update(clientContacts)
|
||||
.set({ clientId: opts.winnerId, isPrimary: false, updatedAt: new Date() })
|
||||
.where(eq(clientContacts.id, c.id));
|
||||
movedContacts += 1;
|
||||
}
|
||||
|
||||
// Addresses: same shape as contacts, but uniqueness is harder to
|
||||
// detect cleanly (free-text street). Just move them all and let the
|
||||
// user dedupe in the UI later.
|
||||
const movedAddresses = (
|
||||
await tx
|
||||
.update(clientAddresses)
|
||||
.set({ clientId: opts.winnerId, isPrimary: false, updatedAt: new Date() })
|
||||
.where(eq(clientAddresses.clientId, opts.loserId))
|
||||
.returning({ id: clientAddresses.id })
|
||||
).length;
|
||||
|
||||
const movedNotes = (
|
||||
await tx
|
||||
.update(clientNotes)
|
||||
.set({ clientId: opts.winnerId, updatedAt: new Date() })
|
||||
.where(eq(clientNotes.clientId, opts.loserId))
|
||||
.returning({ id: clientNotes.id })
|
||||
).length;
|
||||
|
||||
// Tags: copy any loser-only tag to the winner; drop overlap.
|
||||
const winnerTags = await tx
|
||||
.select({ tagId: clientTags.tagId })
|
||||
.from(clientTags)
|
||||
.where(eq(clientTags.clientId, opts.winnerId));
|
||||
const winnerTagSet = new Set(winnerTags.map((t) => t.tagId));
|
||||
let movedTags = 0;
|
||||
for (const t of loserTags) {
|
||||
if (!winnerTagSet.has(t.tagId)) {
|
||||
await tx.insert(clientTags).values({ clientId: opts.winnerId, tagId: t.tagId });
|
||||
movedTags += 1;
|
||||
}
|
||||
}
|
||||
await tx.delete(clientTags).where(eq(clientTags.clientId, opts.loserId));
|
||||
|
||||
// Relationships: rewrite each FK side to point at the winner. Keep
|
||||
// both sides regardless — even if A and B both end up as the same
|
||||
// person, the row is preserved for audit; the UI hides self-loops.
|
||||
const movedRelationships =
|
||||
(
|
||||
await tx
|
||||
.update(clientRelationships)
|
||||
.set({ clientAId: opts.winnerId })
|
||||
.where(eq(clientRelationships.clientAId, opts.loserId))
|
||||
.returning({ id: clientRelationships.id })
|
||||
).length +
|
||||
(
|
||||
await tx
|
||||
.update(clientRelationships)
|
||||
.set({ clientBId: opts.winnerId })
|
||||
.where(eq(clientRelationships.clientBId, opts.loserId))
|
||||
.returning({ id: clientRelationships.id })
|
||||
).length;
|
||||
|
||||
// ── Archive the loser. Row stays in DB for the undo window;
|
||||
// `mergedIntoClientId` is the redirect pointer for any stragglers
|
||||
// (links / direct queries / saved views). ──────────────────────────
|
||||
await tx
|
||||
.update(clients)
|
||||
.set({
|
||||
archivedAt: new Date(),
|
||||
mergedIntoClientId: opts.winnerId,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(clients.id, opts.loserId));
|
||||
|
||||
// ── Mark any open merge candidate row for this pair as resolved. ───
|
||||
await tx
|
||||
.update(clientMergeCandidates)
|
||||
.set({
|
||||
status: 'merged',
|
||||
resolvedAt: new Date(),
|
||||
resolvedBy: opts.mergedBy,
|
||||
})
|
||||
.where(
|
||||
and(
|
||||
eq(clientMergeCandidates.portId, winnerRow.portId),
|
||||
// pair stored in canonical order — match either direction
|
||||
sql`(
|
||||
(${clientMergeCandidates.clientAId} = ${opts.winnerId}
|
||||
AND ${clientMergeCandidates.clientBId} = ${opts.loserId})
|
||||
OR
|
||||
(${clientMergeCandidates.clientAId} = ${opts.loserId}
|
||||
AND ${clientMergeCandidates.clientBId} = ${opts.winnerId})
|
||||
)`,
|
||||
),
|
||||
);
|
||||
|
||||
// ── Write the merge log + audit log. ────────────────────────────────
|
||||
const [logRow] = await tx
|
||||
.insert(clientMergeLog)
|
||||
.values({
|
||||
portId: winnerRow.portId,
|
||||
survivingClientId: opts.winnerId,
|
||||
mergedClientId: opts.loserId,
|
||||
mergedBy: opts.mergedBy,
|
||||
mergeDetails: snapshot,
|
||||
})
|
||||
.returning({ id: clientMergeLog.id });
|
||||
|
||||
await tx.insert(auditLogs).values({
|
||||
portId: winnerRow.portId,
|
||||
userId: opts.mergedBy,
|
||||
entityType: 'client',
|
||||
entityId: opts.winnerId,
|
||||
action: 'merge',
|
||||
newValue: {
|
||||
loserId: opts.loserId,
|
||||
loserName: loserRow.fullName,
|
||||
movedInterests,
|
||||
movedReservations,
|
||||
movedContacts,
|
||||
movedAddresses,
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
mergeLogId: logRow!.id,
|
||||
movedRows: {
|
||||
interests: movedInterests,
|
||||
contacts: movedContacts,
|
||||
addresses: movedAddresses,
|
||||
notes: movedNotes,
|
||||
tags: movedTags,
|
||||
relationships: movedRelationships,
|
||||
reservations: movedReservations,
|
||||
},
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
// ─── Convenience: list merge candidates for a port ──────────────────────────
|
||||
|
||||
export interface MergeCandidatePair {
|
||||
id: string;
|
||||
clientAId: string;
|
||||
clientBId: string;
|
||||
score: number;
|
||||
reasons: string[];
|
||||
status: string;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
/** Fetch pending merge candidate pairs for the admin review queue. */
|
||||
export async function listPendingMergeCandidates(portId: string): Promise<MergeCandidatePair[]> {
|
||||
const rows = await db
|
||||
.select()
|
||||
.from(clientMergeCandidates)
|
||||
.where(
|
||||
and(eq(clientMergeCandidates.portId, portId), eq(clientMergeCandidates.status, 'pending')),
|
||||
)
|
||||
.orderBy(sql`${clientMergeCandidates.score} DESC`);
|
||||
|
||||
return rows.map((r) => ({
|
||||
id: r.id,
|
||||
clientAId: r.clientAId,
|
||||
clientBId: r.clientBId,
|
||||
score: r.score,
|
||||
reasons: Array.isArray(r.reasons) ? (r.reasons as string[]) : [],
|
||||
status: r.status,
|
||||
createdAt: r.createdAt,
|
||||
}));
|
||||
}
|
||||
Reference in New Issue
Block a user