/** * Apply phase for the legacy NocoDB → CRM migration. Walks a * `MigrationPlan` produced by {@link transformSnapshot} and writes * the new client / contact / address / yacht / interest rows into the * target port. * * Idempotent: every insert is guarded by a `migration_source_links` * lookup keyed on `(source_system, source_id, target_entity_type)`, so * a partial failure can be resumed by re-running the script. Re-runs * against an already-applied plan are a near-no-op. * * Per-entity transactions (not one giant transaction) - the design * favours visible partial progress on failure over all-or-nothing. * * @see src/lib/dedup/migration-transform.ts for the input shape. * @see src/lib/db/schema/migration.ts for the idempotency ledger. */ import { and, eq, inArray } from 'drizzle-orm'; import { db } from '@/lib/db'; import { clients, clientContacts, clientAddresses } from '@/lib/db/schema/clients'; import { interests, interestBerths } from '@/lib/db/schema/interests'; import { yachts } from '@/lib/db/schema/yachts'; import { berths } from '@/lib/db/schema/berths'; import { documents, documentSigners } from '@/lib/db/schema/documents'; import { residentialClients, residentialInterests } from '@/lib/db/schema/residential'; import { migrationSourceLinks } from '@/lib/db/schema/migration'; import type { MigrationPlan, PlannedClient, PlannedDocument, PlannedInterest, PlannedResidentialClient, } from './migration-transform'; const SOURCE_SYSTEM = 'nocodb_interests'; /** * Canonicalize a mooring string to the unified form ("A1", "D32", "E18") * used by the new berths schema after the Phase 0 mooring-number sweep. * Accepts inputs in any of the historical forms - bare ("D32"), dashed * ("D-32"), or dashed-padded ("D-032") - and returns the canonical bare * form. Inputs that don't match (multi-mooring strings like "A3, D30", * non-numeric suffixes like "B-LEG") are returned unchanged so a literal * lookup can still hit and the caller can flag the row for review. */ function normalizeLegacyMooring(raw: string): string { const m = /^([A-Z]+)-?0*(\d+)$/i.exec(raw.trim()); if (!m) return raw; return `${m[1]!.toUpperCase()}${parseInt(m[2]!, 10)}`; } export interface ApplyResult { applyId: string; clientsInserted: number; clientsSkipped: number; contactsInserted: number; addressesInserted: number; yachtsInserted: number; interestsInserted: number; interestsSkipped: number; documentsInserted: number; documentsSkipped: number; documentSignersInserted: number; residentialClientsInserted: number; residentialClientsSkipped: number; residentialInterestsInserted: number; warnings: string[]; } export interface ApplyOptions { port: { id: string; slug: string }; applyId: string; /** Set to true for the "preview the writes" mode - runs every read but * rolls back inserts. Useful for verifying mappings before committing. */ rehearsal?: boolean; appliedBy?: string; } /** * Look up an existing migration link for a (sourceId, targetType) pair. * Returns the existing target entity id if already linked. */ async function resolveExistingLink( sourceId: number, targetEntityType: | 'client' | 'interest' | 'yacht' | 'address' | 'document' | 'residential_client' | 'residential_interest', ): Promise { const rows = await db .select({ id: migrationSourceLinks.targetEntityId }) .from(migrationSourceLinks) .where( and( eq(migrationSourceLinks.sourceSystem, SOURCE_SYSTEM), eq(migrationSourceLinks.sourceId, String(sourceId)), eq(migrationSourceLinks.targetEntityType, targetEntityType), ), ) .limit(1); return rows[0]?.id ?? null; } /** Find the first sourceId in a cluster that's already linked to a client, * if any. The cluster might be larger than the previously-applied set if * the dedup algorithm collapsed an extra duplicate this run. */ async function resolveExistingClusterClient(sourceIds: number[]): Promise { if (sourceIds.length === 0) return null; const rows = await db .select({ id: migrationSourceLinks.targetEntityId }) .from(migrationSourceLinks) .where( and( eq(migrationSourceLinks.sourceSystem, SOURCE_SYSTEM), inArray(migrationSourceLinks.sourceId, sourceIds.map(String)), eq(migrationSourceLinks.targetEntityType, 'client'), ), ) .limit(1); return rows[0]?.id ?? null; } /** Apply a single PlannedClient - returns `{clientId, inserted}` so the * caller can wire interests against the (possibly pre-existing) record. */ async function applyClient( planned: PlannedClient, opts: ApplyOptions, result: ApplyResult, ): Promise<{ clientId: string; inserted: boolean }> { // Idempotency: if any source row in the cluster already mapped to a client, // reuse that record. const existing = await resolveExistingClusterClient(planned.sourceIds); if (existing) { result.clientsSkipped += 1; return { clientId: existing, inserted: false }; } if (opts.rehearsal) { // Simulate an insert without writing - used for the preview path. return { clientId: `rehearsal-${planned.tempId}`, inserted: true }; } // surnameToken is on the planned object (used by the dedup blocking // index inside the transform) but not in the clients schema - runtime // dedup re-derives it from fullName when needed. Drop it on insert. const [inserted] = await db .insert(clients) .values({ portId: opts.port.id, fullName: planned.fullName, nationalityIso: planned.countryIso ?? null, preferredContactMethod: planned.preferredContactMethod ?? null, source: planned.source ?? null, }) .returning({ id: clients.id }); if (!inserted) throw new Error('Client insert returned no row'); const clientId = inserted.id; // Record idempotency links - one per source row in the cluster. await db.insert(migrationSourceLinks).values( planned.sourceIds.map((sid) => ({ sourceSystem: SOURCE_SYSTEM, sourceId: String(sid), targetEntityType: 'client' as const, targetEntityId: clientId, appliedId: opts.applyId, ...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}), })), ); // Contacts: bulk insert; mark first email + first phone as primary. if (planned.contacts.length > 0) { let primaryEmailSet = false; let primaryPhoneSet = false; const contactRows = planned.contacts.map((ct) => { let isPrimary = false; if (ct.isPrimary) { if (ct.channel === 'email' && !primaryEmailSet) { isPrimary = true; primaryEmailSet = true; } else if ((ct.channel === 'phone' || ct.channel === 'whatsapp') && !primaryPhoneSet) { isPrimary = true; primaryPhoneSet = true; } } return { clientId, channel: ct.channel, value: ct.value, valueE164: ct.valueE164 ?? null, valueCountry: ct.valueCountry ?? null, isPrimary, }; }); await db.insert(clientContacts).values(contactRows); result.contactsInserted += contactRows.length; } // Addresses: bulk insert; first is marked primary if multiple. Note the // schema requires portId on every address row in addition to clientId. if (planned.addresses.length > 0) { const addressRows = planned.addresses.map((a, idx) => ({ clientId, portId: opts.port.id, streetAddress: a.streetAddress ?? null, city: a.city ?? null, countryIso: a.countryIso ?? null, isPrimary: idx === 0, })); await db.insert(clientAddresses).values(addressRows); result.addressesInserted += addressRows.length; } result.clientsInserted += 1; return { clientId, inserted: true }; } /** Apply a single PlannedInterest - looks up its client + berth + yacht and * inserts the interest row, plus a yacht stub if a yacht name is present. */ async function applyInterest( planned: PlannedInterest, tempIdToClientId: Map, mooringToBerthId: Map, opts: ApplyOptions, result: ApplyResult, ): Promise { // Idempotency: skip if this source row already created an interest. const existing = await resolveExistingLink(planned.sourceId, 'interest'); if (existing) { result.interestsSkipped += 1; return; } const clientId = tempIdToClientId.get(planned.clientTempId); if (!clientId) { result.warnings.push( `Interest source=${planned.sourceId} references unknown client tempId=${planned.clientTempId} - skipped`, ); return; } let berthId: string | null = null; if (planned.berthMooringNumber) { berthId = mooringToBerthId.get(planned.berthMooringNumber) ?? // Legacy NocoDB Interests rows recorded mooring strings inconsistently // ("D32", "D-32", "D-032"). The new berths schema stores the canonical // bare form ("D32") - canonicalize the lookup key as a fallback. mooringToBerthId.get(normalizeLegacyMooring(planned.berthMooringNumber)) ?? null; if (!berthId) { result.warnings.push( `Interest source=${planned.sourceId} references unknown mooring="${planned.berthMooringNumber}" - interest created without berth link`, ); } } // Optional yacht stub: if the legacy row had a yacht name, create a // minimal yacht record owned by the client. The new schema requires // currentOwnerType + currentOwnerId. let yachtId: string | null = null; if (planned.yachtName) { const existingYacht = await resolveExistingLink(planned.sourceId, 'yacht'); if (existingYacht) { yachtId = existingYacht; } else if (!opts.rehearsal) { const [y] = await db .insert(yachts) .values({ portId: opts.port.id, name: planned.yachtName, currentOwnerType: 'client', currentOwnerId: clientId, status: 'active', }) .returning({ id: yachts.id }); if (y) { yachtId = y.id; await db.insert(migrationSourceLinks).values({ sourceSystem: SOURCE_SYSTEM, sourceId: String(planned.sourceId), targetEntityType: 'yacht' as const, targetEntityId: y.id, appliedId: opts.applyId, ...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}), }); result.yachtsInserted += 1; } } } if (opts.rehearsal) { result.interestsInserted += 1; return; } const [iRow] = await db .insert(interests) .values({ portId: opts.port.id, clientId, yachtId, pipelineStage: planned.pipelineStage, leadCategory: planned.leadCategory, source: planned.source, documensoId: planned.documensoId, dateEoiSent: planned.dateEoiSent ? new Date(planned.dateEoiSent) : null, dateEoiSigned: planned.dateEoiSigned ? new Date(planned.dateEoiSigned) : null, dateContractSent: planned.dateContractSent ? new Date(planned.dateContractSent) : null, dateContractSigned: planned.dateContractSigned ? new Date(planned.dateContractSigned) : null, dateDepositReceived: planned.dateDepositReceived ? new Date(planned.dateDepositReceived) : null, dateLastContact: planned.dateLastContact ? new Date(planned.dateLastContact) : null, }) .returning({ id: interests.id }); if (!iRow) throw new Error('Interest insert returned no row'); // Plan §3.4: the legacy interests.berth_id column has been replaced by // the interest_berths junction. Materialise the legacy single-berth link // as a primary/specific row. is_in_eoi_bundle = true only when the // legacy row already had a signed EOI. if (berthId) { await db .insert(interestBerths) .values({ interestId: iRow.id, berthId, isPrimary: true, isSpecificInterest: true, isInEoiBundle: planned.dateEoiSigned != null, addedBy: opts.appliedBy ?? null, }) .onConflictDoNothing({ target: [interestBerths.interestId, interestBerths.berthId] }); } await db.insert(migrationSourceLinks).values({ sourceSystem: SOURCE_SYSTEM, sourceId: String(planned.sourceId), targetEntityType: 'interest' as const, targetEntityId: iRow.id, appliedId: opts.applyId, ...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}), }); result.interestsInserted += 1; } /** * Apply a single PlannedDocument - looks up the parent interest's id from * the migration ledger, materializes a documents row, and inserts the * signer rows. Idempotent via target_entity_type='document'. */ async function applyDocument( planned: PlannedDocument, tempIdToClientId: Map, opts: ApplyOptions, result: ApplyResult, ): Promise { const existing = await resolveExistingLink(planned.sourceId, 'document'); if (existing) { result.documentsSkipped += 1; return; } const interestId = await resolveExistingLink(planned.sourceId, 'interest'); if (!interestId) { result.warnings.push( `Document source=${planned.sourceId} cannot resolve parent interest - skipped (interest must apply first)`, ); return; } const clientId = tempIdToClientId.get(planned.clientTempId); if (!clientId) { result.warnings.push( `Document source=${planned.sourceId} references unknown client tempId=${planned.clientTempId} - skipped`, ); return; } if (opts.rehearsal) { result.documentsInserted += 1; result.documentSignersInserted += planned.signers.length; return; } const [docRow] = await db .insert(documents) .values({ portId: opts.port.id, interestId, clientId, documentType: planned.documentType, title: planned.title, status: planned.status, documensoId: planned.documensoId, isManualUpload: false, notes: planned.notes, createdBy: opts.appliedBy ?? 'migration', }) .returning({ id: documents.id }); if (!docRow) throw new Error('Document insert returned no row'); await db.insert(migrationSourceLinks).values({ sourceSystem: SOURCE_SYSTEM, sourceId: String(planned.sourceId), targetEntityType: 'document' as const, targetEntityId: docRow.id, appliedId: opts.applyId, ...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}), }); if (planned.signers.length > 0) { await db.insert(documentSigners).values( planned.signers.map((s) => ({ documentId: docRow.id, signerName: s.signerName, signerEmail: s.signerEmail, signerRole: s.signerRole, signingOrder: s.signingOrder, status: s.status, signedAt: s.signedAt ? new Date(s.signedAt) : null, signingUrl: s.signingUrl, embeddedUrl: s.embeddedUrl, })), ); result.documentSignersInserted += planned.signers.length; } result.documentsInserted += 1; } /** * Apply a single PlannedResidentialClient - creates a residential_clients * row plus a default residential_interests row at pipeline_stage='new' * so the lead surfaces in the residential funnel. Two ledger entries * record both targets. */ async function applyResidentialClient( planned: PlannedResidentialClient, opts: ApplyOptions, result: ApplyResult, ): Promise { const existingClient = await resolveExistingLink(planned.sourceId, 'residential_client'); if (existingClient) { result.residentialClientsSkipped += 1; return; } if (opts.rehearsal) { result.residentialClientsInserted += 1; result.residentialInterestsInserted += 1; return; } // Wrap the three writes in a transaction so a partial failure (e.g. the // residential_interests insert throws) does NOT leave an orphan // residential_clients row. Without the wrap, a later --apply re-run // would not see a ledger entry for the orphan and would happily insert // a duplicate residential_clients row. await db.transaction(async (tx) => { const [resClient] = await tx .insert(residentialClients) .values({ portId: opts.port.id, fullName: planned.fullName, email: planned.email, phone: planned.phoneE164, phoneE164: planned.phoneE164, phoneCountry: planned.phoneCountry, placeOfResidence: planned.placeOfResidence, placeOfResidenceCountryIso: planned.placeOfResidenceCountryIso, source: planned.source, notes: planned.notes, status: 'prospect', }) .returning({ id: residentialClients.id }); if (!resClient) throw new Error('Residential client insert returned no row'); const [resInterest] = await tx .insert(residentialInterests) .values({ portId: opts.port.id, residentialClientId: resClient.id, pipelineStage: 'new', source: planned.source, notes: planned.notes, dateFirstContact: planned.dateFirstContact ? new Date(planned.dateFirstContact) : null, dateLastContact: planned.dateFirstContact ? new Date(planned.dateFirstContact) : null, }) .returning({ id: residentialInterests.id }); if (!resInterest) throw new Error('Residential interest insert returned no row'); // Two ledger entries - one per target - both keyed on the same legacy // sourceId. Keeps re-runs idempotent on either target type. await tx.insert(migrationSourceLinks).values([ { sourceSystem: 'nocodb_residential_interests', sourceId: String(planned.sourceId), targetEntityType: 'residential_client' as const, targetEntityId: resClient.id, appliedId: opts.applyId, ...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}), }, { sourceSystem: 'nocodb_residential_interests', sourceId: String(planned.sourceId), targetEntityType: 'residential_interest' as const, targetEntityId: resInterest.id, appliedId: opts.applyId, ...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}), }, ]); }); result.residentialClientsInserted += 1; result.residentialInterestsInserted += 1; } /** * Top-level apply driver. Walks the plan once, building the * tempId→clientId map as it goes, then walks interests with that map. */ export async function applyPlan(plan: MigrationPlan, opts: ApplyOptions): Promise { const result: ApplyResult = { applyId: opts.applyId, clientsInserted: 0, clientsSkipped: 0, contactsInserted: 0, addressesInserted: 0, yachtsInserted: 0, interestsInserted: 0, interestsSkipped: 0, documentsInserted: 0, documentsSkipped: 0, documentSignersInserted: 0, residentialClientsInserted: 0, residentialClientsSkipped: 0, residentialInterestsInserted: 0, warnings: [], }; // 1. Clients (and their contacts/addresses) const tempIdToClientId = new Map(); for (const planned of plan.clients) { const { clientId } = await applyClient(planned, opts, result); tempIdToClientId.set(planned.tempId, clientId); } // 2. Build mooring→berthId lookup once, scoped to this port. const berthRows = await db .select({ id: berths.id, mooringNumber: berths.mooringNumber }) .from(berths) .where(eq(berths.portId, opts.port.id)); const mooringToBerthId = new Map(berthRows.map((b) => [b.mooringNumber, b.id])); // 3. Interests (and yacht stubs) for (const planned of plan.interests) { await applyInterest(planned, tempIdToClientId, mooringToBerthId, opts, result); } // 4. Documents (depend on interests being applied first - applyDocument // looks up the new interest_id via the migration ledger). for (const planned of plan.documents) { await applyDocument(planned, tempIdToClientId, opts, result); } // 5. Residential leads - independent domain, no dependency on the marina // apply phase. Each lead gets a residential_clients row + a default // residential_interests row. for (const planned of plan.residentialClients) { await applyResidentialClient(planned, opts, result); } return result; }