Merge feat/dedup-migration: client dedup library + NocoDB migration script + admin queue

# Conflicts:
#	.gitignore
#	src/lib/db/migrations/meta/_journal.json
This commit is contained in:
Matt Ciaccio
2026-05-03 16:24:13 +02:00
32 changed files with 25389 additions and 1 deletions

View File

@@ -0,0 +1,30 @@
CREATE TABLE "client_merge_candidates" (
"id" text PRIMARY KEY NOT NULL,
"port_id" text NOT NULL,
"client_a_id" text NOT NULL,
"client_b_id" text NOT NULL,
"score" integer NOT NULL,
"reasons" jsonb NOT NULL,
"status" text DEFAULT 'pending' NOT NULL,
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
"resolved_at" timestamp with time zone,
"resolved_by" text
);
--> statement-breakpoint
CREATE TABLE "migration_source_links" (
"id" text PRIMARY KEY NOT NULL,
"source_system" text NOT NULL,
"source_id" text NOT NULL,
"target_entity_type" text NOT NULL,
"target_entity_id" text NOT NULL,
"applied_id" text NOT NULL,
"applied_by" text,
"applied_at" timestamp with time zone DEFAULT now() NOT NULL
);
--> statement-breakpoint
ALTER TABLE "client_merge_candidates" ADD CONSTRAINT "client_merge_candidates_port_id_ports_id_fk" FOREIGN KEY ("port_id") REFERENCES "public"."ports"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "client_merge_candidates" ADD CONSTRAINT "client_merge_candidates_client_a_id_clients_id_fk" FOREIGN KEY ("client_a_id") REFERENCES "public"."clients"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "client_merge_candidates" ADD CONSTRAINT "client_merge_candidates_client_b_id_clients_id_fk" FOREIGN KEY ("client_b_id") REFERENCES "public"."clients"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE INDEX "idx_cmc_port_status" ON "client_merge_candidates" USING btree ("port_id","status");--> statement-breakpoint
CREATE UNIQUE INDEX "idx_cmc_pair" ON "client_merge_candidates" USING btree ("port_id","client_a_id","client_b_id");--> statement-breakpoint
CREATE UNIQUE INDEX "idx_msl_source_target" ON "migration_source_links" USING btree ("source_system","source_id","target_entity_type");

View File

@@ -0,0 +1,2 @@
ALTER TABLE "clients" ADD COLUMN "merged_into_client_id" text;--> statement-breakpoint
CREATE INDEX "idx_clients_merged_into" ON "clients" USING btree ("merged_into_client_id");

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -148,6 +148,20 @@
"when": 1777814682110,
"tag": "0020_medical_betty_brant",
"breakpoints": true
},
{
"idx": 21,
"version": "7",
"when": 1777811835982,
"tag": "0021_unusual_azazel",
"breakpoints": true
},
{
"idx": 22,
"version": "7",
"when": 1777812671833,
"tag": "0022_magenta_madame_hydra",
"breakpoints": true
}
]
}

View File

