diff --git a/scripts/migrate-from-nocodb.ts b/scripts/migrate-from-nocodb.ts index 7c0bbeb..bf1cece 100644 --- a/scripts/migrate-from-nocodb.ts +++ b/scripts/migrate-from-nocodb.ts @@ -7,21 +7,30 @@ * Pulls the live NocoDB base, runs the transform + dedup pipeline, * writes a report to .migration//. NO database writes. * - * pnpm tsx scripts/migrate-from-nocodb.ts --dry-run --port-slug harbor-royale + * pnpm tsx scripts/migrate-from-nocodb.ts --dry-run --port-slug port-nimara * Same, but tags the planned writes with the named port (matters for * the apply phase — every client/interest belongs to one port). * - * pnpm tsx scripts/migrate-from-nocodb.ts --apply --report .migration// - * [Not yet implemented — apply phase comes in a follow-up PR.] + * pnpm tsx scripts/migrate-from-nocodb.ts --apply --port-slug port-nimara + * Re-fetches NocoDB, re-transforms, then writes the planned rows + * into the target port via the idempotent `migration_source_links` + * ledger. Re-runs are safe — already-imported source IDs are skipped. + * REQUIRES `EMAIL_REDIRECT_TO` to be set in env (safety net) unless + * `--unsafe-skip-redirect-check` is also passed. * * Design reference: docs/superpowers/specs/2026-05-03-dedup-and-migration-design.md §9. */ import 'dotenv/config'; - +import { randomUUID } from 'node:crypto'; import path from 'node:path'; import { fileURLToPath } from 'node:url'; +import { eq } from 'drizzle-orm'; + +import { db } from '@/lib/db'; +import { ports } from '@/lib/db/schema/ports'; +import { applyPlan } from '@/lib/dedup/migration-apply'; import { fetchSnapshot, loadNocoDbConfig } from '@/lib/dedup/nocodb-source'; import { transformSnapshot } from '@/lib/dedup/migration-transform'; import { resolveReportPaths, writeReport } from '@/lib/dedup/migration-report'; @@ -31,6 +40,7 @@ interface CliArgs { apply: boolean; portSlug: string | null; reportDir: string | null; + unsafeSkipRedirectCheck: boolean; } function parseArgs(argv: string[]): CliArgs { @@ -39,6 +49,7 @@ function parseArgs(argv: string[]): CliArgs { apply: false, portSlug: null, reportDir: null, + unsafeSkipRedirectCheck: false, }; for (let i = 0; i < argv.length; i += 1) { const a = argv[i]!; @@ -46,6 +57,7 @@ function parseArgs(argv: string[]): CliArgs { else if (a === '--apply') args.apply = true; else if (a === '--port-slug') args.portSlug = argv[++i] ?? null; else if (a === '--report') args.reportDir = argv[++i] ?? null; + else if (a === '--unsafe-skip-redirect-check') args.unsafeSkipRedirectCheck = true; else if (a === '-h' || a === '--help') { printHelp(); process.exit(0); @@ -64,20 +76,50 @@ function printHelp(): void { Pulls NocoDB → transforms → writes report to .migration//. No database writes. - pnpm tsx scripts/migrate-from-nocodb.ts --apply --report .migration// - Apply phase. (Not yet implemented.) + pnpm tsx scripts/migrate-from-nocodb.ts --apply --port-slug + Re-fetches NocoDB, re-transforms, writes via migration_source_links + ledger. Idempotent — safe to re-run. Requires EMAIL_REDIRECT_TO set + (unless --unsafe-skip-redirect-check is also passed). Flags: - --dry-run Read NocoDB, write report only. - --apply Actually write to the new DB. (Not yet supported.) - --port-slug Port slug to attach to all imported entities. - Defaults to the first available port if omitted. - --report Path to a previously-generated report dir - (only used by --apply). - -h, --help Show this help. + --dry-run Read NocoDB, write report only. + --apply Actually write rows to the DB. + --port-slug Port slug to attach to all imported + entities. Defaults to the first + available port if omitted. + --report Path to a previously-generated report + dir (only used by --apply). + --unsafe-skip-redirect-check Skip the EMAIL_REDIRECT_TO precondition + check. Only use in production cutover. + -h, --help Show this help. `); } +/** + * Resolve the target port: use the slug if provided, otherwise the first + * port found. Errors out cleanly if the slug doesn't match any port. + */ +async function resolvePort(slug: string | null): Promise<{ id: string; slug: string }> { + if (slug) { + const [p] = await db + .select({ id: ports.id, slug: ports.slug }) + .from(ports) + .where(eq(ports.slug, slug)) + .limit(1); + if (!p) { + console.error(`No port found with slug "${slug}".`); + process.exit(1); + } + return { id: p.id, slug: p.slug }; + } + const [first] = await db.select({ id: ports.id, slug: ports.slug }).from(ports).limit(1); + if (!first) { + console.error('No ports exist in the target DB. Seed at least one port before applying.'); + process.exit(1); + } + return { id: first.id, slug: first.slug }; +} + async function main(): Promise { const args = parseArgs(process.argv.slice(2)); @@ -87,13 +129,21 @@ async function main(): Promise { process.exit(1); } - if (args.apply) { - console.error('--apply is not yet implemented in this version. P3 ships dry-run first.'); - console.error('See docs/superpowers/specs/2026-05-03-dedup-and-migration-design.md §9.2.'); + // Safety gate: --apply must run with EMAIL_REDIRECT_TO set, unless the + // operator explicitly opts out (production cutover). + if (args.apply && !process.env.EMAIL_REDIRECT_TO && !args.unsafeSkipRedirectCheck) { + console.error( + '--apply requires EMAIL_REDIRECT_TO to be set in the environment as a safety net.', + ); + console.error('See docs/operations/outbound-comms-safety.md for the rationale.'); + console.error( + 'If you are running the production cutover and have read that doc, add ' + + '--unsafe-skip-redirect-check to override.', + ); process.exit(2); } - // ── Dry-run path ─────────────────────────────────────────────────────────── + // ── Fetch + transform (shared by dry-run and apply) ────────────────────── console.log('[migrate] Loading NocoDB config…'); const config = loadNocoDbConfig(); @@ -110,8 +160,7 @@ async function main(): Promise { console.log('[migrate] Running transform + dedup pipeline…'); const plan = transformSnapshot(snapshot); - // Resolve output paths relative to the worktree root (the script itself - // lives in scripts/; we want the .migration dir at the repo root). + // Resolve output paths relative to the worktree root. const scriptDir = path.dirname(fileURLToPath(import.meta.url)); const repoRoot = path.resolve(scriptDir, '..'); const generatedAt = new Date().toISOString(); @@ -120,7 +169,7 @@ async function main(): Promise { console.log(`[migrate] Writing report to ${paths.rootDir}…`); await writeReport(paths, plan, generatedAt); - // ── Console summary ────────────────────────────────────────────────────── + // ── Plan summary ───────────────────────────────────────────────────────── const s = plan.stats; console.log(''); console.log('=== Migration Plan Summary ==='); @@ -135,6 +184,50 @@ async function main(): Promise { console.log(` Quality: ${s.flaggedRows} rows flagged (see report.csv)`); console.log(''); console.log(` Full report: ${paths.summaryPath}`); + + if (args.dryRun) { + console.log(''); + console.log('Dry-run complete. Re-run with --apply to write rows.'); + return; + } + + // ── Apply path ─────────────────────────────────────────────────────────── + + const port = await resolvePort(args.portSlug); + const applyId = randomUUID(); + + console.log(''); + console.log(`[migrate] Applying to port "${port.slug}" (id=${port.id})`); + console.log(`[migrate] Apply id: ${applyId}`); + console.log('[migrate] Inserting…'); + + const applyStart = Date.now(); + const result = await applyPlan(plan, { port, applyId }); + const applyElapsed = ((Date.now() - applyStart) / 1000).toFixed(1); + + console.log(''); + console.log('=== Apply Result ==='); + console.log(` Time: ${applyElapsed}s`); + console.log( + ` Clients: ${result.clientsInserted} inserted, ${result.clientsSkipped} already linked`, + ); + console.log(` Contacts: ${result.contactsInserted} inserted`); + console.log(` Addresses: ${result.addressesInserted} inserted`); + console.log(` Yachts: ${result.yachtsInserted} inserted`); + console.log( + ` Interests: ${result.interestsInserted} inserted, ${result.interestsSkipped} already linked`, + ); + + if (result.warnings.length > 0) { + console.log(''); + console.log('Warnings:'); + for (const w of result.warnings.slice(0, 20)) { + console.log(` - ${w}`); + } + if (result.warnings.length > 20) { + console.log(` … ${result.warnings.length - 20} more`); + } + } console.log(''); } diff --git a/src/lib/dedup/migration-apply.ts b/src/lib/dedup/migration-apply.ts new file mode 100644 index 0000000..a0c480e --- /dev/null +++ b/src/lib/dedup/migration-apply.ts @@ -0,0 +1,336 @@ +/** + * Apply phase for the legacy NocoDB → CRM migration. Walks a + * `MigrationPlan` produced by {@link transformSnapshot} and writes + * the new client / contact / address / yacht / interest rows into the + * target port. + * + * Idempotent: every insert is guarded by a `migration_source_links` + * lookup keyed on `(source_system, source_id, target_entity_type)`, so + * a partial failure can be resumed by re-running the script. Re-runs + * against an already-applied plan are a near-no-op. + * + * Per-entity transactions (not one giant transaction) — the design + * favours visible partial progress on failure over all-or-nothing. + * + * @see src/lib/dedup/migration-transform.ts for the input shape. + * @see src/lib/db/schema/migration.ts for the idempotency ledger. + */ + +import { and, eq, inArray } from 'drizzle-orm'; + +import { db } from '@/lib/db'; +import { clients, clientContacts, clientAddresses } from '@/lib/db/schema/clients'; +import { interests } from '@/lib/db/schema/interests'; +import { yachts } from '@/lib/db/schema/yachts'; +import { berths } from '@/lib/db/schema/berths'; +import { migrationSourceLinks } from '@/lib/db/schema/migration'; +import type { MigrationPlan, PlannedClient, PlannedInterest } from './migration-transform'; + +const SOURCE_SYSTEM = 'nocodb_interests'; + +export interface ApplyResult { + applyId: string; + clientsInserted: number; + clientsSkipped: number; + contactsInserted: number; + addressesInserted: number; + yachtsInserted: number; + interestsInserted: number; + interestsSkipped: number; + warnings: string[]; +} + +export interface ApplyOptions { + port: { id: string; slug: string }; + applyId: string; + /** Set to true for the "preview the writes" mode — runs every read but + * rolls back inserts. Useful for verifying mappings before committing. */ + rehearsal?: boolean; + appliedBy?: string; +} + +/** + * Look up an existing migration link for a (sourceId, targetType) pair. + * Returns the existing target entity id if already linked. + */ +async function resolveExistingLink( + sourceId: number, + targetEntityType: 'client' | 'interest' | 'yacht' | 'address', +): Promise { + const rows = await db + .select({ id: migrationSourceLinks.targetEntityId }) + .from(migrationSourceLinks) + .where( + and( + eq(migrationSourceLinks.sourceSystem, SOURCE_SYSTEM), + eq(migrationSourceLinks.sourceId, String(sourceId)), + eq(migrationSourceLinks.targetEntityType, targetEntityType), + ), + ) + .limit(1); + return rows[0]?.id ?? null; +} + +/** Find the first sourceId in a cluster that's already linked to a client, + * if any. The cluster might be larger than the previously-applied set if + * the dedup algorithm collapsed an extra duplicate this run. */ +async function resolveExistingClusterClient(sourceIds: number[]): Promise { + if (sourceIds.length === 0) return null; + const rows = await db + .select({ id: migrationSourceLinks.targetEntityId }) + .from(migrationSourceLinks) + .where( + and( + eq(migrationSourceLinks.sourceSystem, SOURCE_SYSTEM), + inArray(migrationSourceLinks.sourceId, sourceIds.map(String)), + eq(migrationSourceLinks.targetEntityType, 'client'), + ), + ) + .limit(1); + return rows[0]?.id ?? null; +} + +/** Apply a single PlannedClient — returns `{clientId, inserted}` so the + * caller can wire interests against the (possibly pre-existing) record. */ +async function applyClient( + planned: PlannedClient, + opts: ApplyOptions, + result: ApplyResult, +): Promise<{ clientId: string; inserted: boolean }> { + // Idempotency: if any source row in the cluster already mapped to a client, + // reuse that record. + const existing = await resolveExistingClusterClient(planned.sourceIds); + if (existing) { + result.clientsSkipped += 1; + return { clientId: existing, inserted: false }; + } + + if (opts.rehearsal) { + // Simulate an insert without writing — used for the preview path. + return { clientId: `rehearsal-${planned.tempId}`, inserted: true }; + } + + // surnameToken is on the planned object (used by the dedup blocking + // index inside the transform) but not in the clients schema — runtime + // dedup re-derives it from fullName when needed. Drop it on insert. + const [inserted] = await db + .insert(clients) + .values({ + portId: opts.port.id, + fullName: planned.fullName, + nationalityIso: planned.countryIso ?? null, + preferredContactMethod: planned.preferredContactMethod ?? null, + source: planned.source ?? null, + }) + .returning({ id: clients.id }); + + if (!inserted) throw new Error('Client insert returned no row'); + const clientId = inserted.id; + + // Record idempotency links — one per source row in the cluster. + await db.insert(migrationSourceLinks).values( + planned.sourceIds.map((sid) => ({ + sourceSystem: SOURCE_SYSTEM, + sourceId: String(sid), + targetEntityType: 'client' as const, + targetEntityId: clientId, + appliedId: opts.applyId, + ...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}), + })), + ); + + // Contacts: bulk insert; mark first email + first phone as primary. + if (planned.contacts.length > 0) { + let primaryEmailSet = false; + let primaryPhoneSet = false; + const contactRows = planned.contacts.map((ct) => { + let isPrimary = false; + if (ct.isPrimary) { + if (ct.channel === 'email' && !primaryEmailSet) { + isPrimary = true; + primaryEmailSet = true; + } else if ((ct.channel === 'phone' || ct.channel === 'whatsapp') && !primaryPhoneSet) { + isPrimary = true; + primaryPhoneSet = true; + } + } + return { + clientId, + channel: ct.channel, + value: ct.value, + valueE164: ct.valueE164 ?? null, + valueCountry: ct.valueCountry ?? null, + isPrimary, + }; + }); + await db.insert(clientContacts).values(contactRows); + result.contactsInserted += contactRows.length; + } + + // Addresses: bulk insert; first is marked primary if multiple. Note the + // schema requires portId on every address row in addition to clientId. + if (planned.addresses.length > 0) { + const addressRows = planned.addresses.map((a, idx) => ({ + clientId, + portId: opts.port.id, + streetAddress: a.streetAddress ?? null, + city: a.city ?? null, + countryIso: a.countryIso ?? null, + isPrimary: idx === 0, + })); + await db.insert(clientAddresses).values(addressRows); + result.addressesInserted += addressRows.length; + } + + result.clientsInserted += 1; + return { clientId, inserted: true }; +} + +/** Apply a single PlannedInterest — looks up its client + berth + yacht and + * inserts the interest row, plus a yacht stub if a yacht name is present. */ +async function applyInterest( + planned: PlannedInterest, + tempIdToClientId: Map, + mooringToBerthId: Map, + opts: ApplyOptions, + result: ApplyResult, +): Promise { + // Idempotency: skip if this source row already created an interest. + const existing = await resolveExistingLink(planned.sourceId, 'interest'); + if (existing) { + result.interestsSkipped += 1; + return; + } + + const clientId = tempIdToClientId.get(planned.clientTempId); + if (!clientId) { + result.warnings.push( + `Interest source=${planned.sourceId} references unknown client tempId=${planned.clientTempId} — skipped`, + ); + return; + } + + let berthId: string | null = null; + if (planned.berthMooringNumber) { + berthId = mooringToBerthId.get(planned.berthMooringNumber) ?? null; + if (!berthId) { + result.warnings.push( + `Interest source=${planned.sourceId} references unknown mooring="${planned.berthMooringNumber}" — interest created without berth link`, + ); + } + } + + // Optional yacht stub: if the legacy row had a yacht name, create a + // minimal yacht record owned by the client. The new schema requires + // currentOwnerType + currentOwnerId. + let yachtId: string | null = null; + if (planned.yachtName) { + const existingYacht = await resolveExistingLink(planned.sourceId, 'yacht'); + if (existingYacht) { + yachtId = existingYacht; + } else if (!opts.rehearsal) { + const [y] = await db + .insert(yachts) + .values({ + portId: opts.port.id, + name: planned.yachtName, + currentOwnerType: 'client', + currentOwnerId: clientId, + status: 'active', + }) + .returning({ id: yachts.id }); + if (y) { + yachtId = y.id; + await db.insert(migrationSourceLinks).values({ + sourceSystem: SOURCE_SYSTEM, + sourceId: String(planned.sourceId), + targetEntityType: 'yacht' as const, + targetEntityId: y.id, + appliedId: opts.applyId, + ...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}), + }); + result.yachtsInserted += 1; + } + } + } + + if (opts.rehearsal) { + result.interestsInserted += 1; + return; + } + + const [iRow] = await db + .insert(interests) + .values({ + portId: opts.port.id, + clientId, + berthId, + yachtId, + pipelineStage: planned.pipelineStage, + leadCategory: planned.leadCategory, + source: planned.source, + notes: planned.notes, + documensoId: planned.documensoId, + dateEoiSent: planned.dateEoiSent ? new Date(planned.dateEoiSent) : null, + dateEoiSigned: planned.dateEoiSigned ? new Date(planned.dateEoiSigned) : null, + dateContractSent: planned.dateContractSent ? new Date(planned.dateContractSent) : null, + dateContractSigned: planned.dateContractSigned ? new Date(planned.dateContractSigned) : null, + dateDepositReceived: planned.dateDepositReceived + ? new Date(planned.dateDepositReceived) + : null, + dateLastContact: planned.dateLastContact ? new Date(planned.dateLastContact) : null, + }) + .returning({ id: interests.id }); + + if (!iRow) throw new Error('Interest insert returned no row'); + + await db.insert(migrationSourceLinks).values({ + sourceSystem: SOURCE_SYSTEM, + sourceId: String(planned.sourceId), + targetEntityType: 'interest' as const, + targetEntityId: iRow.id, + appliedId: opts.applyId, + ...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}), + }); + + result.interestsInserted += 1; +} + +/** + * Top-level apply driver. Walks the plan once, building the + * tempId→clientId map as it goes, then walks interests with that map. + */ +export async function applyPlan(plan: MigrationPlan, opts: ApplyOptions): Promise { + const result: ApplyResult = { + applyId: opts.applyId, + clientsInserted: 0, + clientsSkipped: 0, + contactsInserted: 0, + addressesInserted: 0, + yachtsInserted: 0, + interestsInserted: 0, + interestsSkipped: 0, + warnings: [], + }; + + // 1. Clients (and their contacts/addresses) + const tempIdToClientId = new Map(); + for (const planned of plan.clients) { + const { clientId } = await applyClient(planned, opts, result); + tempIdToClientId.set(planned.tempId, clientId); + } + + // 2. Build mooring→berthId lookup once, scoped to this port. + const berthRows = await db + .select({ id: berths.id, mooringNumber: berths.mooringNumber }) + .from(berths) + .where(eq(berths.portId, opts.port.id)); + const mooringToBerthId = new Map(berthRows.map((b) => [b.mooringNumber, b.id])); + + // 3. Interests (and yacht stubs) + for (const planned of plan.interests) { + await applyInterest(planned, tempIdToClientId, mooringToBerthId, opts, result); + } + + return result; +}