feat(migration): add expenses + interest EOI status to NocoDB→CRM pipeline

A single idempotent --apply now seeds the full legacy dataset:
- Expenses: fetch the separate "Expenses" NocoDB base (mxfcefkk4dqs6uq),
  transform (price→amount+currency, payment status, receipt marker), apply to
  the expenses table under a new nocodb_expenses ledger tag.
- Interest EOI display state: set interests.eoiStatus/eoiDocStatus from the
  legacy EOI Status / LOI process so deals show signed / awaiting-signature
  (in-flight) state, not only a separate documents row.
- Runner reports expenses + tags createdBy with the seeded super-admin id.

Validated via --apply on the dev DB: 239 clients (multi-deal grouping intact),
255 interests (qualified 171/eoi 51/nurturing 30/reservation 2/contract 1),
48 signed + 3 in-flight EOIs, 165 expenses (5 currencies), 41 docs + 119
signers, 45 residential. tsc clean; 67 dedup unit tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-01 19:18:28 +02:00
parent 7dba1a47bb
commit 6c040a617b
4 changed files with 202 additions and 3 deletions

View File

@@ -30,6 +30,7 @@ import { eq } from 'drizzle-orm';
import { db } from '@/lib/db'; import { db } from '@/lib/db';
import { ports } from '@/lib/db/schema/ports'; import { ports } from '@/lib/db/schema/ports';
import { SUPER_ADMIN_USER_ID } from '@/lib/db/seed-bootstrap';
import { applyPlan } from '@/lib/dedup/migration-apply'; import { applyPlan } from '@/lib/dedup/migration-apply';
import { fetchSnapshot, loadNocoDbConfig } from '@/lib/dedup/nocodb-source'; import { fetchSnapshot, loadNocoDbConfig } from '@/lib/dedup/nocodb-source';
import { transformSnapshot } from '@/lib/dedup/migration-transform'; import { transformSnapshot } from '@/lib/dedup/migration-transform';
@@ -154,7 +155,7 @@ async function main(): Promise<void> {
const snapshot = await fetchSnapshot(config); const snapshot = await fetchSnapshot(config);
const elapsed = ((Date.now() - start) / 1000).toFixed(1); const elapsed = ((Date.now() - start) / 1000).toFixed(1);
console.log( console.log(
`[migrate] Snapshot fetched in ${elapsed}s — ${snapshot.interests.length} interests, ${snapshot.residentialInterests.length} residential, ${snapshot.berths.length} berths.`, `[migrate] Snapshot fetched in ${elapsed}s — ${snapshot.interests.length} interests, ${snapshot.residentialInterests.length} residential, ${snapshot.berths.length} berths, ${snapshot.expenses?.length ?? 0} expenses.`,
); );
console.log('[migrate] Running transform + dedup pipeline…'); console.log('[migrate] Running transform + dedup pipeline…');
@@ -184,6 +185,7 @@ async function main(): Promise<void> {
console.log( console.log(
` ${s.outputResidentialClients} residential clients (with default-stage interests)`, ` ${s.outputResidentialClients} residential clients (with default-stage interests)`,
); );
console.log(` ${s.outputExpenses} expenses`);
console.log( console.log(
` Dedup: ${s.autoLinkedClusters} auto-linked clusters, ${s.needsReviewPairs} pairs flagged for review`, ` Dedup: ${s.autoLinkedClusters} auto-linked clusters, ${s.needsReviewPairs} pairs flagged for review`,
); );
@@ -208,7 +210,7 @@ async function main(): Promise<void> {
console.log('[migrate] Inserting…'); console.log('[migrate] Inserting…');
const applyStart = Date.now(); const applyStart = Date.now();
const result = await applyPlan(plan, { port, applyId }); const result = await applyPlan(plan, { port, applyId, appliedBy: SUPER_ADMIN_USER_ID });
const applyElapsed = ((Date.now() - applyStart) / 1000).toFixed(1); const applyElapsed = ((Date.now() - applyStart) / 1000).toFixed(1);
console.log(''); console.log('');
@@ -231,6 +233,9 @@ async function main(): Promise<void> {
` Res-Clt: ${result.residentialClientsInserted} inserted, ${result.residentialClientsSkipped} already linked`, ` Res-Clt: ${result.residentialClientsInserted} inserted, ${result.residentialClientsSkipped} already linked`,
); );
console.log(` Res-Int: ${result.residentialInterestsInserted} inserted`); console.log(` Res-Int: ${result.residentialInterestsInserted} inserted`);
console.log(
` Expenses: ${result.expensesInserted} inserted, ${result.expensesSkipped} already linked`,
);
if (result.warnings.length > 0) { if (result.warnings.length > 0) {
console.log(''); console.log('');

View File

@@ -25,11 +25,13 @@ import { yachts } from '@/lib/db/schema/yachts';
import { berths } from '@/lib/db/schema/berths'; import { berths } from '@/lib/db/schema/berths';
import { documents, documentSigners } from '@/lib/db/schema/documents'; import { documents, documentSigners } from '@/lib/db/schema/documents';
import { residentialClients, residentialInterests } from '@/lib/db/schema/residential'; import { residentialClients, residentialInterests } from '@/lib/db/schema/residential';
import { expenses } from '@/lib/db/schema/financial';
import { migrationSourceLinks } from '@/lib/db/schema/migration'; import { migrationSourceLinks } from '@/lib/db/schema/migration';
import type { import type {
MigrationPlan, MigrationPlan,
PlannedClient, PlannedClient,
PlannedDocument, PlannedDocument,
PlannedExpense,
PlannedInterest, PlannedInterest,
PlannedResidentialClient, PlannedResidentialClient,
} from './migration-transform'; } from './migration-transform';
@@ -66,6 +68,8 @@ export interface ApplyResult {
residentialClientsInserted: number; residentialClientsInserted: number;
residentialClientsSkipped: number; residentialClientsSkipped: number;
residentialInterestsInserted: number; residentialInterestsInserted: number;
expensesInserted: number;
expensesSkipped: number;
warnings: string[]; warnings: string[];
} }
@@ -91,7 +95,8 @@ async function resolveExistingLink(
| 'address' | 'address'
| 'document' | 'document'
| 'residential_client' | 'residential_client'
| 'residential_interest', | 'residential_interest'
| 'expense',
): Promise<string | null> { ): Promise<string | null> {
const rows = await db const rows = await db
.select({ id: migrationSourceLinks.targetEntityId }) .select({ id: migrationSourceLinks.targetEntityId })
@@ -311,6 +316,8 @@ async function applyInterest(
leadCategory: planned.leadCategory, leadCategory: planned.leadCategory,
source: planned.source, source: planned.source,
documensoId: planned.documensoId, documensoId: planned.documensoId,
eoiStatus: planned.eoiStatus,
eoiDocStatus: planned.eoiDocStatus,
dateEoiSent: planned.dateEoiSent ? new Date(planned.dateEoiSent) : null, dateEoiSent: planned.dateEoiSent ? new Date(planned.dateEoiSent) : null,
dateEoiSigned: planned.dateEoiSigned ? new Date(planned.dateEoiSigned) : null, dateEoiSigned: planned.dateEoiSigned ? new Date(planned.dateEoiSigned) : null,
dateContractSent: planned.dateContractSent ? new Date(planned.dateContractSent) : null, dateContractSent: planned.dateContractSent ? new Date(planned.dateContractSent) : null,
@@ -529,6 +536,74 @@ async function applyResidentialClient(
result.residentialInterestsInserted += 1; result.residentialInterestsInserted += 1;
} }
/** Expenses come from a *separate* legacy NocoDB base, so they get their own
* source-system tag in the idempotency ledger. */
const EXPENSE_SOURCE_SYSTEM = 'nocodb_expenses';
/**
* Apply a single PlannedExpense → one `expenses` row. Independent domain
* (no FK deps). Idempotent via the ledger under `nocodb_expenses`.
*/
async function applyExpense(
planned: PlannedExpense,
opts: ApplyOptions,
result: ApplyResult,
): Promise<void> {
const existing = await db
.select({ id: migrationSourceLinks.targetEntityId })
.from(migrationSourceLinks)
.where(
and(
eq(migrationSourceLinks.sourceSystem, EXPENSE_SOURCE_SYSTEM),
eq(migrationSourceLinks.sourceId, String(planned.sourceId)),
eq(migrationSourceLinks.targetEntityType, 'expense'),
),
)
.limit(1);
if (existing[0]) {
result.expensesSkipped += 1;
return;
}
// `amount` is NOT NULL — an unparseable legacy price can't be inserted.
if (planned.amount === null) {
result.warnings.push(`Expense source=${planned.sourceId} has no parseable amount - skipped`);
return;
}
if (opts.rehearsal) {
result.expensesInserted += 1;
return;
}
const [row] = await db
.insert(expenses)
.values({
portId: opts.port.id,
amount: planned.amount,
currency: planned.currency,
establishmentName: planned.establishmentName,
paymentMethod: planned.paymentMethod,
category: planned.category,
payer: planned.payer,
expenseDate: planned.expenseDate ? new Date(planned.expenseDate) : new Date(),
description: planned.description,
paymentStatus: planned.paymentStatus,
noReceiptAcknowledged: !planned.hadReceipt,
createdBy: opts.appliedBy ?? 'migration',
})
.returning({ id: expenses.id });
if (!row) throw new Error('Expense insert returned no row');
await db.insert(migrationSourceLinks).values({
sourceSystem: EXPENSE_SOURCE_SYSTEM,
sourceId: String(planned.sourceId),
targetEntityType: 'expense' as const,
targetEntityId: row.id,
appliedId: opts.applyId,
...(opts.appliedBy ? { appliedBy: opts.appliedBy } : {}),
});
result.expensesInserted += 1;
}
/** /**
* Top-level apply driver. Walks the plan once, building the * Top-level apply driver. Walks the plan once, building the
* tempId→clientId map as it goes, then walks interests with that map. * tempId→clientId map as it goes, then walks interests with that map.
@@ -549,6 +624,8 @@ export async function applyPlan(plan: MigrationPlan, opts: ApplyOptions): Promis
residentialClientsInserted: 0, residentialClientsInserted: 0,
residentialClientsSkipped: 0, residentialClientsSkipped: 0,
residentialInterestsInserted: 0, residentialInterestsInserted: 0,
expensesInserted: 0,
expensesSkipped: 0,
warnings: [], warnings: [],
}; };
@@ -584,5 +661,10 @@ export async function applyPlan(plan: MigrationPlan, opts: ApplyOptions): Promis
await applyResidentialClient(planned, opts, result); await applyResidentialClient(planned, opts, result);
} }
// 6. Expenses - separate legacy base, independent domain, no FK deps.
for (const planned of plan.expenses) {
await applyExpense(planned, opts, result);
}
return result; return result;
} }

View File

@@ -80,6 +80,12 @@ export interface PlannedInterest {
/** Documenso linkage carried forward when present so the document /** Documenso linkage carried forward when present so the document
* record can be stitched up downstream. */ * record can be stitched up downstream. */
documensoId: string | null; documensoId: string | null;
/** Interest-level EOI signing state so the deal *displays* its EOI
* status (signed / awaiting signatures). Derived from the legacy
* `EOI Status` / `LOI-NDA Document`; the runner may override these from
* the live Documenso envelope status for authoritative in-flight state. */
eoiStatus: 'signed' | 'waiting_for_signatures' | 'expired' | null;
eoiDocStatus: 'pending' | 'sent' | 'signed' | 'declined' | 'voided' | null;
} }
/** /**
@@ -139,6 +145,28 @@ export interface PlannedResidentialClient {
dateFirstContact: string | null; dateFirstContact: string | null;
} }
/**
* Expense from the separate legacy "Expenses" NocoDB base. Maps to the
* `expenses` table (financial.ts). Receipt blobs are backfilled in Phase 2.
*/
export interface PlannedExpense {
/** Legacy Expenses row id — migration_source_links key. */
sourceId: number;
amount: string | null;
currency: string;
establishmentName: string | null;
paymentMethod: string | null;
category: string | null;
payer: string | null;
/** ISO; apply falls back to now (the column is NOT NULL). */
expenseDate: string | null;
description: string | null;
paymentStatus: 'unpaid' | 'paid' | 'partial';
/** Whether the legacy row had a Receipt attachment (sets
* noReceiptAcknowledged when false; the blob itself comes in Phase 2). */
hadReceipt: boolean;
}
export interface MigrationFlag { export interface MigrationFlag {
sourceTable: 'interests' | 'residential_interests' | 'website_interest_submissions'; sourceTable: 'interests' | 'residential_interests' | 'website_interest_submissions';
sourceId: number; sourceId: number;
@@ -153,6 +181,8 @@ export interface MigrationPlan {
documents: PlannedDocument[]; documents: PlannedDocument[];
/** Residential leads - physically separate domain, simple 1:1 mapping. */ /** Residential leads - physically separate domain, simple 1:1 mapping. */
residentialClients: PlannedResidentialClient[]; residentialClients: PlannedResidentialClient[];
/** Expenses from the separate "Expenses" NocoDB base. */
expenses: PlannedExpense[];
flags: MigrationFlag[]; flags: MigrationFlag[];
/** Pairs that the migration would auto-link (high score). */ /** Pairs that the migration would auto-link (high score). */
autoLinks: Array<{ autoLinks: Array<{
@@ -177,6 +207,7 @@ export interface MigrationStats {
outputDocuments: number; outputDocuments: number;
outputDocumentSigners: number; outputDocumentSigners: number;
outputResidentialClients: number; outputResidentialClients: number;
outputExpenses: number;
flaggedRows: number; flaggedRows: number;
autoLinkedClusters: number; autoLinkedClusters: number;
needsReviewPairs: number; needsReviewPairs: number;
@@ -326,11 +357,14 @@ export function transformSnapshot(
.map((row) => buildPlannedResidentialClient(row, opts, flags)) .map((row) => buildPlannedResidentialClient(row, opts, flags))
.filter((r): r is PlannedResidentialClient => r !== null); .filter((r): r is PlannedResidentialClient => r !== null);
const expenses = (snapshot.expenses ?? []).map(buildPlannedExpense);
return { return {
clients, clients,
interests, interests,
documents, documents,
residentialClients, residentialClients,
expenses,
flags, flags,
autoLinks, autoLinks,
needsReview, needsReview,
@@ -344,6 +378,7 @@ export function transformSnapshot(
outputDocuments: documents.length, outputDocuments: documents.length,
outputDocumentSigners: documents.reduce((sum, d) => sum + d.signers.length, 0), outputDocumentSigners: documents.reduce((sum, d) => sum + d.signers.length, 0),
outputResidentialClients: residentialClients.length, outputResidentialClients: residentialClients.length,
outputExpenses: expenses.length,
flaggedRows: flags.length, flaggedRows: flags.length,
autoLinkedClusters: autoLinks.length, autoLinkedClusters: autoLinks.length,
needsReviewPairs: needsReview.length, needsReviewPairs: needsReview.length,
@@ -633,6 +668,25 @@ function buildPlannedInterest(row: NocoDbRow, clientTempId: string): PlannedInte
let mappedStage = STAGE_MAP[stage] ?? 'enquiry'; let mappedStage = STAGE_MAP[stage] ?? 'enquiry';
if (depositReceived && mappedStage !== 'contract') mappedStage = 'deposit_paid'; if (depositReceived && mappedStage !== 'contract') mappedStage = 'deposit_paid';
// Interest-level EOI signing state (for display on the deal). "Signed"
// via the EOI Status enum OR the older LOI-NDA process; "awaiting" when an
// EOI was sent (Documenso id present or EOI Status says waiting).
const eoiStatusRaw = (row['EOI Status'] as string | undefined)?.trim();
const loiRaw = (row['LOI-NDA Document'] as string | undefined)?.trim() ?? '';
const hasDocumensoId = !!(row['documensoID'] as string | undefined)?.trim();
let eoiStatus: PlannedInterest['eoiStatus'] = null;
let eoiDocStatus: PlannedInterest['eoiDocStatus'] = null;
if (
eoiStatusRaw === 'Signed' ||
['Signing Complete', 'Signed by Client', 'Signed by Developer'].includes(loiRaw)
) {
eoiStatus = 'signed';
eoiDocStatus = 'signed';
} else if (eoiStatusRaw === 'Waiting for Signatures' || hasDocumensoId) {
eoiStatus = 'waiting_for_signatures';
eoiDocStatus = 'sent';
}
const notesParts: string[] = []; const notesParts: string[] = [];
const internalNotes = row['Internal Notes'] as string | undefined; const internalNotes = row['Internal Notes'] as string | undefined;
const extraComments = row['Extra Comments'] as string | undefined; const extraComments = row['Extra Comments'] as string | undefined;
@@ -663,6 +717,8 @@ function buildPlannedInterest(row: NocoDbRow, clientTempId: string): PlannedInte
dateContractSigned: parseFlexibleDate(row['developerSignTime']), dateContractSigned: parseFlexibleDate(row['developerSignTime']),
dateLastContact: parseFlexibleDate(row['Created At'] ?? row['Date Added']), dateLastContact: parseFlexibleDate(row['Created At'] ?? row['Date Added']),
documensoId: (row['documensoID'] as string | undefined) ?? null, documensoId: (row['documensoID'] as string | undefined) ?? null,
eoiStatus,
eoiDocStatus,
}; };
} }
@@ -872,3 +928,50 @@ function buildPlannedResidentialClient(
), ),
}; };
} }
// ─── Expense builder ────────────────────────────────────────────────────────
/** Parse a legacy price string ("€1,234.50", "$1,200", "1234") into a numeric
* string + ISO currency. Prefers the row's own `currency` field, then the
* symbol, defaulting to USD. */
function parseExpenseMoney(
priceRaw: string,
currencyField: string,
): { amount: string | null; currency: string } {
const t = (priceRaw ?? '').trim();
const symbolCurrency = t.includes('€')
? 'EUR'
: t.includes('$')
? 'USD'
: t.includes('£')
? 'GBP'
: null;
const currency = (currencyField || '').trim().toUpperCase() || symbolCurrency || 'USD';
const num = parseFloat(t.replace(/[^0-9.]/g, ''));
return { amount: Number.isFinite(num) ? String(num) : null, currency };
}
function buildPlannedExpense(row: NocoDbRow): PlannedExpense {
const { amount, currency } = parseExpenseMoney(
(row['Price'] as string | undefined) ?? '',
(row['currency'] as string | undefined) ?? '',
);
const rawStatus = ((row['payment_status'] as string | undefined) ?? '').trim().toLowerCase();
const paymentStatus: PlannedExpense['paymentStatus'] =
rawStatus === 'paid' ? 'paid' : rawStatus === 'partial' ? 'partial' : 'unpaid';
const receipt = row['Receipt'];
const hadReceipt = Array.isArray(receipt) ? receipt.length > 0 : !!receipt;
return {
sourceId: row.Id,
amount,
currency,
establishmentName: ((row['Establishment Name'] as string | undefined) ?? '').trim() || null,
paymentMethod: ((row['Payment Method'] as string | undefined) ?? '').trim() || null,
category: ((row['Category'] as string | undefined) ?? '').trim() || null,
payer: ((row['Payer'] as string | undefined) ?? '').trim() || null,
expenseDate: parseFlexibleDate(row['Time'] ?? row['CreatedAt'] ?? row['Created At']),
description: ((row['Contents'] as string | undefined) ?? '').trim() || null,
paymentStatus,
hadReceipt,
};
}

View File

@@ -44,6 +44,9 @@ export const NOCO_TABLES = {
websiteContactFormSubmissions: 'mxk5cd0pmwnwlcl', websiteContactFormSubmissions: 'mxk5cd0pmwnwlcl',
websiteBerthEoiSupplements: 'mglmioo0ku8zgqj', websiteBerthEoiSupplements: 'mglmioo0ku8zgqj',
berths: 'mczgos9hr3oa9qc', berths: 'mczgos9hr3oa9qc',
// Lives in a *separate* NocoDB base ("Expenses", p3hq2fxdevqcaq8) but the
// v2 records API addresses tables by id, so no per-base config is needed.
expenses: 'mxfcefkk4dqs6uq',
} as const; } as const;
// ─── HTTP shape ───────────────────────────────────────────────────────────── // ─── HTTP shape ─────────────────────────────────────────────────────────────
@@ -120,6 +123,9 @@ export interface NocoDbSnapshot {
websiteContactFormSubmissions: NocoDbRow[]; websiteContactFormSubmissions: NocoDbRow[];
websiteBerthEoiSupplements: NocoDbRow[]; websiteBerthEoiSupplements: NocoDbRow[];
berths: NocoDbRow[]; berths: NocoDbRow[];
/** From the separate "Expenses" base. Optional so test fixtures and other
* snapshot sources needn't provide it; `fetchSnapshot` always does. */
expenses?: NocoDbRow[];
fetchedAt: string; fetchedAt: string;
} }
@@ -131,6 +137,7 @@ export async function fetchSnapshot(config: NocoDbConfig): Promise<NocoDbSnapsho
websiteContactFormSubmissions, websiteContactFormSubmissions,
websiteBerthEoiSupplements, websiteBerthEoiSupplements,
berths, berths,
expenses,
] = await Promise.all([ ] = await Promise.all([
fetchAllRows(NOCO_TABLES.interests, config), fetchAllRows(NOCO_TABLES.interests, config),
fetchAllRows(NOCO_TABLES.residentialInterests, config), fetchAllRows(NOCO_TABLES.residentialInterests, config),
@@ -138,6 +145,7 @@ export async function fetchSnapshot(config: NocoDbConfig): Promise<NocoDbSnapsho
fetchAllRows(NOCO_TABLES.websiteContactFormSubmissions, config), fetchAllRows(NOCO_TABLES.websiteContactFormSubmissions, config),
fetchAllRows(NOCO_TABLES.websiteBerthEoiSupplements, config), fetchAllRows(NOCO_TABLES.websiteBerthEoiSupplements, config),
fetchAllRows(NOCO_TABLES.berths, config), fetchAllRows(NOCO_TABLES.berths, config),
fetchAllRows(NOCO_TABLES.expenses, config),
]); ]);
return { return {
@@ -147,6 +155,7 @@ export async function fetchSnapshot(config: NocoDbConfig): Promise<NocoDbSnapsho
websiteContactFormSubmissions, websiteContactFormSubmissions,
websiteBerthEoiSupplements, websiteBerthEoiSupplements,
berths, berths,
expenses,
fetchedAt: new Date().toISOString(), fetchedAt: new Date().toISOString(),
}; };
} }