337 lines
11 KiB
TypeScript
337 lines
11 KiB
TypeScript
|
|
/**
|
||
|
|
* Apply phase for the legacy NocoDB → CRM migration. Walks a
|
||
|
|
* `MigrationPlan` produced by {@link transformSnapshot} and writes
|
||
|
|
* the new client / contact / address / yacht / interest rows into the
|
||
|
|
* target port.
|
||
|
|
*
|
||
|
|
* Idempotent: every insert is guarded by a `migration_source_links`
|
||
|
|
* lookup keyed on `(source_system, source_id, target_entity_type)`, so
|
||
|
|
* a partial failure can be resumed by re-running the script. Re-runs
|
||
|
|
* against an already-applied plan are a near-no-op.
|
||
|
|
*
|
||
|
|
* Per-entity transactions (not one giant transaction) — the design
|
||
|
|
* favours visible partial progress on failure over all-or-nothing.
|
||
|
|
*
|
||
|
|
* @see src/lib/dedup/migration-transform.ts for the input shape.
|
||
|
|
* @see src/lib/db/schema/migration.ts for the idempotency ledger.
|
||
|
|
*/
|
||
|
|
|
||
|
|
import { and, eq, inArray } from 'drizzle-orm';
|
||
|
|
|
||
|
|
import { db } from '@/lib/db';
|
||
|
|
import { clients, clientContacts, clientAddresses } from '@/lib/db/schema/clients';
|
||
|
|
import { interests } from '@/lib/db/schema/interests';
|
||
|
|
import { yachts } from '@/lib/db/schema/yachts';
|
||
|
|
import { berths } from '@/lib/db/schema/berths';
|
||
|
|
import { migrationSourceLinks } from '@/lib/db/schema/migration';
|
||
|
|
import type { MigrationPlan, PlannedClient, PlannedInterest } from './migration-transform';
|
||
|
|
|
||
|
|
const SOURCE_SYSTEM = 'nocodb_interests';
|
||
|
|
|
||
|
|
export interface ApplyResult {
|
||
|
|
applyId: string;
|
||
|
|
clientsInserted: number;
|
||
|
|
clientsSkipped: number;
|
||
|
|
contactsInserted: number;
|
||
|
|
addressesInserted: number;
|
||
|
|
yachtsInserted: number;
|
||
|
|
interestsInserted: number;
|
||
|
|
interestsSkipped: number;
|
||
|
|
warnings: string[];
|
||
|
|
}
|
||
|
|
|
||
|
|
export interface ApplyOptions {
|
||
|
|
port: { id: string; slug: string };
|
||
|
|
applyId: string;
|
||
|
|
/** Set to true for the "preview the writes" mode — runs every read but
|
||
|
|
* rolls back inserts. Useful for verifying mappings before committing. */
|
||
|
|
rehearsal?: boolean;
|
||
|
|
appliedBy?: string;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Look up an existing migration link for a (sourceId, targetType) pair.
|
||
|
|
* Returns the existing target entity id if already linked.
|
||
|
|
*/
|
||
|
|
async function resolveExistingLink(
|
||
|
|
sourceId: number,
|
||
|
|
targetEntityType: 'client' | 'interest' | 'yacht' | 'address',
|
||
|
|
): Promise<string | null> {
|
||
|
|
const rows = await db
|
||
|
|
.select({ id: migrationSourceLinks.targetEntityId })
|
||
|
|
.from(migrationSourceLinks)
|
||
|
|
.where(
|
||
|
|
and(
|
||
|
|
eq(migrationSourceLinks.sourceSystem, SOURCE_SYSTEM),
|
||
|
|
eq(migrationSourceLinks.sourceId, String(sourceId)),
|
||
|
|
eq(migrationSourceLinks.targetEntityType, targetEntityType),
|
||
|
|
),
|
||
|
|
)
|
||
|
|
.limit(1);
|
||
|
|
return rows[0]?.id ?? null;
|
||
|
|
}
|
||
|
|
|
||
|
|
/** Find the first sourceId in a cluster that's already linked to a client,
|
||
|
|
* if any. The cluster might be larger than the previously-applied set if
|
||
|
|
* the dedup algorithm collapsed an extra duplicate this run. */
|
||
|
|
async function resolveExistingClusterClient(sourceIds: number[]): Promise<string | null> {
|
||
|
|
if (sourceIds.length === 0) return null;
|
||
|
|
const rows = await db
|
||
|
|
.select({ id: migrationSourceLinks.targetEntityId })
|
||
|
|
.from(migrationSourceLinks)
|
||
|
|
.where(
|
||
|
|
and(
|
||
|
|
eq(migrationSourceLinks.sourceSystem, SOURCE_SYSTEM),
|
||
|
|
inArray(migrationSourceLinks.sourceId, sourceIds.map(String)),
|
||
|
|
eq(migrationSourceLinks.targetEntityType, 'client'),
|
||
|
|
),
|
||
|
|
)
|
||
|
|
.limit(1);
|
||
|
|
return rows[0]?.id ?? null;
|
||
|
|
}
|
||
|
|
|
||
|
|
/** Apply a single PlannedClient — returns `{clientId, inserted}` so the
|
||
|
|
* caller can wire interests against the (possibly pre-existing) record. */
|
||
|
|
async function applyClient(
|
||
|
|
planned: PlannedClient,
|
||
|
|
opts: ApplyOptions,
|
||
|
|
result: ApplyResult,
|
||
|
|
): Promise<{ clientId: string; inserted: boolean }> {
|
||
|
|
// Idempotency: if any source row in the cluster already mapped to a client,
|
||
|
|
// reuse that record.
|
||
|
|
const existing = await resolveExistingClusterClient(planned.sourceIds);
|
||
|
|
if (existing) {
|
||
|
|
result.clientsSkipped += 1;
|
||
|
|
return { clientId: existing, inserted: false };
|
||
|
|
}
|
||
|
|
|
||
|
|
if (opts.rehearsal) {
|
||
|
|
// Simulate an insert without writing — used for the preview path.
|
||
|
|
return { clientId: `rehearsal-${planned.tempId}`, inserted: true };
|
||
|
|
}
|
||
|
|
|
||
|
|
// surnameToken is on the planned object (used by the dedup blocking
|
||
|
|
// index inside the transform) but not in the clients schema — runtime
|
||
|
|
// dedup re-derives it from fullName when needed. Drop it on insert.
|
||
|
|
const [inserted] = await db
|
||
|
|
.insert(clients)
|
||
|
|
.values({
|
||
|
|
portId: opts.port.id,
|
||
|
|
fullName: planned.fullName,
|
||
|
|
nationalityIso: planned.countryIso ?? null,
|
||
|
|
preferredContactMethod: planned.preferredContactMethod ?? null,
|
||
|
|
source: planned.source ?? null,
|
||
|
|
})
|
||
|
|
.returning({ id: clients.id });
|
||
|
|
|
||
|
|
if (!inserted) throw new Error('Client insert returned no row');
|
||
|
|
const clientId = inserted.id;
|
||
|
|
|
||
|
|
// Record idempotency links — one per source row in the cluster.
|
||
|
|
await db.insert(migrationSourceLinks).values(
|
||
|
|
planned.sourceIds.map((sid) => ({
|
||
|
|
sourceSystem: SOURCE_SYSTEM,
|
||
|
|
sourceId: String(sid),
|
||
|
|
targetEntityType: 'client' as const,
|
||
|
|
targetEntityId: clientId,
|
||
|
|
appliedId: opts.applyId,
|
||
|
|
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
|
||
|
|
})),
|
||
|
|
);
|
||
|
|
|
||
|
|
// Contacts: bulk insert; mark first email + first phone as primary.
|
||
|
|
if (planned.contacts.length > 0) {
|
||
|
|
let primaryEmailSet = false;
|
||
|
|
let primaryPhoneSet = false;
|
||
|
|
const contactRows = planned.contacts.map((ct) => {
|
||
|
|
let isPrimary = false;
|
||
|
|
if (ct.isPrimary) {
|
||
|
|
if (ct.channel === 'email' && !primaryEmailSet) {
|
||
|
|
isPrimary = true;
|
||
|
|
primaryEmailSet = true;
|
||
|
|
} else if ((ct.channel === 'phone' || ct.channel === 'whatsapp') && !primaryPhoneSet) {
|
||
|
|
isPrimary = true;
|
||
|
|
primaryPhoneSet = true;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return {
|
||
|
|
clientId,
|
||
|
|
channel: ct.channel,
|
||
|
|
value: ct.value,
|
||
|
|
valueE164: ct.valueE164 ?? null,
|
||
|
|
valueCountry: ct.valueCountry ?? null,
|
||
|
|
isPrimary,
|
||
|
|
};
|
||
|
|
});
|
||
|
|
await db.insert(clientContacts).values(contactRows);
|
||
|
|
result.contactsInserted += contactRows.length;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Addresses: bulk insert; first is marked primary if multiple. Note the
|
||
|
|
// schema requires portId on every address row in addition to clientId.
|
||
|
|
if (planned.addresses.length > 0) {
|
||
|
|
const addressRows = planned.addresses.map((a, idx) => ({
|
||
|
|
clientId,
|
||
|
|
portId: opts.port.id,
|
||
|
|
streetAddress: a.streetAddress ?? null,
|
||
|
|
city: a.city ?? null,
|
||
|
|
countryIso: a.countryIso ?? null,
|
||
|
|
isPrimary: idx === 0,
|
||
|
|
}));
|
||
|
|
await db.insert(clientAddresses).values(addressRows);
|
||
|
|
result.addressesInserted += addressRows.length;
|
||
|
|
}
|
||
|
|
|
||
|
|
result.clientsInserted += 1;
|
||
|
|
return { clientId, inserted: true };
|
||
|
|
}
|
||
|
|
|
||
|
|
/** Apply a single PlannedInterest — looks up its client + berth + yacht and
|
||
|
|
* inserts the interest row, plus a yacht stub if a yacht name is present. */
|
||
|
|
async function applyInterest(
|
||
|
|
planned: PlannedInterest,
|
||
|
|
tempIdToClientId: Map<string, string>,
|
||
|
|
mooringToBerthId: Map<string, string>,
|
||
|
|
opts: ApplyOptions,
|
||
|
|
result: ApplyResult,
|
||
|
|
): Promise<void> {
|
||
|
|
// Idempotency: skip if this source row already created an interest.
|
||
|
|
const existing = await resolveExistingLink(planned.sourceId, 'interest');
|
||
|
|
if (existing) {
|
||
|
|
result.interestsSkipped += 1;
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
const clientId = tempIdToClientId.get(planned.clientTempId);
|
||
|
|
if (!clientId) {
|
||
|
|
result.warnings.push(
|
||
|
|
`Interest source=${planned.sourceId} references unknown client tempId=${planned.clientTempId} — skipped`,
|
||
|
|
);
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
let berthId: string | null = null;
|
||
|
|
if (planned.berthMooringNumber) {
|
||
|
|
berthId = mooringToBerthId.get(planned.berthMooringNumber) ?? null;
|
||
|
|
if (!berthId) {
|
||
|
|
result.warnings.push(
|
||
|
|
`Interest source=${planned.sourceId} references unknown mooring="${planned.berthMooringNumber}" — interest created without berth link`,
|
||
|
|
);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Optional yacht stub: if the legacy row had a yacht name, create a
|
||
|
|
// minimal yacht record owned by the client. The new schema requires
|
||
|
|
// currentOwnerType + currentOwnerId.
|
||
|
|
let yachtId: string | null = null;
|
||
|
|
if (planned.yachtName) {
|
||
|
|
const existingYacht = await resolveExistingLink(planned.sourceId, 'yacht');
|
||
|
|
if (existingYacht) {
|
||
|
|
yachtId = existingYacht;
|
||
|
|
} else if (!opts.rehearsal) {
|
||
|
|
const [y] = await db
|
||
|
|
.insert(yachts)
|
||
|
|
.values({
|
||
|
|
portId: opts.port.id,
|
||
|
|
name: planned.yachtName,
|
||
|
|
currentOwnerType: 'client',
|
||
|
|
currentOwnerId: clientId,
|
||
|
|
status: 'active',
|
||
|
|
})
|
||
|
|
.returning({ id: yachts.id });
|
||
|
|
if (y) {
|
||
|
|
yachtId = y.id;
|
||
|
|
await db.insert(migrationSourceLinks).values({
|
||
|
|
sourceSystem: SOURCE_SYSTEM,
|
||
|
|
sourceId: String(planned.sourceId),
|
||
|
|
targetEntityType: 'yacht' as const,
|
||
|
|
targetEntityId: y.id,
|
||
|
|
appliedId: opts.applyId,
|
||
|
|
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
|
||
|
|
});
|
||
|
|
result.yachtsInserted += 1;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if (opts.rehearsal) {
|
||
|
|
result.interestsInserted += 1;
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
const [iRow] = await db
|
||
|
|
.insert(interests)
|
||
|
|
.values({
|
||
|
|
portId: opts.port.id,
|
||
|
|
clientId,
|
||
|
|
berthId,
|
||
|
|
yachtId,
|
||
|
|
pipelineStage: planned.pipelineStage,
|
||
|
|
leadCategory: planned.leadCategory,
|
||
|
|
source: planned.source,
|
||
|
|
notes: planned.notes,
|
||
|
|
documensoId: planned.documensoId,
|
||
|
|
dateEoiSent: planned.dateEoiSent ? new Date(planned.dateEoiSent) : null,
|
||
|
|
dateEoiSigned: planned.dateEoiSigned ? new Date(planned.dateEoiSigned) : null,
|
||
|
|
dateContractSent: planned.dateContractSent ? new Date(planned.dateContractSent) : null,
|
||
|
|
dateContractSigned: planned.dateContractSigned ? new Date(planned.dateContractSigned) : null,
|
||
|
|
dateDepositReceived: planned.dateDepositReceived
|
||
|
|
? new Date(planned.dateDepositReceived)
|
||
|
|
: null,
|
||
|
|
dateLastContact: planned.dateLastContact ? new Date(planned.dateLastContact) : null,
|
||
|
|
})
|
||
|
|
.returning({ id: interests.id });
|
||
|
|
|
||
|
|
if (!iRow) throw new Error('Interest insert returned no row');
|
||
|
|
|
||
|
|
await db.insert(migrationSourceLinks).values({
|
||
|
|
sourceSystem: SOURCE_SYSTEM,
|
||
|
|
sourceId: String(planned.sourceId),
|
||
|
|
targetEntityType: 'interest' as const,
|
||
|
|
targetEntityId: iRow.id,
|
||
|
|
appliedId: opts.applyId,
|
||
|
|
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
|
||
|
|
});
|
||
|
|
|
||
|
|
result.interestsInserted += 1;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Top-level apply driver. Walks the plan once, building the
|
||
|
|
* tempId→clientId map as it goes, then walks interests with that map.
|
||
|
|
*/
|
||
|
|
export async function applyPlan(plan: MigrationPlan, opts: ApplyOptions): Promise<ApplyResult> {
|
||
|
|
const result: ApplyResult = {
|
||
|
|
applyId: opts.applyId,
|
||
|
|
clientsInserted: 0,
|
||
|
|
clientsSkipped: 0,
|
||
|
|
contactsInserted: 0,
|
||
|
|
addressesInserted: 0,
|
||
|
|
yachtsInserted: 0,
|
||
|
|
interestsInserted: 0,
|
||
|
|
interestsSkipped: 0,
|
||
|
|
warnings: [],
|
||
|
|
};
|
||
|
|
|
||
|
|
// 1. Clients (and their contacts/addresses)
|
||
|
|
const tempIdToClientId = new Map<string, string>();
|
||
|
|
for (const planned of plan.clients) {
|
||
|
|
const { clientId } = await applyClient(planned, opts, result);
|
||
|
|
tempIdToClientId.set(planned.tempId, clientId);
|
||
|
|
}
|
||
|
|
|
||
|
|
// 2. Build mooring→berthId lookup once, scoped to this port.
|
||
|
|
const berthRows = await db
|
||
|
|
.select({ id: berths.id, mooringNumber: berths.mooringNumber })
|
||
|
|
.from(berths)
|
||
|
|
.where(eq(berths.portId, opts.port.id));
|
||
|
|
const mooringToBerthId = new Map(berthRows.map((b) => [b.mooringNumber, b.id]));
|
||
|
|
|
||
|
|
// 3. Interests (and yacht stubs)
|
||
|
|
for (const planned of plan.interests) {
|
||
|
|
await applyInterest(planned, tempIdToClientId, mooringToBerthId, opts, result);
|
||
|
|
}
|
||
|
|
|
||
|
|
return result;
|
||
|
|
}
|