import { and, desc, eq, gte, inArray, isNull, lt, lte, ne, sql, exists } from 'drizzle-orm'; import { db } from '@/lib/db'; import { documents, documentSigners, documentEvents, documentWatchers, files, } from '@/lib/db/schema/documents'; import { interests, interestBerths } from '@/lib/db/schema/interests'; import { clients } from '@/lib/db/schema/clients'; import { companies } from '@/lib/db/schema/companies'; import { yachts } from '@/lib/db/schema/yachts'; import { berthReservations } from '@/lib/db/schema/reservations'; import { ports } from '@/lib/db/schema/ports'; import { userProfiles, userPortRoles } from '@/lib/db/schema/users'; import { buildListQuery } from '@/lib/db/query-builder'; import { createAuditLog, toAuditJson, type AuditMeta } from '@/lib/audit'; import { diffEntity } from '@/lib/entity-diff'; import { CodedError, NotFoundError, ValidationError, ConflictError } from '@/lib/errors'; import { emitToRoom } from '@/lib/socket/server'; import { buildStoragePath } from '@/lib/minio'; import { getStorageBackend } from '@/lib/storage'; import { env } from '@/lib/env'; import { logger } from '@/lib/logger'; import { evaluateRule } from '@/lib/services/berth-rules-engine'; import { PIPELINE_STAGES } from '@/lib/constants'; import { advanceStageIfBehind } from '@/lib/services/interests.service'; import { createDocument as documensoCreate, sendDocument as documensoSend, downloadSignedPdf, voidDocument as documensoVoid, } from '@/lib/services/documenso-client'; import { getPortEoiSigners } from '@/lib/services/documenso-payload'; import { getPortDocumensoConfig } from '@/lib/services/port-config'; import { DOC_TYPE_LABEL, extractSigningToken, nextPendingSigner, } from '@/lib/services/documenso-signers'; import { sendSigningInvitation, sendSigningCompleted, type SignerRole, } from '@/lib/services/document-signing-emails.service'; import { listTree, collectDescendantIds, ensureEntityFolder, type FolderNode, type EntityType, } from '@/lib/services/document-folders.service'; import { assertEntityInPort, collectRelatedEntities } from '@/lib/services/files'; import type { CreateDocumentInput, UpdateDocumentInput, ListDocumentsInput, } from '@/lib/validators/documents'; // ─── Types ──────────────────────────────────────────────────────────────────── // ─── List ───────────────────────────────────────────────────────────────────── import { documentWatchers as documentWatchersTable } from '@/lib/db/schema/documents'; const NON_SIGNATURE_TYPES = [ 'welcome_letter', 'handover_checklist', 'acknowledgment', 'correspondence', ]; function buildHubTabFilters( tab: ListDocumentsInput['tab'], currentUserEmail: string | undefined, ): ReturnType[] { const filters: ReturnType[] = []; if (!tab || tab === 'all') return filters; switch (tab) { case 'in_progress': // All document types currently in-flight — the everyday "what's in flight" view. filters.push( inArray(documents.status, ['draft', 'sent', 'partially_signed']), sql`${documents.status} != 'expired'`, ); break; case 'eoi_queue': // EOI documents currently in-flight (drafted, sent, or partially signed). // Used by the dedicated tab on the documents hub to triage EOI signing // pipeline volume separate from the all-doc-types view. filters.push(eq(documents.documentType, 'eoi')); filters.push(inArray(documents.status, ['draft', 'sent', 'partially_signed'])); break; case 'awaiting_them': // "awaiting them" = pending signers other than the current user. // Without a known caller email we cannot make that distinction, so // short-circuit to empty rather than silently widen the result set. if (!currentUserEmail) { filters.push(sql`1 = 0`); break; } filters.push(inArray(documents.status, ['sent', 'partially_signed'])); filters.push( exists( db .select({ x: sql`1` }) .from(documentSigners) .where( and( eq(documentSigners.documentId, documents.id), eq(documentSigners.status, 'pending'), ne(documentSigners.signerEmail, currentUserEmail), ), ), ), ); break; case 'awaiting_me': if (!currentUserEmail) { // Without a current-user email there is no concept of "awaiting me" filters.push(sql`1 = 0`); break; } filters.push( exists( db .select({ x: sql`1` }) .from(documentSigners) .where( and( eq(documentSigners.documentId, documents.id), eq(documentSigners.status, 'pending'), eq(documentSigners.signerEmail, currentUserEmail), ), ), ), ); break; case 'completed': filters.push(inArray(documents.status, ['completed', 'signed'])); break; case 'expired': // Either explicitly expired, or in-flight past their expiry date. // (Documents schema doesn't yet have an `expires_at` column, so for // now this is just status='expired' - extend when expiry lands.) filters.push(eq(documents.status, 'expired')); break; } return filters; } export interface ListDocumentsExtra { /** Email of the calling user - used by hub tab filtering for "awaiting me". */ currentUserEmail?: string; } export async function listDocuments( portId: string, query: ListDocumentsInput, extra: ListDocumentsExtra = {}, ) { const { page, limit, sort, order, search, interestId, clientId, documentType, status, tab, watcherUserId, signatureOnly, sentSince, sentUntil, } = query; const filters: ReturnType[] = []; if (interestId) filters.push(eq(documents.interestId, interestId)); if (clientId) filters.push(eq(documents.clientId, clientId)); if (documentType) filters.push(eq(documents.documentType, documentType)); if (status) filters.push(eq(documents.status, status)); if (query.folderId !== undefined) { if (query.folderId === null) { filters.push(isNull(documents.folderId)); } else if (query.includeDescendants) { const tree = await listTree(portId); const descendantIds = collectDescendantIds(tree, query.folderId); filters.push(inArray(documents.folderId, [query.folderId, ...descendantIds])); } else { filters.push(eq(documents.folderId, query.folderId)); } // When viewing a specific folder, hide completed workflows — they surface // via their resulting signed-PDF file row in the Files section, not the // Signing section. filters.push(ne(documents.status, 'completed')); } if (sentSince) filters.push(gte(documents.createdAt, new Date(sentSince))); if (sentUntil) filters.push(lte(documents.createdAt, new Date(sentUntil))); if (signatureOnly === true) { filters.push( sql`${documents.documentType} NOT IN ('welcome_letter','handover_checklist','acknowledgment','correspondence')`, ); } else if (signatureOnly === false) { // Pass-through, no extra filter needed. } if (watcherUserId) { filters.push( exists( db .select({ x: sql`1` }) .from(documentWatchersTable) .where( and( eq(documentWatchersTable.documentId, documents.id), eq(documentWatchersTable.userId, watcherUserId), ), ), ), ); } filters.push(...buildHubTabFilters(tab, extra.currentUserEmail)); void NON_SIGNATURE_TYPES; void lt; const sortColumn = sort === 'title' ? documents.title : sort === 'status' ? documents.status : sort === 'documentType' ? documents.documentType : documents.createdAt; const result = await buildListQuery({ table: documents, portIdColumn: documents.portId, portId, idColumn: documents.id, updatedAtColumn: documents.updatedAt, searchColumns: [documents.title], searchTerm: search, filters: filters.filter(Boolean) as Parameters[0]['filters'], sort: sort ? { column: sortColumn, direction: order } : undefined, page, pageSize: limit, }); const hydrated = await hydrateDocumentsWithDownloadUrl(portId, result.data); return { ...result, data: hydrated }; } // ─── Download URL helpers ───────────────────────────────────────────────────── /** * Resolve the rep-facing download URL for a document. The URL embeds the * folder path + filename for browser-tab / shared-link readability, but the * route handler keys lookup off the doc id and validates the slug for truth * — a hand-edited URL with the wrong path 404s instead of silently serving * the wrong file. * * Pass the resolved folder tree once per request and call this for each doc * so we don't refetch the tree per row. Returns `null` when the document has * no attached file (signature-only docs pre-completion); UI consumers branch * on that to decide whether to render the download affordance. */ export function buildDocumentDownloadUrl( doc: { id: string; folderId: string | null; filename: string | null }, folderTree: readonly FolderNode[], ): string | null { if (!doc.filename) return null; const segments: string[] = []; if (doc.folderId) { const path = findFolderPath(folderTree, doc.folderId); for (const node of path) segments.push(encodeURIComponent(node.name)); } segments.push(encodeURIComponent(doc.filename)); return `/api/v1/documents/${doc.id}/download/${segments.join('/')}`; } /** * Walk the folder tree to materialize the ancestor chain that ends at * `id`. Returns roots-first; empty when the id is missing (orphan * `folder_id` pointer — see listTree's intentional silent drop). */ export function findFolderPath(tree: readonly FolderNode[], id: string): FolderNode[] { for (const node of tree) { if (node.id === id) return [node]; const inChild = findFolderPath(node.children, id); if (inChild.length > 0) return [node, ...inChild]; } return []; } type DocumentRow = typeof documents.$inferSelect; type DocumentRowWithDownload = DocumentRow & { downloadUrl: string | null }; async function hydrateDocumentsWithDownloadUrl( portId: string, rows: DocumentRow[], ): Promise { if (rows.length === 0) return []; const fileIds = Array.from( new Set(rows.map((r) => r.fileId).filter((v): v is string => v !== null)), ); const filenameById = new Map(); if (fileIds.length > 0) { const fileRows = await db .select({ id: files.id, filename: files.filename }) .from(files) .where(and(inArray(files.id, fileIds), eq(files.portId, portId))); for (const f of fileRows) filenameById.set(f.id, f.filename); } const tree = await listTree(portId); return rows.map((row) => { const filename = row.fileId ? (filenameById.get(row.fileId) ?? null) : null; return { ...row, downloadUrl: buildDocumentDownloadUrl({ id: row.id, folderId: row.folderId, filename }, tree), }; }); } // ─── Deal docs for a berth ──────────────────────────────────────────────────── export interface BerthDealDoc { id: string; title: string; documentType: string; status: string; createdAt: Date; interestId: string; } /** * Documents attached to any interest currently linked to this berth via * `interest_berths`. Used by the Deal Documents tab on the berth detail * page so reps can see EOIs / contracts / etc. associated with active * prospects on this slip without leaving the page. * * Read-only; visibility piggybacks on the interest tenancy (the * permission-gate on the berth page guards entry, and we only return * documents for interests in the same port). Edits / sends still happen * from the interest's own page. */ export async function listDealDocumentsForBerth( portId: string, berthId: string, ): Promise { const rows = await db .select({ id: documents.id, title: documents.title, documentType: documents.documentType, status: documents.status, createdAt: documents.createdAt, interestId: documents.interestId, }) .from(documents) .innerJoin(interestBerths, eq(interestBerths.interestId, documents.interestId)) .innerJoin(interests, eq(interests.id, documents.interestId)) .where( and( eq(interestBerths.berthId, berthId), eq(documents.portId, portId), eq(interests.portId, portId), ), ) .orderBy(sql`${documents.createdAt} DESC`); return rows .filter((r): r is typeof r & { interestId: string } => Boolean(r.interestId)) .map((r) => ({ id: r.id, title: r.title, documentType: r.documentType, status: r.status, createdAt: r.createdAt, interestId: r.interestId, })); } // ─── Get by ID ──────────────────────────────────────────────────────────────── export async function getDocumentById(id: string, portId: string) { const doc = await db.query.documents.findFirst({ where: and(eq(documents.id, id), eq(documents.portId, portId)), with: { signers: true }, }); if (!doc) throw new NotFoundError('Document'); let filename: string | null = null; if (doc.fileId) { const fileRow = await db .select({ filename: files.filename }) .from(files) .where(and(eq(files.id, doc.fileId), eq(files.portId, portId))) .limit(1) .then((r) => r[0]); filename = fileRow?.filename ?? null; } const tree = await listTree(portId); const downloadUrl = buildDocumentDownloadUrl( { id: doc.id, folderId: doc.folderId, filename }, tree, ); return { ...doc, downloadUrl }; } /** * Reject any subject FK (clientId / interestId / companyId / yachtId / * reservationId) that points at a row outside the caller's port. Without * this guard, a port-A user could create a document whose subject is a * port-B client and then exfiltrate the foreign client's name + email * via sendForSigning's Documenso payload, or via the local watcher / * notification surfaces that hydrate the linked entity. */ async function assertSubjectFksInPort( portId: string, fks: { clientId?: string | null; interestId?: string | null; companyId?: string | null; yachtId?: string | null; reservationId?: string | null; }, ): Promise { const checks: Array> = []; if (fks.clientId) { checks.push( db.query.clients .findFirst({ where: and(eq(clients.id, fks.clientId), eq(clients.portId, portId)) }) .then((row) => { if (!row) throw new ValidationError('clientId not found in this port'); }), ); } if (fks.interestId) { checks.push( db.query.interests .findFirst({ where: and(eq(interests.id, fks.interestId), eq(interests.portId, portId)), }) .then((row) => { if (!row) throw new ValidationError('interestId not found in this port'); }), ); } if (fks.companyId) { checks.push( db.query.companies .findFirst({ where: and(eq(companies.id, fks.companyId), eq(companies.portId, portId)), }) .then((row) => { if (!row) throw new ValidationError('companyId not found in this port'); }), ); } if (fks.yachtId) { checks.push( db.query.yachts .findFirst({ where: and(eq(yachts.id, fks.yachtId), eq(yachts.portId, portId)) }) .then((row) => { if (!row) throw new ValidationError('yachtId not found in this port'); }), ); } if (fks.reservationId) { checks.push( db.query.berthReservations .findFirst({ where: and( eq(berthReservations.id, fks.reservationId), eq(berthReservations.portId, portId), ), }) .then((row) => { if (!row) throw new ValidationError('reservationId not found in this port'); }), ); } await Promise.all(checks); } /** * Reject watchers whose user does not have access to the document's port. * Without this guard, a document watcher row could be created for a user * outside the document's tenant; subsequent notifyDocumentEvent emits a * socket toast + email to that user revealing the document's title. * Super-admins are always allowed (they can watch anything). */ async function assertWatchersInPort(portId: string, userIds: string[]): Promise { if (userIds.length === 0) return; const unique = [...new Set(userIds)]; // Super-admins bypass the port check. const profiles = await db.query.userProfiles.findMany({ where: inArray(userProfiles.userId, unique), }); const superAdmins = new Set(profiles.filter((p) => p.isSuperAdmin).map((p) => p.userId)); const needsCheck = unique.filter((u) => !superAdmins.has(u)); if (needsCheck.length === 0) return; const roles = await db .select({ userId: userPortRoles.userId }) .from(userPortRoles) .where(and(inArray(userPortRoles.userId, needsCheck), eq(userPortRoles.portId, portId))); const allowed = new Set(roles.map((r) => r.userId)); const denied = needsCheck.filter((u) => !allowed.has(u)); if (denied.length > 0) { throw new ValidationError('One or more watchers do not have access to this port'); } } // ─── Create ─────────────────────────────────────────────────────────────────── export async function createDocument(portId: string, data: CreateDocumentInput, meta: AuditMeta) { await assertSubjectFksInPort(portId, { clientId: data.clientId, interestId: data.interestId, }); const [doc] = await db .insert(documents) .values({ portId, interestId: data.interestId ?? null, clientId: data.clientId ?? null, folderId: data.folderId ?? null, documentType: data.documentType, title: data.title, notes: data.notes ?? null, status: 'draft', createdBy: meta.userId, }) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'create', entityType: 'document', entityId: doc!.id, newValue: { documentType: doc!.documentType, title: doc!.title }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'document:created', { documentId: doc!.id }); return doc!; } // ─── Update ─────────────────────────────────────────────────────────────────── export async function updateDocument( id: string, portId: string, data: UpdateDocumentInput, meta: AuditMeta, ) { const existing = await getDocumentById(id, portId); const updates: Partial = {}; if (data.title !== undefined) updates.title = data.title; if (data.notes !== undefined) updates.notes = data.notes; if (data.status !== undefined) updates.status = data.status; updates.updatedAt = new Date(); const [updated] = await db .update(documents) .set(updates) .where(and(eq(documents.id, id), eq(documents.portId, portId))) .returning(); diffEntity(existing, updated!); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'document', entityId: id, oldValue: toAuditJson(existing), newValue: toAuditJson(updated!), ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'document:updated', { documentId: id }); return updated!; } // ─── Delete ─────────────────────────────────────────────────────────────────── export async function deleteDocument(id: string, portId: string, meta: AuditMeta) { const existing = await getDocumentById(id, portId); if (['sent', 'partially_signed'].includes(existing.status)) { throw new ConflictError('Cannot delete a document that is currently in signing process'); } await db.delete(documents).where(and(eq(documents.id, id), eq(documents.portId, portId))); void createAuditLog({ userId: meta.userId, portId, action: 'delete', entityType: 'document', entityId: id, oldValue: { title: existing.title, status: existing.status }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'document:deleted', { documentId: id }); } // ─── Send for Signing (BR-021) ──────────────────────────────────────────────── export async function sendForSigning(documentId: string, portId: string, meta: AuditMeta) { const doc = await getDocumentById(documentId, portId); if (!doc.fileId) throw new ValidationError('Document has no associated file'); if (doc.status !== 'draft') throw new ConflictError('Document is not in draft status'); // Fetch interest + client to build signers. Filter by portId in addition // to the FK so that even if a stale or maliciously-set subject FK on the // document points at a foreign-port row, this signing flow refuses to // hydrate (and therefore refuses to ship to Documenso) data from outside // the caller's tenant. const interest = doc.interestId ? await db.query.interests.findFirst({ where: and(eq(interests.id, doc.interestId), eq(interests.portId, portId)), }) : null; const client = doc.clientId ? await db.query.clients.findFirst({ where: and(eq(clients.id, doc.clientId), eq(clients.portId, portId)), with: { contacts: true }, }) : null; if (!client) throw new ValidationError('Document has no associated client'); const emailContact = ( client.contacts as Array<{ channel: string; value: string }> | undefined )?.find((c) => c.channel === 'email'); if (!emailContact?.value) throw new ValidationError('Client has no email contact'); const port = await db.query.ports.findFirst({ where: eq(ports.id, portId) }); if (!port) throw new NotFoundError('Port'); // Resolve port-configured signer emails from system settings; fall back to // legacy defaults only if the setting is absent. Fabricated slug-based // addresses (developer@{slug}.com) are no longer used here because they // never match real port users and cause silent no-ops in handleRecipientSigned. const eoiSigners = await getPortEoiSigners(portId); // BR-021: Create 3 signers - client (1), developer (2), sales/approver (3) const signerRecords = await db .insert(documentSigners) .values([ { documentId, signerName: client.fullName, signerEmail: emailContact.value, signerRole: 'client', signingOrder: 1, status: 'pending', }, { documentId, signerName: eoiSigners.developer.name, signerEmail: eoiSigners.developer.email, signerRole: 'developer', signingOrder: 2, status: 'pending', }, { documentId, signerName: eoiSigners.approver.name, signerEmail: eoiSigners.approver.email, signerRole: 'approver', signingOrder: 3, status: 'pending', }, ]) .returning(); // Get file from MinIO and base64 encode const fileRecord = await db.query.files.findFirst({ where: eq(files.id, doc.fileId) }); if (!fileRecord) throw new NotFoundError('File'); const fileStream = await (await getStorageBackend()).get(fileRecord.storagePath); const chunks: Buffer[] = []; for await (const chunk of fileStream) { chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); } const pdfBuffer = Buffer.concat(chunks); const pdfBase64 = pdfBuffer.toString('base64'); // Read per-port v2 signing settings (PARALLEL/SEQUENTIAL + redirect URL). // Both are optional — passing undefined yields v1's legacy behavior. const docCfg = await getPortDocumensoConfig(portId); // Create document in Documenso + send. portId is required for the v2 // envelope/create code path (which routes by per-port apiVersion); // meta.signingOrder is honoured only on v2 instances. const documensoDoc = await documensoCreate( doc.title, pdfBase64, [ { name: client.fullName, email: emailContact.value, role: 'SIGNER', signingOrder: 1 }, { name: eoiSigners.developer.name, email: eoiSigners.developer.email, role: 'SIGNER', signingOrder: 2, }, { name: eoiSigners.approver.name, email: eoiSigners.approver.email, role: 'SIGNER', signingOrder: 3, }, ], portId, { ...(docCfg.signingOrder ? { signingOrder: docCfg.signingOrder } : {}), ...(docCfg.redirectUrl ? { redirectUrl: docCfg.redirectUrl } : {}), }, ); await documensoSend(documensoDoc.id, portId); // Update signer records with signing URLs + tokens from the Documenso // response. The signingToken column powers the webhook recipient-match // path (more robust than email match — same email can serve multiple // roles on a contract). Documenso's recipient response carries `token` // explicitly per the OpenAPI spec; we keep the URL-extraction fallback // for any v2 deployment whose distribute response omits the field. for (const docSigner of documensoDoc.recipients) { const localSigner = signerRecords.find((s) => s.signerEmail === docSigner.email); if (localSigner) { await db .update(documentSigners) .set({ signingUrl: docSigner.signingUrl ?? null, embeddedUrl: docSigner.embeddedUrl ?? null, signingToken: docSigner.token ?? extractSigningToken(docSigner.signingUrl), }) .where(eq(documentSigners.id, localSigner.id)); } } // Update document status await db .update(documents) .set({ status: 'sent', documensoId: documensoDoc.id, updatedAt: new Date() }) .where(eq(documents.id, documentId)); // Update interest if linked if (interest) { await db .update(interests) .set({ documensoId: documensoDoc.id, dateEoiSent: new Date(), eoiStatus: 'waiting_for_signatures', updatedAt: new Date(), }) .where(eq(interests.id, interest.id)); // Trigger berth rules void evaluateRule('eoi_sent', interest.id, portId, meta); // Advance pipeline stage to eoi (no-op if already further along). // Doc sub-status is set by the webhook receiver when Documenso confirms; // we stamp eoiDocStatus optimistically here so the UI shows "sent". void advanceStageIfBehind(interest.id, portId, 'eoi', meta, 'EOI sent for signing'); await db .update(interests) .set({ eoiDocStatus: 'sent', updatedAt: new Date() }) .where(eq(interests.id, interest.id)); // Reservation agreements drive the reservation stage; the contract // pathway uses its own send call and stamps contractDocStatus. if (doc.documentType === 'reservation_agreement') { void advanceStageIfBehind( interest.id, portId, 'reservation', meta, 'Reservation agreement sent', ); await db .update(interests) .set({ reservationDocStatus: 'sent', updatedAt: new Date() }) .where(eq(interests.id, interest.id)); } } // Create document event await db.insert(documentEvents).values({ documentId, eventType: 'sent', eventData: { documensoId: documensoDoc.id }, }); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'document', entityId: documentId, newValue: { status: 'sent', documensoId: documensoDoc.id }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'document:sent', { documentId, type: doc.documentType, signerCount: 3, documensoId: documensoDoc.id, }); return await getDocumentById(documentId, portId); } // ─── Upload Signed Manually (BR-013) ───────────────────────────────────────── export async function uploadSignedManually( documentId: string, portId: string, fileData: { buffer: Buffer; originalName: string; mimeType: string; size: number }, meta: AuditMeta, ) { const doc = await getDocumentById(documentId, portId); const port = await db.query.ports.findFirst({ where: eq(ports.id, portId) }); if (!port) throw new NotFoundError('Port'); // Store the signed file const fileId = crypto.randomUUID(); const storagePath = buildStoragePath(port.slug, 'eoi-signed', documentId, fileId, 'pdf'); await ( await getStorageBackend() ).put(storagePath, fileData.buffer, { contentType: fileData.mimeType, sizeBytes: fileData.size, }); const [fileRecord] = await db .insert(files) .values({ portId, clientId: doc.clientId ?? null, filename: fileData.originalName, originalName: fileData.originalName, mimeType: fileData.mimeType, sizeBytes: String(fileData.size), storagePath, storageBucket: env.MINIO_BUCKET, category: 'eoi', uploadedBy: meta.userId, }) .returning(); // Update document await db .update(documents) .set({ signedFileId: fileRecord!.id, status: 'completed', isManualUpload: true, updatedAt: new Date(), }) .where(eq(documents.id, documentId)); // Update interest if linked and type is eoi if (doc.interestId && doc.documentType === 'eoi') { const interest = await db.query.interests.findFirst({ where: eq(interests.id, doc.interestId), }); await db .update(interests) .set({ eoiStatus: 'signed', eoiDocStatus: 'signed', dateEoiSigned: new Date(), updatedAt: new Date(), }) .where(eq(interests.id, doc.interestId)); if (interest) { void evaluateRule('eoi_signed', doc.interestId, portId, meta); // Stage stays at 'eoi' — sub-status badge flips to "signed". void advanceStageIfBehind( doc.interestId, portId, 'eoi', meta, 'Signed EOI uploaded manually', ); } } await db.insert(documentEvents).values({ documentId, eventType: 'completed', eventData: { isManualUpload: true, fileId: fileRecord!.id }, }); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'document', entityId: documentId, newValue: { status: 'completed', isManualUpload: true }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'document:completed', { documentId }); // Notify creator about manual completion void import('@/lib/services/notifications.service').then(({ createNotification }) => createNotification({ portId, userId: meta.userId, type: 'document_signed', title: 'Document marked as signed', description: `"${doc.title}" has been manually uploaded as signed`, link: `/documents/${documentId}`, entityType: 'document', entityId: documentId, dedupeKey: `document:${documentId}:completed`, }), ); return await getDocumentById(documentId, portId); } // ─── List Signers ───────────────────────────────────────────────────────────── export async function listDocumentSigners(documentId: string, portId: string) { await getDocumentById(documentId, portId); // verify access return db.query.documentSigners.findMany({ where: eq(documentSigners.documentId, documentId), orderBy: (ds, { asc }) => [asc(ds.signingOrder)], }); } // ─── List Events ────────────────────────────────────────────────────────────── export async function listDocumentEvents(documentId: string, portId: string) { await getDocumentById(documentId, portId); // verify access return db.query.documentEvents.findMany({ where: eq(documentEvents.documentId, documentId), orderBy: (de, { desc }) => [desc(de.createdAt)], }); } // ─── Webhook Handlers ───────────────────────────────────────────────────────── /** * Shared port-scoped lookup for inbound Documenso webhooks. Two ports * sharing a Documenso instance — or migrating between instances with * documentId reuse — would otherwise let `findFirst` return whichever * row sorts first across tenants. When the route resolves a portId from * the matched per-port webhook secret it threads it here; otherwise we * fall back to a port-agnostic `findMany` and refuse to mutate when the * lookup is ambiguous (mirrors the guard in handleDocumentExpired). * * Returns null when no document matches OR when the lookup is ambiguous * across multiple ports without a resolved portId. Callers must treat * null as "drop the event" (the cron sweep / next webhook will catch up * once the data is consistent). */ async function resolveWebhookDocument( documensoId: string, portId: string | undefined, ): Promise { if (portId) { const doc = await db.query.documents.findFirst({ where: and(eq(documents.documensoId, documensoId), eq(documents.portId, portId)), }); if (!doc) { logger.warn({ documensoId, portId }, 'Document not found for webhook (port-scoped)'); return null; } return doc; } const matches = await db.query.documents.findMany({ where: eq(documents.documensoId, documensoId), }); if (matches.length === 0) { logger.warn({ documensoId }, 'Document not found for webhook'); return null; } if (matches.length > 1) { logger.error( { documensoId, matchCount: matches.length, ports: matches.map((m) => m.portId) }, 'Documenso webhook ambiguous across multiple ports — refusing to mutate', ); return null; } return matches[0]!; } export async function handleRecipientSigned(eventData: { documentId: string; recipientEmail: string; /** Optional Documenso recipient token — when supplied (webhook * payload exposes it on v1.13 + 2.x), preferred over the email * match because a single email can serve multiple roles on one * document. Falls back to email match when null. */ recipientToken?: string | null; signatureHash?: string; portId?: string; }) { const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId); if (!doc) return; // Token-match first, fall back to email match. Phase 2: webhook // payloads carry `recipients[].token` which we captured at send-time // via extractSigningToken — that's the authoritative identifier. const signerWhere = eventData.recipientToken ? and( eq(documentSigners.documentId, doc.id), eq(documentSigners.signingToken, eventData.recipientToken), ) : and( eq(documentSigners.documentId, doc.id), eq(documentSigners.signerEmail, eventData.recipientEmail), ); const [signer] = await db .update(documentSigners) .set({ status: 'signed', signedAt: new Date() }) .where(signerWhere) .returning(); if (!signer) { // Mismatch: neither token nor email matched. This happens when the // local signers were created with fabricated / stale addresses or // the document was created out-of-band. Log a warning so operators // can investigate and fix the port's eoi_signers system setting. logger.warn( { documensoId: eventData.documentId, documentId: doc.id, recipientEmail: eventData.recipientEmail, hadToken: Boolean(eventData.recipientToken), }, 'handleRecipientSigned: no matching signer row for recipient - ' + 'check eoi_signers system setting for this port', ); } // Update document to partially_signed if eoi type if (doc.documentType === 'eoi' && doc.status === 'sent') { await db .update(documents) .set({ status: 'partially_signed', updatedAt: new Date() }) .where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId))); } await db.insert(documentEvents).values({ documentId: doc.id, eventType: 'signed', signerId: signer?.id ?? null, signatureHash: eventData.signatureHash ?? null, eventData: { recipientEmail: eventData.recipientEmail }, }); emitToRoom(`port:${doc.portId}`, 'document:signer:signed', { documentId: doc.id, signerEmail: eventData.recipientEmail, }); // Phase 2 cascade: now that this signer is done, fire the branded // "your turn" invitation to the next pending signer in signing order. // The webhook may fire multiple times per document (one per recipient // sign event); the `invitedAt` guard prevents duplicate invites. if (signer) { await sendCascadingInviteForNextSigner(doc).catch((err) => { // Cascading-invite failure is non-fatal — the webhook itself // succeeded. The rep can manually click "Send invitation" if the // email worker is down. logger.error( { err, documentId: doc.id, justSignedSigner: signer.id }, 'cascading "your turn" invite failed after recipient signed', ); }); } } /** * Phase 2 — cascading invite logic extracted so the * `handleRecipientSigned` handler stays readable and so the same path * can be exercised by a dedicated unit test. Finds the next pending * signer (lowest signing order), sends them a branded invitation, and * stamps `invitedAt` so a duplicate webhook delivery doesn't re-send. * * Phase 6: when the document carries a rep-authored * `invitation_message`, it flows through as `customMessage` so every * cascaded recipient (not just the first one) sees the same note. */ async function sendCascadingInviteForNextSigner(doc: { id: string; portId: string; documentType: string; title: string; invitationMessage: string | null; }): Promise { const signers = await db .select() .from(documentSigners) .where(eq(documentSigners.documentId, doc.id)) .orderBy(documentSigners.signingOrder); const next = nextPendingSigner(signers); if (!next) return; if (next.invitedAt) { // We've already invited them — either via the auto-send wiring at // document creation (first signer) or via an earlier cascade. Do // nothing rather than spam them with a second copy. return; } if (!next.signingUrl) { logger.warn( { documentId: doc.id, signerId: next.id }, 'cascading invite skipped: signer has no signing URL', ); return; } const port = await db.query.ports.findFirst({ where: eq(ports.id, doc.portId) }); const docCfg = await getPortDocumensoConfig(doc.portId); await sendSigningInvitation({ portId: doc.portId, portName: port?.name ?? 'Port Nimara', recipient: { name: next.signerName, email: next.signerEmail }, documensoSigningUrl: next.signingUrl, documentLabel: DOC_TYPE_LABEL[doc.documentType] ?? 'Expression of Interest', signerRole: (next.signerRole as SignerRole) ?? 'other', senderName: docCfg.developerName ?? null, customMessage: doc.invitationMessage, }); await db .update(documentSigners) .set({ invitedAt: new Date() }) .where(eq(documentSigners.id, next.id)); // Phase 7 — Project Director RBAC binding: when the per-port settings // map the developer / approver slot to a CRM user (developerUserId / // approverUserId), fire an in-CRM notification so the user sees their // pending signing turn alongside the branded email. The email is the // primary channel; the notification is a defense-in-depth nudge for // users who live in the CRM all day. Falls back silently when the // settings aren't wired or the signer role doesn't match. const linkedUserId = next.signerRole === 'developer' ? (docCfg.developerUserId ?? null) : next.signerRole === 'approver' ? (docCfg.approverUserId ?? null) : null; if (linkedUserId) { void import('@/lib/services/notifications.service').then(({ createNotification }) => createNotification({ portId: doc.portId, userId: linkedUserId, type: 'document_signing_your_turn', title: 'Your signature is needed', description: `"${doc.title}" is waiting for you to sign.`, link: `/documents/${doc.id}`, entityType: 'document', entityId: doc.id, dedupeKey: `document:${doc.id}:your-turn:${next.id}`, }).catch((err) => { logger.warn( { err, documentId: doc.id, signerId: next.id, linkedUserId }, 'phase-7 in-CRM your-turn notification failed (email still sent)', ); }), ); } } // ─── Owner-wins resolution ──────────────────────────────────────────────────── interface ResolvedOwner { entityType: EntityType; entityId: string; } /** * Owner-wins owner resolution chain — see spec §"Routing on workflow * completion" §3a. Returns the first non-null candidate in priority * order: direct client/company/yacht FK on the document, then via the * linked interest's client / yacht FK. The interests table has no * companyId (per schema), so the company branch is omitted from the * interest fallback. Returns null when no owner is resolvable. */ async function resolveDocumentOwner( portId: string, doc: { clientId: string | null; companyId: string | null; yachtId: string | null; interestId: string | null; }, ): Promise { if (doc.clientId) return { entityType: 'client', entityId: doc.clientId }; if (doc.companyId) return { entityType: 'company', entityId: doc.companyId }; if (doc.yachtId) return { entityType: 'yacht', entityId: doc.yachtId }; if (doc.interestId) { // interests.clientId is NOT NULL — if the interest row exists, the // client owner is always resolvable through it. The yacht-only path // would require relaxing the schema constraint first. const interest = await db.query.interests.findFirst({ where: and(eq(interests.id, doc.interestId), eq(interests.portId, portId)), columns: { clientId: true }, }); if (interest?.clientId) { return { entityType: 'client', entityId: interest.clientId }; } } return null; } /** * Sentinel thrown by `handleDocumentCompleted`'s in-tx race-check when a * concurrent webhook delivery has already committed the signed-PDF * file. Caught by the outer try so we can run the compensating blob * delete + log at info level instead of error. */ class DocumentAlreadyCompletedError extends Error { constructor() { super('document already marked completed by a concurrent webhook'); this.name = 'DocumentAlreadyCompletedError'; } } export async function handleDocumentCompleted(eventData: { documentId: string; portId?: string }) { const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId); if (!doc) return; // A1: Idempotency gate. Documenso retries DOCUMENT_COMPLETED on receiver // 5xx (and the poll worker also reconciles). Without this guard, a second // delivery re-runs downloadSignedPdf + storage.put + db.insert(files) and // then clobbers the previous signedFileId on the UPDATE — leaking the // first file as an orphan blob with no DB pointer. Once we have a signed // file id we are done. if (doc.status === 'completed' && doc.signedFileId) return; // BR-022: Download signed PDF and store in MinIO const port = await db.query.ports.findFirst({ where: eq(ports.id, doc.portId) }); if (!port) { logger.error({ portId: doc.portId }, 'Port not found during document completion'); return; } // Tracked outside the try so the catch can attempt a compensating // storage.delete if we put a blob but failed to commit the DB rows. // Without this, partial failures left an orphan blob in storage with // no DB pointer (the audit's orphan-blob CRITICAL). let putStoragePath: string | null = null; try { const signedPdfBuffer = await downloadSignedPdf(eventData.documentId); // Guard: a 0-byte response from Documenso would otherwise persist a // permanent corrupt signedFileId pointing at a blob with no content. // Refuse and let the next retry / poll-worker pass re-fetch. if (signedPdfBuffer.length === 0) { throw new CodedError('DOCUMENSO_EMPTY_PDF', { internalMessage: `Documenso returned 0-byte signed PDF for documensoId=${eventData.documentId}`, }); } const fileId = crypto.randomUUID(); const storagePath = buildStoragePath(port.slug, 'eoi-signed', doc.id, fileId, 'pdf'); await ( await getStorageBackend() ).put(storagePath, signedPdfBuffer, { contentType: 'application/pdf', sizeBytes: signedPdfBuffer.length, }); putStoragePath = storagePath; // Resolve owner via the Owner-wins chain. The signed PDF lands in // this owner's auto-created entity subfolder (or at root if no owner). const owner = await resolveDocumentOwner(doc.portId, doc); let entityFolderId: string | null = null; if (owner) { try { const folder = await ensureEntityFolder( doc.portId, owner.entityType, owner.entityId, 'system', ); entityFolderId = folder.id; } catch (err) { // Folder creation is best-effort — signed file still lands at root. // Logged at warn level: missing entity folder is recoverable via // the backfill script. logger.warn( { err, documentId: doc.id, owner }, 'ensureEntityFolder failed during document completion', ); } } // Atomic: the files row + the documents.signedFileId pointer + the // reservation contract mirror commit together. If any throws, the // outer catch fires storage.delete on the orphan blob. // // concurrency-auditor C-1: re-check the idempotency gate INSIDE the // tx with SELECT … FOR UPDATE so two near-simultaneous webhook // retries can't both pass the read-outside-lock gate at line 1110 // and both insert into `files` (the losing row would orphan its blob // since `documents.signed_file_id` only points at one). The outer // catch handles the "we put a blob but a concurrent worker won the // race" cleanup via the existing putStoragePath finalizer. const fileRecord = await db.transaction(async (tx) => { const [locked] = await tx .select({ status: documents.status, signedFileId: documents.signedFileId }) .from(documents) .where(eq(documents.id, doc.id)) .for('update'); if (locked && locked.status === 'completed' && locked.signedFileId) { // Concurrent webhook beat us — abort so the outer catch deletes // the duplicate blob we just put into storage. Throw a sentinel // we recognize so we don't log it as an error. throw new DocumentAlreadyCompletedError(); } const [inserted] = await tx .insert(files) .values({ portId: doc.portId, clientId: owner?.entityType === 'client' ? owner.entityId : (doc.clientId ?? null), companyId: owner?.entityType === 'company' ? owner.entityId : (doc.companyId ?? null), yachtId: owner?.entityType === 'yacht' ? owner.entityId : (doc.yachtId ?? null), folderId: entityFolderId, filename: `signed-${doc.id}.pdf`, originalName: `signed-${doc.id}.pdf`, mimeType: 'application/pdf', sizeBytes: String(signedPdfBuffer.length), storagePath, storageBucket: env.MINIO_BUCKET, category: 'eoi', uploadedBy: 'system', }) .returning(); if (!inserted) { throw new Error('files.insert returned no row'); } await tx .update(documents) .set({ status: 'completed', signedFileId: inserted.id, updatedAt: new Date() }) .where(eq(documents.id, doc.id)); // Reservation agreements mirror their signed PDF onto // berth_reservations.contractFileId so the portal "My Reservations" view // can resolve the contract without joining through documents. if (doc.documentType === 'reservation_agreement' && doc.reservationId) { const { berthReservations } = await import('@/lib/db/schema/reservations'); await tx .update(berthReservations) .set({ contractFileId: inserted.id, updatedAt: new Date() }) .where(eq(berthReservations.id, doc.reservationId)); } return inserted; }); // Mark as durably committed BEFORE side effects so the catch block // doesn't try to undo a blob whose DB pointer just landed. putStoragePath = null; // Audit log: the webhook just minted a new signed-PDF file row owned by // 'system'. Without this entry the file appears in the aggregated view // with no provenance trail; auditors need to see who/what wrote it. void createAuditLog({ userId: 'system', portId: doc.portId, action: 'create', entityType: 'file', entityId: fileRecord.id, newValue: { filename: fileRecord.filename, mimeType: 'application/pdf', size: signedPdfBuffer.length, documentId: doc.id, source: 'documenso_completion', }, ipAddress: '0.0.0.0', userAgent: 'webhook', }); } catch (err) { // Distinguish "we lost the concurrent race" from a real failure — // the loser of the SELECT FOR UPDATE re-check should clean up its // blob silently, not log an error. if (err instanceof DocumentAlreadyCompletedError) { logger.info( { documentId: doc.id, portId: doc.portId }, 'Webhook race lost — another worker already committed the signed PDF; deleting our duplicate blob', ); } else { logger.error( { err, documentId: doc.id, portId: doc.portId }, 'Failed to download/store signed PDF', ); } // Compensating delete: storage.put landed but the DB commit didn't. // Without this the blob lives forever with no row pointing at it. if (putStoragePath) { try { await (await getStorageBackend()).delete(putStoragePath); logger.info( { documentId: doc.id, storagePath: putStoragePath }, 'Compensating storage.delete after failed signed-PDF commit', ); } catch (compErr) { // We tried — log so a human can clean up the orphan if needed. logger.error( { compErr, documentId: doc.id, storagePath: putStoragePath }, 'Compensating storage.delete also failed — manual cleanup required', ); } } // Critical: do NOT set documents.status = 'completed' on failure. // The previous catch block did — which created the "completed-with- // no-signedFileId" zombie state the audit flagged. Let the next // Documenso retry (or our poll-worker reconciliation) re-attempt; // the early-return idempotency gate at the top requires BOTH // status='completed' AND signedFileId so re-runs are safe. } // Update interest if eoi type if (doc.interestId && doc.documentType === 'eoi') { const interest = await db.query.interests.findFirst({ where: eq(interests.id, doc.interestId), }); await db .update(interests) .set({ eoiStatus: 'signed', eoiDocStatus: 'signed', dateEoiSigned: new Date(), updatedAt: new Date(), }) .where(eq(interests.id, doc.interestId)); if (interest) { const systemMeta: AuditMeta = { userId: 'system', portId: doc.portId, ipAddress: '0.0.0.0', userAgent: 'webhook', }; // Guard against double-fire: DOCUMENT_COMPLETED may arrive multiple // times. evaluateRule has no idempotency — skip when the interest is // already past the EOI stage so the berth-rule side effect runs once. const currentStageIdx = PIPELINE_STAGES.indexOf( interest.pipelineStage as (typeof PIPELINE_STAGES)[number], ); const eoiIdx = PIPELINE_STAGES.indexOf('eoi'); if (currentStageIdx <= eoiIdx) { void evaluateRule('eoi_signed', doc.interestId, doc.portId, systemMeta); } // Stage stays at 'eoi' — sub-status flips to signed. void advanceStageIfBehind( doc.interestId, doc.portId, 'eoi', systemMeta, 'EOI signed via Documenso', ); } } // Update interest if reservation_agreement type — kept out of the // signed-PDF try/catch above so a Documenso PDF-download failure doesn't // also lose the sub-status stamp (which the rep can see immediately on // the interest detail page). if (doc.interestId && doc.documentType === 'reservation_agreement') { const systemMeta: AuditMeta = { userId: 'system', portId: doc.portId, ipAddress: '0.0.0.0', userAgent: 'webhook', }; await db .update(interests) .set({ reservationDocStatus: 'signed', dateReservationSigned: new Date(), updatedAt: new Date(), }) .where(eq(interests.id, doc.interestId)); void advanceStageIfBehind( doc.interestId, doc.portId, 'reservation', systemMeta, 'Reservation agreement signed', ); void import('@/lib/services/berth-rules-engine').then(({ evaluateRule }) => evaluateRule('contract_signed', doc.interestId!, doc.portId, systemMeta), ); } // Update interest if contract type. Outcome flip to 'won' is a separate // explicit decision so reps can record a contract as signed without // prematurely closing the deal. if (doc.interestId && doc.documentType === 'contract') { const systemMeta: AuditMeta = { userId: 'system', portId: doc.portId, ipAddress: '0.0.0.0', userAgent: 'webhook', }; await db .update(interests) .set({ contractDocStatus: 'signed', dateContractSigned: new Date(), updatedAt: new Date(), }) .where(eq(interests.id, doc.interestId)); void advanceStageIfBehind( doc.interestId, doc.portId, 'contract', systemMeta, 'Contract signed via Documenso', ); void import('@/lib/services/berth-rules-engine').then(({ evaluateRule }) => evaluateRule('contract_signed', doc.interestId!, doc.portId, systemMeta), ); } await db.insert(documentEvents).values({ documentId: doc.id, eventType: 'completed', eventData: { documensoId: eventData.documentId }, }); emitToRoom(`port:${doc.portId}`, 'document:completed', { documentId: doc.id }); // Phase 2: distribute the fully-signed PDF to every recipient via a // branded "all signed" email. Re-read the document so we see the // signedFileId the transaction above just committed. const completedDoc = await db.query.documents.findFirst({ where: eq(documents.id, doc.id), columns: { signedFileId: true }, }); if (completedDoc?.signedFileId) { const signers = await db .select({ name: documentSigners.signerName, email: documentSigners.signerEmail, }) .from(documentSigners) .where(eq(documentSigners.documentId, doc.id)); if (signers.length > 0) { const portRow = await db.query.ports.findFirst({ where: eq(ports.id, doc.portId), columns: { name: true }, }); // Resolve the deal's primary client name for the salutation — // falls back to the document title when the owner chain doesn't // surface a client. let clientName = doc.title; const owner = await resolveDocumentOwner(doc.portId, doc); if (owner?.entityType === 'client') { const client = await db.query.clients.findFirst({ where: eq(clients.id, owner.entityId), columns: { fullName: true }, }); if (client?.fullName) clientName = client.fullName; } await sendSigningCompleted({ portId: doc.portId, portName: portRow?.name ?? 'Port Nimara', recipients: signers, clientName, documentLabel: DOC_TYPE_LABEL[doc.documentType] ?? 'Expression of Interest', completedAt: new Date(), signedPdfFileId: completedDoc.signedFileId, signedPdfFilename: `signed-${doc.id}.pdf`, }).catch((err) => { // Don't let a downstream email failure undo the completion — // the signed PDF is already stored and the document row is // marked completed. Log + emit so admins can re-trigger via // the manual "Send copy" flow. logger.error( { err, documentId: doc.id }, 'sendSigningCompleted fan-out failed after document completed', ); }); } } // Notify the document creator about completion if (doc.createdBy && doc.createdBy !== 'system') { void import('@/lib/services/notifications.service').then(({ createNotification }) => createNotification({ portId: doc.portId, userId: doc.createdBy!, type: 'document_signed', title: 'Document fully signed', description: `"${doc.title}" has been signed by all parties`, link: `/documents/${doc.id}`, entityType: 'document', entityId: doc.id, dedupeKey: `document:${doc.id}:completed`, }), ); } } export async function handleDocumentExpired(eventData: { documentId: string; portId?: string }) { const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId); if (!doc) return; await db .update(documents) .set({ status: 'expired', updatedAt: new Date() }) .where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId))); if (doc.interestId && doc.documentType === 'eoi') { await db .update(interests) .set({ eoiStatus: 'expired', updatedAt: new Date() }) .where(and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId))); } await db.insert(documentEvents).values({ documentId: doc.id, eventType: 'expired', eventData: { documensoId: eventData.documentId }, }); emitToRoom(`port:${doc.portId}`, 'document:expired', { documentId: doc.id }); } export async function handleDocumentOpened(eventData: { documentId: string; recipientEmail: string; /** Optional Documenso recipient token — preferred over email match * (same email may serve multiple roles on one document). */ recipientToken?: string | null; signatureHash?: string; portId?: string; }) { const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId); if (!doc) return; const signerWhere = eventData.recipientToken ? and( eq(documentSigners.documentId, doc.id), eq(documentSigners.signingToken, eventData.recipientToken), ) : and( eq(documentSigners.documentId, doc.id), eq(documentSigners.signerEmail, eventData.recipientEmail), ); const [signer] = await db.select().from(documentSigners).where(signerWhere); // Stamp openedAt the first time we see a viewed event for this signer. // Re-deliveries (v2 can fire RECIPIENT_VIEWED multiple times per visit) // hit the idempotent UPDATE without overwriting the original timestamp. if (signer && !signer.openedAt) { await db .update(documentSigners) .set({ openedAt: new Date() }) .where(eq(documentSigners.id, signer.id)); } await db.insert(documentEvents).values({ documentId: doc.id, eventType: 'viewed', signerId: signer?.id ?? null, signatureHash: eventData.signatureHash ?? null, eventData: { recipientEmail: eventData.recipientEmail }, }); emitToRoom(`port:${doc.portId}`, 'document:signer:opened', { documentId: doc.id, signerEmail: eventData.recipientEmail, }); } export async function handleDocumentRejected(eventData: { documentId: string; recipientEmail?: string; signatureHash?: string; portId?: string; }) { const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId); if (!doc) return; let signerId: string | null = null; if (eventData.recipientEmail) { const [signer] = await db .update(documentSigners) .set({ status: 'declined' }) .where( and( eq(documentSigners.documentId, doc.id), eq(documentSigners.signerEmail, eventData.recipientEmail), ), ) .returning(); signerId = signer?.id ?? null; } await db .update(documents) .set({ status: 'rejected', updatedAt: new Date() }) .where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId))); if (doc.interestId && doc.documentType === 'eoi') { await db .update(interests) .set({ eoiStatus: 'rejected', updatedAt: new Date() }) .where(and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId))); } await db.insert(documentEvents).values({ documentId: doc.id, eventType: 'rejected', signerId, signatureHash: eventData.signatureHash ?? null, eventData: { recipientEmail: eventData.recipientEmail ?? null }, }); emitToRoom(`port:${doc.portId}`, 'document:rejected', { documentId: doc.id, signerEmail: eventData.recipientEmail ?? null, }); } export async function handleDocumentCancelled(eventData: { documentId: string; signatureHash?: string; portId?: string; }) { const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId); if (!doc) return; await db .update(documents) .set({ status: 'cancelled', updatedAt: new Date() }) .where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId))); if (doc.interestId && doc.documentType === 'eoi') { await db .update(interests) .set({ eoiStatus: 'cancelled', updatedAt: new Date() }) .where(and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId))); } await db.insert(documentEvents).values({ documentId: doc.id, eventType: 'cancelled', signatureHash: eventData.signatureHash ?? null, eventData: { documensoId: eventData.documentId }, }); emitToRoom(`port:${doc.portId}`, 'document:cancelled', { documentId: doc.id }); } // ─── Phase A: hub + wizard surface (PR1 skeletons; bodies land in PRs 4-6) ──── export interface DocumentDetailWatcher { userId: string; addedBy: string; addedAt: Date; } export interface DocumentDetail { document: typeof documents.$inferSelect; signers: (typeof documentSigners.$inferSelect)[]; events: (typeof documentEvents.$inferSelect)[]; watchers: DocumentDetailWatcher[]; } /** * Single-roundtrip aggregator for the document detail page (PR5). * Returns the document plus all signers, events (newest first), and watchers. * Throws NotFoundError if the document is not in `portId`. */ export async function getDocumentDetail(id: string, portId: string): Promise { const document = await getDocumentById(id, portId); const [signers, events, watchers] = await Promise.all([ db.query.documentSigners.findMany({ where: eq(documentSigners.documentId, id), orderBy: (ds, { asc }) => [asc(ds.signingOrder)], }), db.query.documentEvents.findMany({ where: eq(documentEvents.documentId, id), orderBy: (de, { desc }) => [desc(de.createdAt)], }), db .select({ userId: documentWatchers.userId, addedBy: documentWatchers.addedBy, addedAt: documentWatchers.addedAt, }) .from(documentWatchers) .where(eq(documentWatchers.documentId, id)), ]); return { document, signers, events, watchers }; } /** * User-initiated cancel of an in-flight document. Voids the doc in Documenso * (when present), updates DB status, logs an event, emits socket. Webhook * receiver also handles documenso-initiated cancellations via * `handleDocumentCancelled`. * * The actual Documenso void call lands in PR2 (`documenso-client.voidDocument`); * this skeleton updates DB state only. */ export async function cancelDocument( documentId: string, portId: string, meta: AuditMeta, ): Promise { const existing = await getDocumentById(documentId, portId); if (['completed', 'cancelled', 'rejected'].includes(existing.status)) { throw new ConflictError(`Document is already ${existing.status}`); } // CRM is the system of record for cancellation status. A transient // Documenso failure shouldn't block the user from marking the doc cancelled // here - voidDocument already treats 404 as success, and the periodic // webhook receiver will reconcile if the remote void eventually lands. if (existing.documensoId) { try { await documensoVoid(existing.documensoId, portId); } catch (err) { logger.warn( { err, documentId, documensoId: existing.documensoId }, 'Documenso void failed; cancelling locally anyway', ); } } const [updated] = await db .update(documents) .set({ status: 'cancelled', updatedAt: new Date() }) .where(and(eq(documents.id, documentId), eq(documents.portId, portId))) .returning(); await db.insert(documentEvents).values({ documentId, eventType: 'cancelled', eventData: { initiatedBy: meta.userId }, }); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'document', entityId: documentId, oldValue: { status: existing.status }, newValue: { status: 'cancelled' }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'document:cancelled', { documentId }); return updated!; } /** * Returns prefilled email composer payload for the "Email signed PDF to all * signatories" action on the document detail page. * * Available for `status='completed' && signedFileId !== null`. * * Body content (from per-port `signed_doc_completion` template), full * sender-resolution, and watcher-cc helpers land alongside PR8 (email * composer with attachments). For PR1 this returns the minimal correct * recipients + auto-attachment shape so detail-page integration tests can * assert against it. */ export interface ComposeSignedDocEmailResult { to: string[]; cc: string[]; subject: string; body: string; attachments: Array<{ fileId: string; filename?: string }>; defaultSenderType: 'system' | 'user'; } export async function composeSignedDocEmail( documentId: string, portId: string, ): Promise { const doc = await getDocumentById(documentId, portId); if (doc.status !== 'completed') { throw new ConflictError('Document is not completed'); } if (!doc.signedFileId) { throw new ValidationError('Document has no signed PDF'); } const signers = await db .select({ email: documentSigners.signerEmail }) .from(documentSigners) .where(eq(documentSigners.documentId, documentId)); const dedupedRecipients = Array.from(new Set(signers.map((s) => s.email))); return { to: dedupedRecipients, cc: [], subject: `Signed ${doc.documentType.replace(/_/g, ' ')} - ${doc.title}`, body: '', attachments: [{ fileId: doc.signedFileId }], defaultSenderType: 'system', }; } // ─── Watchers (PR5) ─────────────────────────────────────────────────────────── export async function listDocumentWatchers(documentId: string, portId: string) { await getDocumentById(documentId, portId); // port-scope check return db .select({ userId: documentWatchers.userId, addedBy: documentWatchers.addedBy, addedAt: documentWatchers.addedAt, }) .from(documentWatchers) .where(eq(documentWatchers.documentId, documentId)); } export async function addDocumentWatcher( documentId: string, portId: string, userId: string, meta: AuditMeta, ): Promise<{ userId: string; addedAt: Date }> { await getDocumentById(documentId, portId); await assertWatchersInPort(portId, [userId]); const [row] = await db .insert(documentWatchers) .values({ documentId, userId, addedBy: meta.userId }) .onConflictDoNothing() .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'create', entityType: 'document_watcher', entityId: documentId, newValue: { documentId, watcherUserId: userId }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'document:updated', { documentId }); return row ? { userId: row.userId, addedAt: row.addedAt } : { userId, addedAt: new Date() }; } export async function removeDocumentWatcher( documentId: string, portId: string, userId: string, meta: AuditMeta, ): Promise { await getDocumentById(documentId, portId); await db .delete(documentWatchers) .where(and(eq(documentWatchers.documentId, documentId), eq(documentWatchers.userId, userId))); void createAuditLog({ userId: meta.userId, portId, action: 'delete', entityType: 'document_watcher', entityId: documentId, oldValue: { documentId, watcherUserId: userId }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'document:updated', { documentId }); } /** * Create-document wizard entry point (PR6). * * Dispatches across pathways: * - 'documenso-template' - Documenso renders + signs from its own template * - 'inapp' - render PDF locally from a CRM template, upload to Documenso * - 'upload' - admin-supplied PDF, upload to Documenso (auto-place signature * fields if `autoPlaceFields`) * * Persists the document, applies reminder overrides, attaches watchers, and * triggers send when `sendImmediately`. */ import type { CreateDocumentWizardInput } from '@/lib/validators/documents'; export async function createFromWizard( portId: string, data: CreateDocumentWizardInput, meta: AuditMeta, ): Promise { if (data.source === 'upload') { return createFromUpload(portId, data, meta); } if (!data.templateId) { throw new ValidationError('templateId is required for template source'); } await assertSubjectFksInPort(portId, { clientId: data.clientId, interestId: data.interestId, companyId: data.companyId, yachtId: data.yachtId, reservationId: data.reservationId, }); const [doc] = await db .insert(documents) .values({ portId, interestId: data.interestId ?? null, reservationId: data.reservationId ?? null, clientId: data.clientId ?? null, companyId: data.companyId ?? null, yachtId: data.yachtId ?? null, documentType: data.documentType, title: data.title, notes: data.notes ?? null, status: 'draft', remindersDisabled: data.remindersDisabled, reminderCadenceOverride: data.reminderCadenceOverride ?? null, createdBy: meta.userId, }) .returning(); if (!doc) throw new CodedError('INSERT_RETURNING_EMPTY', { internalMessage: 'Failed to insert document', }); if (data.watchers.length > 0) { await assertWatchersInPort(portId, data.watchers); await db.insert(documentWatchers).values( data.watchers.map((userId) => ({ documentId: doc.id, userId, addedBy: meta.userId, })), ); } void createAuditLog({ userId: meta.userId, portId, action: 'create', entityType: 'document', entityId: doc.id, newValue: { documentType: doc.documentType, title: doc.title, pathway: data.pathway, source: data.source, }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'document:created', { documentId: doc.id }); return doc; } /** * Upload-driven creation path. Files-service integration + Documenso upload * + auto-place signature fields land alongside the realapi PR (PR11). For * PR6 we persist the document row + signers + watchers and leave the * Documenso upload step to the existing sendForSigning flow on first send. */ export async function createFromUpload( portId: string, data: CreateDocumentWizardInput, meta: AuditMeta, ): Promise { if (!data.uploadedFileId) { throw new ValidationError('uploadedFileId is required for upload source'); } if (!data.signers || data.signers.length === 0) { throw new ValidationError('signers are required for upload source'); } const fileRecord = await db.query.files.findFirst({ where: and(eq(files.id, data.uploadedFileId), eq(files.portId, portId)), }); if (!fileRecord) { throw new NotFoundError('File'); } await assertSubjectFksInPort(portId, { clientId: data.clientId, interestId: data.interestId, companyId: data.companyId, yachtId: data.yachtId, reservationId: data.reservationId, }); const [doc] = await db .insert(documents) .values({ portId, interestId: data.interestId ?? null, reservationId: data.reservationId ?? null, clientId: data.clientId ?? null, companyId: data.companyId ?? null, yachtId: data.yachtId ?? null, documentType: data.documentType, title: data.title, notes: data.notes ?? null, status: 'draft', fileId: fileRecord.id, remindersDisabled: data.remindersDisabled, reminderCadenceOverride: data.reminderCadenceOverride ?? null, createdBy: meta.userId, }) .returning(); if (!doc) throw new CodedError('INSERT_RETURNING_EMPTY', { internalMessage: 'Failed to insert document', }); await db.insert(documentSigners).values( data.signers.map((s) => ({ documentId: doc.id, signerName: s.signerName, signerEmail: s.signerEmail, signerRole: s.signerRole, signingOrder: s.signingOrder, status: 'pending' as const, })), ); if (data.watchers.length > 0) { await assertWatchersInPort(portId, data.watchers); await db.insert(documentWatchers).values( data.watchers.map((userId) => ({ documentId: doc.id, userId, addedBy: meta.userId, })), ); } void createAuditLog({ userId: meta.userId, portId, action: 'create', entityType: 'document', entityId: doc.id, newValue: { documentType: doc.documentType, title: doc.title, pathway: 'upload', source: 'upload', uploadedFileId: fileRecord.id, }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'document:created', { documentId: doc.id }); return doc; } // ─── Aggregated Workflow Projection ─────────────────────────────────────────── export interface AggregatedWorkflowGroup { label: string; source: 'direct' | 'client' | 'company' | 'yacht'; workflows: Array; total: number; } const WORKFLOW_GROUP_LIMIT = 20; const INFLIGHT_STATUSES = ['draft', 'sent', 'partially_signed'] as const; /** * Same projection shape as listFilesAggregatedByEntity but for in-flight * signing workflows. Completed/expired/cancelled workflows are hidden — * they surface via their signed-PDF file row. */ export async function listInflightWorkflowsAggregatedByEntity( portId: string, entityType: 'client' | 'company' | 'yacht', entityId: string, ): Promise<{ groups: AggregatedWorkflowGroup[] }> { const entityExists = await assertEntityInPort(portId, entityType, entityId); if (!entityExists) return { groups: [] }; const related = await collectRelatedEntities(portId, entityType, entityId); const groups: AggregatedWorkflowGroup[] = []; const directColumn = entityType === 'client' ? documents.clientId : entityType === 'company' ? documents.companyId : documents.yachtId; const direct = await fetchWorkflowGroupRows(portId, eq(directColumn, entityId)); if (direct.rows.length > 0) { groups.push({ label: 'DIRECTLY ATTACHED', source: 'direct', workflows: direct.rows, total: direct.total, }); } for (const { id, name } of related.companies) { const g = await fetchWorkflowGroupRows(portId, eq(documents.companyId, id)); if (g.rows.length === 0) continue; groups.push({ label: `FROM COMPANY: ${name.toUpperCase()}`, source: 'company', workflows: g.rows, total: g.total, }); } for (const { id, name } of related.yachts) { const g = await fetchWorkflowGroupRows(portId, eq(documents.yachtId, id)); if (g.rows.length === 0) continue; groups.push({ label: `FROM YACHT: ${name.toUpperCase()}`, source: 'yacht', workflows: g.rows, total: g.total, }); } for (const { id, name } of related.clients) { const g = await fetchWorkflowGroupRows(portId, eq(documents.clientId, id)); if (g.rows.length === 0) continue; groups.push({ label: `FROM CLIENT: ${name.toUpperCase()}`, source: 'client', workflows: g.rows, total: g.total, }); } return { groups }; } async function fetchWorkflowGroupRows( portId: string, predicate: ReturnType, ): Promise<{ rows: Array; total: number }> { const inflightStatuses = INFLIGHT_STATUSES as unknown as string[]; const rows = await db .select() .from(documents) .where( and(eq(documents.portId, portId), inArray(documents.status, inflightStatuses), predicate), ) .orderBy(desc(documents.updatedAt)) .limit(WORKFLOW_GROUP_LIMIT); const [countRow] = await db .select({ count: sql`count(*)::int` }) .from(documents) .where( and(eq(documents.portId, portId), inArray(documents.status, inflightStatuses), predicate), ); return { rows, total: Number(countRow?.count ?? 0) }; }