diff --git a/scripts/migration/connect-berth-links.ts b/scripts/migration/connect-berth-links.ts new file mode 100644 index 00000000..e07bbc49 --- /dev/null +++ b/scripts/migration/connect-berth-links.ts @@ -0,0 +1,143 @@ +/** + * Fix-up: connect the multi-berth links the main pipeline missed. + * + * The dedup pipeline migrated only each interest's single `Berth Number` text + * field; the legacy `_nc_m2m_Berths_Interests` junction (multi-berth deals) was + * not carried over. This reads that junction from the `nocodb_legacy` snapshot, + * resolves each legacy interest → its migrated interest (via the ledger) and + * each mooring → the migrated berth, and inserts the missing `interest_berths` + * rows. + * + * Idempotent: `ON CONFLICT (interest_id, berth_id) DO NOTHING`. Primary safety: + * only makes a berth primary when the interest has no primary yet (≤1 primary + * per interest is a partial unique index). + * + * pnpm tsx scripts/migration/connect-berth-links.ts [--port-slug port-nimara] + */ +import 'dotenv/config'; +import { randomUUID } from 'node:crypto'; +import postgres from 'postgres'; + +const slugArg = (() => { + const i = process.argv.indexOf('--port-slug'); + return i >= 0 ? (process.argv[i + 1] ?? 'port-nimara') : 'port-nimara'; +})(); +const DRY = process.argv.includes('--dry-run'); + +const CRM_URL = process.env.DATABASE_URL!; +const LEGACY_URL = process.env.LEGACY_DB_URL ?? CRM_URL.replace(/\/[^/]+$/, '/nocodb_legacy'); +const crm = postgres(CRM_URL, { max: 4 }); +const legacy = postgres(LEGACY_URL, { max: 4 }); + +const canonMoo = (raw: string): string => { + const m = /^([A-Za-z]+)-?0*(\d+)$/.exec((raw ?? '').trim()); + return m ? `${m[1]!.toUpperCase()}${parseInt(m[2]!, 10)}` : (raw ?? '').trim(); +}; + +async function main() { + const [port] = await crm`select id from ports where slug=${slugArg} limit 1`; + if (!port) throw new Error(`no port ${slugArg}`); + const portId = port.id as string; + + // legacy junction: interestId → set(moorings) (+ the Berth_Number text field) + const mooById = new Map(); + for (const b of await legacy`select id, "Mooring_Number" m from plplouets5zw1um."Berths"`) + mooById.set(b.id as number, canonMoo(b.m as string)); + const legacyMoo = new Map>(); + for (const j of await legacy`select "Interests_id" i, "Berths_id" b from plplouets5zw1um."_nc_m2m_Berths_Interests"`) { + const set = legacyMoo.get(j.i as number) ?? new Set(); + const m = mooById.get(j.b as number); + if (m) set.add(m); + legacyMoo.set(j.i as number, set); + } + // EOI-signed flag per legacy interest (for is_in_eoi_bundle) + const signed = new Set(); + for (const r of await legacy`select id, "EOI_Status" e, "LOI_NDA_Document" l from plplouets5zw1um."Interests"`) { + const e = ((r.e as string) ?? '').trim(); + const l = ((r.l as string) ?? '').trim(); + if ( + e === 'Signed' || + ['Signing Complete', 'Signed by Client', 'Signed by Developer'].includes(l) + ) + signed.add(r.id as number); + } + + // ledger: legacy interest id → new interest id + const links = + await crm`select source_id, target_entity_id from migration_source_links where source_system='nocodb_interests' and target_entity_type='interest'`; + const newInterestBySrc = new Map( + links.map((l) => [Number(l.source_id), l.target_entity_id as string]), + ); + + // CRM berth id by mooring (this port) + const berthByMoo = new Map( + (await crm`select id, mooring_number m from berths where port_id=${portId}`).map((b) => [ + b.m as string, + b.id as string, + ]), + ); + + let inserted = 0; + let madePrimary = 0; + let skipped = 0; + const unresolved: string[] = []; + + for (const [legacyId, moorings] of legacyMoo) { + const interestId = newInterestBySrc.get(legacyId); + if (!interestId) continue; // not a migrated interest (backup/copy tables) + // does this interest already have a primary? + const primaryCheck = + await crm`select exists(select 1 from interest_berths where interest_id=${interestId} and is_primary) as has`; + let hasPrimary = (primaryCheck[0]?.has as boolean | undefined) ?? false; + + for (const moo of moorings) { + const berthId = berthByMoo.get(moo); + if (!berthId) { + unresolved.push(`${legacyId}:${moo}`); + continue; + } + const makePrimary = !hasPrimary; + if (DRY) { + inserted++; + if (makePrimary) { + madePrimary++; + hasPrimary = true; + } + continue; + } + const res = await crm` + insert into interest_berths (id, interest_id, berth_id, is_primary, is_specific_interest, is_in_eoi_bundle) + values (${randomUUID()}, ${interestId}, ${berthId}, ${makePrimary}, true, ${signed.has(legacyId)}) + on conflict (interest_id, berth_id) do nothing + returning id`; + if (res.length > 0) { + inserted++; + if (makePrimary) { + madePrimary++; + hasPrimary = true; + } + } else { + skipped++; + } + } + } + + console.log( + `connect-berth-links ${DRY ? '(DRY)' : ''}: inserted ${inserted} links (${madePrimary} new primary), ${skipped} already linked`, + ); + if (unresolved.length) + console.log( + ` ⚠ ${unresolved.length} moorings with no CRM berth: ${unresolved.slice(0, 20).join(', ')}`, + ); + + await crm.end(); + await legacy.end(); + process.exit(0); +} + +main().catch(async (e) => { + console.error('connect-berth-links failed:', e); + await crm.end().catch(() => {}); + await legacy.end().catch(() => {}); + process.exit(1); +}); diff --git a/scripts/migration/reconcile-migration.ts b/scripts/migration/reconcile-migration.ts new file mode 100644 index 00000000..0c5beda6 --- /dev/null +++ b/scripts/migration/reconcile-migration.ts @@ -0,0 +1,277 @@ +/** + * Exhaustive migration reconciliation (read-only): cross-checks EVERY migrated + * record against its legacy NocoDB source row (via the migration ledger) and + * verifies every relationship is connected. Independently re-derives the + * expected mapped values (stage, eoiStatus, berth, …) so it validates the + * migration logic, not just echoes it. + * + * Connects to BOTH local DBs: + * - CRM : DATABASE_URL (the migrated data) + * - legacy : LEGACY_DB_URL (the nocodb_legacy snapshot); defaults to the + * CRM url with the db name swapped to `nocodb_legacy`. + * + * pnpm tsx scripts/migration/reconcile-migration.ts [--port-slug port-nimara] + */ +import 'dotenv/config'; +import postgres from 'postgres'; + +const slugArg = (() => { + const i = process.argv.indexOf('--port-slug'); + return i >= 0 ? (process.argv[i + 1] ?? 'port-nimara') : 'port-nimara'; +})(); + +const CRM_URL = process.env.DATABASE_URL!; +const LEGACY_URL = process.env.LEGACY_DB_URL ?? CRM_URL.replace(/\/[^/]+$/, '/nocodb_legacy'); +const crm = postgres(CRM_URL, { max: 4 }); +const legacy = postgres(LEGACY_URL, { max: 4 }); + +// ── transforms, re-implemented independently (cross-validation) ────────────── +const STAGE_MAP: Record = { + 'General Qualified Interest': 'qualified', + 'Specific Qualified Interest': 'nurturing', + 'EOI and NDA Sent': 'eoi', + 'Signed EOI and NDA': 'eoi', + 'Made Reservation': 'reservation', + 'Contract Negotiation': 'contract', + 'Contract Negotiations Finalized': 'contract', + 'Contract Signed': 'contract', +}; +const expectStage = (level: string | undefined, deposit: string | undefined): string => { + let s = STAGE_MAP[(level ?? '').trim()] ?? 'enquiry'; + if ((deposit ?? '').trim() === 'Received' && s !== 'contract') s = 'deposit_paid'; + return s; +}; +const expectEoi = ( + eoiStatus: string | undefined, + loi: string | undefined, + documensoId: string | undefined, +): string | null => { + const e = (eoiStatus ?? '').trim(); + const l = (loi ?? '').trim(); + if (e === 'Signed' || ['Signing Complete', 'Signed by Client', 'Signed by Developer'].includes(l)) + return 'signed'; + if (e === 'Waiting for Signatures' || (documensoId ?? '').trim()) return 'waiting_for_signatures'; + return null; +}; +const canonMoo = (raw: string): string => { + const m = /^([A-Za-z]+)-?0*(\d+)$/.exec((raw ?? '').trim()); + return m ? `${m[1]!.toUpperCase()}${parseInt(m[2]!, 10)}` : (raw ?? '').trim(); +}; +const normEmail = (e: string) => (e ?? '').trim().toLowerCase(); + +const issues: string[] = []; +const add = (cat: string, msg: string) => issues.push(`[${cat}] ${msg}`); + +async function main() { + const [port] = await crm`select id, slug from ports where slug=${slugArg} limit 1`; + if (!port) throw new Error(`no port ${slugArg}`); + const portId = port.id as string; + + // ── load legacy source (by id) ─────────────────────────────────────────── + const legacyInterests = new Map>(); + for (const r of await legacy`select * from plplouets5zw1um."Interests"`) + legacyInterests.set(r.id as number, r); + const legacyExpenses = new Map>(); + for (const r of await legacy`select * from p3hq2fxdevqcaq8."Expenses"`) + legacyExpenses.set(r.id as number, r); + const legacyRes = new Map>(); + for (const r of await legacy`select * from plplouets5zw1um."Interests (Residences)"`) + legacyRes.set(r.id as number, r); + // legacy berth links per interest (Interests_id -> [mooring]) + const berthMooById = new Map(); + for (const b of await legacy`select id, "Mooring_Number" m from plplouets5zw1um."Berths"`) + berthMooById.set(b.id as number, b.m as string); + const legacyBerthsByInterest = new Map(); + for (const j of await legacy`select "Interests_id" i, "Berths_id" b from plplouets5zw1um."_nc_m2m_Berths_Interests"`) { + const arr = legacyBerthsByInterest.get(j.i as number) ?? []; + const moo = berthMooById.get(j.b as number); + if (moo) arr.push(canonMoo(moo)); + legacyBerthsByInterest.set(j.i as number, arr); + } + + // ── ledger ──────────────────────────────────────────────────────────────── + const ledger = + await crm`select source_system, source_id, target_entity_type, target_entity_id from migration_source_links`; + const interestLinks = ledger.filter((l) => l.target_entity_type === 'interest'); // sourceId(legacy interest) -> new interest + const expenseLinks = ledger.filter((l) => l.target_entity_type === 'expense'); + const resLinks = ledger.filter((l) => l.target_entity_type === 'residential_client'); + const clientLinks = ledger.filter((l) => l.target_entity_type === 'client'); + + // ── 1. COVERAGE — every legacy row migrated; nothing dropped ────────────── + const migratedInterestSrc = new Set(interestLinks.map((l) => Number(l.source_id))); + const droppedInterests = [...legacyInterests.keys()].filter((id) => !migratedInterestSrc.has(id)); + const migratedExpSrc = new Set(expenseLinks.map((l) => Number(l.source_id))); + const droppedExp = [...legacyExpenses.keys()].filter((id) => !migratedExpSrc.has(id)); + const migratedResSrc = new Set(resLinks.map((l) => Number(l.source_id))); + const droppedRes = [...legacyRes.keys()].filter((id) => !migratedResSrc.has(id)); + for (const id of droppedInterests) + add( + 'COVERAGE', + `legacy interest #${id} NOT migrated (${(legacyInterests.get(id) as { Full_Name?: string }).Full_Name ?? '?'})`, + ); + for (const id of droppedExp) add('COVERAGE', `legacy expense #${id} NOT migrated`); + for (const id of droppedRes) add('COVERAGE', `legacy residential #${id} NOT migrated`); + + // ── 2. INTEREST field fidelity (every migrated deal vs legacy) ──────────── + const newInterests = await crm` + select i.id, i.pipeline_stage, i.lead_category, i.source, i.eoi_status, i.documenso_id, i.client_id, i.yacht_id + from interests i where i.port_id=${portId}`; + const newInterestById = new Map(newInterests.map((i) => [i.id as string, i])); + // berths per new interest + const ibRows = await crm` + select ib.interest_id, b.mooring_number from interest_berths ib join berths b on b.id=ib.berth_id where b.port_id=${portId}`; + const newBerthsByInterest = new Map(); + for (const r of ibRows) { + const a = newBerthsByInterest.get(r.interest_id as string) ?? []; + a.push(r.mooring_number as string); + newBerthsByInterest.set(r.interest_id as string, a); + } + let stageMiss = 0, + eoiMiss = 0, + docMiss = 0, + berthMiss = 0; + for (const l of interestLinks) { + const legacyRow = legacyInterests.get(Number(l.source_id)); + const ni = newInterestById.get(l.target_entity_id as string); + if (!legacyRow || !ni) { + add( + 'INTEGRITY', + `interest link sourceId=${l.source_id} → ${l.target_entity_id}: ${!legacyRow ? 'legacy row missing' : 'new interest missing'}`, + ); + continue; + } + const lr = legacyRow as Record; + const exp = expectStage(lr.Sales_Process_Level, lr.Deposit_10__Status); + if (ni.pipeline_stage !== exp) { + stageMiss++; + add( + 'STAGE', + `interest src#${l.source_id} (${lr.Full_Name}): legacy "${lr.Sales_Process_Level}" → expected ${exp}, got ${ni.pipeline_stage}`, + ); + } + const expEoi = expectEoi(lr.EOI_Status, lr.LOI_NDA_Document, lr.documensoID); + if ((ni.eoi_status ?? null) !== expEoi) { + eoiMiss++; + add( + 'EOI', + `interest src#${l.source_id} (${lr.Full_Name}): expected eoiStatus ${expEoi}, got ${ni.eoi_status}`, + ); + } + if ((ni.documenso_id ?? null) !== ((lr.documensoID ?? '').trim() || null)) { + docMiss++; + add( + 'DOCID', + `interest src#${l.source_id} (${lr.Full_Name}): documensoId legacy="${lr.documensoID}" vs new="${ni.documenso_id}"`, + ); + } + // berth: every legacy-linked mooring should be present on the new interest + const legacyMoo = new Set([...(legacyBerthsByInterest.get(Number(l.source_id)) ?? [])]); + if (lr.Berth_Number && /^[A-Za-z]+-?0*\d+$/.test(lr.Berth_Number.trim())) + legacyMoo.add(canonMoo(lr.Berth_Number)); + const newMoo = new Set(newBerthsByInterest.get(ni.id as string) ?? []); + const missingBerths = [...legacyMoo].filter((m) => !newMoo.has(m)); + if (missingBerths.length > 0) { + berthMiss++; + add( + 'BERTH', + `interest src#${l.source_id} (${lr.Full_Name}): legacy berths [${[...legacyMoo].join(',')}] but new has [${[...newMoo].join(',') || '-'}] (missing ${missingBerths.join(',')})`, + ); + } + } + + // ── 3. CLIENT contact fidelity (migrated email is from a legacy source row) + const clientContacts = await crm` + select c.id, c.full_name, string_agg(cc.value, '|') filter (where cc.channel='email') emails + from clients c left join client_contacts cc on cc.client_id=c.id + where c.port_id=${portId} group by c.id, c.full_name`; + const emailsByClient = new Map( + clientContacts.map((c) => [ + c.id as string, + (c.emails as string | null)?.split('|').map(normEmail) ?? [], + ]), + ); + // group ledger client links: client -> its legacy source emails + const legacyEmailsByClient = new Map>(); + for (const l of clientLinks) { + const lr = legacyInterests.get(Number(l.source_id)) as Record | undefined; + const e = normEmail(lr?.Email_Address ?? ''); + if (!e) continue; + const set = legacyEmailsByClient.get(l.target_entity_id as string) ?? new Set(); + set.add(e); + legacyEmailsByClient.set(l.target_entity_id as string, set); + } + let emailMiss = 0; + for (const [cid, legacyEmails] of legacyEmailsByClient) { + const newEmails = new Set(emailsByClient.get(cid) ?? []); + const missing = [...legacyEmails].filter((e) => !newEmails.has(e)); + if (missing.length > 0) { + emailMiss++; + const nm = clientContacts.find((c) => c.id === cid)?.full_name; + add( + 'EMAIL', + `client ${nm}: legacy email(s) [${[...legacyEmails].join(',')}] not all on client (have [${[...newEmails].join(',') || '-'}])`, + ); + } + } + + // ── 4. RELATIONSHIP integrity (orphans / dangling FKs) ──────────────────── + const orphanInterests = + await crm`select count(*) n from interests i where i.port_id=${portId} and not exists (select 1 from clients c where c.id=i.client_id)`; + const orphanIB = + await crm`select count(*) n from interest_berths ib where not exists (select 1 from interests i where i.id=ib.interest_id) or not exists (select 1 from berths b where b.id=ib.berth_id)`; + const orphanDocs = + await crm`select count(*) n from documents d where d.port_id=${portId} and d.interest_id is not null and not exists (select 1 from interests i where i.id=d.interest_id)`; + const orphanYachts = + await crm`select count(*) n from yachts y where y.port_id=${portId} and y.current_owner_type='client' and not exists (select 1 from clients c where c.id=y.current_owner_id)`; + const danglingSignedFile = + await crm`select count(*) n from documents d where d.signed_file_id is not null and not exists (select 1 from files f where f.id=d.signed_file_id)`; + if (Number(orphanInterests[0]!.n) > 0) + add('INTEGRITY', `${orphanInterests[0]!.n} interests with no client`); + if (Number(orphanIB[0]!.n) > 0) + add('INTEGRITY', `${orphanIB[0]!.n} interest_berths with dangling FK`); + if (Number(orphanDocs[0]!.n) > 0) + add('INTEGRITY', `${orphanDocs[0]!.n} documents with dangling interest`); + if (Number(orphanYachts[0]!.n) > 0) + add('INTEGRITY', `${orphanYachts[0]!.n} yachts with missing owner`); + if (Number(danglingSignedFile[0]!.n) > 0) + add('INTEGRITY', `${danglingSignedFile[0]!.n} documents with dangling signed_file_id`); + + // ── report ──────────────────────────────────────────────────────────────── + console.log('═══════════ MIGRATION RECONCILIATION ═══════════\n'); + console.log( + `Coverage: legacy interests ${legacyInterests.size} → migrated ${migratedInterestSrc.size} (dropped ${droppedInterests.length})`, + ); + console.log( + ` legacy expenses ${legacyExpenses.size} → migrated ${migratedExpSrc.size} (dropped ${droppedExp.length})`, + ); + console.log( + ` legacy residential ${legacyRes.size} → migrated ${migratedResSrc.size} (dropped ${droppedRes.length})`, + ); + console.log( + `Fidelity: stage mismatches ${stageMiss} · eoiStatus ${eoiMiss} · documensoId ${docMiss} · berth-link ${berthMiss} · client-email ${emailMiss}`, + ); + console.log( + `Integrity: orphan interests ${orphanInterests[0]!.n} · interest_berths ${orphanIB[0]!.n} · docs ${orphanDocs[0]!.n} · yachts ${orphanYachts[0]!.n} · signed-file ${danglingSignedFile[0]!.n}`, + ); + console.log(`\nTotal discrepancies: ${issues.length}`); + const byCat = issues.reduce>((a, s) => { + const c = s.slice(1, s.indexOf(']')); + a[c] = (a[c] || 0) + 1; + return a; + }, {}); + console.log('By category:', JSON.stringify(byCat)); + console.log('\n── discrepancy detail (first 60) ──'); + for (const i of issues.slice(0, 60)) console.log(' ' + i); + if (issues.length > 60) console.log(` … +${issues.length - 60} more`); + + await crm.end(); + await legacy.end(); + process.exit(0); +} + +main().catch(async (e) => { + console.error('reconcile failed:', e); + await crm.end().catch(() => {}); + await legacy.end().catch(() => {}); + process.exit(1); +});