Files
pn-new-crm/src/lib/dedup/migration-apply.ts
Matt Ciaccio 05be89ec6f feat(berths): normalize mooring numbers to canonical form
Sweep CRM mooring numbers from the legacy hyphen+padded form ("A-01")
to the canonical bare form ("A1") used by NocoDB, the public website,
the per-berth PDFs, and the Documenso EOI templates. Drift was
introduced by the original load-berths-to-port-nimara.ts seed; this
gates the Phase 3 public-website cutover where /berths/A1 URLs would
404 against a CRM still storing "A-01".

- 0024 data migration: idempotent regexp_replace + post-update sanity
  check that surfaces any non-conforming rows for manual triage.
- Invert normalizeLegacyMooring in dedup/migration-apply: it now
  canonicalizes ("D-32" -> "D32") instead of legacy-izing.
- Update tiptap-to-pdfme example tokens, EOI fixture moorings, and
  smoke-test seed moorings.
- Refresh seed-data/berths.json to canonical form; drop the now-
  redundant legacyMooringNumber field.
- Delete scripts/load-berths-to-port-nimara.ts (superseded in 0c).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 01:59:26 +02:00

573 lines
19 KiB
TypeScript

/**
* Apply phase for the legacy NocoDB → CRM migration. Walks a
* `MigrationPlan` produced by {@link transformSnapshot} and writes
* the new client / contact / address / yacht / interest rows into the
* target port.
*
* Idempotent: every insert is guarded by a `migration_source_links`
* lookup keyed on `(source_system, source_id, target_entity_type)`, so
* a partial failure can be resumed by re-running the script. Re-runs
* against an already-applied plan are a near-no-op.
*
* Per-entity transactions (not one giant transaction) - the design
* favours visible partial progress on failure over all-or-nothing.
*
* @see src/lib/dedup/migration-transform.ts for the input shape.
* @see src/lib/db/schema/migration.ts for the idempotency ledger.
*/
import { and, eq, inArray } from 'drizzle-orm';
import { db } from '@/lib/db';
import { clients, clientContacts, clientAddresses } from '@/lib/db/schema/clients';
import { interests } from '@/lib/db/schema/interests';
import { yachts } from '@/lib/db/schema/yachts';
import { berths } from '@/lib/db/schema/berths';
import { documents, documentSigners } from '@/lib/db/schema/documents';
import { residentialClients, residentialInterests } from '@/lib/db/schema/residential';
import { migrationSourceLinks } from '@/lib/db/schema/migration';
import type {
MigrationPlan,
PlannedClient,
PlannedDocument,
PlannedInterest,
PlannedResidentialClient,
} from './migration-transform';
const SOURCE_SYSTEM = 'nocodb_interests';
/**
* Canonicalize a mooring string to the unified form ("A1", "D32", "E18")
* used by the new berths schema after the Phase 0 mooring-number sweep.
* Accepts inputs in any of the historical forms - bare ("D32"), dashed
* ("D-32"), or dashed-padded ("D-032") - and returns the canonical bare
* form. Inputs that don't match (multi-mooring strings like "A3, D30",
* non-numeric suffixes like "B-LEG") are returned unchanged so a literal
* lookup can still hit and the caller can flag the row for review.
*/
function normalizeLegacyMooring(raw: string): string {
const m = /^([A-Z]+)-?0*(\d+)$/i.exec(raw.trim());
if (!m) return raw;
return `${m[1]!.toUpperCase()}${parseInt(m[2]!, 10)}`;
}
export interface ApplyResult {
applyId: string;
clientsInserted: number;
clientsSkipped: number;
contactsInserted: number;
addressesInserted: number;
yachtsInserted: number;
interestsInserted: number;
interestsSkipped: number;
documentsInserted: number;
documentsSkipped: number;
documentSignersInserted: number;
residentialClientsInserted: number;
residentialClientsSkipped: number;
residentialInterestsInserted: number;
warnings: string[];
}
export interface ApplyOptions {
port: { id: string; slug: string };
applyId: string;
/** Set to true for the "preview the writes" mode - runs every read but
* rolls back inserts. Useful for verifying mappings before committing. */
rehearsal?: boolean;
appliedBy?: string;
}
/**
* Look up an existing migration link for a (sourceId, targetType) pair.
* Returns the existing target entity id if already linked.
*/
async function resolveExistingLink(
sourceId: number,
targetEntityType:
| 'client'
| 'interest'
| 'yacht'
| 'address'
| 'document'
| 'residential_client'
| 'residential_interest',
): Promise<string | null> {
const rows = await db
.select({ id: migrationSourceLinks.targetEntityId })
.from(migrationSourceLinks)
.where(
and(
eq(migrationSourceLinks.sourceSystem, SOURCE_SYSTEM),
eq(migrationSourceLinks.sourceId, String(sourceId)),
eq(migrationSourceLinks.targetEntityType, targetEntityType),
),
)
.limit(1);
return rows[0]?.id ?? null;
}
/** Find the first sourceId in a cluster that's already linked to a client,
* if any. The cluster might be larger than the previously-applied set if
* the dedup algorithm collapsed an extra duplicate this run. */
async function resolveExistingClusterClient(sourceIds: number[]): Promise<string | null> {
if (sourceIds.length === 0) return null;
const rows = await db
.select({ id: migrationSourceLinks.targetEntityId })
.from(migrationSourceLinks)
.where(
and(
eq(migrationSourceLinks.sourceSystem, SOURCE_SYSTEM),
inArray(migrationSourceLinks.sourceId, sourceIds.map(String)),
eq(migrationSourceLinks.targetEntityType, 'client'),
),
)
.limit(1);
return rows[0]?.id ?? null;
}
/** Apply a single PlannedClient - returns `{clientId, inserted}` so the
* caller can wire interests against the (possibly pre-existing) record. */
async function applyClient(
planned: PlannedClient,
opts: ApplyOptions,
result: ApplyResult,
): Promise<{ clientId: string; inserted: boolean }> {
// Idempotency: if any source row in the cluster already mapped to a client,
// reuse that record.
const existing = await resolveExistingClusterClient(planned.sourceIds);
if (existing) {
result.clientsSkipped += 1;
return { clientId: existing, inserted: false };
}
if (opts.rehearsal) {
// Simulate an insert without writing - used for the preview path.
return { clientId: `rehearsal-${planned.tempId}`, inserted: true };
}
// surnameToken is on the planned object (used by the dedup blocking
// index inside the transform) but not in the clients schema - runtime
// dedup re-derives it from fullName when needed. Drop it on insert.
const [inserted] = await db
.insert(clients)
.values({
portId: opts.port.id,
fullName: planned.fullName,
nationalityIso: planned.countryIso ?? null,
preferredContactMethod: planned.preferredContactMethod ?? null,
source: planned.source ?? null,
})
.returning({ id: clients.id });
if (!inserted) throw new Error('Client insert returned no row');
const clientId = inserted.id;
// Record idempotency links - one per source row in the cluster.
await db.insert(migrationSourceLinks).values(
planned.sourceIds.map((sid) => ({
sourceSystem: SOURCE_SYSTEM,
sourceId: String(sid),
targetEntityType: 'client' as const,
targetEntityId: clientId,
appliedId: opts.applyId,
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
})),
);
// Contacts: bulk insert; mark first email + first phone as primary.
if (planned.contacts.length > 0) {
let primaryEmailSet = false;
let primaryPhoneSet = false;
const contactRows = planned.contacts.map((ct) => {
let isPrimary = false;
if (ct.isPrimary) {
if (ct.channel === 'email' && !primaryEmailSet) {
isPrimary = true;
primaryEmailSet = true;
} else if ((ct.channel === 'phone' || ct.channel === 'whatsapp') && !primaryPhoneSet) {
isPrimary = true;
primaryPhoneSet = true;
}
}
return {
clientId,
channel: ct.channel,
value: ct.value,
valueE164: ct.valueE164 ?? null,
valueCountry: ct.valueCountry ?? null,
isPrimary,
};
});
await db.insert(clientContacts).values(contactRows);
result.contactsInserted += contactRows.length;
}
// Addresses: bulk insert; first is marked primary if multiple. Note the
// schema requires portId on every address row in addition to clientId.
if (planned.addresses.length > 0) {
const addressRows = planned.addresses.map((a, idx) => ({
clientId,
portId: opts.port.id,
streetAddress: a.streetAddress ?? null,
city: a.city ?? null,
countryIso: a.countryIso ?? null,
isPrimary: idx === 0,
}));
await db.insert(clientAddresses).values(addressRows);
result.addressesInserted += addressRows.length;
}
result.clientsInserted += 1;
return { clientId, inserted: true };
}
/** Apply a single PlannedInterest - looks up its client + berth + yacht and
* inserts the interest row, plus a yacht stub if a yacht name is present. */
async function applyInterest(
planned: PlannedInterest,
tempIdToClientId: Map<string, string>,
mooringToBerthId: Map<string, string>,
opts: ApplyOptions,
result: ApplyResult,
): Promise<void> {
// Idempotency: skip if this source row already created an interest.
const existing = await resolveExistingLink(planned.sourceId, 'interest');
if (existing) {
result.interestsSkipped += 1;
return;
}
const clientId = tempIdToClientId.get(planned.clientTempId);
if (!clientId) {
result.warnings.push(
`Interest source=${planned.sourceId} references unknown client tempId=${planned.clientTempId} - skipped`,
);
return;
}
let berthId: string | null = null;
if (planned.berthMooringNumber) {
berthId =
mooringToBerthId.get(planned.berthMooringNumber) ??
// Legacy NocoDB Interests rows recorded mooring strings inconsistently
// ("D32", "D-32", "D-032"). The new berths schema stores the canonical
// bare form ("D32") - canonicalize the lookup key as a fallback.
mooringToBerthId.get(normalizeLegacyMooring(planned.berthMooringNumber)) ??
null;
if (!berthId) {
result.warnings.push(
`Interest source=${planned.sourceId} references unknown mooring="${planned.berthMooringNumber}" - interest created without berth link`,
);
}
}
// Optional yacht stub: if the legacy row had a yacht name, create a
// minimal yacht record owned by the client. The new schema requires
// currentOwnerType + currentOwnerId.
let yachtId: string | null = null;
if (planned.yachtName) {
const existingYacht = await resolveExistingLink(planned.sourceId, 'yacht');
if (existingYacht) {
yachtId = existingYacht;
} else if (!opts.rehearsal) {
const [y] = await db
.insert(yachts)
.values({
portId: opts.port.id,
name: planned.yachtName,
currentOwnerType: 'client',
currentOwnerId: clientId,
status: 'active',
})
.returning({ id: yachts.id });
if (y) {
yachtId = y.id;
await db.insert(migrationSourceLinks).values({
sourceSystem: SOURCE_SYSTEM,
sourceId: String(planned.sourceId),
targetEntityType: 'yacht' as const,
targetEntityId: y.id,
appliedId: opts.applyId,
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
});
result.yachtsInserted += 1;
}
}
}
if (opts.rehearsal) {
result.interestsInserted += 1;
return;
}
const [iRow] = await db
.insert(interests)
.values({
portId: opts.port.id,
clientId,
berthId,
yachtId,
pipelineStage: planned.pipelineStage,
leadCategory: planned.leadCategory,
source: planned.source,
notes: planned.notes,
documensoId: planned.documensoId,
dateEoiSent: planned.dateEoiSent ? new Date(planned.dateEoiSent) : null,
dateEoiSigned: planned.dateEoiSigned ? new Date(planned.dateEoiSigned) : null,
dateContractSent: planned.dateContractSent ? new Date(planned.dateContractSent) : null,
dateContractSigned: planned.dateContractSigned ? new Date(planned.dateContractSigned) : null,
dateDepositReceived: planned.dateDepositReceived
? new Date(planned.dateDepositReceived)
: null,
dateLastContact: planned.dateLastContact ? new Date(planned.dateLastContact) : null,
})
.returning({ id: interests.id });
if (!iRow) throw new Error('Interest insert returned no row');
await db.insert(migrationSourceLinks).values({
sourceSystem: SOURCE_SYSTEM,
sourceId: String(planned.sourceId),
targetEntityType: 'interest' as const,
targetEntityId: iRow.id,
appliedId: opts.applyId,
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
});
result.interestsInserted += 1;
}
/**
* Apply a single PlannedDocument - looks up the parent interest's id from
* the migration ledger, materializes a documents row, and inserts the
* signer rows. Idempotent via target_entity_type='document'.
*/
async function applyDocument(
planned: PlannedDocument,
tempIdToClientId: Map<string, string>,
opts: ApplyOptions,
result: ApplyResult,
): Promise<void> {
const existing = await resolveExistingLink(planned.sourceId, 'document');
if (existing) {
result.documentsSkipped += 1;
return;
}
const interestId = await resolveExistingLink(planned.sourceId, 'interest');
if (!interestId) {
result.warnings.push(
`Document source=${planned.sourceId} cannot resolve parent interest - skipped (interest must apply first)`,
);
return;
}
const clientId = tempIdToClientId.get(planned.clientTempId);
if (!clientId) {
result.warnings.push(
`Document source=${planned.sourceId} references unknown client tempId=${planned.clientTempId} - skipped`,
);
return;
}
if (opts.rehearsal) {
result.documentsInserted += 1;
result.documentSignersInserted += planned.signers.length;
return;
}
const [docRow] = await db
.insert(documents)
.values({
portId: opts.port.id,
interestId,
clientId,
documentType: planned.documentType,
title: planned.title,
status: planned.status,
documensoId: planned.documensoId,
isManualUpload: false,
notes: planned.notes,
createdBy: opts.appliedBy ?? 'migration',
})
.returning({ id: documents.id });
if (!docRow) throw new Error('Document insert returned no row');
await db.insert(migrationSourceLinks).values({
sourceSystem: SOURCE_SYSTEM,
sourceId: String(planned.sourceId),
targetEntityType: 'document' as const,
targetEntityId: docRow.id,
appliedId: opts.applyId,
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
});
if (planned.signers.length > 0) {
await db.insert(documentSigners).values(
planned.signers.map((s) => ({
documentId: docRow.id,
signerName: s.signerName,
signerEmail: s.signerEmail,
signerRole: s.signerRole,
signingOrder: s.signingOrder,
status: s.status,
signedAt: s.signedAt ? new Date(s.signedAt) : null,
signingUrl: s.signingUrl,
embeddedUrl: s.embeddedUrl,
})),
);
result.documentSignersInserted += planned.signers.length;
}
result.documentsInserted += 1;
}
/**
* Apply a single PlannedResidentialClient - creates a residential_clients
* row plus a default residential_interests row at pipeline_stage='new'
* so the lead surfaces in the residential funnel. Two ledger entries
* record both targets.
*/
async function applyResidentialClient(
planned: PlannedResidentialClient,
opts: ApplyOptions,
result: ApplyResult,
): Promise<void> {
const existingClient = await resolveExistingLink(planned.sourceId, 'residential_client');
if (existingClient) {
result.residentialClientsSkipped += 1;
return;
}
if (opts.rehearsal) {
result.residentialClientsInserted += 1;
result.residentialInterestsInserted += 1;
return;
}
// Wrap the three writes in a transaction so a partial failure (e.g. the
// residential_interests insert throws) does NOT leave an orphan
// residential_clients row. Without the wrap, a later --apply re-run
// would not see a ledger entry for the orphan and would happily insert
// a duplicate residential_clients row.
await db.transaction(async (tx) => {
const [resClient] = await tx
.insert(residentialClients)
.values({
portId: opts.port.id,
fullName: planned.fullName,
email: planned.email,
phone: planned.phoneE164,
phoneE164: planned.phoneE164,
phoneCountry: planned.phoneCountry,
placeOfResidence: planned.placeOfResidence,
placeOfResidenceCountryIso: planned.placeOfResidenceCountryIso,
source: planned.source,
notes: planned.notes,
status: 'prospect',
})
.returning({ id: residentialClients.id });
if (!resClient) throw new Error('Residential client insert returned no row');
const [resInterest] = await tx
.insert(residentialInterests)
.values({
portId: opts.port.id,
residentialClientId: resClient.id,
pipelineStage: 'new',
source: planned.source,
notes: planned.notes,
dateFirstContact: planned.dateFirstContact ? new Date(planned.dateFirstContact) : null,
dateLastContact: planned.dateFirstContact ? new Date(planned.dateFirstContact) : null,
})
.returning({ id: residentialInterests.id });
if (!resInterest) throw new Error('Residential interest insert returned no row');
// Two ledger entries - one per target - both keyed on the same legacy
// sourceId. Keeps re-runs idempotent on either target type.
await tx.insert(migrationSourceLinks).values([
{
sourceSystem: 'nocodb_residential_interests',
sourceId: String(planned.sourceId),
targetEntityType: 'residential_client' as const,
targetEntityId: resClient.id,
appliedId: opts.applyId,
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
},
{
sourceSystem: 'nocodb_residential_interests',
sourceId: String(planned.sourceId),
targetEntityType: 'residential_interest' as const,
targetEntityId: resInterest.id,
appliedId: opts.applyId,
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
},
]);
});
result.residentialClientsInserted += 1;
result.residentialInterestsInserted += 1;
}
/**
* Top-level apply driver. Walks the plan once, building the
* tempId→clientId map as it goes, then walks interests with that map.
*/
export async function applyPlan(plan: MigrationPlan, opts: ApplyOptions): Promise<ApplyResult> {
const result: ApplyResult = {
applyId: opts.applyId,
clientsInserted: 0,
clientsSkipped: 0,
contactsInserted: 0,
addressesInserted: 0,
yachtsInserted: 0,
interestsInserted: 0,
interestsSkipped: 0,
documentsInserted: 0,
documentsSkipped: 0,
documentSignersInserted: 0,
residentialClientsInserted: 0,
residentialClientsSkipped: 0,
residentialInterestsInserted: 0,
warnings: [],
};
// 1. Clients (and their contacts/addresses)
const tempIdToClientId = new Map<string, string>();
for (const planned of plan.clients) {
const { clientId } = await applyClient(planned, opts, result);
tempIdToClientId.set(planned.tempId, clientId);
}
// 2. Build mooring→berthId lookup once, scoped to this port.
const berthRows = await db
.select({ id: berths.id, mooringNumber: berths.mooringNumber })
.from(berths)
.where(eq(berths.portId, opts.port.id));
const mooringToBerthId = new Map(berthRows.map((b) => [b.mooringNumber, b.id]));
// 3. Interests (and yacht stubs)
for (const planned of plan.interests) {
await applyInterest(planned, tempIdToClientId, mooringToBerthId, opts, result);
}
// 4. Documents (depend on interests being applied first - applyDocument
// looks up the new interest_id via the migration ledger).
for (const planned of plan.documents) {
await applyDocument(planned, tempIdToClientId, opts, result);
}
// 5. Residential leads - independent domain, no dependency on the marina
// apply phase. Each lead gets a residential_clients row + a default
// residential_interests row.
for (const planned of plan.residentialClients) {
await applyResidentialClient(planned, opts, result);
}
return result;
}