feat(migration): old-LOI EOI recovery, folded berth-links, contactless flag
Three polish items so the legacy seed is one-shot and complete: - backfill-documents: recover the ~10 pre-Documenso "LOI process" EOIs whose signed PDF lives only as a NocoDB attachment in the `database` MinIO bucket (the pipeline keys EOI-doc creation off documensoID, so it never created rows for them). Reads EOI_Document attachment metadata from the local nocodb_legacy dump, pulls the PDF (read-only) from the `database` bucket, and CREATES the document + file + folder, linking the signed PDF. Idempotent via a `nocodb_eoi_document` ledger entry. - connect-berth-links: refactored into an exported connectBerthLinks() and folded into migrate-from-nocodb --apply (best-effort; skips with a warning if the local dump isn't restored) so the multi-berth junction is reconnected as part of the one-shot seed, not a separate manual step. - migration-apply: contactless legacy clients (no email/phone across the whole dedup cluster) get a per-port "Needs contact info" tag so staff can filter + chase them, instead of being dropped. The current dev DB's 29 contactless clients were tagged via a one-off mirroring the pipeline logic. EOI recovery code is ready but the actual run needs LEGACY_MINIO_* read creds supplied at the command line. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -247,6 +247,27 @@ async function main(): Promise<void> {
|
|||||||
console.log(` … ${result.warnings.length - 20} more`);
|
console.log(` … ${result.warnings.length - 20} more`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Multi-berth links (folded in for the one-shot seed) ──────────────────
|
||||||
|
// The dedup plan only carries each deal's single `Berth Number`; the legacy
|
||||||
|
// `_nc_m2m_Berths_Interests` junction (multi-berth deals) is reconnected
|
||||||
|
// here from the local `nocodb_legacy` snapshot. Best-effort: if the dump
|
||||||
|
// isn't restored, log + continue (the standalone script can run it later).
|
||||||
|
try {
|
||||||
|
const { connectBerthLinks } = await import('./migration/connect-berth-links');
|
||||||
|
const bl = await connectBerthLinks({ portSlug: port.slug });
|
||||||
|
console.log(
|
||||||
|
` Berths: ${bl.inserted} multi-berth links inserted (${bl.madePrimary} new primary), ${bl.skipped} already linked`,
|
||||||
|
);
|
||||||
|
if (bl.unresolved.length > 0) {
|
||||||
|
console.log(` ⚠ ${bl.unresolved.length} moorings with no CRM berth`);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.log(
|
||||||
|
` Berths: ⚠ multi-berth link step skipped (${(err as Error).message}). ` +
|
||||||
|
`Run scripts/migration/connect-berth-links.ts once the nocodb_legacy dump is restored.`,
|
||||||
|
);
|
||||||
|
}
|
||||||
console.log('');
|
console.log('');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
import 'dotenv/config';
|
import 'dotenv/config';
|
||||||
import { randomUUID } from 'node:crypto';
|
import { randomUUID } from 'node:crypto';
|
||||||
import { Client as MinioClient } from 'minio';
|
import { Client as MinioClient } from 'minio';
|
||||||
|
import postgres from 'postgres';
|
||||||
import { and, eq, isNull } from 'drizzle-orm';
|
import { and, eq, isNull } from 'drizzle-orm';
|
||||||
|
|
||||||
import { db, closeDb } from '@/lib/db';
|
import { db, closeDb } from '@/lib/db';
|
||||||
@@ -26,6 +27,8 @@ import { ports } from '@/lib/db/schema/ports';
|
|||||||
import { berths } from '@/lib/db/schema/berths';
|
import { berths } from '@/lib/db/schema/berths';
|
||||||
import { documents, files } from '@/lib/db/schema/documents';
|
import { documents, files } from '@/lib/db/schema/documents';
|
||||||
import { clients } from '@/lib/db/schema/clients';
|
import { clients } from '@/lib/db/schema/clients';
|
||||||
|
import { interests } from '@/lib/db/schema/interests';
|
||||||
|
import { migrationSourceLinks } from '@/lib/db/schema/migration';
|
||||||
import { getStorageBackend } from '@/lib/storage';
|
import { getStorageBackend } from '@/lib/storage';
|
||||||
import { buildStoragePath } from '@/lib/minio';
|
import { buildStoragePath } from '@/lib/minio';
|
||||||
import { ensureEntityFolder } from '@/lib/services/document-folders.service';
|
import { ensureEntityFolder } from '@/lib/services/document-folders.service';
|
||||||
@@ -40,6 +43,8 @@ const slugArg = (() => {
|
|||||||
})();
|
})();
|
||||||
|
|
||||||
const LEGACY_BUCKET = process.env.LEGACY_MINIO_BUCKET ?? 'client-portal';
|
const LEGACY_BUCKET = process.env.LEGACY_MINIO_BUCKET ?? 'client-portal';
|
||||||
|
// NocoDB's own attachment store — where pre-Documenso "LOI process" EOIs live.
|
||||||
|
const DATABASE_BUCKET = process.env.LEGACY_MINIO_DATABASE_BUCKET ?? 'database';
|
||||||
const legacy = new MinioClient({
|
const legacy = new MinioClient({
|
||||||
endPoint: process.env.LEGACY_MINIO_ENDPOINT ?? 's3.portnimara.com',
|
endPoint: process.env.LEGACY_MINIO_ENDPOINT ?? 's3.portnimara.com',
|
||||||
port: 443,
|
port: 443,
|
||||||
@@ -48,6 +53,11 @@ const legacy = new MinioClient({
|
|||||||
secretKey: process.env.LEGACY_MINIO_SECRET_KEY ?? '',
|
secretKey: process.env.LEGACY_MINIO_SECRET_KEY ?? '',
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Read-only connection to the LOCAL restored NocoDB dump (`nocodb_legacy`) —
|
||||||
|
// used to read the `EOI_Document` attachment metadata. Never prod.
|
||||||
|
const CRM_DB_URL = process.env.DATABASE_URL ?? '';
|
||||||
|
const LEGACY_DB_URL = process.env.LEGACY_DB_URL ?? CRM_DB_URL.replace(/\/[^/]+$/, '/nocodb_legacy');
|
||||||
|
|
||||||
/** Levenshtein edit distance — conservative fuzzy name matching for legacy
|
/** Levenshtein edit distance — conservative fuzzy name matching for legacy
|
||||||
* spelling/format drift (Koshbin↔Khoshbin, Costanzo↔Constanzo). */
|
* spelling/format drift (Koshbin↔Khoshbin, Costanzo↔Constanzo). */
|
||||||
function lev(a: string, b: string): number {
|
function lev(a: string, b: string): number {
|
||||||
@@ -275,6 +285,171 @@ async function backfillEois(port: { id: string; slug: string }) {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ─── Old-LOI EOIs (NocoDB `database` bucket attachments) ─────────────────────
|
||||||
|
// The ~10 pre-Documenso "LOI process" deals have no documensoID and no curated
|
||||||
|
// client-portal/EOIs copy; their signed PDF lives only as a NocoDB attachment
|
||||||
|
// in the `database` bucket. The main pipeline keys EOI-doc creation off
|
||||||
|
// documensoID, so it never created a document row for them. Here we CREATE the
|
||||||
|
// document + file + folder and link the recovered PDF. Idempotent via a
|
||||||
|
// `nocodb_eoi_document` ledger entry per legacy interest.
|
||||||
|
function legacyKeyFromUrl(url: string): string | null {
|
||||||
|
// https://<host>/database/nc/uploads/... → nc/uploads/...
|
||||||
|
const marker = `/${DATABASE_BUCKET}/`;
|
||||||
|
const i = url.indexOf(marker);
|
||||||
|
if (i < 0) return null;
|
||||||
|
return decodeURIComponent(url.slice(i + marker.length));
|
||||||
|
}
|
||||||
|
|
||||||
|
async function backfillOldLoiEois(
|
||||||
|
port: { id: string; slug: string },
|
||||||
|
legacyDb: ReturnType<typeof postgres>,
|
||||||
|
) {
|
||||||
|
const rows = (await legacyDb`
|
||||||
|
select id, "EOI_Document"::text as doc
|
||||||
|
from plplouets5zw1um."Interests"
|
||||||
|
where "EOI_Document" is not null and "EOI_Document"::text not in ('', '[]', 'null')
|
||||||
|
`) as unknown as Array<{ id: number; doc: string }>;
|
||||||
|
|
||||||
|
const backend = await getStorageBackend();
|
||||||
|
let created = 0;
|
||||||
|
let skipped = 0;
|
||||||
|
let unmatched = 0;
|
||||||
|
const unresolved: string[] = [];
|
||||||
|
|
||||||
|
for (const r of rows) {
|
||||||
|
let url: string | null = null;
|
||||||
|
let title: string | null = null;
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(r.doc) as unknown;
|
||||||
|
const first = Array.isArray(parsed) && parsed.length > 0 ? parsed[0] : null;
|
||||||
|
if (first && typeof first === 'object') {
|
||||||
|
const rec = first as Record<string, unknown>;
|
||||||
|
if (typeof rec.url === 'string') url = rec.url;
|
||||||
|
if (typeof rec.title === 'string') title = rec.title;
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// ignore malformed attachment JSON
|
||||||
|
}
|
||||||
|
const key = url ? legacyKeyFromUrl(url) : null;
|
||||||
|
if (!key) {
|
||||||
|
unmatched++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// legacy interest id → migrated interest
|
||||||
|
const [link] = await db
|
||||||
|
.select({ interestId: migrationSourceLinks.targetEntityId })
|
||||||
|
.from(migrationSourceLinks)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(migrationSourceLinks.sourceSystem, 'nocodb_interests'),
|
||||||
|
eq(migrationSourceLinks.sourceId, String(r.id)),
|
||||||
|
eq(migrationSourceLinks.targetEntityType, 'interest'),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.limit(1);
|
||||||
|
if (!link) {
|
||||||
|
unresolved.push(`legacy#${r.id} (not a migrated interest)`);
|
||||||
|
unmatched++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const interestId = link.interestId;
|
||||||
|
|
||||||
|
// Idempotency: skip if this attachment was already recovered.
|
||||||
|
const [already] = await db
|
||||||
|
.select({ id: migrationSourceLinks.id })
|
||||||
|
.from(migrationSourceLinks)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(migrationSourceLinks.sourceSystem, 'nocodb_eoi_document'),
|
||||||
|
eq(migrationSourceLinks.sourceId, String(r.id)),
|
||||||
|
eq(migrationSourceLinks.targetEntityType, 'document'),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.limit(1);
|
||||||
|
if (already) {
|
||||||
|
skipped++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const [intRow] = await db
|
||||||
|
.select({ clientId: interests.clientId, yachtId: interests.yachtId })
|
||||||
|
.from(interests)
|
||||||
|
.where(eq(interests.id, interestId))
|
||||||
|
.limit(1);
|
||||||
|
if (!intRow?.clientId) {
|
||||||
|
unmatched++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const clientId = intRow.clientId;
|
||||||
|
|
||||||
|
if (DRY) {
|
||||||
|
created++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const buf = await streamToBuffer(await legacy.getObject(DATABASE_BUCKET, key));
|
||||||
|
const docId = randomUUID();
|
||||||
|
const storageKey = buildStoragePath(port.slug, 'eoi-signed', docId, randomUUID(), 'pdf');
|
||||||
|
const putRes = await backend.put(storageKey, buf, {
|
||||||
|
contentType: 'application/pdf',
|
||||||
|
sizeBytes: buf.length,
|
||||||
|
});
|
||||||
|
const folder = await ensureEntityFolder(port.id, 'client', clientId, SUPER_ADMIN_USER_ID);
|
||||||
|
const fileName = title || key.split('/').pop() || 'eoi-signed.pdf';
|
||||||
|
|
||||||
|
await db.transaction(async (tx) => {
|
||||||
|
const [f] = await tx
|
||||||
|
.insert(files)
|
||||||
|
.values({
|
||||||
|
portId: port.id,
|
||||||
|
filename: fileName,
|
||||||
|
originalName: fileName,
|
||||||
|
storagePath: putRes.key,
|
||||||
|
mimeType: 'application/pdf',
|
||||||
|
sizeBytes: String(putRes.sizeBytes),
|
||||||
|
category: 'eoi',
|
||||||
|
folderId: folder.id,
|
||||||
|
clientId,
|
||||||
|
interestId,
|
||||||
|
uploadedBy: 'system',
|
||||||
|
})
|
||||||
|
.returning({ id: files.id });
|
||||||
|
if (!f) throw new Error('files insert returned no row');
|
||||||
|
|
||||||
|
await tx.insert(documents).values({
|
||||||
|
id: docId,
|
||||||
|
portId: port.id,
|
||||||
|
interestId,
|
||||||
|
clientId,
|
||||||
|
yachtId: intRow.yachtId ?? null,
|
||||||
|
documentType: 'eoi',
|
||||||
|
title: `External EOI (legacy) - ${fileName}`,
|
||||||
|
status: 'completed',
|
||||||
|
isManualUpload: true,
|
||||||
|
signedFileId: f.id,
|
||||||
|
createdBy: SUPER_ADMIN_USER_ID,
|
||||||
|
});
|
||||||
|
|
||||||
|
await tx
|
||||||
|
.update(interests)
|
||||||
|
.set({ eoiDocStatus: 'signed', updatedAt: new Date() })
|
||||||
|
.where(eq(interests.id, interestId));
|
||||||
|
|
||||||
|
await tx.insert(migrationSourceLinks).values({
|
||||||
|
sourceSystem: 'nocodb_eoi_document',
|
||||||
|
sourceId: String(r.id),
|
||||||
|
targetEntityType: 'document',
|
||||||
|
targetEntityId: docId,
|
||||||
|
appliedId: `oldloi-${docId}`,
|
||||||
|
appliedBy: SUPER_ADMIN_USER_ID,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
created++;
|
||||||
|
}
|
||||||
|
return { total: rows.length, created, skipped, unmatched, unresolved };
|
||||||
|
}
|
||||||
|
|
||||||
async function main() {
|
async function main() {
|
||||||
if (!process.env.LEGACY_MINIO_ACCESS_KEY || !process.env.LEGACY_MINIO_SECRET_KEY) {
|
if (!process.env.LEGACY_MINIO_ACCESS_KEY || !process.env.LEGACY_MINIO_SECRET_KEY) {
|
||||||
console.error(
|
console.error(
|
||||||
@@ -303,6 +478,20 @@ async function main() {
|
|||||||
for (const n of eoiRes.unresolved.slice(0, 25)) console.log(` - ${n}`);
|
for (const n of eoiRes.unresolved.slice(0, 25)) console.log(` - ${n}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
console.log('[backfill] Old-LOI EOIs (NocoDB `database` bucket)…');
|
||||||
|
const legacyDb = postgres(LEGACY_DB_URL, { max: 2 });
|
||||||
|
try {
|
||||||
|
const loiRes = await backfillOldLoiEois(port, legacyDb);
|
||||||
|
console.log(
|
||||||
|
` old-LOI EOIs: ${loiRes.total} attachments → ${loiRes.created} created, ${loiRes.skipped} already done, ${loiRes.unmatched} unmatched`,
|
||||||
|
);
|
||||||
|
if (loiRes.unresolved.length > 0) {
|
||||||
|
for (const n of loiRes.unresolved.slice(0, 25)) console.log(` - ${n}`);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
await legacyDb.end().catch(() => {});
|
||||||
|
}
|
||||||
|
|
||||||
await closeDb();
|
await closeDb();
|
||||||
process.exit(0);
|
process.exit(0);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,143 +1,175 @@
|
|||||||
/**
|
/**
|
||||||
* Fix-up: connect the multi-berth links the main pipeline missed.
|
* Fix-up: connect the multi-berth links the main dedup pipeline misses.
|
||||||
*
|
*
|
||||||
* The dedup pipeline migrated only each interest's single `Berth Number` text
|
* The dedup pipeline migrates only each interest's single `Berth Number` text
|
||||||
* field; the legacy `_nc_m2m_Berths_Interests` junction (multi-berth deals) was
|
* field; the legacy `_nc_m2m_Berths_Interests` junction (multi-berth deals) is
|
||||||
* not carried over. This reads that junction from the `nocodb_legacy` snapshot,
|
* not carried over by it. This reads that junction from the `nocodb_legacy`
|
||||||
* resolves each legacy interest → its migrated interest (via the ledger) and
|
* snapshot, resolves each legacy interest → its migrated interest (via the
|
||||||
* each mooring → the migrated berth, and inserts the missing `interest_berths`
|
* ledger) and each mooring → the migrated berth, and inserts the missing
|
||||||
* rows.
|
* `interest_berths` rows.
|
||||||
*
|
*
|
||||||
* Idempotent: `ON CONFLICT (interest_id, berth_id) DO NOTHING`. Primary safety:
|
* Idempotent: `ON CONFLICT (interest_id, berth_id) DO NOTHING`. Primary safety:
|
||||||
* only makes a berth primary when the interest has no primary yet (≤1 primary
|
* only makes a berth primary when the interest has no primary yet (≤1 primary
|
||||||
* per interest is a partial unique index).
|
* per interest is a partial unique index).
|
||||||
*
|
*
|
||||||
* pnpm tsx scripts/migration/connect-berth-links.ts [--port-slug port-nimara]
|
* Exposed as `connectBerthLinks(...)` so `migrate-from-nocodb.ts --apply` can
|
||||||
|
* fold it into the one-shot seed; also runnable standalone:
|
||||||
|
*
|
||||||
|
* pnpm tsx scripts/migration/connect-berth-links.ts [--port-slug port-nimara] [--dry-run]
|
||||||
*/
|
*/
|
||||||
import 'dotenv/config';
|
import 'dotenv/config';
|
||||||
import { randomUUID } from 'node:crypto';
|
import { randomUUID } from 'node:crypto';
|
||||||
import postgres from 'postgres';
|
import postgres from 'postgres';
|
||||||
|
|
||||||
const slugArg = (() => {
|
|
||||||
const i = process.argv.indexOf('--port-slug');
|
|
||||||
return i >= 0 ? (process.argv[i + 1] ?? 'port-nimara') : 'port-nimara';
|
|
||||||
})();
|
|
||||||
const DRY = process.argv.includes('--dry-run');
|
|
||||||
|
|
||||||
const CRM_URL = process.env.DATABASE_URL!;
|
|
||||||
const LEGACY_URL = process.env.LEGACY_DB_URL ?? CRM_URL.replace(/\/[^/]+$/, '/nocodb_legacy');
|
|
||||||
const crm = postgres(CRM_URL, { max: 4 });
|
|
||||||
const legacy = postgres(LEGACY_URL, { max: 4 });
|
|
||||||
|
|
||||||
const canonMoo = (raw: string): string => {
|
const canonMoo = (raw: string): string => {
|
||||||
const m = /^([A-Za-z]+)-?0*(\d+)$/.exec((raw ?? '').trim());
|
const m = /^([A-Za-z]+)-?0*(\d+)$/.exec((raw ?? '').trim());
|
||||||
return m ? `${m[1]!.toUpperCase()}${parseInt(m[2]!, 10)}` : (raw ?? '').trim();
|
return m ? `${m[1]!.toUpperCase()}${parseInt(m[2]!, 10)}` : (raw ?? '').trim();
|
||||||
};
|
};
|
||||||
|
|
||||||
async function main() {
|
export interface ConnectBerthLinksResult {
|
||||||
const [port] = await crm`select id from ports where slug=${slugArg} limit 1`;
|
inserted: number;
|
||||||
if (!port) throw new Error(`no port ${slugArg}`);
|
madePrimary: number;
|
||||||
const portId = port.id as string;
|
skipped: number;
|
||||||
|
unresolved: string[];
|
||||||
// legacy junction: interestId → set(moorings) (+ the Berth_Number text field)
|
|
||||||
const mooById = new Map<number, string>();
|
|
||||||
for (const b of await legacy`select id, "Mooring_Number" m from plplouets5zw1um."Berths"`)
|
|
||||||
mooById.set(b.id as number, canonMoo(b.m as string));
|
|
||||||
const legacyMoo = new Map<number, Set<string>>();
|
|
||||||
for (const j of await legacy`select "Interests_id" i, "Berths_id" b from plplouets5zw1um."_nc_m2m_Berths_Interests"`) {
|
|
||||||
const set = legacyMoo.get(j.i as number) ?? new Set<string>();
|
|
||||||
const m = mooById.get(j.b as number);
|
|
||||||
if (m) set.add(m);
|
|
||||||
legacyMoo.set(j.i as number, set);
|
|
||||||
}
|
|
||||||
// EOI-signed flag per legacy interest (for is_in_eoi_bundle)
|
|
||||||
const signed = new Set<number>();
|
|
||||||
for (const r of await legacy`select id, "EOI_Status" e, "LOI_NDA_Document" l from plplouets5zw1um."Interests"`) {
|
|
||||||
const e = ((r.e as string) ?? '').trim();
|
|
||||||
const l = ((r.l as string) ?? '').trim();
|
|
||||||
if (
|
|
||||||
e === 'Signed' ||
|
|
||||||
['Signing Complete', 'Signed by Client', 'Signed by Developer'].includes(l)
|
|
||||||
)
|
|
||||||
signed.add(r.id as number);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ledger: legacy interest id → new interest id
|
|
||||||
const links =
|
|
||||||
await crm`select source_id, target_entity_id from migration_source_links where source_system='nocodb_interests' and target_entity_type='interest'`;
|
|
||||||
const newInterestBySrc = new Map(
|
|
||||||
links.map((l) => [Number(l.source_id), l.target_entity_id as string]),
|
|
||||||
);
|
|
||||||
|
|
||||||
// CRM berth id by mooring (this port)
|
|
||||||
const berthByMoo = new Map(
|
|
||||||
(await crm`select id, mooring_number m from berths where port_id=${portId}`).map((b) => [
|
|
||||||
b.m as string,
|
|
||||||
b.id as string,
|
|
||||||
]),
|
|
||||||
);
|
|
||||||
|
|
||||||
let inserted = 0;
|
|
||||||
let madePrimary = 0;
|
|
||||||
let skipped = 0;
|
|
||||||
const unresolved: string[] = [];
|
|
||||||
|
|
||||||
for (const [legacyId, moorings] of legacyMoo) {
|
|
||||||
const interestId = newInterestBySrc.get(legacyId);
|
|
||||||
if (!interestId) continue; // not a migrated interest (backup/copy tables)
|
|
||||||
// does this interest already have a primary?
|
|
||||||
const primaryCheck =
|
|
||||||
await crm`select exists(select 1 from interest_berths where interest_id=${interestId} and is_primary) as has`;
|
|
||||||
let hasPrimary = (primaryCheck[0]?.has as boolean | undefined) ?? false;
|
|
||||||
|
|
||||||
for (const moo of moorings) {
|
|
||||||
const berthId = berthByMoo.get(moo);
|
|
||||||
if (!berthId) {
|
|
||||||
unresolved.push(`${legacyId}:${moo}`);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
const makePrimary = !hasPrimary;
|
|
||||||
if (DRY) {
|
|
||||||
inserted++;
|
|
||||||
if (makePrimary) {
|
|
||||||
madePrimary++;
|
|
||||||
hasPrimary = true;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
const res = await crm`
|
|
||||||
insert into interest_berths (id, interest_id, berth_id, is_primary, is_specific_interest, is_in_eoi_bundle)
|
|
||||||
values (${randomUUID()}, ${interestId}, ${berthId}, ${makePrimary}, true, ${signed.has(legacyId)})
|
|
||||||
on conflict (interest_id, berth_id) do nothing
|
|
||||||
returning id`;
|
|
||||||
if (res.length > 0) {
|
|
||||||
inserted++;
|
|
||||||
if (makePrimary) {
|
|
||||||
madePrimary++;
|
|
||||||
hasPrimary = true;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
skipped++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
console.log(
|
|
||||||
`connect-berth-links ${DRY ? '(DRY)' : ''}: inserted ${inserted} links (${madePrimary} new primary), ${skipped} already linked`,
|
|
||||||
);
|
|
||||||
if (unresolved.length)
|
|
||||||
console.log(
|
|
||||||
` ⚠ ${unresolved.length} moorings with no CRM berth: ${unresolved.slice(0, 20).join(', ')}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
await crm.end();
|
|
||||||
await legacy.end();
|
|
||||||
process.exit(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
main().catch(async (e) => {
|
/**
|
||||||
console.error('connect-berth-links failed:', e);
|
* Self-contained: opens its own CRM + legacy connections (read-only on the
|
||||||
await crm.end().catch(() => {});
|
* legacy snapshot), does the work, closes them, returns stats. Safe to call
|
||||||
await legacy.end().catch(() => {});
|
* from the runner or standalone.
|
||||||
process.exit(1);
|
*/
|
||||||
});
|
export async function connectBerthLinks(opts: {
|
||||||
|
portSlug?: string;
|
||||||
|
dryRun?: boolean;
|
||||||
|
}): Promise<ConnectBerthLinksResult> {
|
||||||
|
const slug = opts.portSlug ?? 'port-nimara';
|
||||||
|
const dry = opts.dryRun ?? false;
|
||||||
|
|
||||||
|
const CRM_URL = process.env.DATABASE_URL!;
|
||||||
|
const LEGACY_URL = process.env.LEGACY_DB_URL ?? CRM_URL.replace(/\/[^/]+$/, '/nocodb_legacy');
|
||||||
|
const crm = postgres(CRM_URL, { max: 4 });
|
||||||
|
const legacy = postgres(LEGACY_URL, { max: 4 });
|
||||||
|
|
||||||
|
try {
|
||||||
|
const [port] = await crm`select id from ports where slug=${slug} limit 1`;
|
||||||
|
if (!port) throw new Error(`no port ${slug}`);
|
||||||
|
const portId = port.id as string;
|
||||||
|
|
||||||
|
// legacy junction: interestId → set(moorings)
|
||||||
|
const mooById = new Map<number, string>();
|
||||||
|
for (const b of await legacy`select id, "Mooring_Number" m from plplouets5zw1um."Berths"`)
|
||||||
|
mooById.set(b.id as number, canonMoo(b.m as string));
|
||||||
|
const legacyMoo = new Map<number, Set<string>>();
|
||||||
|
for (const j of await legacy`select "Interests_id" i, "Berths_id" b from plplouets5zw1um."_nc_m2m_Berths_Interests"`) {
|
||||||
|
const set = legacyMoo.get(j.i as number) ?? new Set<string>();
|
||||||
|
const m = mooById.get(j.b as number);
|
||||||
|
if (m) set.add(m);
|
||||||
|
legacyMoo.set(j.i as number, set);
|
||||||
|
}
|
||||||
|
// EOI-signed flag per legacy interest (for is_in_eoi_bundle)
|
||||||
|
const signed = new Set<number>();
|
||||||
|
for (const r of await legacy`select id, "EOI_Status" e, "LOI_NDA_Document" l from plplouets5zw1um."Interests"`) {
|
||||||
|
const e = ((r.e as string) ?? '').trim();
|
||||||
|
const l = ((r.l as string) ?? '').trim();
|
||||||
|
if (
|
||||||
|
e === 'Signed' ||
|
||||||
|
['Signing Complete', 'Signed by Client', 'Signed by Developer'].includes(l)
|
||||||
|
)
|
||||||
|
signed.add(r.id as number);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ledger: legacy interest id → new interest id
|
||||||
|
const links =
|
||||||
|
await crm`select source_id, target_entity_id from migration_source_links where source_system='nocodb_interests' and target_entity_type='interest'`;
|
||||||
|
const newInterestBySrc = new Map(
|
||||||
|
links.map((l) => [Number(l.source_id), l.target_entity_id as string]),
|
||||||
|
);
|
||||||
|
|
||||||
|
// CRM berth id by mooring (this port)
|
||||||
|
const berthByMoo = new Map(
|
||||||
|
(await crm`select id, mooring_number m from berths where port_id=${portId}`).map((b) => [
|
||||||
|
b.m as string,
|
||||||
|
b.id as string,
|
||||||
|
]),
|
||||||
|
);
|
||||||
|
|
||||||
|
let inserted = 0;
|
||||||
|
let madePrimary = 0;
|
||||||
|
let skipped = 0;
|
||||||
|
const unresolved: string[] = [];
|
||||||
|
|
||||||
|
for (const [legacyId, moorings] of legacyMoo) {
|
||||||
|
const interestId = newInterestBySrc.get(legacyId);
|
||||||
|
if (!interestId) continue; // not a migrated interest (backup/copy tables)
|
||||||
|
const primaryCheck =
|
||||||
|
await crm`select exists(select 1 from interest_berths where interest_id=${interestId} and is_primary) as has`;
|
||||||
|
let hasPrimary = (primaryCheck[0]?.has as boolean | undefined) ?? false;
|
||||||
|
|
||||||
|
for (const moo of moorings) {
|
||||||
|
const berthId = berthByMoo.get(moo);
|
||||||
|
if (!berthId) {
|
||||||
|
unresolved.push(`${legacyId}:${moo}`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const makePrimary = !hasPrimary;
|
||||||
|
if (dry) {
|
||||||
|
inserted++;
|
||||||
|
if (makePrimary) {
|
||||||
|
madePrimary++;
|
||||||
|
hasPrimary = true;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const res = await crm`
|
||||||
|
insert into interest_berths (id, interest_id, berth_id, is_primary, is_specific_interest, is_in_eoi_bundle)
|
||||||
|
values (${randomUUID()}, ${interestId}, ${berthId}, ${makePrimary}, true, ${signed.has(legacyId)})
|
||||||
|
on conflict (interest_id, berth_id) do nothing
|
||||||
|
returning id`;
|
||||||
|
if (res.length > 0) {
|
||||||
|
inserted++;
|
||||||
|
if (makePrimary) {
|
||||||
|
madePrimary++;
|
||||||
|
hasPrimary = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
skipped++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return { inserted, madePrimary, skipped, unresolved };
|
||||||
|
} finally {
|
||||||
|
await crm.end().catch(() => {});
|
||||||
|
await legacy.end().catch(() => {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Standalone CLI ──────────────────────────────────────────────────────────
|
||||||
|
function isMain(): boolean {
|
||||||
|
const arg = process.argv[1] ?? '';
|
||||||
|
return arg.includes('connect-berth-links');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isMain()) {
|
||||||
|
const slugArg = (() => {
|
||||||
|
const i = process.argv.indexOf('--port-slug');
|
||||||
|
return i >= 0 ? (process.argv[i + 1] ?? 'port-nimara') : 'port-nimara';
|
||||||
|
})();
|
||||||
|
const dry = process.argv.includes('--dry-run');
|
||||||
|
|
||||||
|
connectBerthLinks({ portSlug: slugArg, dryRun: dry })
|
||||||
|
.then((r) => {
|
||||||
|
console.log(
|
||||||
|
`connect-berth-links ${dry ? '(DRY)' : ''}: inserted ${r.inserted} links (${r.madePrimary} new primary), ${r.skipped} already linked`,
|
||||||
|
);
|
||||||
|
if (r.unresolved.length)
|
||||||
|
console.log(
|
||||||
|
` ⚠ ${r.unresolved.length} moorings with no CRM berth: ${r.unresolved.slice(0, 20).join(', ')}`,
|
||||||
|
);
|
||||||
|
process.exit(0);
|
||||||
|
})
|
||||||
|
.catch((e) => {
|
||||||
|
console.error('connect-berth-links failed:', e);
|
||||||
|
process.exit(1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,13 +19,14 @@
|
|||||||
import { and, eq, inArray } from 'drizzle-orm';
|
import { and, eq, inArray } from 'drizzle-orm';
|
||||||
|
|
||||||
import { db } from '@/lib/db';
|
import { db } from '@/lib/db';
|
||||||
import { clients, clientContacts, clientAddresses } from '@/lib/db/schema/clients';
|
import { clients, clientContacts, clientAddresses, clientTags } from '@/lib/db/schema/clients';
|
||||||
import { interests, interestBerths } from '@/lib/db/schema/interests';
|
import { interests, interestBerths } from '@/lib/db/schema/interests';
|
||||||
import { yachts } from '@/lib/db/schema/yachts';
|
import { yachts } from '@/lib/db/schema/yachts';
|
||||||
import { berths } from '@/lib/db/schema/berths';
|
import { berths } from '@/lib/db/schema/berths';
|
||||||
import { documents, documentSigners } from '@/lib/db/schema/documents';
|
import { documents, documentSigners } from '@/lib/db/schema/documents';
|
||||||
import { residentialClients, residentialInterests } from '@/lib/db/schema/residential';
|
import { residentialClients, residentialInterests } from '@/lib/db/schema/residential';
|
||||||
import { expenses } from '@/lib/db/schema/financial';
|
import { expenses } from '@/lib/db/schema/financial';
|
||||||
|
import { tags } from '@/lib/db/schema/system';
|
||||||
import { migrationSourceLinks } from '@/lib/db/schema/migration';
|
import { migrationSourceLinks } from '@/lib/db/schema/migration';
|
||||||
import type {
|
import type {
|
||||||
MigrationPlan,
|
MigrationPlan,
|
||||||
@@ -38,6 +39,29 @@ import type {
|
|||||||
|
|
||||||
const SOURCE_SYSTEM = 'nocodb_interests';
|
const SOURCE_SYSTEM = 'nocodb_interests';
|
||||||
|
|
||||||
|
/** Tag applied to legacy clients that arrived with no email/phone, so staff
|
||||||
|
* can filter + chase them. Kept here (not in the transform) because it's an
|
||||||
|
* apply-side side effect, not part of the canonical planned shape. */
|
||||||
|
const CONTACTLESS_TAG_NAME = 'Needs contact info';
|
||||||
|
const contactlessTagByPort = new Map<string, string>();
|
||||||
|
|
||||||
|
async function ensureContactlessTag(portId: string): Promise<string> {
|
||||||
|
const cached = contactlessTagByPort.get(portId);
|
||||||
|
if (cached) return cached;
|
||||||
|
await db
|
||||||
|
.insert(tags)
|
||||||
|
.values({ portId, name: CONTACTLESS_TAG_NAME, color: '#F59E0B' })
|
||||||
|
.onConflictDoNothing();
|
||||||
|
const [row] = await db
|
||||||
|
.select({ id: tags.id })
|
||||||
|
.from(tags)
|
||||||
|
.where(and(eq(tags.portId, portId), eq(tags.name, CONTACTLESS_TAG_NAME)))
|
||||||
|
.limit(1);
|
||||||
|
if (!row) throw new Error('failed to ensure contactless tag');
|
||||||
|
contactlessTagByPort.set(portId, row.id);
|
||||||
|
return row.id;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Canonicalize a mooring string to the unified form ("A1", "D32", "E18")
|
* Canonicalize a mooring string to the unified form ("A1", "D32", "E18")
|
||||||
* used by the new berths schema after the Phase 0 mooring-number sweep.
|
* used by the new berths schema after the Phase 0 mooring-number sweep.
|
||||||
@@ -223,6 +247,14 @@ async function applyClient(
|
|||||||
result.addressesInserted += addressRows.length;
|
result.addressesInserted += addressRows.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Flag legacy records that arrived with no contact info so staff can chase
|
||||||
|
// them. The dedup cluster aggregates all source rows, so an empty contacts
|
||||||
|
// list means truly no email/phone for this person across every merged row.
|
||||||
|
if (planned.contacts.length === 0) {
|
||||||
|
const tagId = await ensureContactlessTag(opts.port.id);
|
||||||
|
await db.insert(clientTags).values({ clientId, tagId }).onConflictDoNothing();
|
||||||
|
}
|
||||||
|
|
||||||
result.clientsInserted += 1;
|
result.clientsInserted += 1;
|
||||||
return { clientId, inserted: true };
|
return { clientId, inserted: true };
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user