feat(audit-cleanup): finish all 15 outstanding items from verified backlog

Audit cleanup completion plan, all tiers shipped:

Tier 1 (security + data integrity)
- A.7 RTBF true wipe: redact email_messages body/subject/addresses for
  threads owned by deleted client; redact document_sends.recipient_email;
  collect file storage keys + delete blobs post-commit.
- A.8 user_permission_overrides FK: documented inline why cascade is
  correct (not set-null as audit suggested) — overrides have no value
  without their user.
- W2.14 PII redaction: camelCase normalization in audit.ts +
  error-events.service.ts isSensitiveKey; added city/postal/country/
  birth fragments. firstName/lastName/dateOfBirth/postalCode etc. now
  caught in BOTH masker paths. 12 new test cases lock the coverage.

Tier 2 (Documenso completion + refactor)
- C.2: documentEvents.recipient_email column + partial unique index for
  per-recipient webhook dedup (migration 0075). handleDocumentSigned
  now sets recipient_email on insert.
- Phase 2: completion_cc_emails distribution. handleDocumentCompleted
  reads documents.completionCcEmails, filters out signer-duplicates
  case-insensitively, fans signed PDF out to non-signer recipients.
- C.4: extracted createPublicInterest() service from the 346-line
  api/public/interests route. Route becomes a thin shell (rate-limit,
  port resolution, audit log, email fan-out). The trio creation logic
  is now unit-testable without an HTTP fixture.
- Phase 4: POST /api/v1/document-templates/[id]/detect-fields wired
  to document-field-detector.detectFields(). Sparkles "Auto-detect"
  button added to template-editor.tsx — maps DetectedField → marker
  with best-guess merge token (DATE / NAME / EMAIL); user retags.

Tier 3 (reporting + recommender snapshot lockfiles)
- W7.reports: extracted rollupStageRevenue / rollupStageCounts /
  computeTotalForecast / computeOccupancyRate / rollupBerthStatusCounts
  into src/lib/services/report-math.ts (pure functions). 16 new tests
  including an inline-snapshot lockfile on a representative 7-stage
  forecast. report-generators.ts now delegates.
- W7.recommender: 18 new toMatchSnapshot tripwires on classifyTier
  boundaries + computeHeat at canonical input points.

Tier 4 (rolling)
- W6.attach: fixed outdated CLAUDE.md claim — threshold banner is
  informational and never depended on IMAP; bounce monitoring (the
  IMAP poller) is separate.
- D.1 + D.2: documented deferral inline with full why-not-build-it
  reasoning so a future engineer sees the rationale.
- G.1: representative formatDate sweep (audit-log-list, user-list,
  document-templates merge tokens, document-signing email). Rest of
  the ~100 sites stay rolling.

Quality gates: 1420/1420 vitest (46 new tests above baseline of 1374),
tsc clean, 0 lint errors.

Plan: docs/superpowers/plans/2026-05-18-audit-cleanup-completion.md
Migration: 0075_c2_document_events_recipient_email.sql (applied to dev DB).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-18 18:22:36 +02:00
parent ef0dc5abc4
commit b3f87563c6
25 changed files with 2569 additions and 350 deletions

View File

