Files
pn-new-crm/scripts/import-berths-from-nocodb.ts
Matt Ciaccio 18119644ae feat(berths): nocodb berth import script + helpers + unit tests
Idempotent NocoDB Berths -> CRM `berths` import script with full
re-run safety. Re-running picks up NocoDB additions/edits without
clobbering CRM-side overrides (compares updated_at vs last_imported_at,
1-second tolerance for sub-second clock drift). --force overrides the
edit guard.

Mitigates the §14.1 critical/high cases:
- Mooring collisions: unique (port_id, mooring_number) on the table.
- Concurrent runs: pg_advisory_xact_lock on a stable BIGINT key.
- Numeric-with-units inputs: parseDecimalWithUnit() strips trailing
  ft/m/kw/v/usd/$ markers before parsing.
- Metric drift: NocoDB's metric formula columns are ignored; metric
  values recomputed from imperial via 0.3048 + round-to-2-decimals to
  match NocoDB's `precision: 2` columns and avoid spurious diffs.
- Map Data shape: zod-validated; failures are skipped rather than
  aborting the import.
- Status enum mapping: NocoDB display strings -> CRM snake_case.
- NocoDB row deleted: reported as "orphaned in CRM"; never auto-
  deleted (rep decides via admin UI in a future phase).

Pure helpers (parseDecimalWithUnit, mapStatus, parseMapData,
extractNumerics, mapRow, buildPlan) live in
src/lib/services/berth-import.ts so vitest can exercise the mapping
logic without triggering the script's top-level db connection.

40 new unit tests (956 -> 996 passing).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 02:07:58 +02:00

406 lines
15 KiB
TypeScript

