Phase 6 — bounds three remaining unbounded Promise.all fan-outs that the audit flagged as potential prod-incident vectors. Same pattern proven by email-compose (4 concurrent S3 reads) and document-signing-emails (3 concurrent SMTP sends) in earlier commits. berth-pdf.service.ts:574 — presignDownload S3 round-trips bound: pLimit(8). A 20-version berth used to issue 20 simultaneous presigns. ~1× round-trip latency preserved on typical 5-15-version berths; pathological 100-version case no longer saturates the keep-alive pool. custom-fields.service.ts:327 — pg upserts on bulk field-value writes bound: pLimit(8). Port admin stacking 50+ field definitions on one client would have burst 50 concurrent upserts at the pg pool. notifications.service.ts:344 — createNotification fan-out across watchers bound: pLimit(8). Hot pipeline items can accumulate many watchers; a document event used to fan out N notification inserts + N socket emits in one burst. Audit also flagged brochures.service.ts and backup.service.ts as candidates — verified neither actually has an unbounded fan-out, just sequential queries. No change needed; speculative entries removed from BACKLOG implicitly. 1298/1298 vitest green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
677 lines
25 KiB
TypeScript
677 lines
25 KiB
TypeScript
/**
|
||
* Berth PDF management service (Phase 6b — see plan §4.7b, §11.1, §14.6).
|
||
*
|
||
* Responsibilities:
|
||
* - Upload a per-berth PDF (versioned), via the active `StorageBackend`.
|
||
* - Verify the magic bytes (`%PDF-`) before persisting; delete the storage
|
||
* object on mismatch (§14.6 critical).
|
||
* - Reconcile the parsed fields against the current berth row, surfacing
|
||
* conflicts for the rep's diff dialog and auto-applying nullable gaps.
|
||
* - Enforce per-port size cap from `system_settings.berth_pdf_max_upload_mb`.
|
||
* - Generate signed download URLs for the version list.
|
||
*/
|
||
|
||
import { createHash } from 'node:crypto';
|
||
|
||
import pLimit from 'p-limit';
|
||
import { and, desc, eq, isNull, max, sql } from 'drizzle-orm';
|
||
|
||
import { db } from '@/lib/db';
|
||
import { berths, berthPdfVersions } from '@/lib/db/schema/berths';
|
||
import { systemSettings } from '@/lib/db/schema/system';
|
||
import { CodedError, ConflictError, NotFoundError, ValidationError } from '@/lib/errors';
|
||
import { logger } from '@/lib/logger';
|
||
import { getStorageBackend } from '@/lib/storage';
|
||
|
||
import {
|
||
type ExtractedBerthFields,
|
||
type ParseResult,
|
||
type ParserEngine,
|
||
isPdfMagic,
|
||
} from './berth-pdf-parser';
|
||
|
||
// ─── shared types ────────────────────────────────────────────────────────────
|
||
|
||
export interface ReconcileConflict {
|
||
field: keyof ExtractedBerthFields;
|
||
crmValue: string | number | null;
|
||
pdfValue: string | number | null;
|
||
/** Confidence the parser assigned to the PDF value (0..1). */
|
||
pdfConfidence: number;
|
||
}
|
||
|
||
export interface ReconcileResult {
|
||
/** Fields where CRM was null and the PDF supplied a value; these can be
|
||
* applied automatically (the rep still sees them as "Auto-applied" chips). */
|
||
autoApplied: Array<{ field: keyof ExtractedBerthFields; value: string | number }>;
|
||
/** Fields where CRM and PDF disagree on a non-null value. The diff dialog
|
||
* shows these as a side-by-side comparison; nothing is written until the
|
||
* rep confirms via the apply endpoint. */
|
||
conflicts: ReconcileConflict[];
|
||
/** Pure-warning bucket — e.g. mooring-number mismatch with the berth being
|
||
* uploaded to (§14.6). */
|
||
warnings: string[];
|
||
/** Engine that produced the parse — surfaced on the diff UI. */
|
||
engine: ParserEngine;
|
||
}
|
||
|
||
// Field allowlist for reconcile/apply. Mirrors `berths` columns; we never
|
||
// blindly write `crypto.randomUUID()` or anything outside this set so a
|
||
// rogue parser tier can't poison the schema.
|
||
const APPLIABLE_FIELDS: ReadonlyArray<keyof ExtractedBerthFields> = [
|
||
'lengthFt',
|
||
'lengthM',
|
||
'widthFt',
|
||
'widthM',
|
||
'draftFt',
|
||
'draftM',
|
||
'waterDepth',
|
||
'waterDepthM',
|
||
'bowFacing',
|
||
'sidePontoon',
|
||
'powerCapacity',
|
||
'voltage',
|
||
'mooringType',
|
||
'cleatType',
|
||
'cleatCapacity',
|
||
'bollardType',
|
||
'bollardCapacity',
|
||
'access',
|
||
'price',
|
||
'weeklyRateHighUsd',
|
||
'weeklyRateLowUsd',
|
||
'dailyRateHighUsd',
|
||
'dailyRateLowUsd',
|
||
'pricingValidUntil',
|
||
];
|
||
|
||
// Numeric berths columns are stored as `numeric` (Drizzle returns string).
|
||
// This set tells the apply path which fields need stringification.
|
||
const NUMERIC_FIELDS = new Set<keyof ExtractedBerthFields>([
|
||
'lengthFt',
|
||
'lengthM',
|
||
'widthFt',
|
||
'widthM',
|
||
'draftFt',
|
||
'draftM',
|
||
'waterDepth',
|
||
'waterDepthM',
|
||
'powerCapacity',
|
||
'voltage',
|
||
'price',
|
||
'weeklyRateHighUsd',
|
||
'weeklyRateLowUsd',
|
||
'dailyRateHighUsd',
|
||
'dailyRateLowUsd',
|
||
]);
|
||
|
||
// Tolerance for imperial vs metric reconcile. Same threshold as the parser.
|
||
const IMPERIAL_METRIC_TOLERANCE = 0.01;
|
||
|
||
// ─── settings helpers ────────────────────────────────────────────────────────
|
||
|
||
/** Resolve `berth_pdf_max_upload_mb` with port-override → global → default 15. */
|
||
export async function getMaxUploadMb(portId: string): Promise<number> {
|
||
const KEY = 'berth_pdf_max_upload_mb';
|
||
const [portRow] = await db
|
||
.select()
|
||
.from(systemSettings)
|
||
.where(and(eq(systemSettings.key, KEY), eq(systemSettings.portId, portId)));
|
||
if (portRow && typeof portRow.value === 'number') return portRow.value;
|
||
if (portRow && typeof portRow.value === 'string') {
|
||
const n = Number(portRow.value);
|
||
if (Number.isFinite(n)) return n;
|
||
}
|
||
const [globalRow] = await db
|
||
.select()
|
||
.from(systemSettings)
|
||
.where(and(eq(systemSettings.key, KEY), isNull(systemSettings.portId)));
|
||
if (globalRow && typeof globalRow.value === 'number') return globalRow.value;
|
||
if (globalRow && typeof globalRow.value === 'string') {
|
||
const n = Number(globalRow.value);
|
||
if (Number.isFinite(n)) return n;
|
||
}
|
||
return 15;
|
||
}
|
||
|
||
// ─── upload + version management ─────────────────────────────────────────────
|
||
|
||
export interface UploadBerthPdfArgs {
|
||
berthId: string;
|
||
/**
|
||
* Acting tenant. Every public service entrypoint requires this so the berth
|
||
* lookup can be scoped to `(berth.id, port_id)` — without it a rep with
|
||
* berths:edit on port A could supply a port B berth UUID and write/read
|
||
* cross-tenant data. NotFoundError on mismatch.
|
||
*/
|
||
portId: string;
|
||
/** Already-uploaded storage key (the upload-url endpoint generated it) OR
|
||
* undefined to make this service compute one. */
|
||
storageKey?: string;
|
||
/** Raw bytes when the server proxies the upload (filesystem mode); when
|
||
* callers used a presigned PUT they pass `storageKey` and skip this. */
|
||
buffer?: Buffer;
|
||
fileName: string;
|
||
uploadedBy: string;
|
||
/** Pre-computed sha256 hex from the client (verified server-side anyway). */
|
||
sha256?: string;
|
||
/** Pre-computed bytes (used for the size cap pre-flight on direct uploads). */
|
||
fileSizeBytes?: number;
|
||
/** Result of running `parseBerthPdf` server-side. Optional — the rep may
|
||
* have skipped parsing on a re-upload. */
|
||
parseResult?: ParseResult;
|
||
}
|
||
|
||
export interface UploadBerthPdfResult {
|
||
versionId: string;
|
||
storageKey: string;
|
||
versionNumber: number;
|
||
fileSizeBytes: number;
|
||
contentSha256: string;
|
||
}
|
||
|
||
/**
|
||
* Persist a per-berth PDF version. Either the raw `buffer` or a pre-uploaded
|
||
* `storageKey` (with optional `buffer` for verification) is required.
|
||
*
|
||
* Critical mitigations enforced here:
|
||
* - §14.6 magic-byte check against the buffer when present.
|
||
* - §14.6 size cap from `berth_pdf_max_upload_mb`.
|
||
* - Storage key namespaced under `berths/{id}/v{n}/...` so two reps racing
|
||
* on the same berth can't collide (the version-number unique index in
|
||
* the DB does the dedup).
|
||
*/
|
||
export async function uploadBerthPdf(args: UploadBerthPdfArgs): Promise<UploadBerthPdfResult> {
|
||
// 1. Resolve the berth + port for size-cap lookup.
|
||
// Tenant-scoped lookup — NotFoundError when the berth lives in a different
|
||
// port so a rep on port A cannot upload PDFs against port B's berths.
|
||
const berthRow = await db.query.berths.findFirst({
|
||
where: and(eq(berths.id, args.berthId), eq(berths.portId, args.portId)),
|
||
});
|
||
if (!berthRow) throw new NotFoundError('Berth');
|
||
const maxMb = await getMaxUploadMb(berthRow.portId);
|
||
const maxBytes = maxMb * 1024 * 1024;
|
||
|
||
// 2. Per-berth advisory lock prevents two concurrent uploads from both
|
||
// computing version `v3` and racing to write blobs (the unique index
|
||
// on (berth_id, version_number) would catch the second insert, but
|
||
// only AFTER its blob is already in storage — leaving an orphan).
|
||
// The lock is scoped to a transaction wrapping the version-number
|
||
// read AND the blob write, so concurrent uploads serialize cleanly.
|
||
// NB: hash the UUID into a 32-bit int for pg_advisory_xact_lock(int).
|
||
const berthLockKey = hashBerthIdToInt(args.berthId);
|
||
|
||
// 3. Magic bytes + size when we have the buffer in hand.
|
||
const backend = await getStorageBackend();
|
||
const buffer = args.buffer;
|
||
// UUID-based storage key path so two concurrent uploads can't collide
|
||
// on the same blob path (the version_number suffix used to be in the
|
||
// key but is now a separate DB column allocated under the per-berth
|
||
// advisory lock — see step 4).
|
||
let versionNumber = 1;
|
||
let storageKey =
|
||
args.storageKey ??
|
||
`berths/${args.berthId}/${crypto.randomUUID()}/${sanitizeFileName(args.fileName)}`;
|
||
let sizeBytes = args.fileSizeBytes ?? buffer?.length ?? 0;
|
||
let sha256 = args.sha256 ?? '';
|
||
|
||
if (buffer) {
|
||
if (!isPdfMagic(buffer)) {
|
||
// Best-effort cleanup if the storage already has a partial.
|
||
if (args.storageKey) await backend.delete(args.storageKey).catch(() => undefined);
|
||
throw new CodedError('BERTHS_PDF_MAGIC_BYTE');
|
||
}
|
||
if (buffer.length === 0) throw new CodedError('BERTHS_PDF_EMPTY');
|
||
if (buffer.length > maxBytes) {
|
||
throw new CodedError('BERTHS_PDF_TOO_LARGE', {
|
||
internalMessage: `PDF exceeds ${maxMb} MB upload cap (got ${(buffer.length / 1024 / 1024).toFixed(1)} MB).`,
|
||
});
|
||
}
|
||
const written = await backend.put(storageKey, buffer, { contentType: 'application/pdf' });
|
||
storageKey = written.key;
|
||
sizeBytes = written.sizeBytes;
|
||
sha256 = written.sha256;
|
||
} else if (args.storageKey) {
|
||
// Browser uploaded directly via presigned URL — verify via HEAD + magic bytes.
|
||
const head = await backend.head(args.storageKey);
|
||
if (!head) {
|
||
throw new ValidationError('Uploaded object not found at expected storage key.');
|
||
}
|
||
if (head.sizeBytes === 0) {
|
||
await backend.delete(args.storageKey).catch(() => undefined);
|
||
throw new CodedError('BERTHS_PDF_EMPTY');
|
||
}
|
||
if (head.sizeBytes > maxBytes) {
|
||
await backend.delete(args.storageKey).catch(() => undefined);
|
||
throw new CodedError('BERTHS_PDF_TOO_LARGE', {
|
||
internalMessage: `PDF exceeds ${maxMb} MB upload cap (got ${(head.sizeBytes / 1024 / 1024).toFixed(1)} MB).`,
|
||
});
|
||
}
|
||
if (head.contentType !== 'application/pdf' && head.contentType !== 'application/octet-stream') {
|
||
await backend.delete(args.storageKey).catch(() => undefined);
|
||
throw new ValidationError(
|
||
`Uploaded object content-type is ${head.contentType}; expected application/pdf.`,
|
||
);
|
||
}
|
||
// Magic-byte check on the presign path (§14.6 critical) - browser-
|
||
// uploaded objects could be anything until we read the bytes. Stream
|
||
// just the first 5 bytes; abort early on mismatch and delete the blob.
|
||
const probeBytes = await readFirstBytes(backend, args.storageKey, 5);
|
||
if (!isPdfMagic(probeBytes)) {
|
||
await backend.delete(args.storageKey).catch(() => undefined);
|
||
throw new ValidationError(
|
||
'Uploaded file failed PDF magic-byte check (does not start with %PDF-).',
|
||
);
|
||
}
|
||
sizeBytes = head.sizeBytes;
|
||
sha256 = args.sha256 ?? '';
|
||
storageKey = args.storageKey;
|
||
} else {
|
||
throw new ValidationError('Either buffer or storageKey is required.');
|
||
}
|
||
|
||
// 4. Take the per-berth advisory lock, compute version_number under
|
||
// the lock, insert + bump pointer. All inside a single transaction
|
||
// so the lock + writes commit atomically.
|
||
const versionId = crypto.randomUUID();
|
||
await db.transaction(async (tx) => {
|
||
await tx.execute(sql`SELECT pg_advisory_xact_lock(${berthLockKey})`);
|
||
versionNumber = await nextVersionNumberTx(tx, args.berthId);
|
||
await tx.insert(berthPdfVersions).values({
|
||
id: versionId,
|
||
berthId: args.berthId,
|
||
versionNumber,
|
||
storageKey,
|
||
fileName: args.fileName,
|
||
fileSizeBytes: sizeBytes,
|
||
contentSha256: sha256,
|
||
uploadedBy: args.uploadedBy,
|
||
parseResults: args.parseResult ? serializeParseResult(args.parseResult) : null,
|
||
});
|
||
await tx
|
||
.update(berths)
|
||
.set({ currentPdfVersionId: versionId, updatedAt: new Date() })
|
||
.where(eq(berths.id, args.berthId));
|
||
});
|
||
|
||
logger.info(
|
||
{ berthId: args.berthId, versionId, versionNumber, storageKey, sizeBytes },
|
||
'Berth PDF version saved',
|
||
);
|
||
|
||
return { versionId, storageKey, versionNumber, fileSizeBytes: sizeBytes, contentSha256: sha256 };
|
||
}
|
||
|
||
/** Tx-bound variant — same SELECT MAX(...) but inside the caller's transaction. */
|
||
async function nextVersionNumberTx(
|
||
tx: Parameters<Parameters<typeof db.transaction>[0]>[0],
|
||
berthId: string,
|
||
): Promise<number> {
|
||
const [row] = await tx
|
||
.select({ max: max(berthPdfVersions.versionNumber) })
|
||
.from(berthPdfVersions)
|
||
.where(eq(berthPdfVersions.berthId, berthId));
|
||
return (row?.max ?? 0) + 1;
|
||
}
|
||
|
||
/**
|
||
* Hash a UUID berthId into a 32-bit signed integer for pg_advisory_xact_lock.
|
||
* Uses the first 4 bytes of sha256 reinterpreted as int32 — collisions are
|
||
* theoretically possible but the lock is per-berth so a collision just
|
||
* means two different berths' uploads serialize through the same key,
|
||
* which is harmless (correctness preserved, slight contention only).
|
||
*/
|
||
function hashBerthIdToInt(berthId: string): number {
|
||
const h = createHash('sha256').update(berthId).digest();
|
||
// Read as signed 32-bit big-endian; pg_advisory_xact_lock(int) signature.
|
||
return h.readInt32BE(0);
|
||
}
|
||
|
||
/**
|
||
* Stream just the first `n` bytes of a stored object so the magic-byte
|
||
* check on the presigned-PUT path can run without buffering the whole
|
||
* file. Returns a Buffer of up to `n` bytes (less if the file is shorter).
|
||
*/
|
||
async function readFirstBytes(
|
||
backend: Awaited<ReturnType<typeof getStorageBackend>>,
|
||
key: string,
|
||
n: number,
|
||
): Promise<Buffer> {
|
||
const stream = await backend.get(key);
|
||
const chunks: Buffer[] = [];
|
||
let total = 0;
|
||
for await (const chunk of stream as AsyncIterable<Buffer | string>) {
|
||
const buf = typeof chunk === 'string' ? Buffer.from(chunk) : chunk;
|
||
chunks.push(buf);
|
||
total += buf.length;
|
||
if (total >= n) break;
|
||
}
|
||
// Best-effort dispose - some streams are still readable after iteration.
|
||
if (typeof (stream as { destroy?: () => void }).destroy === 'function') {
|
||
(stream as unknown as { destroy: () => void }).destroy();
|
||
}
|
||
return Buffer.concat(chunks).subarray(0, n);
|
||
}
|
||
|
||
function sanitizeFileName(raw: string): string {
|
||
// Preserve the extension; replace spaces / disallowed chars with '_' so the
|
||
// result satisfies the storage-key validation regex.
|
||
const last = raw.split(/[\\/]/).pop() ?? raw;
|
||
return last.replace(/[^a-zA-Z0-9._-]/g, '_').slice(0, 200) || 'berth.pdf';
|
||
}
|
||
|
||
function serializeParseResult(parse: ParseResult): Record<string, unknown> {
|
||
return {
|
||
engine: parse.engine,
|
||
extracted: Object.fromEntries(
|
||
Object.entries(parse.fields).map(([k, v]) => [
|
||
k,
|
||
v ? { value: v.value, confidence: v.confidence } : null,
|
||
]),
|
||
),
|
||
meanConfidence: parse.meanConfidence,
|
||
warnings: parse.warnings,
|
||
};
|
||
}
|
||
|
||
// ─── reconcile + apply ───────────────────────────────────────────────────────
|
||
|
||
/**
|
||
* Walk every parsed field; classify into:
|
||
* - `autoApplied` when the CRM column is null/empty.
|
||
* - `conflicts` when both sides have a non-null value and they disagree.
|
||
*
|
||
* Numeric tolerance: ±1% (matches §14.6 imperial-vs-metric guidance, applied
|
||
* uniformly across all numeric columns since the same rounding noise affects
|
||
* weekly/daily rates too).
|
||
*/
|
||
export async function reconcilePdfWithBerth(
|
||
berthId: string,
|
||
parsed: ParseResult,
|
||
/** Tenant scope. NotFoundError on cross-port lookups. */
|
||
portId: string,
|
||
): Promise<ReconcileResult> {
|
||
const berthRow = await db.query.berths.findFirst({
|
||
where: and(eq(berths.id, berthId), eq(berths.portId, portId)),
|
||
});
|
||
if (!berthRow) throw new NotFoundError('Berth');
|
||
const fields = parsed.fields;
|
||
|
||
const autoApplied: ReconcileResult['autoApplied'] = [];
|
||
const conflicts: ReconcileConflict[] = [];
|
||
const warnings: string[] = [...parsed.warnings];
|
||
|
||
// §14.6 — mooring-number mismatch warning.
|
||
const pdfMooring = fields.mooringNumber?.value;
|
||
if (
|
||
pdfMooring &&
|
||
typeof pdfMooring === 'string' &&
|
||
pdfMooring.toUpperCase() !== berthRow.mooringNumber.toUpperCase()
|
||
) {
|
||
warnings.push(
|
||
`PDF says berth ${pdfMooring} but uploading to ${berthRow.mooringNumber}. Confirm before applying.`,
|
||
);
|
||
}
|
||
|
||
for (const key of APPLIABLE_FIELDS) {
|
||
const parsedField = fields[key];
|
||
if (!parsedField || parsedField.value == null) continue;
|
||
|
||
const crmRaw = (berthRow as Record<string, unknown>)[key];
|
||
const crmValue = normalizeForCompare(key, crmRaw);
|
||
const pdfValue = normalizeForCompare(key, parsedField.value);
|
||
|
||
if (crmValue == null || crmValue === '') {
|
||
autoApplied.push({ field: key, value: parsedField.value as string | number });
|
||
continue;
|
||
}
|
||
if (!valuesEqual(crmValue, pdfValue, NUMERIC_FIELDS.has(key))) {
|
||
conflicts.push({
|
||
field: key,
|
||
crmValue: crmValue as string | number | null,
|
||
pdfValue: pdfValue as string | number | null,
|
||
pdfConfidence: parsedField.confidence,
|
||
});
|
||
}
|
||
}
|
||
|
||
return { autoApplied, conflicts, warnings, engine: parsed.engine };
|
||
}
|
||
|
||
/**
|
||
* Apply a rep-confirmed slice of the reconcile diff to the berth row. The
|
||
* caller passes the canonical `ExtractedBerthFields` keys; anything outside
|
||
* `APPLIABLE_FIELDS` is silently dropped to keep this endpoint a hard
|
||
* allowlist.
|
||
*
|
||
* Mooring-mismatch gate (§14.6 critical): when the version's stored
|
||
* `parseResults.warnings` contains a mooring-mismatch warning, the apply
|
||
* is rejected unless the caller passes `confirmMooringMismatch: true`.
|
||
* This is the service-side enforcement of the "force re-confirm" rule —
|
||
* UI confirmation alone is not enough.
|
||
*/
|
||
export async function applyParseResults(
|
||
berthId: string,
|
||
versionId: string,
|
||
fieldsToApply: Partial<ExtractedBerthFields>,
|
||
/** Tenant scope. NotFoundError when berth lives in a different port. */
|
||
portId: string,
|
||
opts: { confirmMooringMismatch?: boolean } = {},
|
||
): Promise<{ updatedFields: Array<keyof ExtractedBerthFields> }> {
|
||
const berthRow = await db.query.berths.findFirst({
|
||
where: and(eq(berths.id, berthId), eq(berths.portId, portId)),
|
||
});
|
||
if (!berthRow) throw new NotFoundError('Berth');
|
||
const versionRow = await db.query.berthPdfVersions.findFirst({
|
||
where: and(eq(berthPdfVersions.id, versionId), eq(berthPdfVersions.berthId, berthId)),
|
||
});
|
||
if (!versionRow) throw new NotFoundError('Berth PDF version');
|
||
|
||
// §14.6 mooring-mismatch gate.
|
||
const priorParse = versionRow.parseResults as { warnings?: string[] } | null;
|
||
const hasMooringMismatch = (priorParse?.warnings ?? []).some(
|
||
(w) => /uploading to/i.test(w) && /berth/i.test(w),
|
||
);
|
||
if (hasMooringMismatch && !opts.confirmMooringMismatch) {
|
||
throw new ConflictError(
|
||
'PDF mooring mismatch with target berth. Pass confirmMooringMismatch=true to override.',
|
||
);
|
||
}
|
||
|
||
const update: Record<string, unknown> = {};
|
||
const applied: Array<keyof ExtractedBerthFields> = [];
|
||
// Capture keys whose values were supplied but couldn't be coerced
|
||
// (e.g. a numeric column receiving a non-finite or non-numeric value).
|
||
// Without this, partial silent drops are invisible because the
|
||
// "no appliable fields supplied" check only fires when EVERY key drops.
|
||
const dropped: Array<{ key: keyof ExtractedBerthFields; reason: string }> = [];
|
||
for (const key of APPLIABLE_FIELDS) {
|
||
const value = fieldsToApply[key];
|
||
if (value === undefined) continue;
|
||
if (value === null) {
|
||
update[key] = null;
|
||
applied.push(key);
|
||
continue;
|
||
}
|
||
if (NUMERIC_FIELDS.has(key)) {
|
||
const n = typeof value === 'number' ? value : Number(value);
|
||
if (!Number.isFinite(n)) {
|
||
dropped.push({ key, reason: `non-finite numeric (${typeof value}: ${String(value)})` });
|
||
continue;
|
||
}
|
||
// numeric columns expect strings to preserve precision.
|
||
update[key] = String(n);
|
||
} else {
|
||
update[key] = String(value);
|
||
}
|
||
applied.push(key);
|
||
}
|
||
if (applied.length === 0) {
|
||
throw new ValidationError('No appliable fields supplied.');
|
||
}
|
||
if (dropped.length > 0) {
|
||
logger.warn(
|
||
{ berthId, versionId, dropped },
|
||
'Berth PDF apply: silently dropped fields that failed type coercion',
|
||
);
|
||
}
|
||
update.updatedAt = new Date();
|
||
|
||
await db.transaction(async (tx) => {
|
||
await tx.update(berths).set(update).where(eq(berths.id, berthId));
|
||
// Stamp the applied-field set onto parse_results for audit.
|
||
const prior = (versionRow.parseResults as Record<string, unknown> | null) ?? {};
|
||
await tx
|
||
.update(berthPdfVersions)
|
||
.set({
|
||
parseResults: {
|
||
...prior,
|
||
appliedFields: applied,
|
||
appliedAt: new Date().toISOString(),
|
||
},
|
||
})
|
||
.where(eq(berthPdfVersions.id, versionId));
|
||
});
|
||
|
||
return { updatedFields: applied };
|
||
}
|
||
|
||
// ─── version listing + rollback ──────────────────────────────────────────────
|
||
|
||
export interface BerthPdfVersionListItem {
|
||
id: string;
|
||
versionNumber: number;
|
||
fileName: string;
|
||
fileSizeBytes: number;
|
||
uploadedBy: string;
|
||
uploadedAt: Date;
|
||
isCurrent: boolean;
|
||
/** Pre-signed download URL (15-min TTL). */
|
||
downloadUrl: string;
|
||
downloadUrlExpiresAt: Date;
|
||
parseEngine: ParserEngine | null;
|
||
}
|
||
|
||
export async function listBerthPdfVersions(
|
||
berthId: string,
|
||
/** Tenant scope. NotFoundError when berth lives in a different port. */
|
||
portId: string,
|
||
): Promise<BerthPdfVersionListItem[]> {
|
||
const berthRow = await db.query.berths.findFirst({
|
||
where: and(eq(berths.id, berthId), eq(berths.portId, portId)),
|
||
});
|
||
if (!berthRow) throw new NotFoundError('Berth');
|
||
|
||
const rows = await db
|
||
.select()
|
||
.from(berthPdfVersions)
|
||
.where(eq(berthPdfVersions.berthId, berthId))
|
||
.orderBy(desc(berthPdfVersions.versionNumber));
|
||
|
||
const backend = await getStorageBackend();
|
||
// Presign with bounded concurrency — for an S3 backend each call is
|
||
// a separate HTTP round-trip. 8 in flight at once keeps the latency
|
||
// close to ~1× round-trip on typical 5-15-version berths while
|
||
// preventing a 100-version pathological case from saturating the
|
||
// S3 client's keep-alive pool.
|
||
const presignLimit = pLimit(8);
|
||
const presigned = await Promise.all(
|
||
rows.map((row) =>
|
||
presignLimit(() =>
|
||
backend.presignDownload(row.storageKey, {
|
||
expirySeconds: 900,
|
||
filename: row.fileName,
|
||
contentType: 'application/pdf',
|
||
}),
|
||
),
|
||
),
|
||
);
|
||
|
||
return rows.map((row, i) => {
|
||
const parseEngine = (row.parseResults as { engine?: ParserEngine } | null)?.engine ?? null;
|
||
return {
|
||
id: row.id,
|
||
versionNumber: row.versionNumber,
|
||
fileName: row.fileName,
|
||
fileSizeBytes: row.fileSizeBytes,
|
||
uploadedBy: row.uploadedBy,
|
||
uploadedAt: row.uploadedAt,
|
||
isCurrent: berthRow.currentPdfVersionId === row.id,
|
||
downloadUrl: presigned[i]!.url,
|
||
downloadUrlExpiresAt: presigned[i]!.expiresAt,
|
||
parseEngine,
|
||
};
|
||
});
|
||
}
|
||
|
||
/**
|
||
* Set `berths.current_pdf_version_id` to the requested version. Per §14.6,
|
||
* this does NOT re-parse and re-update the berth columns — that's a separate
|
||
* deliberate "extract data from this version" action.
|
||
*/
|
||
export async function rollbackToVersion(
|
||
berthId: string,
|
||
versionId: string,
|
||
/** Tenant scope. NotFoundError when berth lives in a different port. */
|
||
portId: string,
|
||
): Promise<{ versionId: string; versionNumber: number }> {
|
||
const versionRow = await db.query.berthPdfVersions.findFirst({
|
||
where: and(eq(berthPdfVersions.id, versionId), eq(berthPdfVersions.berthId, berthId)),
|
||
});
|
||
if (!versionRow) throw new NotFoundError('Berth PDF version');
|
||
const berthRow = await db.query.berths.findFirst({
|
||
where: and(eq(berths.id, berthId), eq(berths.portId, portId)),
|
||
});
|
||
if (!berthRow) throw new NotFoundError('Berth');
|
||
|
||
if (berthRow.currentPdfVersionId === versionId) {
|
||
throw new CodedError('BERTHS_VERSION_ALREADY_CURRENT');
|
||
}
|
||
|
||
await db
|
||
.update(berths)
|
||
.set({ currentPdfVersionId: versionId, updatedAt: new Date() })
|
||
.where(eq(berths.id, berthId));
|
||
|
||
return { versionId, versionNumber: versionRow.versionNumber };
|
||
}
|
||
|
||
// ─── compare helpers ─────────────────────────────────────────────────────────
|
||
|
||
function normalizeForCompare(
|
||
key: keyof ExtractedBerthFields,
|
||
raw: unknown,
|
||
): string | number | null {
|
||
if (raw == null) return null;
|
||
if (NUMERIC_FIELDS.has(key)) {
|
||
const n = typeof raw === 'number' ? raw : Number(String(raw).replace(/[^0-9.\-]/g, ''));
|
||
return Number.isFinite(n) ? n : null;
|
||
}
|
||
if (typeof raw === 'string') return raw.trim();
|
||
return String(raw);
|
||
}
|
||
|
||
function valuesEqual(a: unknown, b: unknown, isNumeric: boolean): boolean {
|
||
if (a == null && b == null) return true;
|
||
if (a == null || b == null) return false;
|
||
if (isNumeric) {
|
||
const an = Number(a);
|
||
const bn = Number(b);
|
||
if (!Number.isFinite(an) || !Number.isFinite(bn)) return false;
|
||
if (an === bn) return true;
|
||
if (bn === 0) return Math.abs(an - bn) < 0.0001;
|
||
return Math.abs(an - bn) / Math.abs(bn) <= IMPERIAL_METRIC_TOLERANCE;
|
||
}
|
||
return String(a).trim().toLowerCase() === String(b).trim().toLowerCase();
|
||
}
|
||
|
||
// ─── re-exports the route layer leans on ─────────────────────────────────────
|
||
|
||
export { parseBerthPdf } from './berth-pdf-parser';
|
||
export type {
|
||
ExtractedBerthFields,
|
||
ParsedField,
|
||
ParseResult,
|
||
ParserEngine,
|
||
} from './berth-pdf-parser';
|