Files
pn-new-crm/src/lib/services/documents.service.ts
Matt Ciaccio 63c4073e64 fix(audit-verification): regressions found in post-Tier-6 review
Two parallel reviews of the Tier 0–6 work surfaced one CRITICAL
regression and a handful of remaining cross-tenant gaps that the
original audit didn't enumerate. All fixed here:

CRITICAL
* document-reminders.processReminderQueue — the new bulk-fetch
  leftJoin to documentTemplates was scoped on `templateType` alone.
  Templates of the same type exist in every port; the cartesian
  explosion would have fired one Documenso reminder PER matching
  template-row per cron tick (a 5-port deploy = 5 reminders to the
  same signer per cycle). Added eq(documentTemplates.portId, portId)
  to the join.
* All five remaining Documenso webhook handlers (RecipientSigned /
  Completed / Opened / Rejected / Cancelled) accept and require an
  optional portId now, with a shared resolveWebhookDocument() helper
  that refuses to mutate when the lookup is ambiguous across tenants
  without a resolved port. Tier 5's port-scoping was applied only to
  Expired; the route now forwards the matched portId to every
  handler. Tightens the WHERE clauses on subsequent UPDATEs to (id,
  portId) for defense-in-depth.

HIGH
* verifyDocumensoSecret rejects when `expected` is empty —
  timingSafeEqual(0-bytes, 0-bytes) was returning true, so a dev env
  with a blank DOCUMENSO_WEBHOOK_SECRET would accept a request whose
  X-Documenso-Secret header was also missing/empty.
  listDocumensoWebhookSecrets skips the env entry when blank.
* /api/public/health — the website-intake-secret comparison was a
  string `===` (not constant-time). Switched to timingSafeEqual via
  Buffer.from().

MEDIUM
* server.ts SIGTERM ordering — Socket.io closes BEFORE the HTTP
  drain so long-poll websockets stop holding the server open past
  the compose stop_grace_period.
* /api/v1/me PATCH preferences merge — allow-list filter on the
  merged JSONB so legacy rows from the old .passthrough() era stop
  silently re-shipping their bloat to disk.

Migration fixes (deploy-blocking)
* 0041 referenced `port_role_overrides.permissions` (column is
  `permission_overrides`) — overrides are partial JSONB and don't
  need backfilling at all (deepMerge resolves edit from the base
  role). Removed the override UPDATEs entirely.
* 0042 switched all FK + CHECK adds to NOT VALID + VALIDATE so the
  brief table-lock phase is decoupled from the row-scan validation,
  giving a cleaner abort-and-restart story if a constraint catches
  dirty production data. Added a pre-cleanup UPDATE for
  invoices.billing_entity_id = '' rows (backfills from clientName,
  falls back to the row id) so the new non-empty CHECK passes on a
  dirty table.

Test status: 1175/1175 vitest, tsc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 21:19:39 +02:00

1571 lines
50 KiB
TypeScript

