/** * Idempotent NocoDB Berths → CRM `berths` import. * * Re-running picks up NocoDB additions/edits without clobbering CRM-side * overrides: rows where `updated_at > last_imported_at` are treated as * human-edited and skipped (use `--force` to override). Map Data JSON * is validated and upserted into `berth_map_data` as a separate step. * * Usage: * pnpm tsx scripts/import-berths-from-nocodb.ts --dry-run [--port-slug port-nimara] * pnpm tsx scripts/import-berths-from-nocodb.ts --apply [--port-slug port-nimara] * pnpm tsx scripts/import-berths-from-nocodb.ts --apply --force * pnpm tsx scripts/import-berths-from-nocodb.ts --apply --update-snapshot * * Edge cases mitigated (see plan §14.1): * - Mooring collisions : unique (port_id, mooring_number) on the table. * - Concurrent runs : pg_advisory_xact_lock on a stable key. * - Numeric-with-units : parseDecimalWithUnit() strips trailing units. * - Metric drift : NocoDB metric formula columns are ignored; * metric values are recomputed from imperial. * - Map Data shape : zod-validated; failures are skipped silently * rather than aborting the whole import. * - Status enum : NocoDB display strings → CRM snake_case. * - NocoDB row deleted : reported as "orphaned in CRM"; not auto-deleted. */ import 'dotenv/config'; import { eq, sql } from 'drizzle-orm'; import { promises as fs } from 'node:fs'; import path from 'node:path'; import { db } from '@/lib/db'; import { ports } from '@/lib/db/schema/ports'; import { berths, berthMapData } from '@/lib/db/schema/berths'; import { fetchAllRows, loadNocoDbConfig, NOCO_TABLES } from '@/lib/dedup/nocodb-source'; import { buildPlan, mapRow, type Action, type ImportedBerth, type PlanEntry, type ExistingBerthRow, } from '@/lib/services/berth-import'; // ─── CLI ──────────────────────────────────────────────────────────────────── interface CliArgs { dryRun: boolean; apply: boolean; portSlug: string; force: boolean; updateSnapshot: boolean; } function parseArgs(argv: string[]): CliArgs { const args: CliArgs = { dryRun: false, apply: false, portSlug: 'port-nimara', force: false, updateSnapshot: false, }; for (let i = 0; i < argv.length; i += 1) { const a = argv[i]!; if (a === '--dry-run') args.dryRun = true; else if (a === '--apply') args.apply = true; else if (a === '--port-slug') args.portSlug = argv[++i] ?? 'port-nimara'; else if (a === '--force') args.force = true; else if (a === '--update-snapshot') args.updateSnapshot = true; else if (a === '-h' || a === '--help') { printHelp(); process.exit(0); } else { console.error(`Unknown argument: ${a}`); printHelp(); process.exit(1); } } if (!args.dryRun && !args.apply) { console.error('Must specify either --dry-run or --apply.'); printHelp(); process.exit(1); } return args; } function printHelp(): void { console.log(`Usage: pnpm tsx scripts/import-berths-from-nocodb.ts --dry-run [--port-slug ] pnpm tsx scripts/import-berths-from-nocodb.ts --apply [--port-slug ] [--force] [--update-snapshot] Flags: --dry-run Read NocoDB + diff vs CRM. No writes. --apply Apply the plan to the DB. --port-slug Target port slug (default: port-nimara). --force Overwrite rows where CRM updated_at > last_imported_at. --update-snapshot Rewrite src/lib/db/seed-data/berths.json after apply. -h, --help Show this help. `); } // ─── Stable advisory lock key ─────────────────────────────────────────────── // 64-bit BIGINT - first 4 bytes spell "BRTH" so it's grep-able in pg_locks. const BERTH_IMPORT_LOCK_KEY = 0x4252544800000001n; // ─── Apply ────────────────────────────────────────────────────────────────── interface ApplyResult { inserted: number; updated: number; skipped: number; mapDataWritten: number; warnings: string[]; } async function apply( portId: string, plan: PlanEntry[], orphans: ExistingBerthRow[], importedAt: Date, ): Promise { const result: ApplyResult = { inserted: 0, updated: 0, skipped: 0, mapDataWritten: 0, warnings: [], }; for (const orphan of orphans) { result.warnings.push( `Orphan: CRM has mooring="${orphan.mooringNumber}" but NocoDB no longer does (id=${orphan.id})`, ); } await db.transaction(async (tx) => { // Stable lock so two simultaneous --apply runs serialize. await tx.execute(sql`SELECT pg_advisory_xact_lock(${BERTH_IMPORT_LOCK_KEY})`); for (const entry of plan) { if (entry.action === 'skip-edited' || entry.action === 'noop') { result.skipped += 1; result.warnings.push(`Skipped ${entry.imported.mooringNumber}: ${entry.reason ?? 'no-op'}`); continue; } const i = entry.imported; const n = i.numerics; const baseValues = { portId, mooringNumber: i.mooringNumber, area: i.area, status: i.status, lengthFt: n.lengthFt != null ? String(n.lengthFt) : null, widthFt: n.widthFt != null ? String(n.widthFt) : null, draftFt: n.draftFt != null ? String(n.draftFt) : null, lengthM: n.lengthM != null ? String(n.lengthM) : null, widthM: n.widthM != null ? String(n.widthM) : null, draftM: n.draftM != null ? String(n.draftM) : null, widthIsMinimum: i.widthIsMinimum, nominalBoatSize: n.nominalBoatSize != null ? String(n.nominalBoatSize) : null, nominalBoatSizeM: n.nominalBoatSizeM != null ? String(n.nominalBoatSizeM) : null, waterDepth: n.waterDepth != null ? String(n.waterDepth) : null, waterDepthM: n.waterDepthM != null ? String(n.waterDepthM) : null, waterDepthIsMinimum: i.waterDepthIsMinimum, sidePontoon: i.sidePontoon, powerCapacity: n.powerCapacity != null ? String(n.powerCapacity) : null, voltage: n.voltage != null ? String(n.voltage) : null, mooringType: i.mooringType, cleatType: i.cleatType, cleatCapacity: i.cleatCapacity, bollardType: i.bollardType, bollardCapacity: i.bollardCapacity, access: i.access, price: n.price != null ? String(n.price) : null, priceCurrency: 'USD' as const, bowFacing: i.bowFacing, berthApproved: i.berthApproved, statusOverrideMode: i.statusOverrideMode, lastImportedAt: importedAt, updatedAt: importedAt, }; let berthId: string; if (entry.action === 'insert') { const [inserted] = await tx .insert(berths) .values({ ...baseValues, tenureType: 'permanent' }) .returning({ id: berths.id }); berthId = inserted!.id; result.inserted += 1; } else { await tx.update(berths).set(baseValues).where(eq(berths.id, entry.existing!.id)); berthId = entry.existing!.id; result.updated += 1; } if (i.mapData) { const mapValues = { berthId, svgPath: i.mapData.path ?? null, x: i.mapData.x != null ? String(i.mapData.x) : null, y: i.mapData.y != null ? String(i.mapData.y) : null, transform: i.mapData.transform ?? null, fontSize: i.mapData.fontSize != null ? String(i.mapData.fontSize) : null, updatedAt: importedAt, }; await tx .insert(berthMapData) .values(mapValues) .onConflictDoUpdate({ target: berthMapData.berthId, set: { svgPath: mapValues.svgPath, x: mapValues.x, y: mapValues.y, transform: mapValues.transform, fontSize: mapValues.fontSize, updatedAt: importedAt, }, }); result.mapDataWritten += 1; } } }); return result; } // ─── Snapshot writer (for seed-data refresh) ──────────────────────────────── async function writeSnapshot(imported: ImportedBerth[]): Promise { // Ordering: idx 0..4 available (small), 5..9 under_offer (medium), // 10..11 sold (large), then everything else by mooring number. The // first 12 indexes feed `seed-data.ts` interest/reservation stubs. const sortByLength = (a: ImportedBerth, b: ImportedBerth) => (a.numerics.lengthFt ?? 0) - (b.numerics.lengthFt ?? 0); const available = imported .filter((b) => b.status === 'available') .sort(sortByLength) .slice(0, 5); const underOffer = imported .filter((b) => b.status === 'under_offer') .sort(sortByLength) .slice(0, 5); const sold = imported .filter((b) => b.status === 'sold') .sort((a, b) => -sortByLength(a, b)) .slice(0, 2); const featured = new Set([...available, ...underOffer, ...sold].map((b) => b.mooringNumber)); const rest = imported .filter((b) => !featured.has(b.mooringNumber)) .sort((a, b) => a.mooringNumber.localeCompare(b.mooringNumber, 'en', { numeric: true })); const ordered = [...available, ...underOffer, ...sold, ...rest]; const payload = ordered.map((b) => ({ legacyId: b.legacyId, mooringNumber: b.mooringNumber, area: b.area, status: b.status, lengthFt: b.numerics.lengthFt, widthFt: b.numerics.widthFt, draftFt: b.numerics.draftFt, lengthM: b.numerics.lengthM, widthM: b.numerics.widthM, draftM: b.numerics.draftM, widthIsMinimum: b.widthIsMinimum, nominalBoatSize: b.numerics.nominalBoatSize, nominalBoatSizeM: b.numerics.nominalBoatSizeM, waterDepth: b.numerics.waterDepth, waterDepthM: b.numerics.waterDepthM, waterDepthIsMinimum: b.waterDepthIsMinimum, sidePontoon: b.sidePontoon, powerCapacity: b.numerics.powerCapacity, voltage: b.numerics.voltage, mooringType: b.mooringType, cleatType: b.cleatType, cleatCapacity: b.cleatCapacity, bollardType: b.bollardType, bollardCapacity: b.bollardCapacity, access: b.access, price: b.numerics.price, bowFacing: b.bowFacing, berthApproved: b.berthApproved, statusOverrideMode: b.statusOverrideMode, })); const target = path.resolve(process.cwd(), 'src/lib/db/seed-data/berths.json'); await fs.writeFile(target, JSON.stringify(payload, null, 2) + '\n', 'utf8'); return target; } // ─── Main ─────────────────────────────────────────────────────────────────── async function main(): Promise { const args = parseArgs(process.argv.slice(2)); const config = loadNocoDbConfig(); const [port] = await db .select({ id: ports.id, slug: ports.slug }) .from(ports) .where(eq(ports.slug, args.portSlug)) .limit(1); if (!port) { console.error(`No port found with slug "${args.portSlug}".`); process.exit(1); } console.log(`> Fetching NocoDB Berths…`); const rows = await fetchAllRows(NOCO_TABLES.berths, config); console.log(` fetched ${rows.length} rows from NocoDB`); const imported: ImportedBerth[] = []; let skippedMalformed = 0; for (const r of rows) { const m = mapRow(r); if (m) imported.push(m); else skippedMalformed += 1; } if (skippedMalformed > 0) { console.warn(` ${skippedMalformed} rows skipped (missing Mooring Number)`); } // De-dup against any same-mooring twins surfacing from NocoDB // (defensive — the Berths table is keyed on Mooring Number in NocoDB). const seen = new Set(); const dedup: ImportedBerth[] = []; for (const b of imported) { if (seen.has(b.mooringNumber)) { console.warn(` duplicate mooring "${b.mooringNumber}" in NocoDB — keeping first`); continue; } seen.add(b.mooringNumber); dedup.push(b); } console.log(`> Reading current CRM berths for port "${port.slug}"…`); const existingRows = await db .select({ id: berths.id, mooringNumber: berths.mooringNumber, updatedAt: berths.updatedAt, lastImportedAt: berths.lastImportedAt, }) .from(berths) .where(eq(berths.portId, port.id)); console.log(` ${existingRows.length} existing rows`); const existingByMooring = new Map(existingRows.map((r) => [r.mooringNumber, r])); const { plan, orphans } = buildPlan(dedup, existingByMooring, args.force); const counts = plan.reduce( (acc, e) => { acc[e.action] += 1; return acc; }, { insert: 0, update: 0, 'skip-edited': 0, noop: 0 } as Record, ); console.log(`> Plan:`); console.log(` insert : ${counts.insert}`); console.log(` update : ${counts.update}`); console.log(` skip-edited : ${counts['skip-edited']}`); console.log(` no-op : ${counts.noop}`); console.log(` orphans (CRM): ${orphans.length}`); if (counts['skip-edited'] > 0) { console.log(` ↳ Skipped (CRM-edited; pass --force to overwrite):`); for (const e of plan.filter((p) => p.action === 'skip-edited').slice(0, 10)) { console.log(` - ${e.imported.mooringNumber} ${e.reason}`); } if (counts['skip-edited'] > 10) console.log(` …and ${counts['skip-edited'] - 10} more`); } if (orphans.length > 0) { console.log(` ↳ Orphans (in CRM but missing from NocoDB):`); for (const o of orphans.slice(0, 10)) console.log(` - ${o.mooringNumber}`); if (orphans.length > 10) console.log(` …and ${orphans.length - 10} more`); } if (args.dryRun) { console.log(`\n[dry-run] no writes performed.`); return; } console.log(`> Applying…`); const result = await apply(port.id, plan, orphans, new Date()); console.log(` inserted : ${result.inserted}`); console.log(` updated : ${result.updated}`); console.log(` skipped : ${result.skipped}`); console.log(` map data writes : ${result.mapDataWritten}`); if (result.warnings.length) { console.log(` warnings :`); for (const w of result.warnings.slice(0, 20)) console.log(` - ${w}`); if (result.warnings.length > 20) console.log(` …and ${result.warnings.length - 20} more`); } if (args.updateSnapshot) { const written = await writeSnapshot(dedup); console.log(`> Wrote ${dedup.length} rows to ${path.relative(process.cwd(), written)}`); } } main() .then(() => process.exit(0)) .catch((err: unknown) => { console.error(err); process.exit(1); });