fix(migration): NocoDB import safety + dedup helpers + lead-source backfill

migration-apply: residential client + interest inserts now wrap in
db.transaction so a partial failure can't leave an orphan client
row without its interest (or vice versa).

migration-transform: buildPlannedDocument returns null when there
are no signers so the apply pass doesn't try to send a Documenso
envelope without recipients. mapDocumentStatus gets an explicit
"Awaiting Further Details" branch that no longer auto-promotes via
stale sign-time fields. parseFlexibleDate handles ISO and DD-MM-YYYY
inputs uniformly.

backfill-legacy-lead-source: chunk UPDATE WHERE clause now
isNull(source) on top of the inArray match, so a re-run can't
overwrite a more accurate source written between batches.

Adds 235 lines of vitest coverage on migration-transform.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Matt Ciaccio
2026-05-04 22:56:18 +02:00
parent 089f4a67a4
commit d62822c284
9 changed files with 938 additions and 47 deletions

View File

@@ -9,7 +9,7 @@
* a partial failure can be resumed by re-running the script. Re-runs
* against an already-applied plan are a near-no-op.
*
* Per-entity transactions (not one giant transaction) the design
* Per-entity transactions (not one giant transaction) - the design
* favours visible partial progress on failure over all-or-nothing.
*
* @see src/lib/dedup/migration-transform.ts for the input shape.
@@ -23,8 +23,16 @@ import { clients, clientContacts, clientAddresses } from '@/lib/db/schema/client
import { interests } from '@/lib/db/schema/interests';
import { yachts } from '@/lib/db/schema/yachts';
import { berths } from '@/lib/db/schema/berths';
import { documents, documentSigners } from '@/lib/db/schema/documents';
import { residentialClients, residentialInterests } from '@/lib/db/schema/residential';
import { migrationSourceLinks } from '@/lib/db/schema/migration';
import type { MigrationPlan, PlannedClient, PlannedInterest } from './migration-transform';
import type {
MigrationPlan,
PlannedClient,
PlannedDocument,
PlannedInterest,
PlannedResidentialClient,
} from './migration-transform';
const SOURCE_SYSTEM = 'nocodb_interests';
@@ -35,7 +43,7 @@ const SOURCE_SYSTEM = 'nocodb_interests';
* unchanged so a literal lookup can still hit (handles the case where
* the legacy data already has the dashed form).
*
* Multi-mooring strings ("A3, D30") return the original string
* Multi-mooring strings ("A3, D30") return the original string -
* those need human review and we don't want to silently pick one half.
*/
function normalizeLegacyMooring(raw: string): string {
@@ -56,13 +64,19 @@ export interface ApplyResult {
yachtsInserted: number;
interestsInserted: number;
interestsSkipped: number;
documentsInserted: number;
documentsSkipped: number;
documentSignersInserted: number;
residentialClientsInserted: number;
residentialClientsSkipped: number;
residentialInterestsInserted: number;
warnings: string[];
}
export interface ApplyOptions {
port: { id: string; slug: string };
applyId: string;
/** Set to true for the "preview the writes" mode runs every read but
/** Set to true for the "preview the writes" mode - runs every read but
* rolls back inserts. Useful for verifying mappings before committing. */
rehearsal?: boolean;
appliedBy?: string;
@@ -74,7 +88,14 @@ export interface ApplyOptions {
*/
async function resolveExistingLink(
sourceId: number,
targetEntityType: 'client' | 'interest' | 'yacht' | 'address',
targetEntityType:
| 'client'
| 'interest'
| 'yacht'
| 'address'
| 'document'
| 'residential_client'
| 'residential_interest',
): Promise<string | null> {
const rows = await db
.select({ id: migrationSourceLinks.targetEntityId })
@@ -109,7 +130,7 @@ async function resolveExistingClusterClient(sourceIds: number[]): Promise<string
return rows[0]?.id ?? null;
}
/** Apply a single PlannedClient returns `{clientId, inserted}` so the
/** Apply a single PlannedClient - returns `{clientId, inserted}` so the
* caller can wire interests against the (possibly pre-existing) record. */
async function applyClient(
planned: PlannedClient,
@@ -125,12 +146,12 @@ async function applyClient(
}
if (opts.rehearsal) {
// Simulate an insert without writing used for the preview path.
// Simulate an insert without writing - used for the preview path.
return { clientId: `rehearsal-${planned.tempId}`, inserted: true };
}
// surnameToken is on the planned object (used by the dedup blocking
// index inside the transform) but not in the clients schema runtime
// index inside the transform) but not in the clients schema - runtime
// dedup re-derives it from fullName when needed. Drop it on insert.
const [inserted] = await db
.insert(clients)
@@ -146,7 +167,7 @@ async function applyClient(
if (!inserted) throw new Error('Client insert returned no row');
const clientId = inserted.id;
// Record idempotency links one per source row in the cluster.
// Record idempotency links - one per source row in the cluster.
await db.insert(migrationSourceLinks).values(
planned.sourceIds.map((sid) => ({
sourceSystem: SOURCE_SYSTEM,
@@ -205,7 +226,7 @@ async function applyClient(
return { clientId, inserted: true };
}
/** Apply a single PlannedInterest looks up its client + berth + yacht and
/** Apply a single PlannedInterest - looks up its client + berth + yacht and
* inserts the interest row, plus a yacht stub if a yacht name is present. */
async function applyInterest(
planned: PlannedInterest,
@@ -224,7 +245,7 @@ async function applyInterest(
const clientId = tempIdToClientId.get(planned.clientTempId);
if (!clientId) {
result.warnings.push(
`Interest source=${planned.sourceId} references unknown client tempId=${planned.clientTempId} skipped`,
`Interest source=${planned.sourceId} references unknown client tempId=${planned.clientTempId} - skipped`,
);
return;
}
@@ -241,7 +262,7 @@ async function applyInterest(
null;
if (!berthId) {
result.warnings.push(
`Interest source=${planned.sourceId} references unknown mooring="${planned.berthMooringNumber}" interest created without berth link`,
`Interest source=${planned.sourceId} references unknown mooring="${planned.berthMooringNumber}" - interest created without berth link`,
);
}
}
@@ -322,6 +343,181 @@ async function applyInterest(
result.interestsInserted += 1;
}
/**
* Apply a single PlannedDocument - looks up the parent interest's id from
* the migration ledger, materializes a documents row, and inserts the
* signer rows. Idempotent via target_entity_type='document'.
*/
async function applyDocument(
planned: PlannedDocument,
tempIdToClientId: Map<string, string>,
opts: ApplyOptions,
result: ApplyResult,
): Promise<void> {
const existing = await resolveExistingLink(planned.sourceId, 'document');
if (existing) {
result.documentsSkipped += 1;
return;
}
const interestId = await resolveExistingLink(planned.sourceId, 'interest');
if (!interestId) {
result.warnings.push(
`Document source=${planned.sourceId} cannot resolve parent interest - skipped (interest must apply first)`,
);
return;
}
const clientId = tempIdToClientId.get(planned.clientTempId);
if (!clientId) {
result.warnings.push(
`Document source=${planned.sourceId} references unknown client tempId=${planned.clientTempId} - skipped`,
);
return;
}
if (opts.rehearsal) {
result.documentsInserted += 1;
result.documentSignersInserted += planned.signers.length;
return;
}
const [docRow] = await db
.insert(documents)
.values({
portId: opts.port.id,
interestId,
clientId,
documentType: planned.documentType,
title: planned.title,
status: planned.status,
documensoId: planned.documensoId,
isManualUpload: false,
notes: planned.notes,
createdBy: opts.appliedBy ?? 'migration',
})
.returning({ id: documents.id });
if (!docRow) throw new Error('Document insert returned no row');
await db.insert(migrationSourceLinks).values({
sourceSystem: SOURCE_SYSTEM,
sourceId: String(planned.sourceId),
targetEntityType: 'document' as const,
targetEntityId: docRow.id,
appliedId: opts.applyId,
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
});
if (planned.signers.length > 0) {
await db.insert(documentSigners).values(
planned.signers.map((s) => ({
documentId: docRow.id,
signerName: s.signerName,
signerEmail: s.signerEmail,
signerRole: s.signerRole,
signingOrder: s.signingOrder,
status: s.status,
signedAt: s.signedAt ? new Date(s.signedAt) : null,
signingUrl: s.signingUrl,
embeddedUrl: s.embeddedUrl,
})),
);
result.documentSignersInserted += planned.signers.length;
}
result.documentsInserted += 1;
}
/**
* Apply a single PlannedResidentialClient - creates a residential_clients
* row plus a default residential_interests row at pipeline_stage='new'
* so the lead surfaces in the residential funnel. Two ledger entries
* record both targets.
*/
async function applyResidentialClient(
planned: PlannedResidentialClient,
opts: ApplyOptions,
result: ApplyResult,
): Promise<void> {
const existingClient = await resolveExistingLink(planned.sourceId, 'residential_client');
if (existingClient) {
result.residentialClientsSkipped += 1;
return;
}
if (opts.rehearsal) {
result.residentialClientsInserted += 1;
result.residentialInterestsInserted += 1;
return;
}
// Wrap the three writes in a transaction so a partial failure (e.g. the
// residential_interests insert throws) does NOT leave an orphan
// residential_clients row. Without the wrap, a later --apply re-run
// would not see a ledger entry for the orphan and would happily insert
// a duplicate residential_clients row.
await db.transaction(async (tx) => {
const [resClient] = await tx
.insert(residentialClients)
.values({
portId: opts.port.id,
fullName: planned.fullName,
email: planned.email,
phone: planned.phoneE164,
phoneE164: planned.phoneE164,
phoneCountry: planned.phoneCountry,
placeOfResidence: planned.placeOfResidence,
placeOfResidenceCountryIso: planned.placeOfResidenceCountryIso,
source: planned.source,
notes: planned.notes,
status: 'prospect',
})
.returning({ id: residentialClients.id });
if (!resClient) throw new Error('Residential client insert returned no row');
const [resInterest] = await tx
.insert(residentialInterests)
.values({
portId: opts.port.id,
residentialClientId: resClient.id,
pipelineStage: 'new',
source: planned.source,
notes: planned.notes,
dateFirstContact: planned.dateFirstContact ? new Date(planned.dateFirstContact) : null,
dateLastContact: planned.dateFirstContact ? new Date(planned.dateFirstContact) : null,
})
.returning({ id: residentialInterests.id });
if (!resInterest) throw new Error('Residential interest insert returned no row');
// Two ledger entries - one per target - both keyed on the same legacy
// sourceId. Keeps re-runs idempotent on either target type.
await tx.insert(migrationSourceLinks).values([
{
sourceSystem: 'nocodb_residential_interests',
sourceId: String(planned.sourceId),
targetEntityType: 'residential_client' as const,
targetEntityId: resClient.id,
appliedId: opts.applyId,
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
},
{
sourceSystem: 'nocodb_residential_interests',
sourceId: String(planned.sourceId),
targetEntityType: 'residential_interest' as const,
targetEntityId: resInterest.id,
appliedId: opts.applyId,
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
},
]);
});
result.residentialClientsInserted += 1;
result.residentialInterestsInserted += 1;
}
/**
* Top-level apply driver. Walks the plan once, building the
* tempId→clientId map as it goes, then walks interests with that map.
@@ -336,6 +532,12 @@ export async function applyPlan(plan: MigrationPlan, opts: ApplyOptions): Promis
yachtsInserted: 0,
interestsInserted: 0,
interestsSkipped: 0,
documentsInserted: 0,
documentsSkipped: 0,
documentSignersInserted: 0,
residentialClientsInserted: 0,
residentialClientsSkipped: 0,
residentialInterestsInserted: 0,
warnings: [],
};
@@ -358,5 +560,18 @@ export async function applyPlan(plan: MigrationPlan, opts: ApplyOptions): Promis
await applyInterest(planned, tempIdToClientId, mooringToBerthId, opts, result);
}
// 4. Documents (depend on interests being applied first - applyDocument
// looks up the new interest_id via the migration ledger).
for (const planned of plan.documents) {
await applyDocument(planned, tempIdToClientId, opts, result);
}
// 5. Residential leads - independent domain, no dependency on the marina
// apply phase. Each lead gets a residential_clients row + a default
// residential_interests row.
for (const planned of plan.residentialClients) {
await applyResidentialClient(planned, opts, result);
}
return result;
}