import { and, count, eq, gte, inArray, lt, lte, ne, sql, exists } from 'drizzle-orm';
import { db } from '@/lib/db';
import {
documents,
documentSigners,
documentEvents,
documentWatchers,
files,
} from '@/lib/db/schema/documents';
import { interests } from '@/lib/db/schema/interests';
import { clients } from '@/lib/db/schema/clients';
import { companies } from '@/lib/db/schema/companies';
import { yachts } from '@/lib/db/schema/yachts';
import { berthReservations } from '@/lib/db/schema/reservations';
import { ports } from '@/lib/db/schema/ports';
import { userProfiles, userPortRoles } from '@/lib/db/schema/users';
import { buildListQuery } from '@/lib/db/query-builder';
import { createAuditLog, type AuditMeta } from '@/lib/audit';
import { diffEntity } from '@/lib/entity-diff';
import { CodedError, NotFoundError, ValidationError, ConflictError } from '@/lib/errors';
import { emitToRoom } from '@/lib/socket/server';
import { buildStoragePath } from '@/lib/minio';
import { getStorageBackend } from '@/lib/storage';
import { env } from '@/lib/env';
import { logger } from '@/lib/logger';
import { evaluateRule } from '@/lib/services/berth-rules-engine';
import { PIPELINE_STAGES } from '@/lib/constants';
import { advanceStageIfBehind } from '@/lib/services/interests.service';
import {
createDocument as documensoCreate,
sendDocument as documensoSend,
downloadSignedPdf,
voidDocument as documensoVoid,
} from '@/lib/services/documenso-client';
import { getPortEoiSigners } from '@/lib/services/documenso-payload';
import type {
CreateDocumentInput,
UpdateDocumentInput,
ListDocumentsInput,
} from '@/lib/validators/documents';
// ─── Types ────────────────────────────────────────────────────────────────────
// ─── List ─────────────────────────────────────────────────────────────────────
import { documentWatchers as documentWatchersTable } from '@/lib/db/schema/documents';
const NON_SIGNATURE_TYPES = [
'welcome_letter',
'handover_checklist',
'acknowledgment',
'correspondence',
];
function buildHubTabFilters(
tab: ListDocumentsInput['tab'],
currentUserEmail: string | undefined,
): ReturnType<typeof and>[] {
const filters: ReturnType<typeof and>[] = [];
if (!tab || tab === 'all') return filters;
switch (tab) {
case 'eoi_queue':
// EOI documents currently in-flight (drafted, sent, or partially signed).
// Used by the dedicated tab on the documents hub to triage EOI signing
// pipeline volume separate from the all-doc-types view.
filters.push(eq(documents.documentType, 'eoi'));
filters.push(inArray(documents.status, ['draft', 'sent', 'partially_signed']));
break;
case 'awaiting_them':
// "awaiting them" = pending signers other than the current user.
// Without a known caller email we cannot make that distinction, so
// short-circuit to empty rather than silently widen the result set.
if (!currentUserEmail) {
filters.push(sql`1 = 0`);
break;
}
filters.push(inArray(documents.status, ['sent', 'partially_signed']));
filters.push(
exists(
db
.select({ x: sql`1` })
.from(documentSigners)
.where(
and(
eq(documentSigners.documentId, documents.id),
eq(documentSigners.status, 'pending'),
ne(documentSigners.signerEmail, currentUserEmail),
),
),
),
);
break;
case 'awaiting_me':
if (!currentUserEmail) {
// Without a current-user email there is no concept of "awaiting me"
filters.push(sql`1 = 0`);
break;
}
filters.push(
exists(
db
.select({ x: sql`1` })
.from(documentSigners)
.where(
and(
eq(documentSigners.documentId, documents.id),
eq(documentSigners.status, 'pending'),
eq(documentSigners.signerEmail, currentUserEmail),
),
),
),
);
break;
case 'completed':
filters.push(inArray(documents.status, ['completed', 'signed']));
break;
case 'expired':
// Either explicitly expired, or in-flight past their expiry date.
// (Documents schema doesn't yet have an `expires_at` column, so for
// now this is just status='expired' - extend when expiry lands.)
filters.push(eq(documents.status, 'expired'));
break;
}
return filters;
}
export interface ListDocumentsExtra {
/** Email of the calling user - used by hub tab filtering for "awaiting me". */
currentUserEmail?: string;
}
export async function listDocuments(
portId: string,
query: ListDocumentsInput,
extra: ListDocumentsExtra = {},
) {
const {
page,
limit,
sort,
order,
search,
interestId,
clientId,
documentType,
status,
tab,
watcherUserId,
signatureOnly,
sentSince,
sentUntil,
} = query;
const filters: ReturnType<typeof and>[] = [];
if (interestId) filters.push(eq(documents.interestId, interestId));
if (clientId) filters.push(eq(documents.clientId, clientId));
if (documentType) filters.push(eq(documents.documentType, documentType));
if (status) filters.push(eq(documents.status, status));
if (sentSince) filters.push(gte(documents.createdAt, new Date(sentSince)));
if (sentUntil) filters.push(lte(documents.createdAt, new Date(sentUntil)));
if (signatureOnly === true) {
filters.push(
sql`${documents.documentType} NOT IN ('welcome_letter','handover_checklist','acknowledgment','correspondence')`,
);
} else if (signatureOnly === false) {
// Pass-through, no extra filter needed.
}
if (watcherUserId) {
filters.push(
exists(
db
.select({ x: sql`1` })
.from(documentWatchersTable)
.where(
and(
eq(documentWatchersTable.documentId, documents.id),
eq(documentWatchersTable.userId, watcherUserId),
),
),
),
);
}
filters.push(...buildHubTabFilters(tab, extra.currentUserEmail));
void NON_SIGNATURE_TYPES;
void lt;
const sortColumn =
sort === 'title'
? documents.title
: sort === 'status'
? documents.status
: sort === 'documentType'
? documents.documentType
: documents.createdAt;
return buildListQuery({
table: documents,
portIdColumn: documents.portId,
portId,
idColumn: documents.id,
updatedAtColumn: documents.updatedAt,
searchColumns: [documents.title],
searchTerm: search,
filters: filters.filter(Boolean) as Parameters<typeof buildListQuery>[0]['filters'],
sort: sort ? { column: sortColumn, direction: order } : undefined,
page,
pageSize: limit,
});
}
// ─── Hub tab counts ───────────────────────────────────────────────────────────
export interface HubTabCounts {
all: number;
eoi_queue: number;
awaiting_them: number;
awaiting_me: number;
completed: number;
expired: number;
}
/**
* Compute hub tab counts in a single roundtrip per tab. Uses
* idx_docs_status_port for cheap aggregation.
*/
export async function getHubTabCounts(
portId: string,
currentUserEmail: string | undefined,
): Promise<HubTabCounts> {
async function tabCount(tab: ListDocumentsInput['tab']): Promise<number> {
const filters: ReturnType<typeof and>[] = [eq(documents.portId, portId)];
filters.push(...buildHubTabFilters(tab, currentUserEmail));
const [row] = await db
.select({ count: count() })
.from(documents)
.where(and(...filters));
return row?.count ?? 0;
}
const [all, eoi_queue, awaiting_them, awaiting_me, completed, expired] = await Promise.all([
tabCount('all'),
tabCount('eoi_queue'),
tabCount('awaiting_them'),
tabCount('awaiting_me'),
tabCount('completed'),
tabCount('expired'),
]);
return { all, eoi_queue, awaiting_them, awaiting_me, completed, expired };
}
// ─── Get by ID ────────────────────────────────────────────────────────────────
export async function getDocumentById(id: string, portId: string) {
const doc = await db.query.documents.findFirst({
where: and(eq(documents.id, id), eq(documents.portId, portId)),
with: { signers: true },
});
if (!doc) throw new NotFoundError('Document');
return doc;
}
/**
* Reject any subject FK (clientId / interestId / companyId / yachtId /
* reservationId) that points at a row outside the caller's port. Without
* this guard, a port-A user could create a document whose subject is a
* port-B client and then exfiltrate the foreign client's name + email
* via sendForSigning's Documenso payload, or via the local watcher /
* notification surfaces that hydrate the linked entity.
*/
async function assertSubjectFksInPort(
portId: string,
fks: {
clientId?: string | null;
interestId?: string | null;
companyId?: string | null;
yachtId?: string | null;
reservationId?: string | null;
},
): Promise<void> {
const checks: Array<Promise<void>> = [];
if (fks.clientId) {
checks.push(
db.query.clients
.findFirst({ where: and(eq(clients.id, fks.clientId), eq(clients.portId, portId)) })
.then((row) => {
if (!row) throw new ValidationError('clientId not found in this port');
}),
);
}
if (fks.interestId) {
checks.push(
db.query.interests
.findFirst({
where: and(eq(interests.id, fks.interestId), eq(interests.portId, portId)),
})
.then((row) => {
if (!row) throw new ValidationError('interestId not found in this port');
}),
);
}
if (fks.companyId) {
checks.push(
db.query.companies
.findFirst({
where: and(eq(companies.id, fks.companyId), eq(companies.portId, portId)),
})
.then((row) => {
if (!row) throw new ValidationError('companyId not found in this port');
}),
);
}
if (fks.yachtId) {
checks.push(
db.query.yachts
.findFirst({ where: and(eq(yachts.id, fks.yachtId), eq(yachts.portId, portId)) })
.then((row) => {
if (!row) throw new ValidationError('yachtId not found in this port');
}),
);
}
if (fks.reservationId) {
checks.push(
db.query.berthReservations
.findFirst({
where: and(
eq(berthReservations.id, fks.reservationId),
eq(berthReservations.portId, portId),
),
})
.then((row) => {
if (!row) throw new ValidationError('reservationId not found in this port');
}),
);
}
await Promise.all(checks);
}
/**
* Reject watchers whose user does not have access to the document's port.
* Without this guard, a document watcher row could be created for a user
* outside the document's tenant; subsequent notifyDocumentEvent emits a
* socket toast + email to that user revealing the document's title.
* Super-admins are always allowed (they can watch anything).
*/
async function assertWatchersInPort(portId: string, userIds: string[]): Promise<void> {
if (userIds.length === 0) return;
const unique = [...new Set(userIds)];
// Super-admins bypass the port check.
const profiles = await db.query.userProfiles.findMany({
where: inArray(userProfiles.userId, unique),
});
const superAdmins = new Set(profiles.filter((p) => p.isSuperAdmin).map((p) => p.userId));
const needsCheck = unique.filter((u) => !superAdmins.has(u));
if (needsCheck.length === 0) return;
const roles = await db
.select({ userId: userPortRoles.userId })
.from(userPortRoles)
.where(and(inArray(userPortRoles.userId, needsCheck), eq(userPortRoles.portId, portId)));
const allowed = new Set(roles.map((r) => r.userId));
const denied = needsCheck.filter((u) => !allowed.has(u));
if (denied.length > 0) {
throw new ValidationError('One or more watchers do not have access to this port');
}
}
// ─── Create ───────────────────────────────────────────────────────────────────
export async function createDocument(portId: string, data: CreateDocumentInput, meta: AuditMeta) {
await assertSubjectFksInPort(portId, {
clientId: data.clientId,
interestId: data.interestId,
});
const [doc] = await db
.insert(documents)
.values({
portId,
interestId: data.interestId ?? null,
clientId: data.clientId ?? null,
documentType: data.documentType,
title: data.title,
notes: data.notes ?? null,
status: 'draft',
createdBy: meta.userId,
})
.returning();
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'document',
entityId: doc!.id,
newValue: { documentType: doc!.documentType, title: doc!.title },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:created', { documentId: doc!.id });
return doc!;
}
// ─── Update ───────────────────────────────────────────────────────────────────
export async function updateDocument(
id: string,
portId: string,
data: UpdateDocumentInput,
meta: AuditMeta,
) {
const existing = await getDocumentById(id, portId);
const updates: Partial<typeof documents.$inferInsert> = {};
if (data.title !== undefined) updates.title = data.title;
if (data.notes !== undefined) updates.notes = data.notes;
if (data.status !== undefined) updates.status = data.status;
updates.updatedAt = new Date();
const [updated] = await db
.update(documents)
.set(updates)
.where(and(eq(documents.id, id), eq(documents.portId, portId)))
.returning();
diffEntity(existing, updated!);
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'document',
entityId: id,
oldValue: existing as unknown as Record<string, unknown>,
newValue: updated as unknown as Record<string, unknown>,
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:updated', { documentId: id });
return updated!;
}
// ─── Delete ───────────────────────────────────────────────────────────────────
export async function deleteDocument(id: string, portId: string, meta: AuditMeta) {
const existing = await getDocumentById(id, portId);
if (['sent', 'partially_signed'].includes(existing.status)) {
throw new ConflictError('Cannot delete a document that is currently in signing process');
}
await db.delete(documents).where(and(eq(documents.id, id), eq(documents.portId, portId)));
void createAuditLog({
userId: meta.userId,
portId,
action: 'delete',
entityType: 'document',
entityId: id,
oldValue: { title: existing.title, status: existing.status },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:deleted', { documentId: id });
}
// ─── Send for Signing (BR-021) ────────────────────────────────────────────────
export async function sendForSigning(documentId: string, portId: string, meta: AuditMeta) {
const doc = await getDocumentById(documentId, portId);
if (!doc.fileId) throw new ValidationError('Document has no associated file');
if (doc.status !== 'draft') throw new ConflictError('Document is not in draft status');
// Fetch interest + client to build signers. Filter by portId in addition
// to the FK so that even if a stale or maliciously-set subject FK on the
// document points at a foreign-port row, this signing flow refuses to
// hydrate (and therefore refuses to ship to Documenso) data from outside
// the caller's tenant.
const interest = doc.interestId
? await db.query.interests.findFirst({
where: and(eq(interests.id, doc.interestId), eq(interests.portId, portId)),
})
: null;
const client = doc.clientId
? await db.query.clients.findFirst({
where: and(eq(clients.id, doc.clientId), eq(clients.portId, portId)),
with: { contacts: true },
})
: null;
if (!client) throw new ValidationError('Document has no associated client');
const emailContact = (
client.contacts as Array<{ channel: string; value: string }> | undefined
)?.find((c) => c.channel === 'email');
if (!emailContact?.value) throw new ValidationError('Client has no email contact');
const port = await db.query.ports.findFirst({ where: eq(ports.id, portId) });
if (!port) throw new NotFoundError('Port');
// Resolve port-configured signer emails from system settings; fall back to
// legacy defaults only if the setting is absent. Fabricated slug-based
// addresses (developer@{slug}.com) are no longer used here because they
// never match real port users and cause silent no-ops in handleRecipientSigned.
const eoiSigners = await getPortEoiSigners(portId);
// BR-021: Create 3 signers - client (1), developer (2), sales/approver (3)
const signerRecords = await db
.insert(documentSigners)
.values([
{
documentId,
signerName: client.fullName,
signerEmail: emailContact.value,
signerRole: 'client',
signingOrder: 1,
status: 'pending',
},
{
documentId,
signerName: eoiSigners.developer.name,
signerEmail: eoiSigners.developer.email,
signerRole: 'developer',
signingOrder: 2,
status: 'pending',
},
{
documentId,
signerName: eoiSigners.approver.name,
signerEmail: eoiSigners.approver.email,
signerRole: 'approver',
signingOrder: 3,
status: 'pending',
},
])
.returning();
// Get file from MinIO and base64 encode
const fileRecord = await db.query.files.findFirst({ where: eq(files.id, doc.fileId) });
if (!fileRecord) throw new NotFoundError('File');
const fileStream = await (await getStorageBackend()).get(fileRecord.storagePath);
const chunks: Buffer[] = [];
for await (const chunk of fileStream) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
const pdfBuffer = Buffer.concat(chunks);
const pdfBase64 = pdfBuffer.toString('base64');
// Create document in Documenso + send
const documensoDoc = await documensoCreate(doc.title, pdfBase64, [
{ name: client.fullName, email: emailContact.value, role: 'SIGNER', signingOrder: 1 },
{
name: eoiSigners.developer.name,
email: eoiSigners.developer.email,
role: 'SIGNER',
signingOrder: 2,
},
{
name: eoiSigners.approver.name,
email: eoiSigners.approver.email,
role: 'SIGNER',
signingOrder: 3,
},
]);
await documensoSend(documensoDoc.id);
// Update signer records with signing URLs from Documenso response
for (const docSigner of documensoDoc.recipients) {
const localSigner = signerRecords.find((s) => s.signerEmail === docSigner.email);
if (localSigner) {
await db
.update(documentSigners)
.set({
signingUrl: docSigner.signingUrl ?? null,
embeddedUrl: docSigner.embeddedUrl ?? null,
})
.where(eq(documentSigners.id, localSigner.id));
}
}
// Update document status
await db
.update(documents)
.set({ status: 'sent', documensoId: documensoDoc.id, updatedAt: new Date() })
.where(eq(documents.id, documentId));
// Update interest if linked
if (interest) {
await db
.update(interests)
.set({
documensoId: documensoDoc.id,
dateEoiSent: new Date(),
eoiStatus: 'waiting_for_signatures',
updatedAt: new Date(),
})
.where(eq(interests.id, interest.id));
// Trigger berth rules
void evaluateRule('eoi_sent', interest.id, portId, meta);
// Advance pipeline stage to eoi_sent (no-op if already further along).
void advanceStageIfBehind(interest.id, portId, 'eoi_sent', meta, 'EOI sent for signing');
}
// Create document event
await db.insert(documentEvents).values({
documentId,
eventType: 'sent',
eventData: { documensoId: documensoDoc.id },
});
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'document',
entityId: documentId,
newValue: { status: 'sent', documensoId: documensoDoc.id },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:sent', {
documentId,
type: doc.documentType,
signerCount: 3,
documensoId: documensoDoc.id,
});
return await getDocumentById(documentId, portId);
}
// ─── Upload Signed Manually (BR-013) ─────────────────────────────────────────
export async function uploadSignedManually(
documentId: string,
portId: string,
fileData: { buffer: Buffer; originalName: string; mimeType: string; size: number },
meta: AuditMeta,
) {
const doc = await getDocumentById(documentId, portId);
const port = await db.query.ports.findFirst({ where: eq(ports.id, portId) });
if (!port) throw new NotFoundError('Port');
// Store the signed file
const fileId = crypto.randomUUID();
const storagePath = buildStoragePath(port.slug, 'eoi-signed', documentId, fileId, 'pdf');
await (
await getStorageBackend()
).put(storagePath, fileData.buffer, {
contentType: fileData.mimeType,
sizeBytes: fileData.size,
});
const [fileRecord] = await db
.insert(files)
.values({
portId,
clientId: doc.clientId ?? null,
filename: fileData.originalName,
originalName: fileData.originalName,
mimeType: fileData.mimeType,
sizeBytes: String(fileData.size),
storagePath,
storageBucket: env.MINIO_BUCKET,
category: 'eoi',
uploadedBy: meta.userId,
})
.returning();
// Update document
await db
.update(documents)
.set({
signedFileId: fileRecord!.id,
status: 'completed',
isManualUpload: true,
updatedAt: new Date(),
})
.where(eq(documents.id, documentId));
// Update interest if linked and type is eoi
if (doc.interestId && doc.documentType === 'eoi') {
const interest = await db.query.interests.findFirst({
where: eq(interests.id, doc.interestId),
});
await db
.update(interests)
.set({ eoiStatus: 'signed', dateEoiSigned: new Date(), updatedAt: new Date() })
.where(eq(interests.id, doc.interestId));
if (interest) {
void evaluateRule('eoi_signed', doc.interestId, portId, meta);
// Advance to eoi_signed (no-op if already past it).
void advanceStageIfBehind(
doc.interestId,
portId,
'eoi_signed',
meta,
'Signed EOI uploaded manually',
);
}
}
await db.insert(documentEvents).values({
documentId,
eventType: 'completed',
eventData: { isManualUpload: true, fileId: fileRecord!.id },
});
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'document',
entityId: documentId,
newValue: { status: 'completed', isManualUpload: true },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:completed', { documentId });
// Notify creator about manual completion
void import('@/lib/services/notifications.service').then(({ createNotification }) =>
createNotification({
portId,
userId: meta.userId,
type: 'document_signed',
title: 'Document marked as signed',
description: `"${doc.title}" has been manually uploaded as signed`,
link: `/documents/${documentId}`,
entityType: 'document',
entityId: documentId,
dedupeKey: `document:${documentId}:completed`,
}),
);
return await getDocumentById(documentId, portId);
}
// ─── List Signers ─────────────────────────────────────────────────────────────
export async function listDocumentSigners(documentId: string, portId: string) {
await getDocumentById(documentId, portId); // verify access
return db.query.documentSigners.findMany({
where: eq(documentSigners.documentId, documentId),
orderBy: (ds, { asc }) => [asc(ds.signingOrder)],
});
}
// ─── List Events ──────────────────────────────────────────────────────────────
export async function listDocumentEvents(documentId: string, portId: string) {
await getDocumentById(documentId, portId); // verify access
return db.query.documentEvents.findMany({
where: eq(documentEvents.documentId, documentId),
orderBy: (de, { desc }) => [desc(de.createdAt)],
});
}
// ─── Webhook Handlers ─────────────────────────────────────────────────────────
/**
* Shared port-scoped lookup for inbound Documenso webhooks. Two ports
* sharing a Documenso instance — or migrating between instances with
* documentId reuse — would otherwise let `findFirst` return whichever
* row sorts first across tenants. When the route resolves a portId from
* the matched per-port webhook secret it threads it here; otherwise we
* fall back to a port-agnostic `findMany` and refuse to mutate when the
* lookup is ambiguous (mirrors the guard in handleDocumentExpired).
*
* Returns null when no document matches OR when the lookup is ambiguous
* across multiple ports without a resolved portId. Callers must treat
* null as "drop the event" (the cron sweep / next webhook will catch up
* once the data is consistent).
*/
async function resolveWebhookDocument(
documensoId: string,
portId: string | undefined,
): Promise<typeof documents.$inferSelect | null> {
if (portId) {
const doc = await db.query.documents.findFirst({
where: and(eq(documents.documensoId, documensoId), eq(documents.portId, portId)),
});
if (!doc) {
logger.warn({ documensoId, portId }, 'Document not found for webhook (port-scoped)');
return null;
}
return doc;
}
const matches = await db.query.documents.findMany({
where: eq(documents.documensoId, documensoId),
});
if (matches.length === 0) {
logger.warn({ documensoId }, 'Document not found for webhook');
return null;
}
if (matches.length > 1) {
logger.error(
{ documensoId, matchCount: matches.length, ports: matches.map((m) => m.portId) },
'Documenso webhook ambiguous across multiple ports — refusing to mutate',
);
return null;
}
return matches[0]!;
}
export async function handleRecipientSigned(eventData: {
documentId: string;
recipientEmail: string;
signatureHash?: string;
portId?: string;
}) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
// Update signer status
const [signer] = await db
.update(documentSigners)
.set({ status: 'signed', signedAt: new Date() })
.where(
and(
eq(documentSigners.documentId, doc.id),
eq(documentSigners.signerEmail, eventData.recipientEmail),
),
)
.returning();
if (!signer) {
// Email mismatch: the address Documenso has on the recipient doesn't match
// any row in documentSigners. This happens when the local signers were
// created with fabricated / stale addresses. Log a warning so operators can
// investigate and fix the port's eoi_signers system setting.
logger.warn(
{
documensoId: eventData.documentId,
documentId: doc.id,
recipientEmail: eventData.recipientEmail,
},
'handleRecipientSigned: no matching signer row for recipient email - ' +
'check eoi_signers system setting for this port',
);
}
// Update document to partially_signed if eoi type
if (doc.documentType === 'eoi' && doc.status === 'sent') {
await db
.update(documents)
.set({ status: 'partially_signed', updatedAt: new Date() })
.where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId)));
}
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'signed',
signerId: signer?.id ?? null,
signatureHash: eventData.signatureHash ?? null,
eventData: { recipientEmail: eventData.recipientEmail },
});
emitToRoom(`port:${doc.portId}`, 'document:signer:signed', {
documentId: doc.id,
signerEmail: eventData.recipientEmail,
});
}
export async function handleDocumentCompleted(eventData: { documentId: string; portId?: string }) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
// BR-022: Download signed PDF and store in MinIO
const port = await db.query.ports.findFirst({ where: eq(ports.id, doc.portId) });
if (!port) {
logger.error({ portId: doc.portId }, 'Port not found during document completion');
return;
}
try {
const signedPdfBuffer = await downloadSignedPdf(eventData.documentId);
const fileId = crypto.randomUUID();
const storagePath = buildStoragePath(port.slug, 'eoi-signed', doc.id, fileId, 'pdf');
await (
await getStorageBackend()
).put(storagePath, signedPdfBuffer, {
contentType: 'application/pdf',
sizeBytes: signedPdfBuffer.length,
});
const [fileRecord] = await db
.insert(files)
.values({
portId: doc.portId,
clientId: doc.clientId ?? null,
filename: `signed-${doc.id}.pdf`,
originalName: `signed-${doc.id}.pdf`,
mimeType: 'application/pdf',
sizeBytes: String(signedPdfBuffer.length),
storagePath,
storageBucket: env.MINIO_BUCKET,
category: 'eoi',
uploadedBy: 'system',
})
.returning();
await db
.update(documents)
.set({ status: 'completed', signedFileId: fileRecord!.id, updatedAt: new Date() })
.where(eq(documents.id, doc.id));
// Reservation agreements mirror their signed PDF onto
// berth_reservations.contractFileId so the portal "My Reservations" view
// can resolve the contract without joining through documents.
if (doc.documentType === 'reservation_agreement' && doc.reservationId) {
const { berthReservations } = await import('@/lib/db/schema/reservations');
await db
.update(berthReservations)
.set({ contractFileId: fileRecord!.id, updatedAt: new Date() })
.where(eq(berthReservations.id, doc.reservationId));
}
} catch (err) {
logger.error({ err, documentId: doc.id }, 'Failed to download/store signed PDF');
await db
.update(documents)
.set({ status: 'completed', updatedAt: new Date() })
.where(eq(documents.id, doc.id));
}
// Update interest if eoi type
if (doc.interestId && doc.documentType === 'eoi') {
const interest = await db.query.interests.findFirst({
where: eq(interests.id, doc.interestId),
});
await db
.update(interests)
.set({ eoiStatus: 'signed', dateEoiSigned: new Date(), updatedAt: new Date() })
.where(eq(interests.id, doc.interestId));
if (interest) {
const systemMeta: AuditMeta = {
userId: 'system',
portId: doc.portId,
ipAddress: '0.0.0.0',
userAgent: 'webhook',
};
// Guard against double-fire: DOCUMENT_COMPLETED may arrive multiple times
// (webhook retries) or follow a DOCUMENT_SIGNED that already advanced the
// stage. advanceStageIfBehind handles the pipeline guard internally, but
// evaluateRule has no idempotency - skip it if the interest is already at
// eoi_signed or beyond to prevent duplicate berth-rule side effects.
const currentStageIdx = PIPELINE_STAGES.indexOf(
interest.pipelineStage as (typeof PIPELINE_STAGES)[number],
);
const eoiSignedIdx = PIPELINE_STAGES.indexOf('eoi_signed');
if (currentStageIdx < eoiSignedIdx) {
void evaluateRule('eoi_signed', doc.interestId, doc.portId, systemMeta);
}
// Advance to eoi_signed (no-op if interest already past it).
void advanceStageIfBehind(
doc.interestId,
doc.portId,
'eoi_signed',
systemMeta,
'EOI signed via Documenso',
);
}
}
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'completed',
eventData: { documensoId: eventData.documentId },
});
emitToRoom(`port:${doc.portId}`, 'document:completed', { documentId: doc.id });
// Notify the document creator about completion
if (doc.createdBy && doc.createdBy !== 'system') {
void import('@/lib/services/notifications.service').then(({ createNotification }) =>
createNotification({
portId: doc.portId,
userId: doc.createdBy!,
type: 'document_signed',
title: 'Document fully signed',
description: `"${doc.title}" has been signed by all parties`,
link: `/documents/${doc.id}`,
entityType: 'document',
entityId: doc.id,
dedupeKey: `document:${doc.id}:completed`,
}),
);
}
}
export async function handleDocumentExpired(eventData: { documentId: string; portId?: string }) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
await db
.update(documents)
.set({ status: 'expired', updatedAt: new Date() })
.where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId)));
if (doc.interestId && doc.documentType === 'eoi') {
await db
.update(interests)
.set({ eoiStatus: 'expired', updatedAt: new Date() })
.where(and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId)));
}
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'expired',
eventData: { documensoId: eventData.documentId },
});
emitToRoom(`port:${doc.portId}`, 'document:expired', { documentId: doc.id });
}
export async function handleDocumentOpened(eventData: {
documentId: string;
recipientEmail: string;
signatureHash?: string;
portId?: string;
}) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
const [signer] = await db
.select()
.from(documentSigners)
.where(
and(
eq(documentSigners.documentId, doc.id),
eq(documentSigners.signerEmail, eventData.recipientEmail),
),
);
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'viewed',
signerId: signer?.id ?? null,
signatureHash: eventData.signatureHash ?? null,
eventData: { recipientEmail: eventData.recipientEmail },
});
emitToRoom(`port:${doc.portId}`, 'document:signer:opened', {
documentId: doc.id,
signerEmail: eventData.recipientEmail,
});
}
export async function handleDocumentRejected(eventData: {
documentId: string;
recipientEmail?: string;
signatureHash?: string;
portId?: string;
}) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
let signerId: string | null = null;
if (eventData.recipientEmail) {
const [signer] = await db
.update(documentSigners)
.set({ status: 'declined' })
.where(
and(
eq(documentSigners.documentId, doc.id),
eq(documentSigners.signerEmail, eventData.recipientEmail),
),
)
.returning();
signerId = signer?.id ?? null;
}
await db
.update(documents)
.set({ status: 'rejected', updatedAt: new Date() })
.where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId)));
if (doc.interestId && doc.documentType === 'eoi') {
await db
.update(interests)
.set({ eoiStatus: 'rejected', updatedAt: new Date() })
.where(and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId)));
}
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'rejected',
signerId,
signatureHash: eventData.signatureHash ?? null,
eventData: { recipientEmail: eventData.recipientEmail ?? null },
});
emitToRoom(`port:${doc.portId}`, 'document:rejected', {
documentId: doc.id,
signerEmail: eventData.recipientEmail ?? null,
});
}
export async function handleDocumentCancelled(eventData: {
documentId: string;
signatureHash?: string;
portId?: string;
}) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
await db
.update(documents)
.set({ status: 'cancelled', updatedAt: new Date() })
.where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId)));
if (doc.interestId && doc.documentType === 'eoi') {
await db
.update(interests)
.set({ eoiStatus: 'cancelled', updatedAt: new Date() })
.where(and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId)));
}
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'cancelled',
signatureHash: eventData.signatureHash ?? null,
eventData: { documensoId: eventData.documentId },
});
emitToRoom(`port:${doc.portId}`, 'document:cancelled', { documentId: doc.id });
}
// ─── Phase A: hub + wizard surface (PR1 skeletons; bodies land in PRs 4-6) ────
export interface DocumentDetailWatcher {
userId: string;
addedBy: string;
addedAt: Date;
}
export interface DocumentDetail {
document: typeof documents.$inferSelect;
signers: (typeof documentSigners.$inferSelect)[];
events: (typeof documentEvents.$inferSelect)[];
watchers: DocumentDetailWatcher[];
}
/**
* Single-roundtrip aggregator for the document detail page (PR5).
* Returns the document plus all signers, events (newest first), and watchers.
* Throws NotFoundError if the document is not in `portId`.
*/
export async function getDocumentDetail(id: string, portId: string): Promise<DocumentDetail> {
const document = await getDocumentById(id, portId);
const [signers, events, watchers] = await Promise.all([
db.query.documentSigners.findMany({
where: eq(documentSigners.documentId, id),
orderBy: (ds, { asc }) => [asc(ds.signingOrder)],
}),
db.query.documentEvents.findMany({
where: eq(documentEvents.documentId, id),
orderBy: (de, { desc }) => [desc(de.createdAt)],
}),
db
.select({
userId: documentWatchers.userId,
addedBy: documentWatchers.addedBy,
addedAt: documentWatchers.addedAt,
})
.from(documentWatchers)
.where(eq(documentWatchers.documentId, id)),
]);
return { document, signers, events, watchers };
}
/**
* User-initiated cancel of an in-flight document. Voids the doc in Documenso
* (when present), updates DB status, logs an event, emits socket. Webhook
* receiver also handles documenso-initiated cancellations via
* `handleDocumentCancelled`.
*
* The actual Documenso void call lands in PR2 (`documenso-client.voidDocument`);
* this skeleton updates DB state only.
*/
export async function cancelDocument(
documentId: string,
portId: string,
meta: AuditMeta,
): Promise<typeof documents.$inferSelect> {
const existing = await getDocumentById(documentId, portId);
if (['completed', 'cancelled', 'rejected'].includes(existing.status)) {
throw new ConflictError(`Document is already ${existing.status}`);
}
// CRM is the system of record for cancellation status. A transient
// Documenso failure shouldn't block the user from marking the doc cancelled
// here - voidDocument already treats 404 as success, and the periodic
// webhook receiver will reconcile if the remote void eventually lands.
if (existing.documensoId) {
try {
await documensoVoid(existing.documensoId, portId);
} catch (err) {
logger.warn(
{ err, documentId, documensoId: existing.documensoId },
'Documenso void failed; cancelling locally anyway',
);
}
}
const [updated] = await db
.update(documents)
.set({ status: 'cancelled', updatedAt: new Date() })
.where(and(eq(documents.id, documentId), eq(documents.portId, portId)))
.returning();
await db.insert(documentEvents).values({
documentId,
eventType: 'cancelled',
eventData: { initiatedBy: meta.userId },
});
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'document',
entityId: documentId,
oldValue: { status: existing.status },
newValue: { status: 'cancelled' },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:cancelled', { documentId });
return updated!;
}
/**
* Returns prefilled email composer payload for the "Email signed PDF to all
* signatories" action on the document detail page.
*
* Available for `status='completed' && signedFileId !== null`.
*
* Body content (from per-port `signed_doc_completion` template), full
* sender-resolution, and watcher-cc helpers land alongside PR8 (email
* composer with attachments). For PR1 this returns the minimal correct
* recipients + auto-attachment shape so detail-page integration tests can
* assert against it.
*/
export interface ComposeSignedDocEmailResult {
to: string[];
cc: string[];
subject: string;
body: string;
attachments: Array<{ fileId: string; filename?: string }>;
defaultSenderType: 'system' | 'user';
}
export async function composeSignedDocEmail(
documentId: string,
portId: string,
): Promise<ComposeSignedDocEmailResult> {
const doc = await getDocumentById(documentId, portId);
if (doc.status !== 'completed') {
throw new ConflictError('Document is not completed');
}
if (!doc.signedFileId) {
throw new ValidationError('Document has no signed PDF');
}
const signers = await db
.select({ email: documentSigners.signerEmail })
.from(documentSigners)
.where(eq(documentSigners.documentId, documentId));
const dedupedRecipients = Array.from(new Set(signers.map((s) => s.email)));
return {
to: dedupedRecipients,
cc: [],
subject: `Signed ${doc.documentType.replace(/_/g, ' ')} - ${doc.title}`,
body: '',
attachments: [{ fileId: doc.signedFileId }],
defaultSenderType: 'system',
};
}
// ─── Watchers (PR5) ───────────────────────────────────────────────────────────
export async function listDocumentWatchers(documentId: string, portId: string) {
await getDocumentById(documentId, portId); // port-scope check
return db
.select({
userId: documentWatchers.userId,
addedBy: documentWatchers.addedBy,
addedAt: documentWatchers.addedAt,
})
.from(documentWatchers)
.where(eq(documentWatchers.documentId, documentId));
}
export async function addDocumentWatcher(
documentId: string,
portId: string,
userId: string,
meta: AuditMeta,
): Promise<{ userId: string; addedAt: Date }> {
await getDocumentById(documentId, portId);
await assertWatchersInPort(portId, [userId]);
const [row] = await db
.insert(documentWatchers)
.values({ documentId, userId, addedBy: meta.userId })
.onConflictDoNothing()
.returning();
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'document_watcher',
entityId: documentId,
newValue: { documentId, watcherUserId: userId },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:updated', { documentId });
return row ? { userId: row.userId, addedAt: row.addedAt } : { userId, addedAt: new Date() };
}
export async function removeDocumentWatcher(
documentId: string,
portId: string,
userId: string,
meta: AuditMeta,
): Promise<void> {
await getDocumentById(documentId, portId);
await db
.delete(documentWatchers)
.where(and(eq(documentWatchers.documentId, documentId), eq(documentWatchers.userId, userId)));
void createAuditLog({
userId: meta.userId,
portId,
action: 'delete',
entityType: 'document_watcher',
entityId: documentId,
oldValue: { documentId, watcherUserId: userId },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:updated', { documentId });
}
/**
* Create-document wizard entry point (PR6).
*
* Dispatches across pathways:
* - 'documenso-template' - Documenso renders + signs from its own template
* - 'inapp' - render PDF locally from a CRM template, upload to Documenso
* - 'upload' - admin-supplied PDF, upload to Documenso (auto-place signature
* fields if `autoPlaceFields`)
*
* Persists the document, applies reminder overrides, attaches watchers, and
* triggers send when `sendImmediately`.
*/
import type { CreateDocumentWizardInput } from '@/lib/validators/documents';
export async function createFromWizard(
portId: string,
data: CreateDocumentWizardInput,
meta: AuditMeta,
): Promise<typeof documents.$inferSelect> {
if (data.source === 'upload') {
return createFromUpload(portId, data, meta);
}
if (!data.templateId) {
throw new ValidationError('templateId is required for template source');
}
await assertSubjectFksInPort(portId, {
clientId: data.clientId,
interestId: data.interestId,
companyId: data.companyId,
yachtId: data.yachtId,
reservationId: data.reservationId,
});
const [doc] = await db
.insert(documents)
.values({
portId,
interestId: data.interestId ?? null,
reservationId: data.reservationId ?? null,
clientId: data.clientId ?? null,
companyId: data.companyId ?? null,
yachtId: data.yachtId ?? null,
documentType: data.documentType,
title: data.title,
notes: data.notes ?? null,
status: 'draft',
remindersDisabled: data.remindersDisabled,
reminderCadenceOverride: data.reminderCadenceOverride ?? null,
createdBy: meta.userId,
})
.returning();
if (!doc)
throw new CodedError('INSERT_RETURNING_EMPTY', {
internalMessage: 'Failed to insert document',
});
if (data.watchers.length > 0) {
await assertWatchersInPort(portId, data.watchers);
await db.insert(documentWatchers).values(
data.watchers.map((userId) => ({
documentId: doc.id,
userId,
addedBy: meta.userId,
})),
);
}
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'document',
entityId: doc.id,
newValue: {
documentType: doc.documentType,
title: doc.title,
pathway: data.pathway,
source: data.source,
},
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:created', { documentId: doc.id });
return doc;
}
/**
* Upload-driven creation path. Files-service integration + Documenso upload
* + auto-place signature fields land alongside the realapi PR (PR11). For
* PR6 we persist the document row + signers + watchers and leave the
* Documenso upload step to the existing sendForSigning flow on first send.
*/
export async function createFromUpload(
portId: string,
data: CreateDocumentWizardInput,
meta: AuditMeta,
): Promise<typeof documents.$inferSelect> {
if (!data.uploadedFileId) {
throw new ValidationError('uploadedFileId is required for upload source');
}
if (!data.signers || data.signers.length === 0) {
throw new ValidationError('signers are required for upload source');
}
const fileRecord = await db.query.files.findFirst({
where: and(eq(files.id, data.uploadedFileId), eq(files.portId, portId)),
});
if (!fileRecord) {
throw new NotFoundError('File');
}
await assertSubjectFksInPort(portId, {
clientId: data.clientId,
interestId: data.interestId,
companyId: data.companyId,
yachtId: data.yachtId,
reservationId: data.reservationId,
});
const [doc] = await db
.insert(documents)
.values({
portId,
interestId: data.interestId ?? null,
reservationId: data.reservationId ?? null,
clientId: data.clientId ?? null,
companyId: data.companyId ?? null,
yachtId: data.yachtId ?? null,
documentType: data.documentType,
title: data.title,
notes: data.notes ?? null,
status: 'draft',
fileId: fileRecord.id,
remindersDisabled: data.remindersDisabled,
reminderCadenceOverride: data.reminderCadenceOverride ?? null,
createdBy: meta.userId,
})
.returning();
if (!doc)
throw new CodedError('INSERT_RETURNING_EMPTY', {
internalMessage: 'Failed to insert document',
});
await db.insert(documentSigners).values(
data.signers.map((s) => ({
documentId: doc.id,
signerName: s.signerName,
signerEmail: s.signerEmail,
signerRole: s.signerRole,
signingOrder: s.signingOrder,
status: 'pending' as const,
})),
);
if (data.watchers.length > 0) {
await assertWatchersInPort(portId, data.watchers);
await db.insert(documentWatchers).values(
data.watchers.map((userId) => ({
documentId: doc.id,
userId,
addedBy: meta.userId,
})),
);
}
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'document',
entityId: doc.id,
newValue: {
documentType: doc.documentType,
title: doc.title,
pathway: 'upload',
source: 'upload',
uploadedFileId: fileRecord.id,
},
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:created', { documentId: doc.id });
return doc;
}