/** * Berth PDF management service (Phase 6b — see plan §4.7b, §11.1, §14.6). * * Responsibilities: * - Upload a per-berth PDF (versioned), via the active `StorageBackend`. * - Verify the magic bytes (`%PDF-`) before persisting; delete the storage * object on mismatch (§14.6 critical). * - Reconcile the parsed fields against the current berth row, surfacing * conflicts for the rep's diff dialog and auto-applying nullable gaps. * - Enforce per-port size cap from `system_settings.berth_pdf_max_upload_mb`. * - Generate signed download URLs for the version list. */ import { and, desc, eq, isNull, max } from 'drizzle-orm'; import { db } from '@/lib/db'; import { berths, berthPdfVersions } from '@/lib/db/schema/berths'; import { systemSettings } from '@/lib/db/schema/system'; import { ConflictError, NotFoundError, ValidationError } from '@/lib/errors'; import { logger } from '@/lib/logger'; import { getStorageBackend } from '@/lib/storage'; import { type ExtractedBerthFields, type ParseResult, type ParserEngine, isPdfMagic, } from './berth-pdf-parser'; // ─── shared types ──────────────────────────────────────────────────────────── export interface ReconcileConflict { field: keyof ExtractedBerthFields; crmValue: string | number | null; pdfValue: string | number | null; /** Confidence the parser assigned to the PDF value (0..1). */ pdfConfidence: number; } export interface ReconcileResult { /** Fields where CRM was null and the PDF supplied a value; these can be * applied automatically (the rep still sees them as "Auto-applied" chips). */ autoApplied: Array<{ field: keyof ExtractedBerthFields; value: string | number }>; /** Fields where CRM and PDF disagree on a non-null value. The diff dialog * shows these as a side-by-side comparison; nothing is written until the * rep confirms via the apply endpoint. */ conflicts: ReconcileConflict[]; /** Pure-warning bucket — e.g. mooring-number mismatch with the berth being * uploaded to (§14.6). */ warnings: string[]; /** Engine that produced the parse — surfaced on the diff UI. */ engine: ParserEngine; } // Field allowlist for reconcile/apply. Mirrors `berths` columns; we never // blindly write `crypto.randomUUID()` or anything outside this set so a // rogue parser tier can't poison the schema. const APPLIABLE_FIELDS: ReadonlyArray = [ 'lengthFt', 'lengthM', 'widthFt', 'widthM', 'draftFt', 'draftM', 'waterDepth', 'waterDepthM', 'bowFacing', 'sidePontoon', 'powerCapacity', 'voltage', 'mooringType', 'cleatType', 'cleatCapacity', 'bollardType', 'bollardCapacity', 'access', 'price', 'weeklyRateHighUsd', 'weeklyRateLowUsd', 'dailyRateHighUsd', 'dailyRateLowUsd', 'pricingValidUntil', ]; // Numeric berths columns are stored as `numeric` (Drizzle returns string). // This set tells the apply path which fields need stringification. const NUMERIC_FIELDS = new Set([ 'lengthFt', 'lengthM', 'widthFt', 'widthM', 'draftFt', 'draftM', 'waterDepth', 'waterDepthM', 'powerCapacity', 'voltage', 'price', 'weeklyRateHighUsd', 'weeklyRateLowUsd', 'dailyRateHighUsd', 'dailyRateLowUsd', ]); // Tolerance for imperial vs metric reconcile. Same threshold as the parser. const IMPERIAL_METRIC_TOLERANCE = 0.01; // ─── settings helpers ──────────────────────────────────────────────────────── /** Resolve `berth_pdf_max_upload_mb` with port-override → global → default 15. */ export async function getMaxUploadMb(portId: string): Promise { const KEY = 'berth_pdf_max_upload_mb'; const [portRow] = await db .select() .from(systemSettings) .where(and(eq(systemSettings.key, KEY), eq(systemSettings.portId, portId))); if (portRow && typeof portRow.value === 'number') return portRow.value; if (portRow && typeof portRow.value === 'string') { const n = Number(portRow.value); if (Number.isFinite(n)) return n; } const [globalRow] = await db .select() .from(systemSettings) .where(and(eq(systemSettings.key, KEY), isNull(systemSettings.portId))); if (globalRow && typeof globalRow.value === 'number') return globalRow.value; if (globalRow && typeof globalRow.value === 'string') { const n = Number(globalRow.value); if (Number.isFinite(n)) return n; } return 15; } // ─── upload + version management ───────────────────────────────────────────── export interface UploadBerthPdfArgs { berthId: string; /** Already-uploaded storage key (the upload-url endpoint generated it) OR * undefined to make this service compute one. */ storageKey?: string; /** Raw bytes when the server proxies the upload (filesystem mode); when * callers used a presigned PUT they pass `storageKey` and skip this. */ buffer?: Buffer; fileName: string; uploadedBy: string; /** Pre-computed sha256 hex from the client (verified server-side anyway). */ sha256?: string; /** Pre-computed bytes (used for the size cap pre-flight on direct uploads). */ fileSizeBytes?: number; /** Result of running `parseBerthPdf` server-side. Optional — the rep may * have skipped parsing on a re-upload. */ parseResult?: ParseResult; } export interface UploadBerthPdfResult { versionId: string; storageKey: string; versionNumber: number; fileSizeBytes: number; contentSha256: string; } /** * Persist a per-berth PDF version. Either the raw `buffer` or a pre-uploaded * `storageKey` (with optional `buffer` for verification) is required. * * Critical mitigations enforced here: * - §14.6 magic-byte check against the buffer when present. * - §14.6 size cap from `berth_pdf_max_upload_mb`. * - Storage key namespaced under `berths/{id}/v{n}/...` so two reps racing * on the same berth can't collide (the version-number unique index in * the DB does the dedup). */ export async function uploadBerthPdf(args: UploadBerthPdfArgs): Promise { // 1. Resolve the berth + port for size-cap lookup. const berthRow = await db.query.berths.findFirst({ where: eq(berths.id, args.berthId) }); if (!berthRow) throw new NotFoundError('Berth'); const maxMb = await getMaxUploadMb(berthRow.portId); const maxBytes = maxMb * 1024 * 1024; // 2. Compute next version number. Using a serializable transaction so two // concurrent uploads can't both pick `v3` (the unique index would catch // it but we'd rather return a clean error than a 23505). const versionNumber = await nextVersionNumber(args.berthId); // 3. Magic bytes + size when we have the buffer in hand. const backend = await getStorageBackend(); const buffer = args.buffer; let storageKey = args.storageKey ?? `berths/${args.berthId}/v${versionNumber}/${sanitizeFileName(args.fileName)}`; let sizeBytes = args.fileSizeBytes ?? buffer?.length ?? 0; let sha256 = args.sha256 ?? ''; if (buffer) { if (!isPdfMagic(buffer)) { // Best-effort cleanup if the storage already has a partial. if (args.storageKey) await backend.delete(args.storageKey).catch(() => undefined); throw new ValidationError( 'Uploaded file failed PDF magic-byte check (does not start with %PDF-).', ); } if (buffer.length === 0) throw new ValidationError('Uploaded PDF is empty (0 bytes).'); if (buffer.length > maxBytes) { throw new ValidationError( `PDF exceeds ${maxMb} MB upload cap (got ${(buffer.length / 1024 / 1024).toFixed(1)} MB).`, ); } const written = await backend.put(storageKey, buffer, { contentType: 'application/pdf' }); storageKey = written.key; sizeBytes = written.sizeBytes; sha256 = written.sha256; } else if (args.storageKey) { // Browser uploaded directly via presigned URL — verify via HEAD. const head = await backend.head(args.storageKey); if (!head) { throw new ValidationError('Uploaded object not found at expected storage key.'); } if (head.sizeBytes === 0) { await backend.delete(args.storageKey).catch(() => undefined); throw new ValidationError('Uploaded PDF is empty (0 bytes).'); } if (head.sizeBytes > maxBytes) { await backend.delete(args.storageKey).catch(() => undefined); throw new ValidationError( `PDF exceeds ${maxMb} MB upload cap (got ${(head.sizeBytes / 1024 / 1024).toFixed(1)} MB).`, ); } if (head.contentType !== 'application/pdf' && head.contentType !== 'application/octet-stream') { await backend.delete(args.storageKey).catch(() => undefined); throw new ValidationError( `Uploaded object content-type is ${head.contentType}; expected application/pdf.`, ); } sizeBytes = head.sizeBytes; sha256 = args.sha256 ?? ''; storageKey = args.storageKey; } else { throw new ValidationError('Either buffer or storageKey is required.'); } // 4. Insert version row + bump current pointer in one transaction. const versionId = crypto.randomUUID(); await db.transaction(async (tx) => { await tx.insert(berthPdfVersions).values({ id: versionId, berthId: args.berthId, versionNumber, storageKey, fileName: args.fileName, fileSizeBytes: sizeBytes, contentSha256: sha256, uploadedBy: args.uploadedBy, parseResults: args.parseResult ? serializeParseResult(args.parseResult) : null, }); await tx .update(berths) .set({ currentPdfVersionId: versionId, updatedAt: new Date() }) .where(eq(berths.id, args.berthId)); }); logger.info( { berthId: args.berthId, versionId, versionNumber, storageKey, sizeBytes }, 'Berth PDF version saved', ); return { versionId, storageKey, versionNumber, fileSizeBytes: sizeBytes, contentSha256: sha256 }; } async function nextVersionNumber(berthId: string): Promise { const [row] = await db .select({ max: max(berthPdfVersions.versionNumber) }) .from(berthPdfVersions) .where(eq(berthPdfVersions.berthId, berthId)); return (row?.max ?? 0) + 1; } function sanitizeFileName(raw: string): string { // Preserve the extension; replace spaces / disallowed chars with '_' so the // result satisfies the storage-key validation regex. const last = raw.split(/[\\/]/).pop() ?? raw; return last.replace(/[^a-zA-Z0-9._-]/g, '_').slice(0, 200) || 'berth.pdf'; } function serializeParseResult(parse: ParseResult): Record { return { engine: parse.engine, extracted: Object.fromEntries( Object.entries(parse.fields).map(([k, v]) => [ k, v ? { value: v.value, confidence: v.confidence } : null, ]), ), meanConfidence: parse.meanConfidence, warnings: parse.warnings, }; } // ─── reconcile + apply ─────────────────────────────────────────────────────── /** * Walk every parsed field; classify into: * - `autoApplied` when the CRM column is null/empty. * - `conflicts` when both sides have a non-null value and they disagree. * * Numeric tolerance: ±1% (matches §14.6 imperial-vs-metric guidance, applied * uniformly across all numeric columns since the same rounding noise affects * weekly/daily rates too). */ export async function reconcilePdfWithBerth( berthId: string, parsed: ParseResult, ): Promise { const berthRow = await db.query.berths.findFirst({ where: eq(berths.id, berthId) }); if (!berthRow) throw new NotFoundError('Berth'); const fields = parsed.fields; const autoApplied: ReconcileResult['autoApplied'] = []; const conflicts: ReconcileConflict[] = []; const warnings: string[] = [...parsed.warnings]; // §14.6 — mooring-number mismatch warning. const pdfMooring = fields.mooringNumber?.value; if ( pdfMooring && typeof pdfMooring === 'string' && pdfMooring.toUpperCase() !== berthRow.mooringNumber.toUpperCase() ) { warnings.push( `PDF says berth ${pdfMooring} but uploading to ${berthRow.mooringNumber}. Confirm before applying.`, ); } for (const key of APPLIABLE_FIELDS) { const parsedField = fields[key]; if (!parsedField || parsedField.value == null) continue; const crmRaw = (berthRow as Record)[key]; const crmValue = normalizeForCompare(key, crmRaw); const pdfValue = normalizeForCompare(key, parsedField.value); if (crmValue == null || crmValue === '') { autoApplied.push({ field: key, value: parsedField.value as string | number }); continue; } if (!valuesEqual(crmValue, pdfValue, NUMERIC_FIELDS.has(key))) { conflicts.push({ field: key, crmValue: crmValue as string | number | null, pdfValue: pdfValue as string | number | null, pdfConfidence: parsedField.confidence, }); } } return { autoApplied, conflicts, warnings, engine: parsed.engine }; } /** * Apply a rep-confirmed slice of the reconcile diff to the berth row. The * caller passes the canonical `ExtractedBerthFields` keys; anything outside * `APPLIABLE_FIELDS` is silently dropped to keep this endpoint a hard * allowlist. */ export async function applyParseResults( berthId: string, versionId: string, fieldsToApply: Partial, ): Promise<{ updatedFields: Array }> { const berthRow = await db.query.berths.findFirst({ where: eq(berths.id, berthId) }); if (!berthRow) throw new NotFoundError('Berth'); const versionRow = await db.query.berthPdfVersions.findFirst({ where: and(eq(berthPdfVersions.id, versionId), eq(berthPdfVersions.berthId, berthId)), }); if (!versionRow) throw new NotFoundError('Berth PDF version'); const update: Record = {}; const applied: Array = []; for (const key of APPLIABLE_FIELDS) { const value = fieldsToApply[key]; if (value === undefined) continue; if (value === null) { update[key] = null; applied.push(key); continue; } if (NUMERIC_FIELDS.has(key)) { const n = typeof value === 'number' ? value : Number(value); if (!Number.isFinite(n)) continue; // numeric columns expect strings to preserve precision. update[key] = String(n); } else { update[key] = String(value); } applied.push(key); } if (applied.length === 0) { throw new ValidationError('No appliable fields supplied.'); } update.updatedAt = new Date(); await db.transaction(async (tx) => { await tx.update(berths).set(update).where(eq(berths.id, berthId)); // Stamp the applied-field set onto parse_results for audit. const prior = (versionRow.parseResults as Record | null) ?? {}; await tx .update(berthPdfVersions) .set({ parseResults: { ...prior, appliedFields: applied, appliedAt: new Date().toISOString(), }, }) .where(eq(berthPdfVersions.id, versionId)); }); return { updatedFields: applied }; } // ─── version listing + rollback ────────────────────────────────────────────── export interface BerthPdfVersionListItem { id: string; versionNumber: number; fileName: string; fileSizeBytes: number; uploadedBy: string; uploadedAt: Date; isCurrent: boolean; /** Pre-signed download URL (15-min TTL). */ downloadUrl: string; downloadUrlExpiresAt: Date; parseEngine: ParserEngine | null; } export async function listBerthPdfVersions(berthId: string): Promise { const berthRow = await db.query.berths.findFirst({ where: eq(berths.id, berthId) }); if (!berthRow) throw new NotFoundError('Berth'); const rows = await db .select() .from(berthPdfVersions) .where(eq(berthPdfVersions.berthId, berthId)) .orderBy(desc(berthPdfVersions.versionNumber)); const backend = await getStorageBackend(); const out: BerthPdfVersionListItem[] = []; for (const row of rows) { const parseEngine = (row.parseResults as { engine?: ParserEngine } | null)?.engine ?? null; const presigned = await backend.presignDownload(row.storageKey, { expirySeconds: 900, filename: row.fileName, contentType: 'application/pdf', }); out.push({ id: row.id, versionNumber: row.versionNumber, fileName: row.fileName, fileSizeBytes: row.fileSizeBytes, uploadedBy: row.uploadedBy, uploadedAt: row.uploadedAt, isCurrent: berthRow.currentPdfVersionId === row.id, downloadUrl: presigned.url, downloadUrlExpiresAt: presigned.expiresAt, parseEngine, }); } return out; } /** * Set `berths.current_pdf_version_id` to the requested version. Per §14.6, * this does NOT re-parse and re-update the berth columns — that's a separate * deliberate "extract data from this version" action. */ export async function rollbackToVersion( berthId: string, versionId: string, ): Promise<{ versionId: string; versionNumber: number }> { const versionRow = await db.query.berthPdfVersions.findFirst({ where: and(eq(berthPdfVersions.id, versionId), eq(berthPdfVersions.berthId, berthId)), }); if (!versionRow) throw new NotFoundError('Berth PDF version'); const berthRow = await db.query.berths.findFirst({ where: eq(berths.id, berthId) }); if (!berthRow) throw new NotFoundError('Berth'); if (berthRow.currentPdfVersionId === versionId) { throw new ConflictError('That version is already current; rollback is a no-op.'); } await db .update(berths) .set({ currentPdfVersionId: versionId, updatedAt: new Date() }) .where(eq(berths.id, berthId)); return { versionId, versionNumber: versionRow.versionNumber }; } // ─── compare helpers ───────────────────────────────────────────────────────── function normalizeForCompare( key: keyof ExtractedBerthFields, raw: unknown, ): string | number | null { if (raw == null) return null; if (NUMERIC_FIELDS.has(key)) { const n = typeof raw === 'number' ? raw : Number(String(raw).replace(/[^0-9.\-]/g, '')); return Number.isFinite(n) ? n : null; } if (typeof raw === 'string') return raw.trim(); return String(raw); } function valuesEqual(a: unknown, b: unknown, isNumeric: boolean): boolean { if (a == null && b == null) return true; if (a == null || b == null) return false; if (isNumeric) { const an = Number(a); const bn = Number(b); if (!Number.isFinite(an) || !Number.isFinite(bn)) return false; if (an === bn) return true; if (bn === 0) return Math.abs(an - bn) < 0.0001; return Math.abs(an - bn) / Math.abs(bn) <= IMPERIAL_METRIC_TOLERANCE; } return String(a).trim().toLowerCase() === String(b).trim().toLowerCase(); } // ─── re-exports the route layer leans on ───────────────────────────────────── export { parseBerthPdf } from './berth-pdf-parser'; export type { ExtractedBerthFields, ParsedField, ParseResult, ParserEngine, } from './berth-pdf-parser';