/** * Bulk berth price reconciliation (CM-2 Part A). * * Re-parses each berth's CURRENT spec-sheet PDF (stored parseResults are * stale/wrong — the old purchase-price regex matched 0/113 real sheets), * surfaces old→new price diffs for an admin review page, and applies only the * rows a rep explicitly approves. Nothing is written until apply. */ import pLimit from 'p-limit'; import { and, eq, inArray, isNull } from 'drizzle-orm'; import { db } from '@/lib/db'; import { berths, berthPdfVersions } from '@/lib/db/schema/berths'; import { getStorageBackend } from '@/lib/storage'; import { logger } from '@/lib/logger'; import { parseBerthPdf, extractPurchasePrice } from './berth-pdf-parser'; export interface PriceReconcileRow { berthId: string; mooringNumber: string; area: string | null; currentPrice: number | null; currentCurrency: string; parsedPrice: number | null; parsedCurrency: string | null; versionId: string | null; status: 'changed' | 'matched' | 'needs_review' | 'no_pdf'; warning?: string; } async function streamToBuffer(stream: AsyncIterable): Promise { const chunks: Buffer[] = []; for await (const chunk of stream) { chunks.push(typeof chunk === 'string' ? Buffer.from(chunk) : chunk); } return Buffer.concat(chunks); } /** * For every active berth in the port, re-parse the current spec-sheet PDF and * report the parsed main price alongside the stored price. Tenant-scoped by * `portId`. Bounded concurrency keeps the S3/filesystem round-trips in check. */ export async function listPriceReconciliation(portId: string): Promise { const rows = await db .select({ berthId: berths.id, mooringNumber: berths.mooringNumber, area: berths.area, currentPrice: berths.price, currentCurrency: berths.priceCurrency, versionId: berths.currentPdfVersionId, storageKey: berthPdfVersions.storageKey, }) .from(berths) .leftJoin(berthPdfVersions, eq(berthPdfVersions.id, berths.currentPdfVersionId)) .where(and(eq(berths.portId, portId), isNull(berths.archivedAt))) .orderBy(berths.mooringNumber); const backend = await getStorageBackend(); const limit = pLimit(8); return Promise.all( rows.map((r) => limit(async (): Promise => { const currentPrice = r.currentPrice == null ? null : Number(r.currentPrice); const base = { berthId: r.berthId, mooringNumber: r.mooringNumber, area: r.area, currentPrice, currentCurrency: r.currentCurrency, versionId: r.versionId, }; if (!r.versionId || !r.storageKey) { return { ...base, parsedPrice: null, parsedCurrency: null, status: 'no_pdf' }; } try { const buffer = await streamToBuffer( (await backend.get(r.storageKey)) as AsyncIterable, ); const parse = await parseBerthPdf(buffer); const price = extractPurchasePrice(parse.rawText ?? ''); if (price.value == null) { return { ...base, parsedPrice: null, parsedCurrency: null, status: 'needs_review', warning: price.warning, }; } const status = currentPrice === price.value ? 'matched' : 'changed'; return { ...base, parsedPrice: price.value, parsedCurrency: price.currency, status }; } catch (err) { logger.warn({ berthId: r.berthId, err }, 'price-reconcile: parse failed'); return { ...base, parsedPrice: null, parsedCurrency: null, status: 'needs_review', warning: 'PDF could not be parsed.', }; } }), ), ); } /** * Apply a rep-approved slice of parsed prices to `berths.price`/`priceCurrency`. * Tenant-scoped: cross-port berth ids are silently skipped (defense in depth on * top of the route's permission gate). Stamps each berth's current PDF version * `parseResults.bulkPriceApplied` for audit. */ export async function applyBulkBerthPrices( portId: string, approvals: Array<{ berthId: string; price: number; currency: string }>, actingUserId: string, ): Promise<{ updated: number }> { if (approvals.length === 0) return { updated: 0 }; const ids = approvals.map((a) => a.berthId); const owned = await db .select({ id: berths.id, vid: berths.currentPdfVersionId }) .from(berths) .where(and(eq(berths.portId, portId), inArray(berths.id, ids))); const ownedVid = new Map(owned.map((b) => [b.id, b.vid])); let updated = 0; await db.transaction(async (tx) => { for (const a of approvals) { if (!ownedVid.has(a.berthId)) continue; // cross-port → skip if (!Number.isFinite(a.price) || a.price < 0) continue; await tx .update(berths) .set({ price: String(a.price), priceCurrency: a.currency, updatedAt: new Date() }) .where(and(eq(berths.id, a.berthId), eq(berths.portId, portId))); const vid = ownedVid.get(a.berthId); if (vid) { const [ver] = await tx .select({ pr: berthPdfVersions.parseResults }) .from(berthPdfVersions) .where(eq(berthPdfVersions.id, vid)); const prior = (ver?.pr as Record | null) ?? {}; await tx .update(berthPdfVersions) .set({ parseResults: { ...prior, bulkPriceApplied: { price: a.price, currency: a.currency, by: actingUserId, at: new Date().toISOString(), }, }, }) .where(eq(berthPdfVersions.id, vid)); } updated += 1; } }); return { updated }; }