diff --git a/src/lib/services/berth-price-reconcile.service.ts b/src/lib/services/berth-price-reconcile.service.ts new file mode 100644 index 00000000..82fc978c --- /dev/null +++ b/src/lib/services/berth-price-reconcile.service.ts @@ -0,0 +1,166 @@ +/** + * Bulk berth price reconciliation (CM-2 Part A). + * + * Re-parses each berth's CURRENT spec-sheet PDF (stored parseResults are + * stale/wrong — the old purchase-price regex matched 0/113 real sheets), + * surfaces old→new price diffs for an admin review page, and applies only the + * rows a rep explicitly approves. Nothing is written until apply. + */ + +import pLimit from 'p-limit'; +import { and, eq, inArray, isNull } from 'drizzle-orm'; + +import { db } from '@/lib/db'; +import { berths, berthPdfVersions } from '@/lib/db/schema/berths'; +import { getStorageBackend } from '@/lib/storage'; +import { logger } from '@/lib/logger'; + +import { parseBerthPdf, extractPurchasePrice } from './berth-pdf-parser'; + +export interface PriceReconcileRow { + berthId: string; + mooringNumber: string; + area: string | null; + currentPrice: number | null; + currentCurrency: string; + parsedPrice: number | null; + parsedCurrency: string | null; + versionId: string | null; + status: 'changed' | 'matched' | 'needs_review' | 'no_pdf'; + warning?: string; +} + +async function streamToBuffer(stream: AsyncIterable): Promise { + const chunks: Buffer[] = []; + for await (const chunk of stream) { + chunks.push(typeof chunk === 'string' ? Buffer.from(chunk) : chunk); + } + return Buffer.concat(chunks); +} + +/** + * For every active berth in the port, re-parse the current spec-sheet PDF and + * report the parsed main price alongside the stored price. Tenant-scoped by + * `portId`. Bounded concurrency keeps the S3/filesystem round-trips in check. + */ +export async function listPriceReconciliation(portId: string): Promise { + const rows = await db + .select({ + berthId: berths.id, + mooringNumber: berths.mooringNumber, + area: berths.area, + currentPrice: berths.price, + currentCurrency: berths.priceCurrency, + versionId: berths.currentPdfVersionId, + storageKey: berthPdfVersions.storageKey, + }) + .from(berths) + .leftJoin(berthPdfVersions, eq(berthPdfVersions.id, berths.currentPdfVersionId)) + .where(and(eq(berths.portId, portId), isNull(berths.archivedAt))) + .orderBy(berths.mooringNumber); + + const backend = await getStorageBackend(); + const limit = pLimit(8); + + return Promise.all( + rows.map((r) => + limit(async (): Promise => { + const currentPrice = r.currentPrice == null ? null : Number(r.currentPrice); + const base = { + berthId: r.berthId, + mooringNumber: r.mooringNumber, + area: r.area, + currentPrice, + currentCurrency: r.currentCurrency, + versionId: r.versionId, + }; + if (!r.versionId || !r.storageKey) { + return { ...base, parsedPrice: null, parsedCurrency: null, status: 'no_pdf' }; + } + try { + const buffer = await streamToBuffer( + (await backend.get(r.storageKey)) as AsyncIterable, + ); + const parse = await parseBerthPdf(buffer); + const price = extractPurchasePrice(parse.rawText ?? ''); + if (price.value == null) { + return { + ...base, + parsedPrice: null, + parsedCurrency: null, + status: 'needs_review', + warning: price.warning, + }; + } + const status = currentPrice === price.value ? 'matched' : 'changed'; + return { ...base, parsedPrice: price.value, parsedCurrency: price.currency, status }; + } catch (err) { + logger.warn({ berthId: r.berthId, err }, 'price-reconcile: parse failed'); + return { + ...base, + parsedPrice: null, + parsedCurrency: null, + status: 'needs_review', + warning: 'PDF could not be parsed.', + }; + } + }), + ), + ); +} + +/** + * Apply a rep-approved slice of parsed prices to `berths.price`/`priceCurrency`. + * Tenant-scoped: cross-port berth ids are silently skipped (defense in depth on + * top of the route's permission gate). Stamps each berth's current PDF version + * `parseResults.bulkPriceApplied` for audit. + */ +export async function applyBulkBerthPrices( + portId: string, + approvals: Array<{ berthId: string; price: number; currency: string }>, + actingUserId: string, +): Promise<{ updated: number }> { + if (approvals.length === 0) return { updated: 0 }; + const ids = approvals.map((a) => a.berthId); + const owned = await db + .select({ id: berths.id, vid: berths.currentPdfVersionId }) + .from(berths) + .where(and(eq(berths.portId, portId), inArray(berths.id, ids))); + const ownedVid = new Map(owned.map((b) => [b.id, b.vid])); + + let updated = 0; + await db.transaction(async (tx) => { + for (const a of approvals) { + if (!ownedVid.has(a.berthId)) continue; // cross-port → skip + if (!Number.isFinite(a.price) || a.price < 0) continue; + await tx + .update(berths) + .set({ price: String(a.price), priceCurrency: a.currency, updatedAt: new Date() }) + .where(and(eq(berths.id, a.berthId), eq(berths.portId, portId))); + const vid = ownedVid.get(a.berthId); + if (vid) { + const [ver] = await tx + .select({ pr: berthPdfVersions.parseResults }) + .from(berthPdfVersions) + .where(eq(berthPdfVersions.id, vid)); + const prior = (ver?.pr as Record | null) ?? {}; + await tx + .update(berthPdfVersions) + .set({ + parseResults: { + ...prior, + bulkPriceApplied: { + price: a.price, + currency: a.currency, + by: actingUserId, + at: new Date().toISOString(), + }, + }, + }) + .where(eq(berthPdfVersions.id, vid)); + } + updated += 1; + } + }); + return { updated }; +} diff --git a/tests/integration/berth-price-reconcile.test.ts b/tests/integration/berth-price-reconcile.test.ts new file mode 100644 index 00000000..afdaace1 --- /dev/null +++ b/tests/integration/berth-price-reconcile.test.ts @@ -0,0 +1,86 @@ +/** + * Integration tests for the bulk berth price-reconcile service (CM-2 Part A). + * + * Uses the real filesystem storage backend (seeded below) + a real spec-sheet + * PDF, so the full upload → store → re-parse → extract path is exercised end to + * end with no storage mock. + */ + +import { readFileSync } from 'node:fs'; +import path from 'node:path'; + +import { eq } from 'drizzle-orm'; +import { beforeEach, describe, expect, it } from 'vitest'; + +import { + listPriceReconciliation, + applyBulkBerthPrices, +} from '@/lib/services/berth-price-reconcile.service'; +import { uploadBerthPdf } from '@/lib/services/berth-pdf.service'; +import { db } from '@/lib/db'; +import { berths } from '@/lib/db/schema/berths'; +import { systemSettings } from '@/lib/db/schema/system'; + +import { makeBerth, makePort } from '../helpers/factories'; + +const A1_PDF = readFileSync(path.join(process.cwd(), 'berth_pdf_example/Berth_Spec_Sheet_A1.pdf')); + +beforeEach(async () => { + await db + .insert(systemSettings) + .values({ key: 'storage_backend', value: 'filesystem', portId: null, updatedBy: null }) + .onConflictDoNothing(); +}); + +describe('listPriceReconciliation', () => { + it('parses the main price for a berth with a PDF and flags one without', async () => { + const port = await makePort(); + const withPdf = await makeBerth({ portId: port.id, overrides: { mooringNumber: 'A1' } }); + const withoutPdf = await makeBerth({ portId: port.id, overrides: { mooringNumber: 'Z9' } }); + + await uploadBerthPdf({ + berthId: withPdf.id, + portId: port.id, + buffer: A1_PDF, + fileName: 'Berth_Spec_Sheet_A1.pdf', + uploadedBy: 'test-user', + }); + + const rows = await listPriceReconciliation(port.id); + const w = rows.find((r) => r.mooringNumber === 'A1'); + const wo = rows.find((r) => r.mooringNumber === 'Z9'); + + expect(w?.parsedPrice).toBe(3880800); + expect(w?.parsedCurrency).toBe('USD'); + expect(w?.currentPrice).toBeNull(); + expect(w?.status).toBe('changed'); // CRM price null → changed + expect(wo?.status).toBe('no_pdf'); + }); +}); + +describe('applyBulkBerthPrices', () => { + it('writes only approved, in-port berths and skips cross-port ids', async () => { + const portA = await makePort(); + const portB = await makePort(); + const berthA = await makeBerth({ portId: portA.id, overrides: { mooringNumber: 'A1' } }); + const berthB = await makeBerth({ portId: portB.id, overrides: { mooringNumber: 'A1' } }); + + const res = await applyBulkBerthPrices( + portA.id, + [ + { berthId: berthA.id, price: 3880800, currency: 'USD' }, + { berthId: berthB.id, price: 999, currency: 'USD' }, // foreign port → skipped + ], + 'test-user', + ); + + expect(res.updated).toBe(1); + + const [a] = await db.select().from(berths).where(eq(berths.id, berthA.id)); + expect(Number(a!.price)).toBe(3880800); + expect(a!.priceCurrency).toBe('USD'); + + const [b] = await db.select().from(berths).where(eq(berths.id, berthB.id)); + expect(b!.price).toBeNull(); // untouched + }); +});