/** * Berth PDF management service (Phase 6b — see plan §4.7b, §11.1, §14.6). * * Responsibilities: * - Upload a per-berth PDF (versioned), via the active `StorageBackend`. * - Verify the magic bytes (`%PDF-`) before persisting; delete the storage * object on mismatch (§14.6 critical). * - Reconcile the parsed fields against the current berth row, surfacing * conflicts for the rep's diff dialog and auto-applying nullable gaps. * - Enforce per-port size cap from `system_settings.berth_pdf_max_upload_mb`. * - Generate signed download URLs for the version list. */ import { createHash } from 'node:crypto'; import { and, desc, eq, isNull, max, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { berths, berthPdfVersions } from '@/lib/db/schema/berths'; import { systemSettings } from '@/lib/db/schema/system'; import { ConflictError, NotFoundError, ValidationError } from '@/lib/errors'; import { logger } from '@/lib/logger'; import { getStorageBackend } from '@/lib/storage'; import { type ExtractedBerthFields, type ParseResult, type ParserEngine, isPdfMagic, } from './berth-pdf-parser'; // ─── shared types ──────────────────────────────────────────────────────────── export interface ReconcileConflict { field: keyof ExtractedBerthFields; crmValue: string | number | null; pdfValue: string | number | null; /** Confidence the parser assigned to the PDF value (0..1). */ pdfConfidence: number; } export interface ReconcileResult { /** Fields where CRM was null and the PDF supplied a value; these can be * applied automatically (the rep still sees them as "Auto-applied" chips). */ autoApplied: Array<{ field: keyof ExtractedBerthFields; value: string | number }>; /** Fields where CRM and PDF disagree on a non-null value. The diff dialog * shows these as a side-by-side comparison; nothing is written until the * rep confirms via the apply endpoint. */ conflicts: ReconcileConflict[]; /** Pure-warning bucket — e.g. mooring-number mismatch with the berth being * uploaded to (§14.6). */ warnings: string[]; /** Engine that produced the parse — surfaced on the diff UI. */ engine: ParserEngine; } // Field allowlist for reconcile/apply. Mirrors `berths` columns; we never // blindly write `crypto.randomUUID()` or anything outside this set so a // rogue parser tier can't poison the schema. const APPLIABLE_FIELDS: ReadonlyArray = [ 'lengthFt', 'lengthM', 'widthFt', 'widthM', 'draftFt', 'draftM', 'waterDepth', 'waterDepthM', 'bowFacing', 'sidePontoon', 'powerCapacity', 'voltage', 'mooringType', 'cleatType', 'cleatCapacity', 'bollardType', 'bollardCapacity', 'access', 'price', 'weeklyRateHighUsd', 'weeklyRateLowUsd', 'dailyRateHighUsd', 'dailyRateLowUsd', 'pricingValidUntil', ]; // Numeric berths columns are stored as `numeric` (Drizzle returns string). // This set tells the apply path which fields need stringification. const NUMERIC_FIELDS = new Set([ 'lengthFt', 'lengthM', 'widthFt', 'widthM', 'draftFt', 'draftM', 'waterDepth', 'waterDepthM', 'powerCapacity', 'voltage', 'price', 'weeklyRateHighUsd', 'weeklyRateLowUsd', 'dailyRateHighUsd', 'dailyRateLowUsd', ]); // Tolerance for imperial vs metric reconcile. Same threshold as the parser. const IMPERIAL_METRIC_TOLERANCE = 0.01; // ─── settings helpers ──────────────────────────────────────────────────────── /** Resolve `berth_pdf_max_upload_mb` with port-override → global → default 15. */ export async function getMaxUploadMb(portId: string): Promise { const KEY = 'berth_pdf_max_upload_mb'; const [portRow] = await db .select() .from(systemSettings) .where(and(eq(systemSettings.key, KEY), eq(systemSettings.portId, portId))); if (portRow && typeof portRow.value === 'number') return portRow.value; if (portRow && typeof portRow.value === 'string') { const n = Number(portRow.value); if (Number.isFinite(n)) return n; } const [globalRow] = await db .select() .from(systemSettings) .where(and(eq(systemSettings.key, KEY), isNull(systemSettings.portId))); if (globalRow && typeof globalRow.value === 'number') return globalRow.value; if (globalRow && typeof globalRow.value === 'string') { const n = Number(globalRow.value); if (Number.isFinite(n)) return n; } return 15; } // ─── upload + version management ───────────────────────────────────────────── export interface UploadBerthPdfArgs { berthId: string; /** Already-uploaded storage key (the upload-url endpoint generated it) OR * undefined to make this service compute one. */ storageKey?: string; /** Raw bytes when the server proxies the upload (filesystem mode); when * callers used a presigned PUT they pass `storageKey` and skip this. */ buffer?: Buffer; fileName: string; uploadedBy: string; /** Pre-computed sha256 hex from the client (verified server-side anyway). */ sha256?: string; /** Pre-computed bytes (used for the size cap pre-flight on direct uploads). */ fileSizeBytes?: number; /** Result of running `parseBerthPdf` server-side. Optional — the rep may * have skipped parsing on a re-upload. */ parseResult?: ParseResult; } export interface UploadBerthPdfResult { versionId: string; storageKey: string; versionNumber: number; fileSizeBytes: number; contentSha256: string; } /** * Persist a per-berth PDF version. Either the raw `buffer` or a pre-uploaded * `storageKey` (with optional `buffer` for verification) is required. * * Critical mitigations enforced here: * - §14.6 magic-byte check against the buffer when present. * - §14.6 size cap from `berth_pdf_max_upload_mb`. * - Storage key namespaced under `berths/{id}/v{n}/...` so two reps racing * on the same berth can't collide (the version-number unique index in * the DB does the dedup). */ export async function uploadBerthPdf(args: UploadBerthPdfArgs): Promise { // 1. Resolve the berth + port for size-cap lookup. const berthRow = await db.query.berths.findFirst({ where: eq(berths.id, args.berthId) }); if (!berthRow) throw new NotFoundError('Berth'); const maxMb = await getMaxUploadMb(berthRow.portId); const maxBytes = maxMb * 1024 * 1024; // 2. Per-berth advisory lock prevents two concurrent uploads from both // computing version `v3` and racing to write blobs (the unique index // on (berth_id, version_number) would catch the second insert, but // only AFTER its blob is already in storage — leaving an orphan). // The lock is scoped to a transaction wrapping the version-number // read AND the blob write, so concurrent uploads serialize cleanly. // NB: hash the UUID into a 32-bit int for pg_advisory_xact_lock(int). const berthLockKey = hashBerthIdToInt(args.berthId); // 3. Magic bytes + size when we have the buffer in hand. const backend = await getStorageBackend(); const buffer = args.buffer; // UUID-based storage key path so two concurrent uploads can't collide // on the same blob path (the version_number suffix used to be in the // key but is now a separate DB column allocated under the per-berth // advisory lock — see step 4). let versionNumber = 1; let storageKey = args.storageKey ?? `berths/${args.berthId}/${crypto.randomUUID()}/${sanitizeFileName(args.fileName)}`; let sizeBytes = args.fileSizeBytes ?? buffer?.length ?? 0; let sha256 = args.sha256 ?? ''; if (buffer) { if (!isPdfMagic(buffer)) { // Best-effort cleanup if the storage already has a partial. if (args.storageKey) await backend.delete(args.storageKey).catch(() => undefined); throw new ValidationError( 'Uploaded file failed PDF magic-byte check (does not start with %PDF-).', ); } if (buffer.length === 0) throw new ValidationError('Uploaded PDF is empty (0 bytes).'); if (buffer.length > maxBytes) { throw new ValidationError( `PDF exceeds ${maxMb} MB upload cap (got ${(buffer.length / 1024 / 1024).toFixed(1)} MB).`, ); } const written = await backend.put(storageKey, buffer, { contentType: 'application/pdf' }); storageKey = written.key; sizeBytes = written.sizeBytes; sha256 = written.sha256; } else if (args.storageKey) { // Browser uploaded directly via presigned URL — verify via HEAD + magic bytes. const head = await backend.head(args.storageKey); if (!head) { throw new ValidationError('Uploaded object not found at expected storage key.'); } if (head.sizeBytes === 0) { await backend.delete(args.storageKey).catch(() => undefined); throw new ValidationError('Uploaded PDF is empty (0 bytes).'); } if (head.sizeBytes > maxBytes) { await backend.delete(args.storageKey).catch(() => undefined); throw new ValidationError( `PDF exceeds ${maxMb} MB upload cap (got ${(head.sizeBytes / 1024 / 1024).toFixed(1)} MB).`, ); } if (head.contentType !== 'application/pdf' && head.contentType !== 'application/octet-stream') { await backend.delete(args.storageKey).catch(() => undefined); throw new ValidationError( `Uploaded object content-type is ${head.contentType}; expected application/pdf.`, ); } // Magic-byte check on the presign path (§14.6 critical) - browser- // uploaded objects could be anything until we read the bytes. Stream // just the first 5 bytes; abort early on mismatch and delete the blob. const probeBytes = await readFirstBytes(backend, args.storageKey, 5); if (!isPdfMagic(probeBytes)) { await backend.delete(args.storageKey).catch(() => undefined); throw new ValidationError( 'Uploaded file failed PDF magic-byte check (does not start with %PDF-).', ); } sizeBytes = head.sizeBytes; sha256 = args.sha256 ?? ''; storageKey = args.storageKey; } else { throw new ValidationError('Either buffer or storageKey is required.'); } // 4. Take the per-berth advisory lock, compute version_number under // the lock, insert + bump pointer. All inside a single transaction // so the lock + writes commit atomically. const versionId = crypto.randomUUID(); await db.transaction(async (tx) => { await tx.execute(sql`SELECT pg_advisory_xact_lock(${berthLockKey})`); versionNumber = await nextVersionNumberTx(tx, args.berthId); await tx.insert(berthPdfVersions).values({ id: versionId, berthId: args.berthId, versionNumber, storageKey, fileName: args.fileName, fileSizeBytes: sizeBytes, contentSha256: sha256, uploadedBy: args.uploadedBy, parseResults: args.parseResult ? serializeParseResult(args.parseResult) : null, }); await tx .update(berths) .set({ currentPdfVersionId: versionId, updatedAt: new Date() }) .where(eq(berths.id, args.berthId)); }); logger.info( { berthId: args.berthId, versionId, versionNumber, storageKey, sizeBytes }, 'Berth PDF version saved', ); return { versionId, storageKey, versionNumber, fileSizeBytes: sizeBytes, contentSha256: sha256 }; } /** Tx-bound variant — same SELECT MAX(...) but inside the caller's transaction. */ async function nextVersionNumberTx( tx: Parameters[0]>[0], berthId: string, ): Promise { const [row] = await tx .select({ max: max(berthPdfVersions.versionNumber) }) .from(berthPdfVersions) .where(eq(berthPdfVersions.berthId, berthId)); return (row?.max ?? 0) + 1; } /** * Hash a UUID berthId into a 32-bit signed integer for pg_advisory_xact_lock. * Uses the first 4 bytes of sha256 reinterpreted as int32 — collisions are * theoretically possible but the lock is per-berth so a collision just * means two different berths' uploads serialize through the same key, * which is harmless (correctness preserved, slight contention only). */ function hashBerthIdToInt(berthId: string): number { const h = createHash('sha256').update(berthId).digest(); // Read as signed 32-bit big-endian; pg_advisory_xact_lock(int) signature. return h.readInt32BE(0); } /** * Stream just the first `n` bytes of a stored object so the magic-byte * check on the presigned-PUT path can run without buffering the whole * file. Returns a Buffer of up to `n` bytes (less if the file is shorter). */ async function readFirstBytes( backend: Awaited>, key: string, n: number, ): Promise { const stream = await backend.get(key); const chunks: Buffer[] = []; let total = 0; for await (const chunk of stream as AsyncIterable) { const buf = typeof chunk === 'string' ? Buffer.from(chunk) : chunk; chunks.push(buf); total += buf.length; if (total >= n) break; } // Best-effort dispose - some streams are still readable after iteration. if (typeof (stream as { destroy?: () => void }).destroy === 'function') { (stream as unknown as { destroy: () => void }).destroy(); } return Buffer.concat(chunks).subarray(0, n); } function sanitizeFileName(raw: string): string { // Preserve the extension; replace spaces / disallowed chars with '_' so the // result satisfies the storage-key validation regex. const last = raw.split(/[\\/]/).pop() ?? raw; return last.replace(/[^a-zA-Z0-9._-]/g, '_').slice(0, 200) || 'berth.pdf'; } function serializeParseResult(parse: ParseResult): Record { return { engine: parse.engine, extracted: Object.fromEntries( Object.entries(parse.fields).map(([k, v]) => [ k, v ? { value: v.value, confidence: v.confidence } : null, ]), ), meanConfidence: parse.meanConfidence, warnings: parse.warnings, }; } // ─── reconcile + apply ─────────────────────────────────────────────────────── /** * Walk every parsed field; classify into: * - `autoApplied` when the CRM column is null/empty. * - `conflicts` when both sides have a non-null value and they disagree. * * Numeric tolerance: ±1% (matches §14.6 imperial-vs-metric guidance, applied * uniformly across all numeric columns since the same rounding noise affects * weekly/daily rates too). */ export async function reconcilePdfWithBerth( berthId: string, parsed: ParseResult, ): Promise { const berthRow = await db.query.berths.findFirst({ where: eq(berths.id, berthId) }); if (!berthRow) throw new NotFoundError('Berth'); const fields = parsed.fields; const autoApplied: ReconcileResult['autoApplied'] = []; const conflicts: ReconcileConflict[] = []; const warnings: string[] = [...parsed.warnings]; // §14.6 — mooring-number mismatch warning. const pdfMooring = fields.mooringNumber?.value; if ( pdfMooring && typeof pdfMooring === 'string' && pdfMooring.toUpperCase() !== berthRow.mooringNumber.toUpperCase() ) { warnings.push( `PDF says berth ${pdfMooring} but uploading to ${berthRow.mooringNumber}. Confirm before applying.`, ); } for (const key of APPLIABLE_FIELDS) { const parsedField = fields[key]; if (!parsedField || parsedField.value == null) continue; const crmRaw = (berthRow as Record)[key]; const crmValue = normalizeForCompare(key, crmRaw); const pdfValue = normalizeForCompare(key, parsedField.value); if (crmValue == null || crmValue === '') { autoApplied.push({ field: key, value: parsedField.value as string | number }); continue; } if (!valuesEqual(crmValue, pdfValue, NUMERIC_FIELDS.has(key))) { conflicts.push({ field: key, crmValue: crmValue as string | number | null, pdfValue: pdfValue as string | number | null, pdfConfidence: parsedField.confidence, }); } } return { autoApplied, conflicts, warnings, engine: parsed.engine }; } /** * Apply a rep-confirmed slice of the reconcile diff to the berth row. The * caller passes the canonical `ExtractedBerthFields` keys; anything outside * `APPLIABLE_FIELDS` is silently dropped to keep this endpoint a hard * allowlist. * * Mooring-mismatch gate (§14.6 critical): when the version's stored * `parseResults.warnings` contains a mooring-mismatch warning, the apply * is rejected unless the caller passes `confirmMooringMismatch: true`. * This is the service-side enforcement of the "force re-confirm" rule — * UI confirmation alone is not enough. */ export async function applyParseResults( berthId: string, versionId: string, fieldsToApply: Partial, opts: { confirmMooringMismatch?: boolean } = {}, ): Promise<{ updatedFields: Array }> { const berthRow = await db.query.berths.findFirst({ where: eq(berths.id, berthId) }); if (!berthRow) throw new NotFoundError('Berth'); const versionRow = await db.query.berthPdfVersions.findFirst({ where: and(eq(berthPdfVersions.id, versionId), eq(berthPdfVersions.berthId, berthId)), }); if (!versionRow) throw new NotFoundError('Berth PDF version'); // §14.6 mooring-mismatch gate. const priorParse = versionRow.parseResults as { warnings?: string[] } | null; const hasMooringMismatch = (priorParse?.warnings ?? []).some( (w) => /uploading to/i.test(w) && /berth/i.test(w), ); if (hasMooringMismatch && !opts.confirmMooringMismatch) { throw new ConflictError( 'PDF mooring mismatch with target berth. Pass confirmMooringMismatch=true to override.', ); } const update: Record = {}; const applied: Array = []; for (const key of APPLIABLE_FIELDS) { const value = fieldsToApply[key]; if (value === undefined) continue; if (value === null) { update[key] = null; applied.push(key); continue; } if (NUMERIC_FIELDS.has(key)) { const n = typeof value === 'number' ? value : Number(value); if (!Number.isFinite(n)) continue; // numeric columns expect strings to preserve precision. update[key] = String(n); } else { update[key] = String(value); } applied.push(key); } if (applied.length === 0) { throw new ValidationError('No appliable fields supplied.'); } update.updatedAt = new Date(); await db.transaction(async (tx) => { await tx.update(berths).set(update).where(eq(berths.id, berthId)); // Stamp the applied-field set onto parse_results for audit. const prior = (versionRow.parseResults as Record | null) ?? {}; await tx .update(berthPdfVersions) .set({ parseResults: { ...prior, appliedFields: applied, appliedAt: new Date().toISOString(), }, }) .where(eq(berthPdfVersions.id, versionId)); }); return { updatedFields: applied }; } // ─── version listing + rollback ────────────────────────────────────────────── export interface BerthPdfVersionListItem { id: string; versionNumber: number; fileName: string; fileSizeBytes: number; uploadedBy: string; uploadedAt: Date; isCurrent: boolean; /** Pre-signed download URL (15-min TTL). */ downloadUrl: string; downloadUrlExpiresAt: Date; parseEngine: ParserEngine | null; } export async function listBerthPdfVersions(berthId: string): Promise { const berthRow = await db.query.berths.findFirst({ where: eq(berths.id, berthId) }); if (!berthRow) throw new NotFoundError('Berth'); const rows = await db .select() .from(berthPdfVersions) .where(eq(berthPdfVersions.berthId, berthId)) .orderBy(desc(berthPdfVersions.versionNumber)); const backend = await getStorageBackend(); // Presign in parallel — for an S3 backend each call is a separate HTTP // round-trip, so a 20-version berth used to take 20× the latency in // the sequential loop. Promise.all collapses to ~1× round-trip. const presigned = await Promise.all( rows.map((row) => backend.presignDownload(row.storageKey, { expirySeconds: 900, filename: row.fileName, contentType: 'application/pdf', }), ), ); return rows.map((row, i) => { const parseEngine = (row.parseResults as { engine?: ParserEngine } | null)?.engine ?? null; return { id: row.id, versionNumber: row.versionNumber, fileName: row.fileName, fileSizeBytes: row.fileSizeBytes, uploadedBy: row.uploadedBy, uploadedAt: row.uploadedAt, isCurrent: berthRow.currentPdfVersionId === row.id, downloadUrl: presigned[i]!.url, downloadUrlExpiresAt: presigned[i]!.expiresAt, parseEngine, }; }); } /** * Set `berths.current_pdf_version_id` to the requested version. Per §14.6, * this does NOT re-parse and re-update the berth columns — that's a separate * deliberate "extract data from this version" action. */ export async function rollbackToVersion( berthId: string, versionId: string, ): Promise<{ versionId: string; versionNumber: number }> { const versionRow = await db.query.berthPdfVersions.findFirst({ where: and(eq(berthPdfVersions.id, versionId), eq(berthPdfVersions.berthId, berthId)), }); if (!versionRow) throw new NotFoundError('Berth PDF version'); const berthRow = await db.query.berths.findFirst({ where: eq(berths.id, berthId) }); if (!berthRow) throw new NotFoundError('Berth'); if (berthRow.currentPdfVersionId === versionId) { throw new ConflictError('That version is already current; rollback is a no-op.'); } await db .update(berths) .set({ currentPdfVersionId: versionId, updatedAt: new Date() }) .where(eq(berths.id, berthId)); return { versionId, versionNumber: versionRow.versionNumber }; } // ─── compare helpers ───────────────────────────────────────────────────────── function normalizeForCompare( key: keyof ExtractedBerthFields, raw: unknown, ): string | number | null { if (raw == null) return null; if (NUMERIC_FIELDS.has(key)) { const n = typeof raw === 'number' ? raw : Number(String(raw).replace(/[^0-9.\-]/g, '')); return Number.isFinite(n) ? n : null; } if (typeof raw === 'string') return raw.trim(); return String(raw); } function valuesEqual(a: unknown, b: unknown, isNumeric: boolean): boolean { if (a == null && b == null) return true; if (a == null || b == null) return false; if (isNumeric) { const an = Number(a); const bn = Number(b); if (!Number.isFinite(an) || !Number.isFinite(bn)) return false; if (an === bn) return true; if (bn === 0) return Math.abs(an - bn) < 0.0001; return Math.abs(an - bn) / Math.abs(bn) <= IMPERIAL_METRIC_TOLERANCE; } return String(a).trim().toLowerCase() === String(b).trim().toLowerCase(); } // ─── re-exports the route layer leans on ───────────────────────────────────── export { parseBerthPdf } from './berth-pdf-parser'; export type { ExtractedBerthFields, ParsedField, ParseResult, ParserEngine, } from './berth-pdf-parser';