feat(berths): nocodb berth import script + helpers + unit tests
Idempotent NocoDB Berths -> CRM `berths` import script with full re-run safety. Re-running picks up NocoDB additions/edits without clobbering CRM-side overrides (compares updated_at vs last_imported_at, 1-second tolerance for sub-second clock drift). --force overrides the edit guard. Mitigates the §14.1 critical/high cases: - Mooring collisions: unique (port_id, mooring_number) on the table. - Concurrent runs: pg_advisory_xact_lock on a stable BIGINT key. - Numeric-with-units inputs: parseDecimalWithUnit() strips trailing ft/m/kw/v/usd/$ markers before parsing. - Metric drift: NocoDB's metric formula columns are ignored; metric values recomputed from imperial via 0.3048 + round-to-2-decimals to match NocoDB's `precision: 2` columns and avoid spurious diffs. - Map Data shape: zod-validated; failures are skipped rather than aborting the import. - Status enum mapping: NocoDB display strings -> CRM snake_case. - NocoDB row deleted: reported as "orphaned in CRM"; never auto- deleted (rep decides via admin UI in a future phase). Pure helpers (parseDecimalWithUnit, mapStatus, parseMapData, extractNumerics, mapRow, buildPlan) live in src/lib/services/berth-import.ts so vitest can exercise the mapping logic without triggering the script's top-level db connection. 40 new unit tests (956 -> 996 passing). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
405
scripts/import-berths-from-nocodb.ts
Normal file
405
scripts/import-berths-from-nocodb.ts
Normal file
@@ -0,0 +1,405 @@
|
||||
/**
|
||||
* Idempotent NocoDB Berths → CRM `berths` import.
|
||||
*
|
||||
* Re-running picks up NocoDB additions/edits without clobbering CRM-side
|
||||
* overrides: rows where `updated_at > last_imported_at` are treated as
|
||||
* human-edited and skipped (use `--force` to override). Map Data JSON
|
||||
* is validated and upserted into `berth_map_data` as a separate step.
|
||||
*
|
||||
* Usage:
|
||||
* pnpm tsx scripts/import-berths-from-nocodb.ts --dry-run [--port-slug port-nimara]
|
||||
* pnpm tsx scripts/import-berths-from-nocodb.ts --apply [--port-slug port-nimara]
|
||||
* pnpm tsx scripts/import-berths-from-nocodb.ts --apply --force
|
||||
* pnpm tsx scripts/import-berths-from-nocodb.ts --apply --update-snapshot
|
||||
*
|
||||
* Edge cases mitigated (see plan §14.1):
|
||||
* - Mooring collisions : unique (port_id, mooring_number) on the table.
|
||||
* - Concurrent runs : pg_advisory_xact_lock on a stable key.
|
||||
* - Numeric-with-units : parseDecimalWithUnit() strips trailing units.
|
||||
* - Metric drift : NocoDB metric formula columns are ignored;
|
||||
* metric values are recomputed from imperial.
|
||||
* - Map Data shape : zod-validated; failures are skipped silently
|
||||
* rather than aborting the whole import.
|
||||
* - Status enum : NocoDB display strings → CRM snake_case.
|
||||
* - NocoDB row deleted : reported as "orphaned in CRM"; not auto-deleted.
|
||||
*/
|
||||
|
||||
import 'dotenv/config';
|
||||
import { eq, sql } from 'drizzle-orm';
|
||||
import { promises as fs } from 'node:fs';
|
||||
import path from 'node:path';
|
||||
|
||||
import { db } from '@/lib/db';
|
||||
import { ports } from '@/lib/db/schema/ports';
|
||||
import { berths, berthMapData } from '@/lib/db/schema/berths';
|
||||
import { fetchAllRows, loadNocoDbConfig, NOCO_TABLES } from '@/lib/dedup/nocodb-source';
|
||||
import {
|
||||
buildPlan,
|
||||
mapRow,
|
||||
type Action,
|
||||
type ImportedBerth,
|
||||
type PlanEntry,
|
||||
type ExistingBerthRow,
|
||||
} from '@/lib/services/berth-import';
|
||||
|
||||
// ─── CLI ────────────────────────────────────────────────────────────────────
|
||||
|
||||
interface CliArgs {
|
||||
dryRun: boolean;
|
||||
apply: boolean;
|
||||
portSlug: string;
|
||||
force: boolean;
|
||||
updateSnapshot: boolean;
|
||||
}
|
||||
|
||||
function parseArgs(argv: string[]): CliArgs {
|
||||
const args: CliArgs = {
|
||||
dryRun: false,
|
||||
apply: false,
|
||||
portSlug: 'port-nimara',
|
||||
force: false,
|
||||
updateSnapshot: false,
|
||||
};
|
||||
for (let i = 0; i < argv.length; i += 1) {
|
||||
const a = argv[i]!;
|
||||
if (a === '--dry-run') args.dryRun = true;
|
||||
else if (a === '--apply') args.apply = true;
|
||||
else if (a === '--port-slug') args.portSlug = argv[++i] ?? 'port-nimara';
|
||||
else if (a === '--force') args.force = true;
|
||||
else if (a === '--update-snapshot') args.updateSnapshot = true;
|
||||
else if (a === '-h' || a === '--help') {
|
||||
printHelp();
|
||||
process.exit(0);
|
||||
} else {
|
||||
console.error(`Unknown argument: ${a}`);
|
||||
printHelp();
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
if (!args.dryRun && !args.apply) {
|
||||
console.error('Must specify either --dry-run or --apply.');
|
||||
printHelp();
|
||||
process.exit(1);
|
||||
}
|
||||
return args;
|
||||
}
|
||||
|
||||
function printHelp(): void {
|
||||
console.log(`Usage:
|
||||
pnpm tsx scripts/import-berths-from-nocodb.ts --dry-run [--port-slug <slug>]
|
||||
pnpm tsx scripts/import-berths-from-nocodb.ts --apply [--port-slug <slug>] [--force] [--update-snapshot]
|
||||
|
||||
Flags:
|
||||
--dry-run Read NocoDB + diff vs CRM. No writes.
|
||||
--apply Apply the plan to the DB.
|
||||
--port-slug <slug> Target port slug (default: port-nimara).
|
||||
--force Overwrite rows where CRM updated_at > last_imported_at.
|
||||
--update-snapshot Rewrite src/lib/db/seed-data/berths.json after apply.
|
||||
-h, --help Show this help.
|
||||
`);
|
||||
}
|
||||
|
||||
// ─── Stable advisory lock key ───────────────────────────────────────────────
|
||||
// 64-bit BIGINT - first 4 bytes spell "BRTH" so it's grep-able in pg_locks.
|
||||
const BERTH_IMPORT_LOCK_KEY = 0x4252544800000001n;
|
||||
|
||||
// ─── Apply ──────────────────────────────────────────────────────────────────
|
||||
|
||||
interface ApplyResult {
|
||||
inserted: number;
|
||||
updated: number;
|
||||
skipped: number;
|
||||
mapDataWritten: number;
|
||||
warnings: string[];
|
||||
}
|
||||
|
||||
async function apply(
|
||||
portId: string,
|
||||
plan: PlanEntry[],
|
||||
orphans: ExistingBerthRow[],
|
||||
importedAt: Date,
|
||||
): Promise<ApplyResult> {
|
||||
const result: ApplyResult = {
|
||||
inserted: 0,
|
||||
updated: 0,
|
||||
skipped: 0,
|
||||
mapDataWritten: 0,
|
||||
warnings: [],
|
||||
};
|
||||
for (const orphan of orphans) {
|
||||
result.warnings.push(
|
||||
`Orphan: CRM has mooring="${orphan.mooringNumber}" but NocoDB no longer does (id=${orphan.id})`,
|
||||
);
|
||||
}
|
||||
|
||||
await db.transaction(async (tx) => {
|
||||
// Stable lock so two simultaneous --apply runs serialize.
|
||||
await tx.execute(sql`SELECT pg_advisory_xact_lock(${BERTH_IMPORT_LOCK_KEY})`);
|
||||
|
||||
for (const entry of plan) {
|
||||
if (entry.action === 'skip-edited' || entry.action === 'noop') {
|
||||
result.skipped += 1;
|
||||
result.warnings.push(`Skipped ${entry.imported.mooringNumber}: ${entry.reason ?? 'no-op'}`);
|
||||
continue;
|
||||
}
|
||||
const i = entry.imported;
|
||||
const n = i.numerics;
|
||||
const baseValues = {
|
||||
portId,
|
||||
mooringNumber: i.mooringNumber,
|
||||
area: i.area,
|
||||
status: i.status,
|
||||
lengthFt: n.lengthFt != null ? String(n.lengthFt) : null,
|
||||
widthFt: n.widthFt != null ? String(n.widthFt) : null,
|
||||
draftFt: n.draftFt != null ? String(n.draftFt) : null,
|
||||
lengthM: n.lengthM != null ? String(n.lengthM) : null,
|
||||
widthM: n.widthM != null ? String(n.widthM) : null,
|
||||
draftM: n.draftM != null ? String(n.draftM) : null,
|
||||
widthIsMinimum: i.widthIsMinimum,
|
||||
nominalBoatSize: n.nominalBoatSize != null ? String(n.nominalBoatSize) : null,
|
||||
nominalBoatSizeM: n.nominalBoatSizeM != null ? String(n.nominalBoatSizeM) : null,
|
||||
waterDepth: n.waterDepth != null ? String(n.waterDepth) : null,
|
||||
waterDepthM: n.waterDepthM != null ? String(n.waterDepthM) : null,
|
||||
waterDepthIsMinimum: i.waterDepthIsMinimum,
|
||||
sidePontoon: i.sidePontoon,
|
||||
powerCapacity: n.powerCapacity != null ? String(n.powerCapacity) : null,
|
||||
voltage: n.voltage != null ? String(n.voltage) : null,
|
||||
mooringType: i.mooringType,
|
||||
cleatType: i.cleatType,
|
||||
cleatCapacity: i.cleatCapacity,
|
||||
bollardType: i.bollardType,
|
||||
bollardCapacity: i.bollardCapacity,
|
||||
access: i.access,
|
||||
price: n.price != null ? String(n.price) : null,
|
||||
priceCurrency: 'USD' as const,
|
||||
bowFacing: i.bowFacing,
|
||||
berthApproved: i.berthApproved,
|
||||
statusOverrideMode: i.statusOverrideMode,
|
||||
lastImportedAt: importedAt,
|
||||
updatedAt: importedAt,
|
||||
};
|
||||
|
||||
let berthId: string;
|
||||
if (entry.action === 'insert') {
|
||||
const [inserted] = await tx
|
||||
.insert(berths)
|
||||
.values({ ...baseValues, tenureType: 'permanent' })
|
||||
.returning({ id: berths.id });
|
||||
berthId = inserted!.id;
|
||||
result.inserted += 1;
|
||||
} else {
|
||||
await tx.update(berths).set(baseValues).where(eq(berths.id, entry.existing!.id));
|
||||
berthId = entry.existing!.id;
|
||||
result.updated += 1;
|
||||
}
|
||||
|
||||
if (i.mapData) {
|
||||
const mapValues = {
|
||||
berthId,
|
||||
svgPath: i.mapData.path ?? null,
|
||||
x: i.mapData.x != null ? String(i.mapData.x) : null,
|
||||
y: i.mapData.y != null ? String(i.mapData.y) : null,
|
||||
transform: i.mapData.transform ?? null,
|
||||
fontSize: i.mapData.fontSize != null ? String(i.mapData.fontSize) : null,
|
||||
updatedAt: importedAt,
|
||||
};
|
||||
await tx
|
||||
.insert(berthMapData)
|
||||
.values(mapValues)
|
||||
.onConflictDoUpdate({
|
||||
target: berthMapData.berthId,
|
||||
set: {
|
||||
svgPath: mapValues.svgPath,
|
||||
x: mapValues.x,
|
||||
y: mapValues.y,
|
||||
transform: mapValues.transform,
|
||||
fontSize: mapValues.fontSize,
|
||||
updatedAt: importedAt,
|
||||
},
|
||||
});
|
||||
result.mapDataWritten += 1;
|
||||
}
|
||||
}
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
// ─── Snapshot writer (for seed-data refresh) ────────────────────────────────
|
||||
|
||||
async function writeSnapshot(imported: ImportedBerth[]): Promise<string> {
|
||||
// Ordering: idx 0..4 available (small), 5..9 under_offer (medium),
|
||||
// 10..11 sold (large), then everything else by mooring number. The
|
||||
// first 12 indexes feed `seed-data.ts` interest/reservation stubs.
|
||||
const sortByLength = (a: ImportedBerth, b: ImportedBerth) =>
|
||||
(a.numerics.lengthFt ?? 0) - (b.numerics.lengthFt ?? 0);
|
||||
const available = imported
|
||||
.filter((b) => b.status === 'available')
|
||||
.sort(sortByLength)
|
||||
.slice(0, 5);
|
||||
const underOffer = imported
|
||||
.filter((b) => b.status === 'under_offer')
|
||||
.sort(sortByLength)
|
||||
.slice(0, 5);
|
||||
const sold = imported
|
||||
.filter((b) => b.status === 'sold')
|
||||
.sort((a, b) => -sortByLength(a, b))
|
||||
.slice(0, 2);
|
||||
const featured = new Set([...available, ...underOffer, ...sold].map((b) => b.mooringNumber));
|
||||
const rest = imported
|
||||
.filter((b) => !featured.has(b.mooringNumber))
|
||||
.sort((a, b) => a.mooringNumber.localeCompare(b.mooringNumber, 'en', { numeric: true }));
|
||||
const ordered = [...available, ...underOffer, ...sold, ...rest];
|
||||
|
||||
const payload = ordered.map((b) => ({
|
||||
legacyId: b.legacyId,
|
||||
mooringNumber: b.mooringNumber,
|
||||
area: b.area,
|
||||
status: b.status,
|
||||
lengthFt: b.numerics.lengthFt,
|
||||
widthFt: b.numerics.widthFt,
|
||||
draftFt: b.numerics.draftFt,
|
||||
lengthM: b.numerics.lengthM,
|
||||
widthM: b.numerics.widthM,
|
||||
draftM: b.numerics.draftM,
|
||||
widthIsMinimum: b.widthIsMinimum,
|
||||
nominalBoatSize: b.numerics.nominalBoatSize,
|
||||
nominalBoatSizeM: b.numerics.nominalBoatSizeM,
|
||||
waterDepth: b.numerics.waterDepth,
|
||||
waterDepthM: b.numerics.waterDepthM,
|
||||
waterDepthIsMinimum: b.waterDepthIsMinimum,
|
||||
sidePontoon: b.sidePontoon,
|
||||
powerCapacity: b.numerics.powerCapacity,
|
||||
voltage: b.numerics.voltage,
|
||||
mooringType: b.mooringType,
|
||||
cleatType: b.cleatType,
|
||||
cleatCapacity: b.cleatCapacity,
|
||||
bollardType: b.bollardType,
|
||||
bollardCapacity: b.bollardCapacity,
|
||||
access: b.access,
|
||||
price: b.numerics.price,
|
||||
bowFacing: b.bowFacing,
|
||||
berthApproved: b.berthApproved,
|
||||
statusOverrideMode: b.statusOverrideMode,
|
||||
}));
|
||||
|
||||
const target = path.resolve(process.cwd(), 'src/lib/db/seed-data/berths.json');
|
||||
await fs.writeFile(target, JSON.stringify(payload, null, 2) + '\n', 'utf8');
|
||||
return target;
|
||||
}
|
||||
|
||||
// ─── Main ───────────────────────────────────────────────────────────────────
|
||||
|
||||
async function main(): Promise<void> {
|
||||
const args = parseArgs(process.argv.slice(2));
|
||||
const config = loadNocoDbConfig();
|
||||
|
||||
const [port] = await db
|
||||
.select({ id: ports.id, slug: ports.slug })
|
||||
.from(ports)
|
||||
.where(eq(ports.slug, args.portSlug))
|
||||
.limit(1);
|
||||
if (!port) {
|
||||
console.error(`No port found with slug "${args.portSlug}".`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
console.log(`> Fetching NocoDB Berths…`);
|
||||
const rows = await fetchAllRows(NOCO_TABLES.berths, config);
|
||||
console.log(` fetched ${rows.length} rows from NocoDB`);
|
||||
|
||||
const imported: ImportedBerth[] = [];
|
||||
let skippedMalformed = 0;
|
||||
for (const r of rows) {
|
||||
const m = mapRow(r);
|
||||
if (m) imported.push(m);
|
||||
else skippedMalformed += 1;
|
||||
}
|
||||
if (skippedMalformed > 0) {
|
||||
console.warn(` ${skippedMalformed} rows skipped (missing Mooring Number)`);
|
||||
}
|
||||
|
||||
// De-dup against any same-mooring twins surfacing from NocoDB
|
||||
// (defensive — the Berths table is keyed on Mooring Number in NocoDB).
|
||||
const seen = new Set<string>();
|
||||
const dedup: ImportedBerth[] = [];
|
||||
for (const b of imported) {
|
||||
if (seen.has(b.mooringNumber)) {
|
||||
console.warn(` duplicate mooring "${b.mooringNumber}" in NocoDB — keeping first`);
|
||||
continue;
|
||||
}
|
||||
seen.add(b.mooringNumber);
|
||||
dedup.push(b);
|
||||
}
|
||||
|
||||
console.log(`> Reading current CRM berths for port "${port.slug}"…`);
|
||||
const existingRows = await db
|
||||
.select({
|
||||
id: berths.id,
|
||||
mooringNumber: berths.mooringNumber,
|
||||
updatedAt: berths.updatedAt,
|
||||
lastImportedAt: berths.lastImportedAt,
|
||||
})
|
||||
.from(berths)
|
||||
.where(eq(berths.portId, port.id));
|
||||
console.log(` ${existingRows.length} existing rows`);
|
||||
|
||||
const existingByMooring = new Map(existingRows.map((r) => [r.mooringNumber, r]));
|
||||
const { plan, orphans } = buildPlan(dedup, existingByMooring, args.force);
|
||||
|
||||
const counts = plan.reduce(
|
||||
(acc, e) => {
|
||||
acc[e.action] += 1;
|
||||
return acc;
|
||||
},
|
||||
{ insert: 0, update: 0, 'skip-edited': 0, noop: 0 } as Record<Action, number>,
|
||||
);
|
||||
|
||||
console.log(`> Plan:`);
|
||||
console.log(` insert : ${counts.insert}`);
|
||||
console.log(` update : ${counts.update}`);
|
||||
console.log(` skip-edited : ${counts['skip-edited']}`);
|
||||
console.log(` no-op : ${counts.noop}`);
|
||||
console.log(` orphans (CRM): ${orphans.length}`);
|
||||
|
||||
if (counts['skip-edited'] > 0) {
|
||||
console.log(` ↳ Skipped (CRM-edited; pass --force to overwrite):`);
|
||||
for (const e of plan.filter((p) => p.action === 'skip-edited').slice(0, 10)) {
|
||||
console.log(` - ${e.imported.mooringNumber} ${e.reason}`);
|
||||
}
|
||||
if (counts['skip-edited'] > 10) console.log(` …and ${counts['skip-edited'] - 10} more`);
|
||||
}
|
||||
if (orphans.length > 0) {
|
||||
console.log(` ↳ Orphans (in CRM but missing from NocoDB):`);
|
||||
for (const o of orphans.slice(0, 10)) console.log(` - ${o.mooringNumber}`);
|
||||
if (orphans.length > 10) console.log(` …and ${orphans.length - 10} more`);
|
||||
}
|
||||
|
||||
if (args.dryRun) {
|
||||
console.log(`\n[dry-run] no writes performed.`);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`> Applying…`);
|
||||
const result = await apply(port.id, plan, orphans, new Date());
|
||||
console.log(` inserted : ${result.inserted}`);
|
||||
console.log(` updated : ${result.updated}`);
|
||||
console.log(` skipped : ${result.skipped}`);
|
||||
console.log(` map data writes : ${result.mapDataWritten}`);
|
||||
if (result.warnings.length) {
|
||||
console.log(` warnings :`);
|
||||
for (const w of result.warnings.slice(0, 20)) console.log(` - ${w}`);
|
||||
if (result.warnings.length > 20) console.log(` …and ${result.warnings.length - 20} more`);
|
||||
}
|
||||
|
||||
if (args.updateSnapshot) {
|
||||
const written = await writeSnapshot(dedup);
|
||||
console.log(`> Wrote ${dedup.length} rows to ${path.relative(process.cwd(), written)}`);
|
||||
}
|
||||
}
|
||||
|
||||
main()
|
||||
.then(() => process.exit(0))
|
||||
.catch((err: unknown) => {
|
||||
console.error(err);
|
||||
process.exit(1);
|
||||
});
|
||||
Reference in New Issue
Block a user