@@ -109,9 +109,10 @@ export interface AuditLogParams {
}
// Lower-cased key fragments. A metadata key is masked if any fragment is
// contained as a substring after lowercase + snake/kebab normalization.
// Substring match catches `recipientEmail`, `sent_to_email`, `userEmail`,
// `attempted_email`, `from_address`, `phone_number`, `passwordHash`, etc.
// contained as a substring after camelCase→snake + lowercase + kebab→snake
// normalization. Substring match catches `recipientEmail`, `sent_to_email`,
// `userEmail`, `from_address`, `phone_number`, `passwordHash`, `firstName`,
// `postalCode`, `dateOfBirth`, etc.
const SENSITIVE_KEY_FRAGMENTS = [
'email',
'phone',
@@ -125,9 +126,12 @@ const SENSITIVE_KEY_FRAGMENTS = [
'authorization',
'cookie',
'address', // physical/mailing addresses
'city',
'postal',
'country',
'dob',
'date_of_birth',
'birthdate',
'birth',
'tax_id',
'taxid',
'national_id',
@@ -136,7 +140,7 @@ const SENSITIVE_KEY_FRAGMENTS = [
'iban',
'card_number',
'cvv',
'recipient', // e.g. recipientEmail catches the parent too — preserves intent
'recipient',
'first_name',
'last_name',
'full_name',
@@ -144,7 +148,10 @@ const SENSITIVE_KEY_FRAGMENTS = [
];
function isSensitiveKey(key: string): boolean {
const k = key.toLowerCase().replace(/[-]/g, '_');
const k = key
.replace(/([a-z0-9])([A-Z])/g, '$1_$2')
.toLowerCase()
.replace(/[-]/g, '_');
return SENSITIVE_KEY_FRAGMENTS.some((frag) => k.includes(frag));
}

View File

@@ -0,0 +1,21 @@
-- C.2: per-recipient webhook event dedup.
--
-- The legacy `idx_de_dedup` index keyed off (document_id, signature_hash)
-- where signature_hash was a body-content hash. A RECIPIENT_SIGNED webhook
-- for signer A and a separate RECIPIENT_SIGNED for signer B on the same
-- envelope would have identical bodies in some Documenso versions, causing
-- the second to be incorrectly deduplicated as a re-delivery.
--
-- This migration adds:
-- 1. A nullable `recipient_email` column to document_events.
-- 2. A partial unique index on (document_id, recipient_email, event_type)
-- where recipient_email IS NOT NULL — so per-recipient events dedup
-- independently while legacy events without recipient context fall
-- through to the existing signature_hash dedup.
ALTER TABLE document_events
ADD COLUMN IF NOT EXISTS recipient_email text;
CREATE UNIQUE INDEX IF NOT EXISTS idx_de_per_recipient_dedup
ON document_events (document_id, recipient_email, event_type)
WHERE recipient_email IS NOT NULL;

View File

@@ -207,8 +207,13 @@ export const documentEvents = pgTable(
// H-01: events outlive their signer row so the audit trail stays
// intact if a recipient is removed.
signerId: text('signer_id').references(() => documentSigners.id, { onDelete: 'set null' }),
// C.2: recipient_email captured at event time. Enables per-recipient
// dedup (`(documenso_document_id, recipient_email, event_type)`) so
// a RECIPIENT_SIGNED webhook for signer-A doesn't dedup against an
// earlier RECIPIENT_SIGNED for signer-B on the same envelope.
recipientEmail: text('recipient_email'),
eventData: jsonb('event_data').default({}),
signatureHash: text('signature_hash'), // deduplication
signatureHash: text('signature_hash'), // deduplication (legacy: per-payload-hash)
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
},
(table) => [
@@ -218,6 +223,13 @@ export const documentEvents = pgTable(
uniqueIndex('idx_de_dedup')
.on(table.documentId, table.signatureHash)
.where(sql`${table.signatureHash} IS NOT NULL`),
// C.2: per-recipient event dedup. Distinct event_type per (document,
// recipient) so re-delivery of the same SIGNED webhook for the same
// recipient is a no-op, while a different recipient's SIGNED still
// lands.
uniqueIndex('idx_de_per_recipient_dedup')
.on(table.documentId, table.recipientEmail, table.eventType)
.where(sql`${table.recipientEmail} IS NOT NULL`),
],
);

View File

@@ -314,6 +314,11 @@ export const userPermissionOverrides = pgTable(
id: text('id')
.primaryKey()
.$defaultFn(() => crypto.randomUUID()),
// onDelete: 'cascade' is intentional here (not 'set null' as a stale 2026-05-12
// audit item suggested). A permission override has no semantic value without
// the user it grants permissions to — preserving a row with user_id=NULL
// would be an orphan with no audit value, since the override is per-user
// additive permissions, not a historical event we need to retain.
userId: text('user_id')
.notNull()
.references(() => user.id, { onDelete: 'cascade' }),

View File

@@ -22,6 +22,7 @@ import { Button, Hr, Link, Text, render } from '@react-email/components';
import * as React from 'react';
import { brandingPrimaryColor, renderShell, safeUrl, type BrandingShell } from '@/lib/email/shell';
import { formatDate } from '@/lib/utils/format-date';
interface RenderOpts {
subject?: string | null;
@@ -212,13 +213,7 @@ export async function signingCompletedEmail(
.replace(/\{\{clientName\}\}/g, data.clientName)
.replace(/\{\{portName\}\}/g, data.portName)
: `${data.documentLabel} fully signed — ${data.clientName}`;
const completedDateStr = data.completedAt.toLocaleString('en-GB', {
day: '2-digit',
month: '2-digit',
year: 'numeric',
hour: '2-digit',
minute: '2-digit',
});
const completedDateStr = formatDate(data.completedAt, 'datetime.medium');
const body = await render(
<CompletedBody data={data} accent={accent} completedDateStr={completedDateStr} />,

View File

@@ -44,8 +44,16 @@ export async function registerRecurringJobs(): Promise<void> {
// Report scheduler - checks every minute for reports due to run
{ queue: 'reports', name: 'report-scheduler', pattern: '* * * * *' },
// Notification digest - configurable per user; placeholder fires hourly
// TODO(L2): make per-user schedule configurable (read from user_settings)
// Notification digest — fires hourly globally; the worker checks each
// user's `notification_digest_paused_until` and unread-count threshold
// before composing a digest, so most ticks are no-ops. Per-user time-
// of-day scheduling is DEFERRED — implementing it requires a product
// decision on UX (slider? time picker? per-channel toggles?) and adds
// a per-user cron path that doesn't pay off until enough users are
// actively customizing it. The hourly bucket aligns with how reps
// already check inboxes ("on the hour") so the current behavior is
// operationally acceptable without per-user override. Revisit when
// a customer asks for digest-time control.
{ queue: 'email', name: 'notification-digest', pattern: '0 * * * *' },
// Cleanup jobs

View File

@@ -6,14 +6,37 @@ import { logger } from '@/lib/logger';
import { attachWorkerAudit } from '@/lib/queue/audit-helpers';
import { QUEUE_CONFIGS } from '@/lib/queue';
/**
* Bulk-import worker — DEFERRED FEATURE (placeholder).
*
* Status: registered with BullMQ so any future enqueue site lands on a
* real worker instance instead of disappearing into an unbound queue.
* No callers currently dispatch to this worker — the body is intentionally
* a no-op that logs the dispatch for forensics.
*
* Why deferred (vs implemented inline):
* - CSV/Excel import is a real product feature, not a refactor. Done
* properly it needs: per-entity schema mapping (clients / berths /
* interests / companies / yachts), zod-level row validation, per-row
* error rollup with line-numbered diagnostics, dry-run preview,
* progress reporting, dedupe-on-conflict policy, admin upload UI
* with column-mapping UX. Building it speculatively without a
* customer in the room would lock in a UX that may not match what
* real importers want.
* - When the trigger comes (a customer needs to bulk-load a season
* roster or migrate from another CRM), build it from product spec
* not from this placeholder.
*
* What's required to ship: papaparse (CSV) + a thin schema-per-entity
* mapping layer, plus an admin /admin/import page with a per-entity
* picker + file dropzone. The queue registration here stays as-is.
*/
export const importWorker = new Worker(
'import',
async (job: Job) => {
logger.info({ jobId: job.id, jobName: job.name }, 'Processing import job');
// TODO(L2): implement import job handlers
// - CSV client import
// - Excel berth spec import
// - Note: maxAttempts=1 - imports are idempotent, user retries manually
// Deferred — no callers enqueue this. If a job lands, we log + swallow
// so a future test enqueue doesn't trip the failed-job alert.
},
{
connection: { url: env.REDIS_URL } as ConnectionOptions,

View File

@@ -37,7 +37,7 @@ import { interests } from '@/lib/db/schema/interests';
import { berthReservations } from '@/lib/db/schema/reservations';
import { files, documents, formSubmissions } from '@/lib/db/schema/documents';
import { documentSends } from '@/lib/db/schema/brochures';
import { emailThreads } from '@/lib/db/schema/email';
import { emailThreads, emailMessages } from '@/lib/db/schema/email';
import { reminders } from '@/lib/db/schema/operations';
import { scratchpadNotes } from '@/lib/db/schema/system';
import { websiteSubmissions } from '@/lib/db/schema/website-submissions';
@@ -47,8 +47,11 @@ import { sendEmail } from '@/lib/email';
import { logger } from '@/lib/logger';
import { createAuditLog, type AuditMeta } from '@/lib/audit';
import { demoteSystemFolderOnEntityDelete } from '@/lib/services/document-folders.service';
import { getStorageBackend } from '@/lib/storage';
import { ConflictError, NotFoundError, ValidationError } from '@/lib/errors';
const ERASED_SENTINEL = '[erased]';
const CODE_TTL_SECONDS = 10 * 60;
function codeKey(userId: string, clientId: string): string {
@@ -180,6 +183,15 @@ export async function hardDeleteClient(args: {
// forces the operator to request a fresh code.
await redis.del(key);
// Storage keys we'll need to delete POST-commit. Collected inside the tx
// so the read is consistent with what the tx detached. Deleting blobs
// INSIDE the tx would block the commit on remote storage latency and
// leave the tx hanging if S3 is slow; deleting AFTER commit means an
// S3 outage at most leaks the blob (a known acceptable RTBF tradeoff,
// since the DB row is detached + filename redacted, so the blob has
// no identifying metadata and can be reaped by a future sweeper).
const blobStorageKeys: string[] = [];
await db.transaction(async (tx) => {
// Lock the client row.
const [locked] = await tx
@@ -213,8 +225,58 @@ export async function hardDeleteClient(args: {
);
}
// Detach nullable FKs so we keep their audit history.
await tx.update(files).set({ clientId: null }).where(eq(files.clientId, args.clientId));
// A.7 RTBF wipe — Article-17 erasure of PII-bearing fields, not just FK
// detach. The previous code merely nullified clientId, which left:
// - email_messages.{body_html, body_text, subject, from/to/cc} intact
// - document_sends.recipient_email intact
// - files.{original_name, storage_path blobs} intact
// Below we (a) collect blob storage paths so we can delete them
// post-commit, (b) redact PII text columns to a sentinel, and only
// then (c) detach the FKs so the audit-trail rows survive without
// their data subject's content.
// (a) Collect file storage paths + original filenames (which may
// themselves contain PII like "alice-smith-passport.pdf").
const fileRows = await tx
.select({ id: files.id, storagePath: files.storagePath })
.from(files)
.where(eq(files.clientId, args.clientId));
blobStorageKeys.push(...fileRows.map((f) => f.storagePath));
if (fileRows.length > 0) {
await tx
.update(files)
.set({
clientId: null,
originalName: ERASED_SENTINEL,
filename: ERASED_SENTINEL,
})
.where(eq(files.clientId, args.clientId));
}
// (b) Redact email_messages content for threads owned by this client.
// Threads themselves stay (we detach via clientId=null below) so the
// audit log "a thread existed" remains; the message bodies, subjects,
// and address arrays — all PII — get wiped.
const threadRows = await tx
.select({ id: emailThreads.id })
.from(emailThreads)
.where(eq(emailThreads.clientId, args.clientId));
if (threadRows.length > 0) {
const threadIds = threadRows.map((t) => t.id);
await tx
.update(emailMessages)
.set({
bodyHtml: ERASED_SENTINEL,
bodyText: ERASED_SENTINEL,
subject: ERASED_SENTINEL,
fromAddress: ERASED_SENTINEL,
toAddresses: [ERASED_SENTINEL],
ccAddresses: null,
})
.where(inArray(emailMessages.threadId, threadIds));
}
await tx.update(documents).set({ clientId: null }).where(eq(documents.clientId, args.clientId));
await tx
.update(formSubmissions)
@@ -225,9 +287,12 @@ export async function hardDeleteClient(args: {
.set({ clientId: null })
.where(eq(emailThreads.clientId, args.clientId));
await tx.update(reminders).set({ clientId: null }).where(eq(reminders.clientId, args.clientId));
// (c) document_sends — redact recipient_email when detaching. The row
// stays (audit log "a doc was sent") but the recipient identity is wiped.
await tx
.update(documentSends)
.set({ clientId: null })
.set({ clientId: null, recipientEmail: ERASED_SENTINEL })
.where(eq(documentSends.clientId, args.clientId));
// G-C2: scratchpad_notes.linked_client_id is RESTRICT (default for no
// onDelete clause). Any rep who linked a scratchpad note to this client
@@ -264,6 +329,33 @@ export async function hardDeleteClient(args: {
);
});
// A.7 RTBF: delete blobs from storage post-commit. We never want a
// storage error to abort the DB tx (PII removal must succeed durably
// even if S3 is flaky), so this runs after commit and logs failures
// individually. Surviving blobs without a row reference are reaped by
// the standard orphan-blob sweeper job.
if (blobStorageKeys.length > 0) {
void (async () => {
const storage = await getStorageBackend();
let deleted = 0;
for (const key of blobStorageKeys) {
try {
await storage.delete(key);
deleted += 1;
} catch (err) {
logger.error(
{ err, clientId: args.clientId, storageKey: key },
'hardDeleteClient: blob delete failed (RTBF)',
);
}
}
logger.info(
{ clientId: args.clientId, deletedBlobs: deleted, totalBlobs: blobStorageKeys.length },
'hardDeleteClient: blob deletion complete',
);
})();
}
void createAuditLog({
portId: args.portId,
userId: args.requesterUserId,

View File

@@ -217,7 +217,7 @@ export async function resolveTemplate(
// Date tokens
const now = new Date();
tokenMap['{{date.today}}'] = now.toLocaleDateString('en-GB');
tokenMap['{{date.today}}'] = formatDate(now, 'date.medium');
tokenMap['{{date.year}}'] = String(now.getFullYear());
// Port tokens
@@ -304,7 +304,7 @@ export async function resolveTemplate(
tokenMap['{{interest.leadCategory}}'] = eoi.interest.leadCategory ?? '';
tokenMap['{{interest.berthNumber}}'] = eoi.berth?.mooringNumber ?? '';
tokenMap['{{interest.dateFirstContact}}'] = eoi.interest.dateFirstContact
? eoi.interest.dateFirstContact.toLocaleDateString('en-GB')
? formatDate(eoi.interest.dateFirstContact, 'date.medium')
: '';
tokenMap['{{interest.notes}}'] = eoi.interest.notes ?? '';
} catch (err) {

View File

@@ -1171,13 +1171,16 @@ export async function handleRecipientSigned(eventData: {
// varying rawBody hashes, so the (documentId, hash:signed:email) unique
// index would otherwise throw on duplicate deliveries and short-circuit
// the cascade below. `onConflictDoNothing` treats the duplicate as the
// no-op it is.
// no-op it is. C.2: recipient_email column is the dedup key for the
// per-recipient partial unique index, so a re-delivery of signer A's
// SIGNED is no-op'd, while signer B's SIGNED still lands.
await db
.insert(documentEvents)
.values({
documentId: doc.id,
eventType: 'signed',
signerId: signer?.id ?? null,
recipientEmail: eventData.recipientEmail ?? null,
signatureHash: eventData.signatureHash ?? null,
eventData: { recipientEmail: eventData.recipientEmail },
})
@@ -1691,10 +1694,12 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
// Phase 2: distribute the fully-signed PDF to every recipient via a
// branded "all signed" email. Re-read the document so we see the
// signedFileId the transaction above just committed.
// signedFileId the transaction above just committed + the
// completionCcEmails list (Phase 2 — sales mgr / accounts etc who get
// a copy without being a signer).
const completedDoc = await db.query.documents.findFirst({
where: eq(documents.id, doc.id),
columns: { signedFileId: true },
columns: { signedFileId: true, completionCcEmails: true },
});
if (completedDoc?.signedFileId) {
const signers = await db
@@ -1705,7 +1710,20 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
.from(documentSigners)
.where(eq(documentSigners.documentId, doc.id));
if (signers.length > 0) {
// Phase 2 CC list — emails that weren't signers but get a copy of
// the finalized PDF on completion. Filter to addresses not already
// in the signer set (case-insensitive) so a sales mgr who's also
// a signer doesn't get two emails.
const signerEmailSet = new Set(signers.map((s) => s.email.toLowerCase()));
const ccRecipients = (completedDoc.completionCcEmails ?? [])
.filter((e): e is string => typeof e === 'string' && e.trim().length > 0)
.map((e) => e.trim())
.filter((e) => !signerEmailSet.has(e.toLowerCase()))
.map((email) => ({ name: '', email }));
const allRecipients = [...signers, ...ccRecipients];
if (allRecipients.length > 0) {
const portRow = await db.query.ports.findFirst({
where: eq(ports.id, doc.portId),
columns: { name: true },
@@ -1727,7 +1745,7 @@ export async function handleDocumentCompleted(eventData: { documentId: string; p
await sendSigningCompleted({
portId: doc.portId,
portName: portRow?.name ?? 'Port Nimara',
recipients: signers,
recipients: allRecipients,
clientName,
documentLabel: DOC_TYPE_LABEL[doc.documentType] ?? 'Expression of Interest',
completedAt: new Date(),

View File

@@ -25,9 +25,10 @@ const BODY_MAX_BYTES = 1 * 1024;
// A 5xx in /api/v1/clients (create / update) was landing full client
// PII (full name, DOB, address, phone, nationality, email) in
// error_events.request_body_excerpt for the super-admin inspector.
// Match fragments case-insensitively + snake/kebab-normalized so the
// redactor catches `recipientEmail`, `client_email`, `phone_number`,
// `tax_id`, `passwordHash`, etc. without an exhaustive enumeration.
// Match fragments case-insensitively + camelCase→snake + kebab→snake
// normalized so the redactor catches `recipientEmail`, `firstName`,
// `postalCode`, `dateOfBirth`, `phone_number`, `passwordHash`, etc.
// without an exhaustive enumeration.
const SENSITIVE_KEY_FRAGMENTS = [
// Credentials
'password',
@@ -50,11 +51,14 @@ const SENSITIVE_KEY_FRAGMENTS = [
'whatsapp',
'dob',
'date_of_birth',
'birthdate',
'birth',
'address',
'street',
'city',
'postal',
'postcode',
'zip',
'country',
'national_id',
'passport',
'iban',
@@ -68,7 +72,10 @@ const SENSITIVE_KEY_FRAGMENTS = [
];
function isSensitiveKey(key: string): boolean {
const k = key.toLowerCase().replace(/[-]/g, '_');
const k = key
.replace(/([a-z0-9])([A-Z])/g, '$1_$2')
.toLowerCase()
.replace(/[-]/g, '_');
return SENSITIVE_KEY_FRAGMENTS.some((frag) => k.includes(frag));
}

View File

@@ -0,0 +1,303 @@
/**
* Public interest creation — extracted from `/api/public/interests/route.ts`
* per the C.4 audit finding ("Public POST routes bypass service layer"). The
* pre-extraction route was 346 lines of inline DB logic + audit + email
* fan-out, which made unit testing the dedup, ownership, and address rules
* effectively impossible without spinning up a full HTTP request fixture.
*
* After extraction:
* - The route handles HTTP concerns: rate-limit, port resolution from
* headers, parseBody validation, audit-log + email side-effect dispatch.
* - This service handles the transactional trio creation (client + yacht
* + interest, plus optional company + membership + address).
*
* The companion routes — `/api/public/website-inquiries/route.ts` (pure raw
* capture; no entity creation) and `/api/public/residential-inquiries/route.ts`
* (residential funnel, separate schema) — were intentionally NOT extracted
* here. Their bodies are smaller and their concerns don't overlap with the
* marina-funnel logic this service encapsulates.
*/
import { and, eq, isNull, sql } from 'drizzle-orm';
import { db } from '@/lib/db';
import { withTransaction } from '@/lib/db/utils';
import { interests, interestBerths } from '@/lib/db/schema/interests';
import { clients, clientContacts, clientAddresses } from '@/lib/db/schema/clients';
import { berths } from '@/lib/db/schema/berths';
import { yachts, yachtOwnershipHistory } from '@/lib/db/schema/yachts';
import { companies, companyMemberships } from '@/lib/db/schema/companies';
import { parsePhone } from '@/lib/i18n/phone';
import type { CountryCode } from '@/lib/i18n/countries';
import type { publicInterestSchema } from '@/lib/validators/interests';
import type { z } from 'zod';
type PublicInterestData = z.infer<typeof publicInterestSchema>;
type Tx = typeof db;
export interface CreatePublicInterestArgs {
portId: string;
data: PublicInterestData;
}
export interface CreatePublicInterestResult {
interestId: string;
clientId: string;
yachtId: string;
companyId: string | null;
berthId: string | null;
resolvedMooringNumber: string | null;
fullName: string;
firstName: string;
}
export async function createPublicInterest(
args: CreatePublicInterestArgs,
): Promise<CreatePublicInterestResult> {
const { portId, data } = args;
// Server-side phone normalization for older website builds that post raw
// international/national strings. Newer builds may pre-fill phoneE164/Country.
let phoneE164 = data.phoneE164 ?? null;
let phoneCountry: CountryCode | null = (data.phoneCountry as CountryCode | null) ?? null;
if (!phoneE164) {
const parsed = parsePhone(data.phone, phoneCountry ?? undefined);
phoneE164 = parsed.e164;
phoneCountry = parsed.country ?? phoneCountry;
}
const fullName =
data.firstName && data.lastName
? `${data.firstName} ${data.lastName}`
: (data.fullName ?? 'Unknown');
const firstName = data.firstName ?? fullName.split(/\s+/)[0] ?? 'Valued Guest';
// Resolve berth by mooring number (if provided). Read-only lookup — safe
// to do outside the transaction.
let berthId: string | null = null;
let resolvedMooringNumber: string | null = data.mooringNumber ?? null;
if (data.mooringNumber) {
const berth = await db.query.berths.findFirst({
where: and(eq(berths.mooringNumber, data.mooringNumber), eq(berths.portId, portId)),
});
if (berth) {
berthId = berth.id;
resolvedMooringNumber = berth.mooringNumber;
}
}
// ─── Transactional trio creation ────────────────────────────────────────
const result = await withTransaction(async (tx) => {
// 1. Find or create client by email. The inquiry-funnel audit
// flagged that the previous exact match was case-sensitive —
// capital-letter resubmissions spawned duplicate client+yacht+
// interest rows. Match LOWER(value) instead so foo@x.com and
// Foo@X.COM dedupe to the same client.
let clientId: string;
const normalizedEmail = data.email.trim().toLowerCase();
const existingContact = await tx.query.clientContacts.findFirst({
where: and(
eq(clientContacts.channel, 'email'),
sql`LOWER(${clientContacts.value}) = ${normalizedEmail}`,
),
});
if (existingContact) {
const existingClient = await tx.query.clients.findFirst({
where: eq(clients.id, existingContact.clientId),
});
if (existingClient && existingClient.portId === portId) {
clientId = existingClient.id;
const updates: Partial<typeof clients.$inferInsert> = {};
if (data.preferredContactMethod) {
updates.preferredContactMethod = data.preferredContactMethod;
}
if (data.nationalityIso && !existingClient.nationalityIso) {
updates.nationalityIso = data.nationalityIso;
}
if (Object.keys(updates).length > 0) {
await tx.update(clients).set(updates).where(eq(clients.id, clientId));
}
} else {
clientId = await createClientInTx(tx, portId, fullName, data, phoneE164, phoneCountry);
}
} else {
clientId = await createClientInTx(tx, portId, fullName, data, phoneE164, phoneCountry);
}
// 2. Optional: upsert company + add membership
let companyId: string | null = null;
if (data.company) {
const existingCompany = await tx.query.companies.findFirst({
where: and(
eq(companies.portId, portId),
sql`lower(${companies.name}) = lower(${data.company.name})`,
),
});
if (existingCompany) {
companyId = existingCompany.id;
} else {
const [newCompany] = await tx
.insert(companies)
.values({
portId,
name: data.company.name,
legalName: data.company.legalName ?? null,
taxId: data.company.taxId ?? null,
incorporationCountryIso: data.company.incorporationCountryIso ?? null,
incorporationSubdivisionIso: data.company.incorporationSubdivisionIso ?? null,
status: 'active',
})
.returning();
companyId = newCompany!.id;
}
// Add active membership only if one doesn't already exist (open row).
const existingMembership = await tx.query.companyMemberships.findFirst({
where: and(
eq(companyMemberships.companyId, companyId),
eq(companyMemberships.clientId, clientId),
isNull(companyMemberships.endDate),
),
});
if (!existingMembership) {
await tx.insert(companyMemberships).values({
companyId,
clientId,
role: data.company.role ?? 'representative',
startDate: new Date(),
isPrimary: false,
});
}
}
// 3. Create yacht. Owner is the company when provided, else the client.
const ownerType: 'client' | 'company' = companyId ? 'company' : 'client';
const ownerId = companyId ?? clientId;
const [newYacht] = await tx
.insert(yachts)
.values({
portId,
name: data.yacht.name,
hullNumber: data.yacht.hullNumber ?? null,
registration: data.yacht.registration ?? null,
flag: data.yacht.flag ?? null,
yearBuilt: data.yacht.yearBuilt ?? null,
lengthFt: data.yacht.lengthFt != null ? String(data.yacht.lengthFt) : null,
widthFt: data.yacht.widthFt != null ? String(data.yacht.widthFt) : null,
draftFt: data.yacht.draftFt != null ? String(data.yacht.draftFt) : null,
currentOwnerType: ownerType,
currentOwnerId: ownerId,
status: 'active',
})
.returning();
const yachtId = newYacht!.id;
// 3a. Open ownership_history row for the new yacht.
await tx.insert(yachtOwnershipHistory).values({
yachtId,
ownerType,
ownerId,
startDate: new Date(),
endDate: null,
createdBy: 'public-submission',
});
// 4. Store address if provided AND no primary address exists yet.
if (data.address && Object.values(data.address).some(Boolean)) {
const existingAddr = await tx.query.clientAddresses.findFirst({
where: and(eq(clientAddresses.clientId, clientId), eq(clientAddresses.isPrimary, true)),
});
if (!existingAddr) {
await tx.insert(clientAddresses).values({
clientId,
portId,
label: 'Primary',
streetAddress: data.address.street ?? null,
city: data.address.city ?? null,
subdivisionIso: data.address.subdivisionIso ?? null,
postalCode: data.address.postalCode ?? null,
countryIso: data.address.countryIso ?? null,
isPrimary: true,
});
}
}
// 5. Create interest with yachtId wired up.
const [newInterest] = await tx
.insert(interests)
.values({
portId,
clientId,
yachtId,
source: 'website',
pipelineStage: 'open',
})
.returning();
if (berthId) {
await tx.insert(interestBerths).values({
interestId: newInterest!.id,
berthId,
isPrimary: true,
isSpecificInterest: true,
isInEoiBundle: false,
});
}
return {
interestId: newInterest!.id,
clientId,
yachtId,
companyId,
};
});
return {
...result,
berthId,
resolvedMooringNumber,
fullName,
firstName,
};
}
async function createClientInTx(
tx: Tx,
portId: string,
fullName: string,
data: Pick<PublicInterestData, 'email' | 'phone' | 'preferredContactMethod' | 'nationalityIso'>,
phoneE164: string | null,
phoneCountry: CountryCode | null,
): Promise<string> {
const [newClient] = await tx
.insert(clients)
.values({
portId,
fullName,
preferredContactMethod: data.preferredContactMethod,
nationalityIso: data.nationalityIso ?? null,
source: 'website',
})
.returning();
const clientId = newClient!.id;
await tx.insert(clientContacts).values({
clientId,
channel: 'email',
// Store lowercased so the case-insensitive dedup match above always
// hits on subsequent submissions.
value: data.email.trim().toLowerCase(),
isPrimary: true,
});
await tx.insert(clientContacts).values({
clientId,
channel: 'phone',
value: data.phone,
valueE164: phoneE164,
valueCountry: phoneCountry,
isPrimary: false,
});
return clientId;
}

View File

@@ -6,6 +6,13 @@ import { berths } from '@/lib/db/schema/berths';
import { auditLogs, systemSettings } from '@/lib/db/schema/system';
import { STAGE_WEIGHTS, canonicalizeStage } from '@/lib/constants';
import { activeInterestsWhere } from '@/lib/services/active-interest';
import {
rollupStageRevenue,
rollupStageCounts,
rollupBerthStatusCounts,
computeOccupancyRate,
computeTotalForecast,
} from '@/lib/services/report-math';
// ─── Types ────────────────────────────────────────────────────────────────────
@@ -76,14 +83,9 @@ export async function fetchPipelineData(
.groupBy(interests.pipelineStage);
// M-L02: legacy 9-stage values (deposit_10pct, contract_sent…) may
// still be present on historical rows. canonicalizeStage maps them
// back to the modern 7-stage keys so the rollup doesn't carry phantom
// buckets through to the PDF.
const stageCountMap: Record<string, number> = {};
for (const row of stageCounts) {
const key = canonicalizeStage(row.stage);
stageCountMap[key] = (stageCountMap[key] ?? 0) + row.count;
}
// still be present on historical rows. rollupStageCounts canonicalizes
// via canonicalizeStage so historical rows fold into the modern bucket.
const stageCountMap = rollupStageCounts(stageCounts);
// Top 10 interests by berth price (via primary-berth junction join, plan §3.4).
const topInterestsRows = await db
@@ -141,13 +143,7 @@ export async function fetchRevenueData(
.groupBy(interests.pipelineStage);
// M-L02: canonicalize so legacy 9-stage rows fold into the modern bucket.
const stageRevenueMap: Record<string, string> = {};
for (const row of stageRevenue) {
const key = canonicalizeStage(row.stage);
const prior = parseFloat(stageRevenueMap[key] ?? '0');
const next = row.revenue ? parseFloat(String(row.revenue)) : 0;
stageRevenueMap[key] = String(prior + next);
}
const stageRevenueMap = rollupStageRevenue(stageRevenue);
// Total revenue from WON interests only. Reporting audit caught the
// `outcome='won'` is the canonical money-changed-hands signal — won
@@ -196,20 +192,14 @@ export async function fetchRevenueData(
.where(activeInterestsWhere(portId))
.groupBy(interests.pipelineStage);
let totalForecast = 0;
for (const row of forecastRows) {
if (!row.revenue) continue;
// M-L02: canonicalize so legacy keys hit pipelineWeights via their
// modern equivalent (otherwise the lookup falls through to 0 and the
// forecast silently undershoots).
const weight = pipelineWeights[canonicalizeStage(row.stage)] ?? 0;
totalForecast += parseFloat(String(row.revenue)) * weight;
}
// M-L02 covered inside computeTotalForecast via canonicalizeStage —
// legacy stage keys hit the weight map under their modern equivalent.
const totalForecast = computeTotalForecast(forecastRows, pipelineWeights);
return {
stageRevenue: stageRevenueMap,
totalCompleted: completedRevenue[0]?.total ? String(completedRevenue[0].total) : '0',
totalForecast: totalForecast.toFixed(2),
totalForecast,
pipelineWeights,
generatedAt: new Date().toISOString(),
};
@@ -278,23 +268,16 @@ export async function fetchOccupancyData(
.where(eq(berths.portId, portId))
.groupBy(berths.status);
const statusCountMap: Record<string, number> = {};
let totalBerths = 0;
for (const row of statusCounts) {
statusCountMap[row.status] = row.count;
totalBerths += row.count;
}
const { statusCounts: statusCountMap, totalBerths } = rollupBerthStatusCounts(statusCounts);
// Occupied = sold only. Per 2026-05-14 decision, `under_offer` is a
// hold (blocks the berth from sale to other clients) but the berth is
// still technically available until the deal closes. Aligned with the
// KPI tile + analytics timeline so the same dashboard shows one number.
const occupiedCount = statusCountMap['sold'] ?? 0;
const occupancyRate = totalBerths > 0 ? (occupiedCount / totalBerths) * 100 : 0;
// still technically available until the deal closes. computeOccupancyRate
// implements that rule + rounds to 1 decimal.
const { occupancyRate } = computeOccupancyRate(statusCountMap);
return {
statusCounts: statusCountMap,
occupancyRate: Math.round(occupancyRate * 10) / 10,
occupancyRate,
totalBerths,
generatedAt: new Date().toISOString(),
};

View File

@@ -0,0 +1,117 @@
/**
* Pure-math helpers extracted from report-generators.ts so the
* revenue/forecast/occupancy/funnel computations can be unit-tested
* deterministically without spinning up a Postgres fixture.
*
* The corresponding DB-bound `fetch*Data` functions in report-generators
* call into these helpers after gathering rows. Tests for the SQL itself
* remain integration-tier; this module covers the arithmetic so a future
* weight-tuning change can't silently shift the forecast number.
*/
import { STAGE_WEIGHTS, canonicalizeStage } from '@/lib/constants';
export interface StageRevenueRow {
stage: string;
revenue: string | number | null;
}
export interface StageCountRow {
stage: string;
count: number;
}
export interface BerthStatusRow {
status: string;
count: number;
}
/**
* Collapse a per-pipeline-stage revenue list into a canonicalized
* Record<canonicalStage, totalRevenueString>. Handles the legacy 9-stage
* keys via canonicalizeStage so historical rows fold into the modern
* 7-stage bucket they belong to.
*/
export function rollupStageRevenue(rows: StageRevenueRow[]): Record<string, string> {
const out: Record<string, string> = {};
for (const row of rows) {
const key = canonicalizeStage(row.stage);
const prior = parseFloat(out[key] ?? '0');
const next = row.revenue ? parseFloat(String(row.revenue)) : 0;
out[key] = String(prior + next);
}
return out;
}
/**
* Same as rollupStageRevenue but for counts (funnel breakdown).
*/
export function rollupStageCounts(rows: StageCountRow[]): Record<string, number> {
const out: Record<string, number> = {};
for (const row of rows) {
const key = canonicalizeStage(row.stage);
out[key] = (out[key] ?? 0) + row.count;
}
return out;
}
/**
* Pipeline-weighted forecast: sum(berth_price × stage_weight) for every
* active interest. The weight per stage resolves from per-port admin
* overrides (`system_settings.pipeline_weights`) and falls back to the
* STAGE_WEIGHTS defaults. Legacy stage keys canonicalize before lookup
* so the forecast doesn't silently undershoot due to a key miss.
*
* Returns the forecast as a 2-decimal-fixed string for stable
* comparison + downstream PDF rendering.
*/
export function computeTotalForecast(
rows: StageRevenueRow[],
weights: Record<string, number> = STAGE_WEIGHTS,
): string {
let total = 0;
for (const row of rows) {
if (!row.revenue) continue;
const weight = weights[canonicalizeStage(row.stage)] ?? 0;
total += parseFloat(String(row.revenue)) * weight;
}
return total.toFixed(2);
}
/**
* Occupancy rate as a percentage. "Occupied" = sold only — per the
* 2026-05-14 product decision, under_offer is a hold (blocks sale to
* other clients) but doesn't count as the berth being occupied yet.
* Returns the rate to 1 decimal place; returns 0 when totalBerths=0
* to avoid NaN propagation through the PDF.
*/
export function computeOccupancyRate(statusCounts: Record<string, number>): {
occupancyRate: number;
totalBerths: number;
} {
let totalBerths = 0;
for (const k of Object.keys(statusCounts)) {
totalBerths += statusCounts[k] ?? 0;
}
const occupiedCount = statusCounts['sold'] ?? 0;
const occupancyRate =
totalBerths > 0 ? Math.round((occupiedCount / totalBerths) * 100 * 10) / 10 : 0;
return { occupancyRate, totalBerths };
}
/**
* Build the per-status count map from a status-grouped query result.
* Returns the map AND the total count so callers don't have to sum
* again themselves.
*/
export function rollupBerthStatusCounts(rows: BerthStatusRow[]): {
statusCounts: Record<string, number>;
totalBerths: number;
} {
const statusCounts: Record<string, number> = {};
let totalBerths = 0;
for (const row of rows) {
statusCounts[row.status] = row.count;
totalBerths += row.count;
}
return { statusCounts, totalBerths };
}