/**
* Idempotent NocoDB Berths → CRM `berths` import.
*
* Re-running picks up NocoDB additions/edits without clobbering CRM-side
* overrides: rows where `updated_at > last_imported_at` are treated as
* human-edited and skipped (use `--force` to override). Map Data JSON
* is validated and upserted into `berth_map_data` as a separate step.
*
* Usage:
* pnpm tsx scripts/import-berths-from-nocodb.ts --dry-run [--port-slug port-nimara]
* pnpm tsx scripts/import-berths-from-nocodb.ts --apply [--port-slug port-nimara]
* pnpm tsx scripts/import-berths-from-nocodb.ts --apply --force
* pnpm tsx scripts/import-berths-from-nocodb.ts --apply --update-snapshot
*
* Edge cases mitigated (see plan §14.1):
* - Mooring collisions : unique (port_id, mooring_number) on the table.
* - Concurrent runs : pg_advisory_xact_lock on a stable key.
* - Numeric-with-units : parseDecimalWithUnit() strips trailing units.
* - Metric drift : NocoDB metric formula columns are ignored;
* metric values are recomputed from imperial.
* - Map Data shape : zod-validated; failures are skipped silently
* rather than aborting the whole import.
* - Status enum : NocoDB display strings → CRM snake_case.
* - NocoDB row deleted : reported as "orphaned in CRM"; not auto-deleted.
*/
import 'dotenv/config';
import { eq, sql } from 'drizzle-orm';
import { promises as fs } from 'node:fs';
import path from 'node:path';
import { db } from '@/lib/db';
import { ports } from '@/lib/db/schema/ports';
import { berths, berthMapData } from '@/lib/db/schema/berths';
import { fetchAllRows, loadNocoDbConfig, NOCO_TABLES } from '@/lib/dedup/nocodb-source';
import {
buildPlan,
mapRow,
type Action,
type ImportedBerth,
type PlanEntry,
type ExistingBerthRow,
} from '@/lib/services/berth-import';
// ─── CLI ────────────────────────────────────────────────────────────────────
interface CliArgs {
dryRun: boolean;
apply: boolean;
portSlug: string;
force: boolean;
updateSnapshot: boolean;
}
function parseArgs(argv: string[]): CliArgs {
const args: CliArgs = {
dryRun: false,
apply: false,
portSlug: 'port-nimara',
force: false,
updateSnapshot: false,
};
for (let i = 0; i < argv.length; i += 1) {
const a = argv[i]!;
if (a === '--dry-run') args.dryRun = true;
else if (a === '--apply') args.apply = true;
else if (a === '--port-slug') args.portSlug = argv[++i] ?? 'port-nimara';
else if (a === '--force') args.force = true;
else if (a === '--update-snapshot') args.updateSnapshot = true;
else if (a === '-h' || a === '--help') {
printHelp();
process.exit(0);
} else {
console.error(`Unknown argument: ${a}`);
printHelp();
process.exit(1);
}
}
if (!args.dryRun && !args.apply) {
console.error('Must specify either --dry-run or --apply.');
printHelp();
process.exit(1);
}
return args;
}
function printHelp(): void {
console.log(`Usage:
pnpm tsx scripts/import-berths-from-nocodb.ts --dry-run [--port-slug <slug>]
pnpm tsx scripts/import-berths-from-nocodb.ts --apply [--port-slug <slug>] [--force] [--update-snapshot]
Flags:
--dry-run Read NocoDB + diff vs CRM. No writes.
--apply Apply the plan to the DB.
--port-slug <slug> Target port slug (default: port-nimara).
--force Overwrite rows where CRM updated_at > last_imported_at.
--update-snapshot Rewrite src/lib/db/seed-data/berths.json after apply.
-h, --help Show this help.
`);
}
// ─── Stable advisory lock key ───────────────────────────────────────────────
// 64-bit BIGINT - first 4 bytes spell "BRTH" so it's grep-able in pg_locks.
const BERTH_IMPORT_LOCK_KEY = 0x4252544800000001n;
// ─── Apply ──────────────────────────────────────────────────────────────────
interface ApplyResult {
inserted: number;
updated: number;
skipped: number;
mapDataWritten: number;
warnings: string[];
}
async function apply(
portId: string,
plan: PlanEntry[],
orphans: ExistingBerthRow[],
importedAt: Date,
): Promise<ApplyResult> {
const result: ApplyResult = {
inserted: 0,
updated: 0,
skipped: 0,
mapDataWritten: 0,
warnings: [],
};
for (const orphan of orphans) {
result.warnings.push(
`Orphan: CRM has mooring="${orphan.mooringNumber}" but NocoDB no longer does (id=${orphan.id})`,
);
}
await db.transaction(async (tx) => {
// Stable lock so two simultaneous --apply runs serialize.
await tx.execute(sql`SELECT pg_advisory_xact_lock(${BERTH_IMPORT_LOCK_KEY})`);
for (const entry of plan) {
if (entry.action === 'skip-edited' || entry.action === 'noop') {
result.skipped += 1;
result.warnings.push(`Skipped ${entry.imported.mooringNumber}: ${entry.reason ?? 'no-op'}`);
continue;
}
const i = entry.imported;
const n = i.numerics;
const baseValues = {
portId,
mooringNumber: i.mooringNumber,
area: i.area,
status: i.status,
lengthFt: n.lengthFt != null ? String(n.lengthFt) : null,
widthFt: n.widthFt != null ? String(n.widthFt) : null,
draftFt: n.draftFt != null ? String(n.draftFt) : null,
lengthM: n.lengthM != null ? String(n.lengthM) : null,
widthM: n.widthM != null ? String(n.widthM) : null,
draftM: n.draftM != null ? String(n.draftM) : null,
widthIsMinimum: i.widthIsMinimum,
nominalBoatSize: n.nominalBoatSize != null ? String(n.nominalBoatSize) : null,
nominalBoatSizeM: n.nominalBoatSizeM != null ? String(n.nominalBoatSizeM) : null,
waterDepth: n.waterDepth != null ? String(n.waterDepth) : null,
waterDepthM: n.waterDepthM != null ? String(n.waterDepthM) : null,
waterDepthIsMinimum: i.waterDepthIsMinimum,
sidePontoon: i.sidePontoon,
powerCapacity: n.powerCapacity != null ? String(n.powerCapacity) : null,
voltage: n.voltage != null ? String(n.voltage) : null,
mooringType: i.mooringType,
cleatType: i.cleatType,
cleatCapacity: i.cleatCapacity,
bollardType: i.bollardType,
bollardCapacity: i.bollardCapacity,
access: i.access,
price: n.price != null ? String(n.price) : null,
priceCurrency: 'USD' as const,
bowFacing: i.bowFacing,
berthApproved: i.berthApproved,
statusOverrideMode: i.statusOverrideMode,
lastImportedAt: importedAt,
updatedAt: importedAt,
};
let berthId: string;
if (entry.action === 'insert') {
const [inserted] = await tx
.insert(berths)
.values({ ...baseValues, tenureType: 'permanent' })
.returning({ id: berths.id });
berthId = inserted!.id;
result.inserted += 1;
} else {
await tx.update(berths).set(baseValues).where(eq(berths.id, entry.existing!.id));
berthId = entry.existing!.id;
result.updated += 1;
}
if (i.mapData) {
const mapValues = {
berthId,
svgPath: i.mapData.path ?? null,
x: i.mapData.x != null ? String(i.mapData.x) : null,
y: i.mapData.y != null ? String(i.mapData.y) : null,
transform: i.mapData.transform ?? null,
fontSize: i.mapData.fontSize != null ? String(i.mapData.fontSize) : null,
updatedAt: importedAt,
};
await tx
.insert(berthMapData)
.values(mapValues)
.onConflictDoUpdate({
target: berthMapData.berthId,
set: {
svgPath: mapValues.svgPath,
x: mapValues.x,
y: mapValues.y,
transform: mapValues.transform,
fontSize: mapValues.fontSize,
updatedAt: importedAt,
},
});
result.mapDataWritten += 1;
}
}
});
return result;
}
// ─── Snapshot writer (for seed-data refresh) ────────────────────────────────
async function writeSnapshot(imported: ImportedBerth[]): Promise<string> {
// Ordering: idx 0..4 available (small), 5..9 under_offer (medium),
// 10..11 sold (large), then everything else by mooring number. The
// first 12 indexes feed `seed-data.ts` interest/reservation stubs.
const sortByLength = (a: ImportedBerth, b: ImportedBerth) =>
(a.numerics.lengthFt ?? 0) - (b.numerics.lengthFt ?? 0);
const available = imported
.filter((b) => b.status === 'available')
.sort(sortByLength)
.slice(0, 5);
const underOffer = imported
.filter((b) => b.status === 'under_offer')
.sort(sortByLength)
.slice(0, 5);
const sold = imported
.filter((b) => b.status === 'sold')
.sort((a, b) => -sortByLength(a, b))
.slice(0, 2);
const featured = new Set([...available, ...underOffer, ...sold].map((b) => b.mooringNumber));
const rest = imported
.filter((b) => !featured.has(b.mooringNumber))
.sort((a, b) => a.mooringNumber.localeCompare(b.mooringNumber, 'en', { numeric: true }));
const ordered = [...available, ...underOffer, ...sold, ...rest];
const payload = ordered.map((b) => ({
legacyId: b.legacyId,
mooringNumber: b.mooringNumber,
area: b.area,
status: b.status,
lengthFt: b.numerics.lengthFt,
widthFt: b.numerics.widthFt,
draftFt: b.numerics.draftFt,
lengthM: b.numerics.lengthM,
widthM: b.numerics.widthM,
draftM: b.numerics.draftM,
widthIsMinimum: b.widthIsMinimum,
nominalBoatSize: b.numerics.nominalBoatSize,
nominalBoatSizeM: b.numerics.nominalBoatSizeM,
waterDepth: b.numerics.waterDepth,
waterDepthM: b.numerics.waterDepthM,
waterDepthIsMinimum: b.waterDepthIsMinimum,
sidePontoon: b.sidePontoon,
powerCapacity: b.numerics.powerCapacity,
voltage: b.numerics.voltage,
mooringType: b.mooringType,
cleatType: b.cleatType,
cleatCapacity: b.cleatCapacity,
bollardType: b.bollardType,
bollardCapacity: b.bollardCapacity,
access: b.access,
price: b.numerics.price,
bowFacing: b.bowFacing,
berthApproved: b.berthApproved,
statusOverrideMode: b.statusOverrideMode,
}));
const target = path.resolve(process.cwd(), 'src/lib/db/seed-data/berths.json');
await fs.writeFile(target, JSON.stringify(payload, null, 2) + '\n', 'utf8');
return target;
}
// ─── Main ───────────────────────────────────────────────────────────────────
async function main(): Promise<void> {
const args = parseArgs(process.argv.slice(2));
const config = loadNocoDbConfig();
const [port] = await db
.select({ id: ports.id, slug: ports.slug })
.from(ports)
.where(eq(ports.slug, args.portSlug))
.limit(1);
if (!port) {
console.error(`No port found with slug "${args.portSlug}".`);
process.exit(1);
}
console.log(`> Fetching NocoDB Berths…`);
const rows = await fetchAllRows(NOCO_TABLES.berths, config);
console.log(` fetched ${rows.length} rows from NocoDB`);
const imported: ImportedBerth[] = [];
let skippedMalformed = 0;
for (const r of rows) {
const m = mapRow(r);
if (m) imported.push(m);
else skippedMalformed += 1;
}
if (skippedMalformed > 0) {
console.warn(` ${skippedMalformed} rows skipped (missing Mooring Number)`);
}
// De-dup against any same-mooring twins surfacing from NocoDB
// (defensive — the Berths table is keyed on Mooring Number in NocoDB).
const seen = new Set<string>();
const dedup: ImportedBerth[] = [];
for (const b of imported) {
if (seen.has(b.mooringNumber)) {
console.warn(` duplicate mooring "${b.mooringNumber}" in NocoDB — keeping first`);
continue;
}
seen.add(b.mooringNumber);
dedup.push(b);
}
console.log(`> Reading current CRM berths for port "${port.slug}"…`);
const existingRows = await db
.select({
id: berths.id,
mooringNumber: berths.mooringNumber,
updatedAt: berths.updatedAt,
lastImportedAt: berths.lastImportedAt,
})
.from(berths)
.where(eq(berths.portId, port.id));
console.log(` ${existingRows.length} existing rows`);
const existingByMooring = new Map(existingRows.map((r) => [r.mooringNumber, r]));
const { plan, orphans } = buildPlan(dedup, existingByMooring, args.force);
const counts = plan.reduce(
(acc, e) => {
acc[e.action] += 1;
return acc;
},
{ insert: 0, update: 0, 'skip-edited': 0, noop: 0 } as Record<Action, number>,
);
console.log(`> Plan:`);
console.log(` insert : ${counts.insert}`);
console.log(` update : ${counts.update}`);
console.log(` skip-edited : ${counts['skip-edited']}`);
console.log(` no-op : ${counts.noop}`);
console.log(` orphans (CRM): ${orphans.length}`);
if (counts['skip-edited'] > 0) {
console.log(` ↳ Skipped (CRM-edited; pass --force to overwrite):`);
for (const e of plan.filter((p) => p.action === 'skip-edited').slice(0, 10)) {
console.log(` - ${e.imported.mooringNumber} ${e.reason}`);
}
if (counts['skip-edited'] > 10) console.log(` …and ${counts['skip-edited'] - 10} more`);
}
if (orphans.length > 0) {
console.log(` ↳ Orphans (in CRM but missing from NocoDB):`);
for (const o of orphans.slice(0, 10)) console.log(` - ${o.mooringNumber}`);
if (orphans.length > 10) console.log(` …and ${orphans.length - 10} more`);
}
if (args.dryRun) {
console.log(`\n[dry-run] no writes performed.`);
return;
}
console.log(`> Applying…`);
const result = await apply(port.id, plan, orphans, new Date());
console.log(` inserted : ${result.inserted}`);
console.log(` updated : ${result.updated}`);
console.log(` skipped : ${result.skipped}`);
console.log(` map data writes : ${result.mapDataWritten}`);
if (result.warnings.length) {
console.log(` warnings :`);
for (const w of result.warnings.slice(0, 20)) console.log(` - ${w}`);
if (result.warnings.length > 20) console.log(` …and ${result.warnings.length - 20} more`);
}
if (args.updateSnapshot) {
const written = await writeSnapshot(dedup);
console.log(`> Wrote ${dedup.length} rows to ${path.relative(process.cwd(), written)}`);
}
}
main()
.then(() => process.exit(0))
.catch((err: unknown) => {
console.error(err);
process.exit(1);
});