@@ -2,6 +2,7 @@ import {
pgTable,
text,
boolean,
integer,
timestamp,
jsonb,
index,
@@ -30,6 +31,11 @@ export const clients = pgTable(
source: text('source'), // website, manual, referral, broker
sourceDetails: text('source_details'),
archivedAt: timestamp('archived_at', { withTimezone: true }),
/** When this client was merged into another (the "loser" of a dedup
* merge), this points at the surviving client. Used by the
* /admin/duplicates review queue to redirect any stragglers, and by
* the unmerge flow to restore. Null for live clients. */
mergedIntoClientId: text('merged_into_client_id'),
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
updatedAt: timestamp('updated_at', { withTimezone: true }).notNull().defaultNow(),
},
@@ -38,6 +44,7 @@ export const clients = pgTable(
index('idx_clients_name').on(table.portId, table.fullName),
index('idx_clients_archived').on(table.portId, table.archivedAt),
index('idx_clients_nationality_iso').on(table.nationalityIso),
index('idx_clients_merged_into').on(table.mergedIntoClientId),
],
);
@@ -145,6 +152,54 @@ export const clientMergeLog = pgTable(
(table) => [index('idx_cml_port').on(table.portId)],
);
/**
* Pairs of clients flagged by the background scoring job as potential
* duplicates. The `/admin/duplicates` review queue reads from here.
*
* Lifecycle:
* - Background job inserts a row when a pair scores >= the
* `dedup_review_queue_threshold` system setting.
* - User reviews in the admin UI and either merges (status='merged')
* or dismisses (status='dismissed').
* - Subsequent runs of the scoring job skip pairs already
* `dismissed` so the same false-positive doesn't keep reappearing.
* A future score increase recreates the row.
*
* Pairs are stored canonically with `clientAId < clientBId` (string
* comparison) so the same pair only generates one row regardless of
* scoring direction.
*/
export const clientMergeCandidates = pgTable(
'client_merge_candidates',
{
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
portId: text('port_id')
.notNull()
.references(() => ports.id),
clientAId: text('client_a_id')
.notNull()
.references(() => clients.id, { onDelete: 'cascade' }),
clientBId: text('client_b_id')
.notNull()
.references(() => clients.id, { onDelete: 'cascade' }),
score: integer('score').notNull(),
/** Human-readable rule list, e.g. ["email match", "phone match"]. */
reasons: jsonb('reasons').notNull(),
status: text('status').notNull().default('pending'), // pending | dismissed | merged
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
resolvedAt: timestamp('resolved_at', { withTimezone: true }),
resolvedBy: text('resolved_by'),
},
(table) => [
index('idx_cmc_port_status').on(table.portId, table.status),
// Same pair shouldn't surface twice — enforce uniqueness on the
// canonical (a < b) ordering.
uniqueIndex('idx_cmc_pair').on(table.portId, table.clientAId, table.clientBId),
],
);
export const clientAddresses = pgTable(
'client_addresses',
{
@@ -190,3 +245,5 @@ export type ClientMergeLog = typeof clientMergeLog.$inferSelect;
export type NewClientMergeLog = typeof clientMergeLog.$inferInsert;
export type ClientAddress = typeof clientAddresses.$inferSelect;
export type NewClientAddress = typeof clientAddresses.$inferInsert;
export type ClientMergeCandidate = typeof clientMergeCandidates.$inferSelect;
export type NewClientMergeCandidate = typeof clientMergeCandidates.$inferInsert;

View File

@@ -56,5 +56,8 @@ export * from './ai-usage';
// GDPR export tracking (Phase 3d)
export * from './gdpr';
// Migration ledger (one-shot scripts — NocoDB import etc.)
export * from './migration';
// Relations (must come last — references all tables)
export * from './relations';

View File

@@ -0,0 +1,48 @@
import { pgTable, text, timestamp, uniqueIndex } from 'drizzle-orm/pg-core';
/**
* Idempotency ledger for one-shot data migrations from external sources
* (e.g. the legacy NocoDB Interests table).
*
* Every entity created during a migration script's `--apply` run gets a
* row here mapping the source-system row identifier to the new-system
* entity id. Re-running `--apply` against the same report skips rows
* already linked, so partial-failure resumption is just "run again."
*
* One source row can generate multiple new entities (e.g. one NocoDB
* Interests row → one client + one interest + one yacht), so the
* uniqueness constraint includes `target_entity_type`.
*/
export const migrationSourceLinks = pgTable(
'migration_source_links',
{
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
/** e.g. 'nocodb_interests', 'nocodb_residences', 'nocodb_website_submissions'. */
sourceSystem: text('source_system').notNull(),
/** Source row identifier as a string (NocoDB IDs are integers; we keep
* text here for forward compat with other sources). */
sourceId: text('source_id').notNull(),
/** e.g. 'client', 'interest', 'yacht', 'document'. */
targetEntityType: text('target_entity_type').notNull(),
/** UUID of the new-system entity (clients.id, interests.id, etc.). */
targetEntityId: text('target_entity_id').notNull(),
/** Apply-id from the migration run that created this link — pairs with
* the on-disk apply manifest so `--rollback --apply-id <id>` knows
* exactly which links to remove. */
appliedId: text('applied_id').notNull(),
appliedBy: text('applied_by'),
appliedAt: timestamp('applied_at', { withTimezone: true }).notNull().defaultNow(),
},
(table) => [
uniqueIndex('idx_msl_source_target').on(
table.sourceSystem,
table.sourceId,
table.targetEntityType,
),
],
);
export type MigrationSourceLink = typeof migrationSourceLinks.$inferSelect;
export type NewMigrationSourceLink = typeof migrationSourceLinks.$inferInsert;