feat(dedup): NocoDB migration script + tables (P3 dry-run)
Lands the one-shot migration pipeline from the legacy NocoDB Interests base into the new client/interest schema. Dry-run mode is fully operational: pulls the live snapshot, runs the dedup library, and writes a CSV + Markdown report under .migration/<timestamp>/. The --apply phase is stubbed for a follow-up PR per the design's P3 implementation sequence. Schema additions ================ - `client_merge_candidates` — pairs flagged by the background scoring job for the /admin/duplicates review queue. Status enum: pending / dismissed / merged. Unique-(portId, clientAId, clientBId) so the same pair can't surface twice. Empty until P2 lands the cron. - `migration_source_links` — idempotency ledger. Maps source-system rows (NocoDB Interest #624 → new client UUID) so re-running --apply against the same dry-run report skips already-imported entities. Both tables ship with the migration `0020_unusual_azazel.sql` — already applied to the local dev DB during this commit's preparation. Library ======= src/lib/dedup/nocodb-source.ts Read-only adapter for the legacy NocoDB v2 API. xc-token auth, auto-paginates until isLastPage, captures the table IDs from the 2026-05-03 audit. `fetchSnapshot()` pulls every relevant table in parallel into one in-memory object the transform layer consumes. src/lib/dedup/migration-transform.ts Pure function: NocoDB snapshot in, MigrationPlan out. Per row: - normalizes name / email / phone / country via the dedup library - parses the legacy DD-MM-YYYY / DD/MM/YYYY / ISO date formats - maps the 8-stage `Sales Process Level` enum to the new 9-stage pipelineStage - filters yacht-name placeholders ('TBC', 'Na', etc.) - merges Internal Notes + Extra Comments + Berth Size Desired into a single notes blob Then runs `findClientMatches` pairwise (with blocking) and union-finds clusters of rows whose score crosses the auto-link threshold (90). Lower-scoring pairs (50–89) become 'needs review'. Each cluster's "lead" row is picked by completeness score with recency tie-break. src/lib/dedup/migration-report.ts Writes three artifacts to .migration/<timestamp>/: - report.csv — one row per planned op, RFC-4180 escaped - summary.md — human-skimmable overview - plan.json — full structured plan for the --apply phase CSV cells with comma / quote / newline are quoted; internal quotes are doubled. No external CSV dep. src/lib/dedup/phone-parse.ts Script-safe wrapper around libphonenumber-js's `core` entry that loads `metadata.min.json` directly. The default `index.cjs.js` bundled by libphonenumber hits a metadata-shape interop bug under Node 25 + tsx (`{ default }` wrapping); core+JSON sidesteps it. The dedup `normalizePhone` and `find-matches` both use this wrapper now so the same code path runs in vitest, Next.js, and the migration CLI without surprises. src/lib/dedup/normalize.ts Tightened country resolution: added Caribbean short-form aliases ('antigua' → AG, 'st kitts' → KN, etc.) and a city map covering the US locations seen in the NocoDB dump (Boston, Tampa, Fort Lauderdale, Port Jefferson, Nantucket). Also relaxed phone parsing to drop the `isValid()` strict check — the libphonenumber min build rejects many real NANP-territory numbers, and dedup only needs a canonical E.164 to compare. CLI === scripts/migrate-from-nocodb.ts pnpm tsx scripts/migrate-from-nocodb.ts --dry-run → Pulls the live NocoDB base (NOCODB_URL + NOCODB_TOKEN env vars), runs the transform, writes report. No DB writes. pnpm tsx scripts/migrate-from-nocodb.ts --apply --report .migration/<dir>/ → Stubbed; exits with `not yet implemented` and a pointer to the design doc. Apply phase ships in a follow-up. Tests ===== tests/unit/dedup/migration-transform.test.ts (7 cases) Fixture-based regression. A frozen 12-row NocoDB snapshot covers every duplicate pattern in the design (§1.2). The test asserts: - 12 input rows → 7 unique clients (cluster math is right) - Patterns A / B / C / E auto-link - Pattern F (Etiennette Clamouze) does NOT auto-link - Every interest preserved as its own row even when clients merge - 8-stage → 9-stage enum mapping is correct per spec - Multi-yacht merge (Constanzo CALYPSO + Costanzo GEMINI under one client) — the design's signature win - Output is deterministic (run twice, identical) Validation against real data ============================ Ran `pnpm tsx scripts/migrate-from-nocodb.ts --dry-run` against the live NocoDB. Result on 252 Interests rows: - 237 clients (15 merged into 13 clusters) - 252 interests (one per source row) - 406 contacts, 52 addresses - 13 auto-linked clusters (every confirmed cluster from §1.2 audit) - 3 pairs flagged for review (Camazou, Zasso, one new) - 1 phone placeholder flagged Total dedup test count: 57 (50 from P1 + 7 fixture tests). Lint: clean. Tsc: clean for new files. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
30
src/lib/db/migrations/0020_unusual_azazel.sql
Normal file
30
src/lib/db/migrations/0020_unusual_azazel.sql
Normal file
@@ -0,0 +1,30 @@
|
||||
CREATE TABLE "client_merge_candidates" (
|
||||
"id" text PRIMARY KEY NOT NULL,
|
||||
"port_id" text NOT NULL,
|
||||
"client_a_id" text NOT NULL,
|
||||
"client_b_id" text NOT NULL,
|
||||
"score" integer NOT NULL,
|
||||
"reasons" jsonb NOT NULL,
|
||||
"status" text DEFAULT 'pending' NOT NULL,
|
||||
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
|
||||
"resolved_at" timestamp with time zone,
|
||||
"resolved_by" text
|
||||
);
|
||||
--> statement-breakpoint
|
||||
CREATE TABLE "migration_source_links" (
|
||||
"id" text PRIMARY KEY NOT NULL,
|
||||
"source_system" text NOT NULL,
|
||||
"source_id" text NOT NULL,
|
||||
"target_entity_type" text NOT NULL,
|
||||
"target_entity_id" text NOT NULL,
|
||||
"applied_id" text NOT NULL,
|
||||
"applied_by" text,
|
||||
"applied_at" timestamp with time zone DEFAULT now() NOT NULL
|
||||
);
|
||||
--> statement-breakpoint
|
||||
ALTER TABLE "client_merge_candidates" ADD CONSTRAINT "client_merge_candidates_port_id_ports_id_fk" FOREIGN KEY ("port_id") REFERENCES "public"."ports"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint
|
||||
ALTER TABLE "client_merge_candidates" ADD CONSTRAINT "client_merge_candidates_client_a_id_clients_id_fk" FOREIGN KEY ("client_a_id") REFERENCES "public"."clients"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
|
||||
ALTER TABLE "client_merge_candidates" ADD CONSTRAINT "client_merge_candidates_client_b_id_clients_id_fk" FOREIGN KEY ("client_b_id") REFERENCES "public"."clients"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
|
||||
CREATE INDEX "idx_cmc_port_status" ON "client_merge_candidates" USING btree ("port_id","status");--> statement-breakpoint
|
||||
CREATE UNIQUE INDEX "idx_cmc_pair" ON "client_merge_candidates" USING btree ("port_id","client_a_id","client_b_id");--> statement-breakpoint
|
||||
CREATE UNIQUE INDEX "idx_msl_source_target" ON "migration_source_links" USING btree ("source_system","source_id","target_entity_type");
|
||||
10482
src/lib/db/migrations/meta/0020_snapshot.json
Normal file
10482
src/lib/db/migrations/meta/0020_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -141,6 +141,13 @@
|
||||
"when": 1777671562738,
|
||||
"tag": "0019_lazy_vampiro",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 20,
|
||||
"version": "7",
|
||||
"when": 1777811835982,
|
||||
"tag": "0020_unusual_azazel",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import {
|
||||
pgTable,
|
||||
text,
|
||||
boolean,
|
||||
integer,
|
||||
timestamp,
|
||||
jsonb,
|
||||
index,
|
||||
@@ -145,6 +146,54 @@ export const clientMergeLog = pgTable(
|
||||
(table) => [index('idx_cml_port').on(table.portId)],
|
||||
);
|
||||
|
||||
/**
|
||||
* Pairs of clients flagged by the background scoring job as potential
|
||||
* duplicates. The `/admin/duplicates` review queue reads from here.
|
||||
*
|
||||
* Lifecycle:
|
||||
* - Background job inserts a row when a pair scores >= the
|
||||
* `dedup_review_queue_threshold` system setting.
|
||||
* - User reviews in the admin UI and either merges (status='merged')
|
||||
* or dismisses (status='dismissed').
|
||||
* - Subsequent runs of the scoring job skip pairs already
|
||||
* `dismissed` so the same false-positive doesn't keep reappearing.
|
||||
* A future score increase recreates the row.
|
||||
*
|
||||
* Pairs are stored canonically with `clientAId < clientBId` (string
|
||||
* comparison) so the same pair only generates one row regardless of
|
||||
* scoring direction.
|
||||
*/
|
||||
export const clientMergeCandidates = pgTable(
|
||||
'client_merge_candidates',
|
||||
{
|
||||
id: text('id')
|
||||
.primaryKey()
|
||||
.$defaultFn(() => crypto.randomUUID()),
|
||||
portId: text('port_id')
|
||||
.notNull()
|
||||
.references(() => ports.id),
|
||||
clientAId: text('client_a_id')
|
||||
.notNull()
|
||||
.references(() => clients.id, { onDelete: 'cascade' }),
|
||||
clientBId: text('client_b_id')
|
||||
.notNull()
|
||||
.references(() => clients.id, { onDelete: 'cascade' }),
|
||||
score: integer('score').notNull(),
|
||||
/** Human-readable rule list, e.g. ["email match", "phone match"]. */
|
||||
reasons: jsonb('reasons').notNull(),
|
||||
status: text('status').notNull().default('pending'), // pending | dismissed | merged
|
||||
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
|
||||
resolvedAt: timestamp('resolved_at', { withTimezone: true }),
|
||||
resolvedBy: text('resolved_by'),
|
||||
},
|
||||
(table) => [
|
||||
index('idx_cmc_port_status').on(table.portId, table.status),
|
||||
// Same pair shouldn't surface twice — enforce uniqueness on the
|
||||
// canonical (a < b) ordering.
|
||||
uniqueIndex('idx_cmc_pair').on(table.portId, table.clientAId, table.clientBId),
|
||||
],
|
||||
);
|
||||
|
||||
export const clientAddresses = pgTable(
|
||||
'client_addresses',
|
||||
{
|
||||
@@ -190,3 +239,5 @@ export type ClientMergeLog = typeof clientMergeLog.$inferSelect;
|
||||
export type NewClientMergeLog = typeof clientMergeLog.$inferInsert;
|
||||
export type ClientAddress = typeof clientAddresses.$inferSelect;
|
||||
export type NewClientAddress = typeof clientAddresses.$inferInsert;
|
||||
export type ClientMergeCandidate = typeof clientMergeCandidates.$inferSelect;
|
||||
export type NewClientMergeCandidate = typeof clientMergeCandidates.$inferInsert;
|
||||
|
||||
@@ -56,5 +56,8 @@ export * from './ai-usage';
|
||||
// GDPR export tracking (Phase 3d)
|
||||
export * from './gdpr';
|
||||
|
||||
// Migration ledger (one-shot scripts — NocoDB import etc.)
|
||||
export * from './migration';
|
||||
|
||||
// Relations (must come last — references all tables)
|
||||
export * from './relations';
|
||||
|
||||
48
src/lib/db/schema/migration.ts
Normal file
48
src/lib/db/schema/migration.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { pgTable, text, timestamp, uniqueIndex } from 'drizzle-orm/pg-core';
|
||||
|
||||
/**
|
||||
* Idempotency ledger for one-shot data migrations from external sources
|
||||
* (e.g. the legacy NocoDB Interests table).
|
||||
*
|
||||
* Every entity created during a migration script's `--apply` run gets a
|
||||
* row here mapping the source-system row identifier to the new-system
|
||||
* entity id. Re-running `--apply` against the same report skips rows
|
||||
* already linked, so partial-failure resumption is just "run again."
|
||||
*
|
||||
* One source row can generate multiple new entities (e.g. one NocoDB
|
||||
* Interests row → one client + one interest + one yacht), so the
|
||||
* uniqueness constraint includes `target_entity_type`.
|
||||
*/
|
||||
export const migrationSourceLinks = pgTable(
|
||||
'migration_source_links',
|
||||
{
|
||||
id: text('id')
|
||||
.primaryKey()
|
||||
.$defaultFn(() => crypto.randomUUID()),
|
||||
/** e.g. 'nocodb_interests', 'nocodb_residences', 'nocodb_website_submissions'. */
|
||||
sourceSystem: text('source_system').notNull(),
|
||||
/** Source row identifier as a string (NocoDB IDs are integers; we keep
|
||||
* text here for forward compat with other sources). */
|
||||
sourceId: text('source_id').notNull(),
|
||||
/** e.g. 'client', 'interest', 'yacht', 'document'. */
|
||||
targetEntityType: text('target_entity_type').notNull(),
|
||||
/** UUID of the new-system entity (clients.id, interests.id, etc.). */
|
||||
targetEntityId: text('target_entity_id').notNull(),
|
||||
/** Apply-id from the migration run that created this link — pairs with
|
||||
* the on-disk apply manifest so `--rollback --apply-id <id>` knows
|
||||
* exactly which links to remove. */
|
||||
appliedId: text('applied_id').notNull(),
|
||||
appliedBy: text('applied_by'),
|
||||
appliedAt: timestamp('applied_at', { withTimezone: true }).notNull().defaultNow(),
|
||||
},
|
||||
(table) => [
|
||||
uniqueIndex('idx_msl_source_target').on(
|
||||
table.sourceSystem,
|
||||
table.sourceId,
|
||||
table.targetEntityType,
|
||||
),
|
||||
],
|
||||
);
|
||||
|
||||
export type MigrationSourceLink = typeof migrationSourceLinks.$inferSelect;
|
||||
export type NewMigrationSourceLink = typeof migrationSourceLinks.$inferInsert;
|
||||
Reference in New Issue
Block a user