Files
pn-new-crm/src/lib/services/documents.service.ts
Matt bb9b5bb1a3 fix(audit-wave-1): orphan-blob window in handleDocumentCompleted
Closes Wave 1.3 (CRITICAL). The previous storage.put → files.insert
→ documents.update sequence had two real failure modes:

1. **Orphan blob.** If storage.put succeeded but the files.insert or
   documents.update failed, the blob lived forever in MinIO with no
   DB pointer. Re-runs re-uploaded a new blob without cleaning up
   the previous one.

2. **Zombie completed state.** The catch block at the end ran
   `documents.update({status: 'completed'})` with NO signedFileId
   on any failure path. The idempotency early-return at the top
   requires BOTH status='completed' AND signedFileId, so retries
   *did* still re-attempt — but reps saw a "completed" document
   with no signed file, hiding the failure.

Fix:
- Track `putStoragePath` outside the try. After storage.put lands,
  the variable holds the path; cleared once the DB commit succeeds.
- files.insert + documents.update + reservation contract mirror all
  run in a single `db.transaction(...)`. Atomic commit-or-rollback.
- Catch block: compensating `storage.delete(putStoragePath)` if the
  DB commit didn't land. Logs at error level on compensating-delete
  failure so a human can clean up.
- Catch block no longer sets `status='completed'`. The doc stays
  in its prior state; Documenso's retry (or our poll-worker) re-
  attempts the full sequence safely thanks to the unchanged
  idempotency gate.

Verified: tsc clean, documents-completion-auto-deposit tests all
pass (5/5).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-13 00:07:08 +02:00

2026 lines
67 KiB
TypeScript

import { and, desc, eq, gte, inArray, isNull, lt, lte, ne, sql, exists } from 'drizzle-orm';
import { db } from '@/lib/db';
import {
documents,
documentSigners,
documentEvents,
documentWatchers,
files,
} from '@/lib/db/schema/documents';
import { interests, interestBerths } from '@/lib/db/schema/interests';
import { clients } from '@/lib/db/schema/clients';
import { companies } from '@/lib/db/schema/companies';
import { yachts } from '@/lib/db/schema/yachts';
import { berthReservations } from '@/lib/db/schema/reservations';
import { ports } from '@/lib/db/schema/ports';
import { userProfiles, userPortRoles } from '@/lib/db/schema/users';
import { buildListQuery } from '@/lib/db/query-builder';
import { createAuditLog, type AuditMeta } from '@/lib/audit';
import { diffEntity } from '@/lib/entity-diff';
import { CodedError, NotFoundError, ValidationError, ConflictError } from '@/lib/errors';
import { emitToRoom } from '@/lib/socket/server';
import { buildStoragePath } from '@/lib/minio';
import { getStorageBackend } from '@/lib/storage';
import { env } from '@/lib/env';
import { logger } from '@/lib/logger';
import { evaluateRule } from '@/lib/services/berth-rules-engine';
import { PIPELINE_STAGES } from '@/lib/constants';
import { advanceStageIfBehind } from '@/lib/services/interests.service';
import {
createDocument as documensoCreate,
sendDocument as documensoSend,
downloadSignedPdf,
voidDocument as documensoVoid,
} from '@/lib/services/documenso-client';
import { getPortEoiSigners } from '@/lib/services/documenso-payload';
import { getPortDocumensoConfig } from '@/lib/services/port-config';
import {
listTree,
collectDescendantIds,
ensureEntityFolder,
type FolderNode,
type EntityType,
} from '@/lib/services/document-folders.service';
import { assertEntityInPort, collectRelatedEntities } from '@/lib/services/files';
import type {
CreateDocumentInput,
UpdateDocumentInput,
ListDocumentsInput,
} from '@/lib/validators/documents';
// ─── Types ────────────────────────────────────────────────────────────────────
// ─── List ─────────────────────────────────────────────────────────────────────
import { documentWatchers as documentWatchersTable } from '@/lib/db/schema/documents';
const NON_SIGNATURE_TYPES = [
'welcome_letter',
'handover_checklist',
'acknowledgment',
'correspondence',
];
function buildHubTabFilters(
tab: ListDocumentsInput['tab'],
currentUserEmail: string | undefined,
): ReturnType<typeof and>[] {
const filters: ReturnType<typeof and>[] = [];
if (!tab || tab === 'all') return filters;
switch (tab) {
case 'in_progress':
// All document types currently in-flight — the everyday "what's in flight" view.
filters.push(
inArray(documents.status, ['draft', 'sent', 'partially_signed']),
sql`${documents.status} != 'expired'`,
);
break;
case 'eoi_queue':
// EOI documents currently in-flight (drafted, sent, or partially signed).
// Used by the dedicated tab on the documents hub to triage EOI signing
// pipeline volume separate from the all-doc-types view.
filters.push(eq(documents.documentType, 'eoi'));
filters.push(inArray(documents.status, ['draft', 'sent', 'partially_signed']));
break;
case 'awaiting_them':
// "awaiting them" = pending signers other than the current user.
// Without a known caller email we cannot make that distinction, so
// short-circuit to empty rather than silently widen the result set.
if (!currentUserEmail) {
filters.push(sql`1 = 0`);
break;
}
filters.push(inArray(documents.status, ['sent', 'partially_signed']));
filters.push(
exists(
db
.select({ x: sql`1` })
.from(documentSigners)
.where(
and(
eq(documentSigners.documentId, documents.id),
eq(documentSigners.status, 'pending'),
ne(documentSigners.signerEmail, currentUserEmail),
),
),
),
);
break;
case 'awaiting_me':
if (!currentUserEmail) {
// Without a current-user email there is no concept of "awaiting me"
filters.push(sql`1 = 0`);
break;
}
filters.push(
exists(
db
.select({ x: sql`1` })
.from(documentSigners)
.where(
and(
eq(documentSigners.documentId, documents.id),
eq(documentSigners.status, 'pending'),
eq(documentSigners.signerEmail, currentUserEmail),
),
),
),
);
break;
case 'completed':
filters.push(inArray(documents.status, ['completed', 'signed']));
break;
case 'expired':
// Either explicitly expired, or in-flight past their expiry date.
// (Documents schema doesn't yet have an `expires_at` column, so for
// now this is just status='expired' - extend when expiry lands.)
filters.push(eq(documents.status, 'expired'));
break;
}
return filters;
}
export interface ListDocumentsExtra {
/** Email of the calling user - used by hub tab filtering for "awaiting me". */
currentUserEmail?: string;
}
export async function listDocuments(
portId: string,
query: ListDocumentsInput,
extra: ListDocumentsExtra = {},
) {
const {
page,
limit,
sort,
order,
search,
interestId,
clientId,
documentType,
status,
tab,
watcherUserId,
signatureOnly,
sentSince,
sentUntil,
} = query;
const filters: ReturnType<typeof and>[] = [];
if (interestId) filters.push(eq(documents.interestId, interestId));
if (clientId) filters.push(eq(documents.clientId, clientId));
if (documentType) filters.push(eq(documents.documentType, documentType));
if (status) filters.push(eq(documents.status, status));
if (query.folderId !== undefined) {
if (query.folderId === null) {
filters.push(isNull(documents.folderId));
} else if (query.includeDescendants) {
const tree = await listTree(portId);
const descendantIds = collectDescendantIds(tree, query.folderId);
filters.push(inArray(documents.folderId, [query.folderId, ...descendantIds]));
} else {
filters.push(eq(documents.folderId, query.folderId));
}
// When viewing a specific folder, hide completed workflows — they surface
// via their resulting signed-PDF file row in the Files section, not the
// Signing section.
filters.push(ne(documents.status, 'completed'));
}
if (sentSince) filters.push(gte(documents.createdAt, new Date(sentSince)));
if (sentUntil) filters.push(lte(documents.createdAt, new Date(sentUntil)));
if (signatureOnly === true) {
filters.push(
sql`${documents.documentType} NOT IN ('welcome_letter','handover_checklist','acknowledgment','correspondence')`,
);
} else if (signatureOnly === false) {
// Pass-through, no extra filter needed.
}
if (watcherUserId) {
filters.push(
exists(
db
.select({ x: sql`1` })
.from(documentWatchersTable)
.where(
and(
eq(documentWatchersTable.documentId, documents.id),
eq(documentWatchersTable.userId, watcherUserId),
),
),
),
);
}
filters.push(...buildHubTabFilters(tab, extra.currentUserEmail));
void NON_SIGNATURE_TYPES;
void lt;
const sortColumn =
sort === 'title'
? documents.title
: sort === 'status'
? documents.status
: sort === 'documentType'
? documents.documentType
: documents.createdAt;
const result = await buildListQuery<typeof documents.$inferSelect>({
table: documents,
portIdColumn: documents.portId,
portId,
idColumn: documents.id,
updatedAtColumn: documents.updatedAt,
searchColumns: [documents.title],
searchTerm: search,
filters: filters.filter(Boolean) as Parameters<typeof buildListQuery>[0]['filters'],
sort: sort ? { column: sortColumn, direction: order } : undefined,
page,
pageSize: limit,
});
const hydrated = await hydrateDocumentsWithDownloadUrl(portId, result.data);
return { ...result, data: hydrated };
}
// ─── Download URL helpers ─────────────────────────────────────────────────────
/**
* Resolve the rep-facing download URL for a document. The URL embeds the
* folder path + filename for browser-tab / shared-link readability, but the
* route handler keys lookup off the doc id and validates the slug for truth
* — a hand-edited URL with the wrong path 404s instead of silently serving
* the wrong file.
*
* Pass the resolved folder tree once per request and call this for each doc
* so we don't refetch the tree per row. Returns `null` when the document has
* no attached file (signature-only docs pre-completion); UI consumers branch
* on that to decide whether to render the download affordance.
*/
export function buildDocumentDownloadUrl(
doc: { id: string; folderId: string | null; filename: string | null },
folderTree: readonly FolderNode[],
): string | null {
if (!doc.filename) return null;
const segments: string[] = [];
if (doc.folderId) {
const path = findFolderPath(folderTree, doc.folderId);
for (const node of path) segments.push(encodeURIComponent(node.name));
}
segments.push(encodeURIComponent(doc.filename));
return `/api/v1/documents/${doc.id}/download/${segments.join('/')}`;
}
/**
* Walk the folder tree to materialize the ancestor chain that ends at
* `id`. Returns roots-first; empty when the id is missing (orphan
* `folder_id` pointer — see listTree's intentional silent drop).
*/
export function findFolderPath(tree: readonly FolderNode[], id: string): FolderNode[] {
for (const node of tree) {
if (node.id === id) return [node];
const inChild = findFolderPath(node.children, id);
if (inChild.length > 0) return [node, ...inChild];
}
return [];
}
type DocumentRow = typeof documents.$inferSelect;
type DocumentRowWithDownload = DocumentRow & { downloadUrl: string | null };
async function hydrateDocumentsWithDownloadUrl(
portId: string,
rows: DocumentRow[],
): Promise<DocumentRowWithDownload[]> {
if (rows.length === 0) return [];
const fileIds = Array.from(
new Set(rows.map((r) => r.fileId).filter((v): v is string => v !== null)),
);
const filenameById = new Map<string, string | null>();
if (fileIds.length > 0) {
const fileRows = await db
.select({ id: files.id, filename: files.filename })
.from(files)
.where(and(inArray(files.id, fileIds), eq(files.portId, portId)));
for (const f of fileRows) filenameById.set(f.id, f.filename);
}
const tree = await listTree(portId);
return rows.map((row) => {
const filename = row.fileId ? (filenameById.get(row.fileId) ?? null) : null;
return {
...row,
downloadUrl: buildDocumentDownloadUrl({ id: row.id, folderId: row.folderId, filename }, tree),
};
});
}
// ─── Deal docs for a berth ────────────────────────────────────────────────────
export interface BerthDealDoc {
id: string;
title: string;
documentType: string;
status: string;
createdAt: Date;
interestId: string;
}
/**
* Documents attached to any interest currently linked to this berth via
* `interest_berths`. Used by the Deal Documents tab on the berth detail
* page so reps can see EOIs / contracts / etc. associated with active
* prospects on this slip without leaving the page.
*
* Read-only; visibility piggybacks on the interest tenancy (the
* permission-gate on the berth page guards entry, and we only return
* documents for interests in the same port). Edits / sends still happen
* from the interest's own page.
*/
export async function listDealDocumentsForBerth(
portId: string,
berthId: string,
): Promise<BerthDealDoc[]> {
const rows = await db
.select({
id: documents.id,
title: documents.title,
documentType: documents.documentType,
status: documents.status,
createdAt: documents.createdAt,
interestId: documents.interestId,
})
.from(documents)
.innerJoin(interestBerths, eq(interestBerths.interestId, documents.interestId))
.innerJoin(interests, eq(interests.id, documents.interestId))
.where(
and(
eq(interestBerths.berthId, berthId),
eq(documents.portId, portId),
eq(interests.portId, portId),
),
)
.orderBy(sql`${documents.createdAt} DESC`);
return rows
.filter((r): r is typeof r & { interestId: string } => Boolean(r.interestId))
.map((r) => ({
id: r.id,
title: r.title,
documentType: r.documentType,
status: r.status,
createdAt: r.createdAt,
interestId: r.interestId,
}));
}
// ─── Get by ID ────────────────────────────────────────────────────────────────
export async function getDocumentById(id: string, portId: string) {
const doc = await db.query.documents.findFirst({
where: and(eq(documents.id, id), eq(documents.portId, portId)),
with: { signers: true },
});
if (!doc) throw new NotFoundError('Document');
let filename: string | null = null;
if (doc.fileId) {
const fileRow = await db
.select({ filename: files.filename })
.from(files)
.where(and(eq(files.id, doc.fileId), eq(files.portId, portId)))
.limit(1)
.then((r) => r[0]);
filename = fileRow?.filename ?? null;
}
const tree = await listTree(portId);
const downloadUrl = buildDocumentDownloadUrl(
{ id: doc.id, folderId: doc.folderId, filename },
tree,
);
return { ...doc, downloadUrl };
}
/**
* Reject any subject FK (clientId / interestId / companyId / yachtId /
* reservationId) that points at a row outside the caller's port. Without
* this guard, a port-A user could create a document whose subject is a
* port-B client and then exfiltrate the foreign client's name + email
* via sendForSigning's Documenso payload, or via the local watcher /
* notification surfaces that hydrate the linked entity.
*/
async function assertSubjectFksInPort(
portId: string,
fks: {
clientId?: string | null;
interestId?: string | null;
companyId?: string | null;
yachtId?: string | null;
reservationId?: string | null;
},
): Promise<void> {
const checks: Array<Promise<void>> = [];
if (fks.clientId) {
checks.push(
db.query.clients
.findFirst({ where: and(eq(clients.id, fks.clientId), eq(clients.portId, portId)) })
.then((row) => {
if (!row) throw new ValidationError('clientId not found in this port');
}),
);
}
if (fks.interestId) {
checks.push(
db.query.interests
.findFirst({
where: and(eq(interests.id, fks.interestId), eq(interests.portId, portId)),
})
.then((row) => {
if (!row) throw new ValidationError('interestId not found in this port');
}),
);
}
if (fks.companyId) {
checks.push(
db.query.companies
.findFirst({
where: and(eq(companies.id, fks.companyId), eq(companies.portId, portId)),
})
.then((row) => {
if (!row) throw new ValidationError('companyId not found in this port');
}),
);
}
if (fks.yachtId) {
checks.push(
db.query.yachts
.findFirst({ where: and(eq(yachts.id, fks.yachtId), eq(yachts.portId, portId)) })
.then((row) => {
if (!row) throw new ValidationError('yachtId not found in this port');
}),
);
}
if (fks.reservationId) {
checks.push(
db.query.berthReservations
.findFirst({
where: and(
eq(berthReservations.id, fks.reservationId),
eq(berthReservations.portId, portId),
),
})
.then((row) => {
if (!row) throw new ValidationError('reservationId not found in this port');
}),
);
}
await Promise.all(checks);
}
/**
* Reject watchers whose user does not have access to the document's port.
* Without this guard, a document watcher row could be created for a user
* outside the document's tenant; subsequent notifyDocumentEvent emits a
* socket toast + email to that user revealing the document's title.
* Super-admins are always allowed (they can watch anything).
*/
async function assertWatchersInPort(portId: string, userIds: string[]): Promise<void> {
if (userIds.length === 0) return;
const unique = [...new Set(userIds)];
// Super-admins bypass the port check.
const profiles = await db.query.userProfiles.findMany({
where: inArray(userProfiles.userId, unique),
});
const superAdmins = new Set(profiles.filter((p) => p.isSuperAdmin).map((p) => p.userId));
const needsCheck = unique.filter((u) => !superAdmins.has(u));
if (needsCheck.length === 0) return;
const roles = await db
.select({ userId: userPortRoles.userId })
.from(userPortRoles)
.where(and(inArray(userPortRoles.userId, needsCheck), eq(userPortRoles.portId, portId)));
const allowed = new Set(roles.map((r) => r.userId));
const denied = needsCheck.filter((u) => !allowed.has(u));
if (denied.length > 0) {
throw new ValidationError('One or more watchers do not have access to this port');
}
}
// ─── Create ───────────────────────────────────────────────────────────────────
export async function createDocument(portId: string, data: CreateDocumentInput, meta: AuditMeta) {
await assertSubjectFksInPort(portId, {
clientId: data.clientId,
interestId: data.interestId,
});
const [doc] = await db
.insert(documents)
.values({
portId,
interestId: data.interestId ?? null,
clientId: data.clientId ?? null,
folderId: data.folderId ?? null,
documentType: data.documentType,
title: data.title,
notes: data.notes ?? null,
status: 'draft',
createdBy: meta.userId,
})
.returning();
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'document',
entityId: doc!.id,
newValue: { documentType: doc!.documentType, title: doc!.title },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:created', { documentId: doc!.id });
return doc!;
}
// ─── Update ───────────────────────────────────────────────────────────────────
export async function updateDocument(
id: string,
portId: string,
data: UpdateDocumentInput,
meta: AuditMeta,
) {
const existing = await getDocumentById(id, portId);
const updates: Partial<typeof documents.$inferInsert> = {};
if (data.title !== undefined) updates.title = data.title;
if (data.notes !== undefined) updates.notes = data.notes;
if (data.status !== undefined) updates.status = data.status;
updates.updatedAt = new Date();
const [updated] = await db
.update(documents)
.set(updates)
.where(and(eq(documents.id, id), eq(documents.portId, portId)))
.returning();
diffEntity(existing, updated!);
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'document',
entityId: id,
oldValue: existing as unknown as Record<string, unknown>,
newValue: updated as unknown as Record<string, unknown>,
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:updated', { documentId: id });
return updated!;
}
// ─── Delete ───────────────────────────────────────────────────────────────────
export async function deleteDocument(id: string, portId: string, meta: AuditMeta) {
const existing = await getDocumentById(id, portId);
if (['sent', 'partially_signed'].includes(existing.status)) {
throw new ConflictError('Cannot delete a document that is currently in signing process');
}
await db.delete(documents).where(and(eq(documents.id, id), eq(documents.portId, portId)));
void createAuditLog({
userId: meta.userId,
portId,
action: 'delete',
entityType: 'document',
entityId: id,
oldValue: { title: existing.title, status: existing.status },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:deleted', { documentId: id });
}
// ─── Send for Signing (BR-021) ────────────────────────────────────────────────
export async function sendForSigning(documentId: string, portId: string, meta: AuditMeta) {
const doc = await getDocumentById(documentId, portId);
if (!doc.fileId) throw new ValidationError('Document has no associated file');
if (doc.status !== 'draft') throw new ConflictError('Document is not in draft status');
// Fetch interest + client to build signers. Filter by portId in addition
// to the FK so that even if a stale or maliciously-set subject FK on the
// document points at a foreign-port row, this signing flow refuses to
// hydrate (and therefore refuses to ship to Documenso) data from outside
// the caller's tenant.
const interest = doc.interestId
? await db.query.interests.findFirst({
where: and(eq(interests.id, doc.interestId), eq(interests.portId, portId)),
})
: null;
const client = doc.clientId
? await db.query.clients.findFirst({
where: and(eq(clients.id, doc.clientId), eq(clients.portId, portId)),
with: { contacts: true },
})
: null;
if (!client) throw new ValidationError('Document has no associated client');
const emailContact = (
client.contacts as Array<{ channel: string; value: string }> | undefined
)?.find((c) => c.channel === 'email');
if (!emailContact?.value) throw new ValidationError('Client has no email contact');
const port = await db.query.ports.findFirst({ where: eq(ports.id, portId) });
if (!port) throw new NotFoundError('Port');
// Resolve port-configured signer emails from system settings; fall back to
// legacy defaults only if the setting is absent. Fabricated slug-based
// addresses (developer@{slug}.com) are no longer used here because they
// never match real port users and cause silent no-ops in handleRecipientSigned.
const eoiSigners = await getPortEoiSigners(portId);
// BR-021: Create 3 signers - client (1), developer (2), sales/approver (3)
const signerRecords = await db
.insert(documentSigners)
.values([
{
documentId,
signerName: client.fullName,
signerEmail: emailContact.value,
signerRole: 'client',
signingOrder: 1,
status: 'pending',
},
{
documentId,
signerName: eoiSigners.developer.name,
signerEmail: eoiSigners.developer.email,
signerRole: 'developer',
signingOrder: 2,
status: 'pending',
},
{
documentId,
signerName: eoiSigners.approver.name,
signerEmail: eoiSigners.approver.email,
signerRole: 'approver',
signingOrder: 3,
status: 'pending',
},
])
.returning();
// Get file from MinIO and base64 encode
const fileRecord = await db.query.files.findFirst({ where: eq(files.id, doc.fileId) });
if (!fileRecord) throw new NotFoundError('File');
const fileStream = await (await getStorageBackend()).get(fileRecord.storagePath);
const chunks: Buffer[] = [];
for await (const chunk of fileStream) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
const pdfBuffer = Buffer.concat(chunks);
const pdfBase64 = pdfBuffer.toString('base64');
// Read per-port v2 signing settings (PARALLEL/SEQUENTIAL + redirect URL).
// Both are optional — passing undefined yields v1's legacy behavior.
const docCfg = await getPortDocumensoConfig(portId);
// Create document in Documenso + send. portId is required for the v2
// envelope/create code path (which routes by per-port apiVersion);
// meta.signingOrder is honoured only on v2 instances.
const documensoDoc = await documensoCreate(
doc.title,
pdfBase64,
[
{ name: client.fullName, email: emailContact.value, role: 'SIGNER', signingOrder: 1 },
{
name: eoiSigners.developer.name,
email: eoiSigners.developer.email,
role: 'SIGNER',
signingOrder: 2,
},
{
name: eoiSigners.approver.name,
email: eoiSigners.approver.email,
role: 'SIGNER',
signingOrder: 3,
},
],
portId,
{
...(docCfg.signingOrder ? { signingOrder: docCfg.signingOrder } : {}),
...(docCfg.redirectUrl ? { redirectUrl: docCfg.redirectUrl } : {}),
},
);
await documensoSend(documensoDoc.id, portId);
// Update signer records with signing URLs from Documenso response
for (const docSigner of documensoDoc.recipients) {
const localSigner = signerRecords.find((s) => s.signerEmail === docSigner.email);
if (localSigner) {
await db
.update(documentSigners)
.set({
signingUrl: docSigner.signingUrl ?? null,
embeddedUrl: docSigner.embeddedUrl ?? null,
})
.where(eq(documentSigners.id, localSigner.id));
}
}
// Update document status
await db
.update(documents)
.set({ status: 'sent', documensoId: documensoDoc.id, updatedAt: new Date() })
.where(eq(documents.id, documentId));
// Update interest if linked
if (interest) {
await db
.update(interests)
.set({
documensoId: documensoDoc.id,
dateEoiSent: new Date(),
eoiStatus: 'waiting_for_signatures',
updatedAt: new Date(),
})
.where(eq(interests.id, interest.id));
// Trigger berth rules
void evaluateRule('eoi_sent', interest.id, portId, meta);
// Advance pipeline stage to eoi_sent (no-op if already further along).
void advanceStageIfBehind(interest.id, portId, 'eoi_sent', meta, 'EOI sent for signing');
// G-C5: reservation agreements drive the contract_sent stage. The EOI
// and contract flows share `sendForSigning`, so we differentiate by
// documentType here rather than splitting the entry point.
if (doc.documentType === 'reservation_agreement') {
void advanceStageIfBehind(
interest.id,
portId,
'contract_sent',
meta,
'Reservation agreement sent',
);
}
}
// Create document event
await db.insert(documentEvents).values({
documentId,
eventType: 'sent',
eventData: { documensoId: documensoDoc.id },
});
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'document',
entityId: documentId,
newValue: { status: 'sent', documensoId: documensoDoc.id },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:sent', {
documentId,
type: doc.documentType,
signerCount: 3,
documensoId: documensoDoc.id,
});
return await getDocumentById(documentId, portId);
}
// ─── Upload Signed Manually (BR-013) ─────────────────────────────────────────
export async function uploadSignedManually(
documentId: string,
portId: string,
fileData: { buffer: Buffer; originalName: string; mimeType: string; size: number },
meta: AuditMeta,
) {
const doc = await getDocumentById(documentId, portId);
const port = await db.query.ports.findFirst({ where: eq(ports.id, portId) });
if (!port) throw new NotFoundError('Port');
// Store the signed file
const fileId = crypto.randomUUID();
const storagePath = buildStoragePath(port.slug, 'eoi-signed', documentId, fileId, 'pdf');
await (
await getStorageBackend()
).put(storagePath, fileData.buffer, {
contentType: fileData.mimeType,
sizeBytes: fileData.size,
});
const [fileRecord] = await db
.insert(files)
.values({
portId,
clientId: doc.clientId ?? null,
filename: fileData.originalName,
originalName: fileData.originalName,
mimeType: fileData.mimeType,
sizeBytes: String(fileData.size),
storagePath,
storageBucket: env.MINIO_BUCKET,
category: 'eoi',
uploadedBy: meta.userId,
})
.returning();
// Update document
await db
.update(documents)
.set({
signedFileId: fileRecord!.id,
status: 'completed',
isManualUpload: true,
updatedAt: new Date(),
})
.where(eq(documents.id, documentId));
// Update interest if linked and type is eoi
if (doc.interestId && doc.documentType === 'eoi') {
const interest = await db.query.interests.findFirst({
where: eq(interests.id, doc.interestId),
});
await db
.update(interests)
.set({ eoiStatus: 'signed', dateEoiSigned: new Date(), updatedAt: new Date() })
.where(eq(interests.id, doc.interestId));
if (interest) {
void evaluateRule('eoi_signed', doc.interestId, portId, meta);
// Advance to eoi_signed (no-op if already past it).
void advanceStageIfBehind(
doc.interestId,
portId,
'eoi_signed',
meta,
'Signed EOI uploaded manually',
);
}
}
await db.insert(documentEvents).values({
documentId,
eventType: 'completed',
eventData: { isManualUpload: true, fileId: fileRecord!.id },
});
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'document',
entityId: documentId,
newValue: { status: 'completed', isManualUpload: true },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:completed', { documentId });
// Notify creator about manual completion
void import('@/lib/services/notifications.service').then(({ createNotification }) =>
createNotification({
portId,
userId: meta.userId,
type: 'document_signed',
title: 'Document marked as signed',
description: `"${doc.title}" has been manually uploaded as signed`,
link: `/documents/${documentId}`,
entityType: 'document',
entityId: documentId,
dedupeKey: `document:${documentId}:completed`,
}),
);
return await getDocumentById(documentId, portId);
}
// ─── List Signers ─────────────────────────────────────────────────────────────
export async function listDocumentSigners(documentId: string, portId: string) {
await getDocumentById(documentId, portId); // verify access
return db.query.documentSigners.findMany({
where: eq(documentSigners.documentId, documentId),
orderBy: (ds, { asc }) => [asc(ds.signingOrder)],
});
}
// ─── List Events ──────────────────────────────────────────────────────────────
export async function listDocumentEvents(documentId: string, portId: string) {
await getDocumentById(documentId, portId); // verify access
return db.query.documentEvents.findMany({
where: eq(documentEvents.documentId, documentId),
orderBy: (de, { desc }) => [desc(de.createdAt)],
});
}
// ─── Webhook Handlers ─────────────────────────────────────────────────────────
/**
* Shared port-scoped lookup for inbound Documenso webhooks. Two ports
* sharing a Documenso instance — or migrating between instances with
* documentId reuse — would otherwise let `findFirst` return whichever
* row sorts first across tenants. When the route resolves a portId from
* the matched per-port webhook secret it threads it here; otherwise we
* fall back to a port-agnostic `findMany` and refuse to mutate when the
* lookup is ambiguous (mirrors the guard in handleDocumentExpired).
*
* Returns null when no document matches OR when the lookup is ambiguous
* across multiple ports without a resolved portId. Callers must treat
* null as "drop the event" (the cron sweep / next webhook will catch up
* once the data is consistent).
*/
async function resolveWebhookDocument(
documensoId: string,
portId: string | undefined,
): Promise<typeof documents.$inferSelect | null> {
if (portId) {
const doc = await db.query.documents.findFirst({
where: and(eq(documents.documensoId, documensoId), eq(documents.portId, portId)),
});
if (!doc) {
logger.warn({ documensoId, portId }, 'Document not found for webhook (port-scoped)');
return null;
}
return doc;
}
const matches = await db.query.documents.findMany({
where: eq(documents.documensoId, documensoId),
});
if (matches.length === 0) {
logger.warn({ documensoId }, 'Document not found for webhook');
return null;
}
if (matches.length > 1) {
logger.error(
{ documensoId, matchCount: matches.length, ports: matches.map((m) => m.portId) },
'Documenso webhook ambiguous across multiple ports — refusing to mutate',
);
return null;
}
return matches[0]!;
}
export async function handleRecipientSigned(eventData: {
documentId: string;
recipientEmail: string;
signatureHash?: string;
portId?: string;
}) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
// Update signer status
const [signer] = await db
.update(documentSigners)
.set({ status: 'signed', signedAt: new Date() })
.where(
and(
eq(documentSigners.documentId, doc.id),
eq(documentSigners.signerEmail, eventData.recipientEmail),
),
)
.returning();
if (!signer) {
// Email mismatch: the address Documenso has on the recipient doesn't match
// any row in documentSigners. This happens when the local signers were
// created with fabricated / stale addresses. Log a warning so operators can
// investigate and fix the port's eoi_signers system setting.
logger.warn(
{
documensoId: eventData.documentId,
documentId: doc.id,
recipientEmail: eventData.recipientEmail,
},
'handleRecipientSigned: no matching signer row for recipient email - ' +
'check eoi_signers system setting for this port',
);
}
// Update document to partially_signed if eoi type
if (doc.documentType === 'eoi' && doc.status === 'sent') {
await db
.update(documents)
.set({ status: 'partially_signed', updatedAt: new Date() })
.where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId)));
}
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'signed',
signerId: signer?.id ?? null,
signatureHash: eventData.signatureHash ?? null,
eventData: { recipientEmail: eventData.recipientEmail },
});
emitToRoom(`port:${doc.portId}`, 'document:signer:signed', {
documentId: doc.id,
signerEmail: eventData.recipientEmail,
});
}
// ─── Owner-wins resolution ────────────────────────────────────────────────────
interface ResolvedOwner {
entityType: EntityType;
entityId: string;
}
/**
* Owner-wins owner resolution chain — see spec §"Routing on workflow
* completion" §3a. Returns the first non-null candidate in priority
* order: direct client/company/yacht FK on the document, then via the
* linked interest's client / yacht FK. The interests table has no
* companyId (per schema), so the company branch is omitted from the
* interest fallback. Returns null when no owner is resolvable.
*/
async function resolveDocumentOwner(
portId: string,
doc: {
clientId: string | null;
companyId: string | null;
yachtId: string | null;
interestId: string | null;
},
): Promise<ResolvedOwner | null> {
if (doc.clientId) return { entityType: 'client', entityId: doc.clientId };
if (doc.companyId) return { entityType: 'company', entityId: doc.companyId };
if (doc.yachtId) return { entityType: 'yacht', entityId: doc.yachtId };
if (doc.interestId) {
// interests.clientId is NOT NULL — if the interest row exists, the
// client owner is always resolvable through it. The yacht-only path
// would require relaxing the schema constraint first.
const interest = await db.query.interests.findFirst({
where: and(eq(interests.id, doc.interestId), eq(interests.portId, portId)),
columns: { clientId: true },
});
if (interest?.clientId) {
return { entityType: 'client', entityId: interest.clientId };
}
}
return null;
}
export async function handleDocumentCompleted(eventData: { documentId: string; portId?: string }) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
// A1: Idempotency gate. Documenso retries DOCUMENT_COMPLETED on receiver
// 5xx (and the poll worker also reconciles). Without this guard, a second
// delivery re-runs downloadSignedPdf + storage.put + db.insert(files) and
// then clobbers the previous signedFileId on the UPDATE — leaking the
// first file as an orphan blob with no DB pointer. Once we have a signed
// file id we are done.
if (doc.status === 'completed' && doc.signedFileId) return;
// BR-022: Download signed PDF and store in MinIO
const port = await db.query.ports.findFirst({ where: eq(ports.id, doc.portId) });
if (!port) {
logger.error({ portId: doc.portId }, 'Port not found during document completion');
return;
}
// Tracked outside the try so the catch can attempt a compensating
// storage.delete if we put a blob but failed to commit the DB rows.
// Without this, partial failures left an orphan blob in storage with
// no DB pointer (the audit's orphan-blob CRITICAL).
let putStoragePath: string | null = null;
try {
const signedPdfBuffer = await downloadSignedPdf(eventData.documentId);
// Guard: a 0-byte response from Documenso would otherwise persist a
// permanent corrupt signedFileId pointing at a blob with no content.
// Refuse and let the next retry / poll-worker pass re-fetch.
if (signedPdfBuffer.length === 0) {
throw new CodedError('DOCUMENSO_EMPTY_PDF', {
internalMessage: `Documenso returned 0-byte signed PDF for documensoId=${eventData.documentId}`,
});
}
const fileId = crypto.randomUUID();
const storagePath = buildStoragePath(port.slug, 'eoi-signed', doc.id, fileId, 'pdf');
await (
await getStorageBackend()
).put(storagePath, signedPdfBuffer, {
contentType: 'application/pdf',
sizeBytes: signedPdfBuffer.length,
});
putStoragePath = storagePath;
// Resolve owner via the Owner-wins chain. The signed PDF lands in
// this owner's auto-created entity subfolder (or at root if no owner).
const owner = await resolveDocumentOwner(doc.portId, doc);
let entityFolderId: string | null = null;
if (owner) {
try {
const folder = await ensureEntityFolder(
doc.portId,
owner.entityType,
owner.entityId,
'system',
);
entityFolderId = folder.id;
} catch (err) {
// Folder creation is best-effort — signed file still lands at root.
// Logged at warn level: missing entity folder is recoverable via
// the backfill script.
logger.warn(
{ err, documentId: doc.id, owner },
'ensureEntityFolder failed during document completion',
);
}
}
// Atomic: the files row + the documents.signedFileId pointer + the
// reservation contract mirror commit together. If any throws, the
// outer catch fires storage.delete on the orphan blob.
const fileRecord = await db.transaction(async (tx) => {
const [inserted] = await tx
.insert(files)
.values({
portId: doc.portId,
clientId: owner?.entityType === 'client' ? owner.entityId : (doc.clientId ?? null),
companyId: owner?.entityType === 'company' ? owner.entityId : (doc.companyId ?? null),
yachtId: owner?.entityType === 'yacht' ? owner.entityId : (doc.yachtId ?? null),
folderId: entityFolderId,
filename: `signed-${doc.id}.pdf`,
originalName: `signed-${doc.id}.pdf`,
mimeType: 'application/pdf',
sizeBytes: String(signedPdfBuffer.length),
storagePath,
storageBucket: env.MINIO_BUCKET,
category: 'eoi',
uploadedBy: 'system',
})
.returning();
if (!inserted) {
throw new Error('files.insert returned no row');
}
await tx
.update(documents)
.set({ status: 'completed', signedFileId: inserted.id, updatedAt: new Date() })
.where(eq(documents.id, doc.id));
// Reservation agreements mirror their signed PDF onto
// berth_reservations.contractFileId so the portal "My Reservations" view
// can resolve the contract without joining through documents.
if (doc.documentType === 'reservation_agreement' && doc.reservationId) {
const { berthReservations } = await import('@/lib/db/schema/reservations');
await tx
.update(berthReservations)
.set({ contractFileId: inserted.id, updatedAt: new Date() })
.where(eq(berthReservations.id, doc.reservationId));
}
return inserted;
});
// Mark as durably committed BEFORE side effects so the catch block
// doesn't try to undo a blob whose DB pointer just landed.
putStoragePath = null;
// Audit log: the webhook just minted a new signed-PDF file row owned by
// 'system'. Without this entry the file appears in the aggregated view
// with no provenance trail; auditors need to see who/what wrote it.
void createAuditLog({
userId: 'system',
portId: doc.portId,
action: 'create',
entityType: 'file',
entityId: fileRecord.id,
newValue: {
filename: fileRecord.filename,
mimeType: 'application/pdf',
size: signedPdfBuffer.length,
documentId: doc.id,
source: 'documenso_completion',
},
ipAddress: '0.0.0.0',
userAgent: 'webhook',
});
// G-C5: reservation agreement signing-complete → contract_signed.
// Fired here (not below in the eoi-only branch) so contract pipeline
// tracks reality the same way EOIs do via the eoi_signed advance.
if (doc.documentType === 'reservation_agreement' && doc.interestId) {
const systemMeta: AuditMeta = {
userId: 'system',
portId: doc.portId,
ipAddress: '0.0.0.0',
userAgent: 'webhook',
};
void advanceStageIfBehind(
doc.interestId,
doc.portId,
'contract_signed',
systemMeta,
'Reservation agreement signed',
);
// Dynamic import mirrors the eoi_signed pattern below to avoid the
// berth-rules-engine module-cycle risk during cold-start.
void import('@/lib/services/berth-rules-engine').then(({ evaluateRule }) =>
evaluateRule('contract_signed', doc.interestId!, doc.portId, systemMeta),
);
}
} catch (err) {
logger.error(
{ err, documentId: doc.id, portId: doc.portId },
'Failed to download/store signed PDF',
);
// Compensating delete: storage.put landed but the DB commit didn't.
// Without this the blob lives forever with no row pointing at it.
if (putStoragePath) {
try {
await (await getStorageBackend()).delete(putStoragePath);
logger.info(
{ documentId: doc.id, storagePath: putStoragePath },
'Compensating storage.delete after failed signed-PDF commit',
);
} catch (compErr) {
// We tried — log so a human can clean up the orphan if needed.
logger.error(
{ compErr, documentId: doc.id, storagePath: putStoragePath },
'Compensating storage.delete also failed — manual cleanup required',
);
}
}
// Critical: do NOT set documents.status = 'completed' on failure.
// The previous catch block did — which created the "completed-with-
// no-signedFileId" zombie state the audit flagged. Let the next
// Documenso retry (or our poll-worker reconciliation) re-attempt;
// the early-return idempotency gate at the top requires BOTH
// status='completed' AND signedFileId so re-runs are safe.
}
// Update interest if eoi type
if (doc.interestId && doc.documentType === 'eoi') {
const interest = await db.query.interests.findFirst({
where: eq(interests.id, doc.interestId),
});
await db
.update(interests)
.set({ eoiStatus: 'signed', dateEoiSigned: new Date(), updatedAt: new Date() })
.where(eq(interests.id, doc.interestId));
if (interest) {
const systemMeta: AuditMeta = {
userId: 'system',
portId: doc.portId,
ipAddress: '0.0.0.0',
userAgent: 'webhook',
};
// Guard against double-fire: DOCUMENT_COMPLETED may arrive multiple times
// (webhook retries) or follow a DOCUMENT_SIGNED that already advanced the
// stage. advanceStageIfBehind handles the pipeline guard internally, but
// evaluateRule has no idempotency - skip it if the interest is already at
// eoi_signed or beyond to prevent duplicate berth-rule side effects.
const currentStageIdx = PIPELINE_STAGES.indexOf(
interest.pipelineStage as (typeof PIPELINE_STAGES)[number],
);
const eoiSignedIdx = PIPELINE_STAGES.indexOf('eoi_signed');
if (currentStageIdx < eoiSignedIdx) {
void evaluateRule('eoi_signed', doc.interestId, doc.portId, systemMeta);
}
// Advance to eoi_signed (no-op if interest already past it).
void advanceStageIfBehind(
doc.interestId,
doc.portId,
'eoi_signed',
systemMeta,
'EOI signed via Documenso',
);
}
}
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'completed',
eventData: { documensoId: eventData.documentId },
});
emitToRoom(`port:${doc.portId}`, 'document:completed', { documentId: doc.id });
// Notify the document creator about completion
if (doc.createdBy && doc.createdBy !== 'system') {
void import('@/lib/services/notifications.service').then(({ createNotification }) =>
createNotification({
portId: doc.portId,
userId: doc.createdBy!,
type: 'document_signed',
title: 'Document fully signed',
description: `"${doc.title}" has been signed by all parties`,
link: `/documents/${doc.id}`,
entityType: 'document',
entityId: doc.id,
dedupeKey: `document:${doc.id}:completed`,
}),
);
}
}
export async function handleDocumentExpired(eventData: { documentId: string; portId?: string }) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
await db
.update(documents)
.set({ status: 'expired', updatedAt: new Date() })
.where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId)));
if (doc.interestId && doc.documentType === 'eoi') {
await db
.update(interests)
.set({ eoiStatus: 'expired', updatedAt: new Date() })
.where(and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId)));
}
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'expired',
eventData: { documensoId: eventData.documentId },
});
emitToRoom(`port:${doc.portId}`, 'document:expired', { documentId: doc.id });
}
export async function handleDocumentOpened(eventData: {
documentId: string;
recipientEmail: string;
signatureHash?: string;
portId?: string;
}) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
const [signer] = await db
.select()
.from(documentSigners)
.where(
and(
eq(documentSigners.documentId, doc.id),
eq(documentSigners.signerEmail, eventData.recipientEmail),
),
);
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'viewed',
signerId: signer?.id ?? null,
signatureHash: eventData.signatureHash ?? null,
eventData: { recipientEmail: eventData.recipientEmail },
});
emitToRoom(`port:${doc.portId}`, 'document:signer:opened', {
documentId: doc.id,
signerEmail: eventData.recipientEmail,
});
}
export async function handleDocumentRejected(eventData: {
documentId: string;
recipientEmail?: string;
signatureHash?: string;
portId?: string;
}) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
let signerId: string | null = null;
if (eventData.recipientEmail) {
const [signer] = await db
.update(documentSigners)
.set({ status: 'declined' })
.where(
and(
eq(documentSigners.documentId, doc.id),
eq(documentSigners.signerEmail, eventData.recipientEmail),
),
)
.returning();
signerId = signer?.id ?? null;
}
await db
.update(documents)
.set({ status: 'rejected', updatedAt: new Date() })
.where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId)));
if (doc.interestId && doc.documentType === 'eoi') {
await db
.update(interests)
.set({ eoiStatus: 'rejected', updatedAt: new Date() })
.where(and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId)));
}
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'rejected',
signerId,
signatureHash: eventData.signatureHash ?? null,
eventData: { recipientEmail: eventData.recipientEmail ?? null },
});
emitToRoom(`port:${doc.portId}`, 'document:rejected', {
documentId: doc.id,
signerEmail: eventData.recipientEmail ?? null,
});
}
export async function handleDocumentCancelled(eventData: {
documentId: string;
signatureHash?: string;
portId?: string;
}) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
await db
.update(documents)
.set({ status: 'cancelled', updatedAt: new Date() })
.where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId)));
if (doc.interestId && doc.documentType === 'eoi') {
await db
.update(interests)
.set({ eoiStatus: 'cancelled', updatedAt: new Date() })
.where(and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId)));
}
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'cancelled',
signatureHash: eventData.signatureHash ?? null,
eventData: { documensoId: eventData.documentId },
});
emitToRoom(`port:${doc.portId}`, 'document:cancelled', { documentId: doc.id });
}
// ─── Phase A: hub + wizard surface (PR1 skeletons; bodies land in PRs 4-6) ────
export interface DocumentDetailWatcher {
userId: string;
addedBy: string;
addedAt: Date;
}
export interface DocumentDetail {
document: typeof documents.$inferSelect;
signers: (typeof documentSigners.$inferSelect)[];
events: (typeof documentEvents.$inferSelect)[];
watchers: DocumentDetailWatcher[];
}
/**
* Single-roundtrip aggregator for the document detail page (PR5).
* Returns the document plus all signers, events (newest first), and watchers.
* Throws NotFoundError if the document is not in `portId`.
*/
export async function getDocumentDetail(id: string, portId: string): Promise<DocumentDetail> {
const document = await getDocumentById(id, portId);
const [signers, events, watchers] = await Promise.all([
db.query.documentSigners.findMany({
where: eq(documentSigners.documentId, id),
orderBy: (ds, { asc }) => [asc(ds.signingOrder)],
}),
db.query.documentEvents.findMany({
where: eq(documentEvents.documentId, id),
orderBy: (de, { desc }) => [desc(de.createdAt)],
}),
db
.select({
userId: documentWatchers.userId,
addedBy: documentWatchers.addedBy,
addedAt: documentWatchers.addedAt,
})
.from(documentWatchers)
.where(eq(documentWatchers.documentId, id)),
]);
return { document, signers, events, watchers };
}
/**
* User-initiated cancel of an in-flight document. Voids the doc in Documenso
* (when present), updates DB status, logs an event, emits socket. Webhook
* receiver also handles documenso-initiated cancellations via
* `handleDocumentCancelled`.
*
* The actual Documenso void call lands in PR2 (`documenso-client.voidDocument`);
* this skeleton updates DB state only.
*/
export async function cancelDocument(
documentId: string,
portId: string,
meta: AuditMeta,
): Promise<typeof documents.$inferSelect> {
const existing = await getDocumentById(documentId, portId);
if (['completed', 'cancelled', 'rejected'].includes(existing.status)) {
throw new ConflictError(`Document is already ${existing.status}`);
}
// CRM is the system of record for cancellation status. A transient
// Documenso failure shouldn't block the user from marking the doc cancelled
// here - voidDocument already treats 404 as success, and the periodic
// webhook receiver will reconcile if the remote void eventually lands.
if (existing.documensoId) {
try {
await documensoVoid(existing.documensoId, portId);
} catch (err) {
logger.warn(
{ err, documentId, documensoId: existing.documensoId },
'Documenso void failed; cancelling locally anyway',
);
}
}
const [updated] = await db
.update(documents)
.set({ status: 'cancelled', updatedAt: new Date() })
.where(and(eq(documents.id, documentId), eq(documents.portId, portId)))
.returning();
await db.insert(documentEvents).values({
documentId,
eventType: 'cancelled',
eventData: { initiatedBy: meta.userId },
});
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'document',
entityId: documentId,
oldValue: { status: existing.status },
newValue: { status: 'cancelled' },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:cancelled', { documentId });
return updated!;
}
/**
* Returns prefilled email composer payload for the "Email signed PDF to all
* signatories" action on the document detail page.
*
* Available for `status='completed' && signedFileId !== null`.
*
* Body content (from per-port `signed_doc_completion` template), full
* sender-resolution, and watcher-cc helpers land alongside PR8 (email
* composer with attachments). For PR1 this returns the minimal correct
* recipients + auto-attachment shape so detail-page integration tests can
* assert against it.
*/
export interface ComposeSignedDocEmailResult {
to: string[];
cc: string[];
subject: string;
body: string;
attachments: Array<{ fileId: string; filename?: string }>;
defaultSenderType: 'system' | 'user';
}
export async function composeSignedDocEmail(
documentId: string,
portId: string,
): Promise<ComposeSignedDocEmailResult> {
const doc = await getDocumentById(documentId, portId);
if (doc.status !== 'completed') {
throw new ConflictError('Document is not completed');
}
if (!doc.signedFileId) {
throw new ValidationError('Document has no signed PDF');
}
const signers = await db
.select({ email: documentSigners.signerEmail })
.from(documentSigners)
.where(eq(documentSigners.documentId, documentId));
const dedupedRecipients = Array.from(new Set(signers.map((s) => s.email)));
return {
to: dedupedRecipients,
cc: [],
subject: `Signed ${doc.documentType.replace(/_/g, ' ')} - ${doc.title}`,
body: '',
attachments: [{ fileId: doc.signedFileId }],
defaultSenderType: 'system',
};
}
// ─── Watchers (PR5) ───────────────────────────────────────────────────────────
export async function listDocumentWatchers(documentId: string, portId: string) {
await getDocumentById(documentId, portId); // port-scope check
return db
.select({
userId: documentWatchers.userId,
addedBy: documentWatchers.addedBy,
addedAt: documentWatchers.addedAt,
})
.from(documentWatchers)
.where(eq(documentWatchers.documentId, documentId));
}
export async function addDocumentWatcher(
documentId: string,
portId: string,
userId: string,
meta: AuditMeta,
): Promise<{ userId: string; addedAt: Date }> {
await getDocumentById(documentId, portId);
await assertWatchersInPort(portId, [userId]);
const [row] = await db
.insert(documentWatchers)
.values({ documentId, userId, addedBy: meta.userId })
.onConflictDoNothing()
.returning();
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'document_watcher',
entityId: documentId,
newValue: { documentId, watcherUserId: userId },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:updated', { documentId });
return row ? { userId: row.userId, addedAt: row.addedAt } : { userId, addedAt: new Date() };
}
export async function removeDocumentWatcher(
documentId: string,
portId: string,
userId: string,
meta: AuditMeta,
): Promise<void> {
await getDocumentById(documentId, portId);
await db
.delete(documentWatchers)
.where(and(eq(documentWatchers.documentId, documentId), eq(documentWatchers.userId, userId)));
void createAuditLog({
userId: meta.userId,
portId,
action: 'delete',
entityType: 'document_watcher',
entityId: documentId,
oldValue: { documentId, watcherUserId: userId },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:updated', { documentId });
}
/**
* Create-document wizard entry point (PR6).
*
* Dispatches across pathways:
* - 'documenso-template' - Documenso renders + signs from its own template
* - 'inapp' - render PDF locally from a CRM template, upload to Documenso
* - 'upload' - admin-supplied PDF, upload to Documenso (auto-place signature
* fields if `autoPlaceFields`)
*
* Persists the document, applies reminder overrides, attaches watchers, and
* triggers send when `sendImmediately`.
*/
import type { CreateDocumentWizardInput } from '@/lib/validators/documents';
export async function createFromWizard(
portId: string,
data: CreateDocumentWizardInput,
meta: AuditMeta,
): Promise<typeof documents.$inferSelect> {
if (data.source === 'upload') {
return createFromUpload(portId, data, meta);
}
if (!data.templateId) {
throw new ValidationError('templateId is required for template source');
}
await assertSubjectFksInPort(portId, {
clientId: data.clientId,
interestId: data.interestId,
companyId: data.companyId,
yachtId: data.yachtId,
reservationId: data.reservationId,
});
const [doc] = await db
.insert(documents)
.values({
portId,
interestId: data.interestId ?? null,
reservationId: data.reservationId ?? null,
clientId: data.clientId ?? null,
companyId: data.companyId ?? null,
yachtId: data.yachtId ?? null,
documentType: data.documentType,
title: data.title,
notes: data.notes ?? null,
status: 'draft',
remindersDisabled: data.remindersDisabled,
reminderCadenceOverride: data.reminderCadenceOverride ?? null,
createdBy: meta.userId,
})
.returning();
if (!doc)
throw new CodedError('INSERT_RETURNING_EMPTY', {
internalMessage: 'Failed to insert document',
});
if (data.watchers.length > 0) {
await assertWatchersInPort(portId, data.watchers);
await db.insert(documentWatchers).values(
data.watchers.map((userId) => ({
documentId: doc.id,
userId,
addedBy: meta.userId,
})),
);
}
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'document',
entityId: doc.id,
newValue: {
documentType: doc.documentType,
title: doc.title,
pathway: data.pathway,
source: data.source,
},
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:created', { documentId: doc.id });
return doc;
}
/**
* Upload-driven creation path. Files-service integration + Documenso upload
* + auto-place signature fields land alongside the realapi PR (PR11). For
* PR6 we persist the document row + signers + watchers and leave the
* Documenso upload step to the existing sendForSigning flow on first send.
*/
export async function createFromUpload(
portId: string,
data: CreateDocumentWizardInput,
meta: AuditMeta,
): Promise<typeof documents.$inferSelect> {
if (!data.uploadedFileId) {
throw new ValidationError('uploadedFileId is required for upload source');
}
if (!data.signers || data.signers.length === 0) {
throw new ValidationError('signers are required for upload source');
}
const fileRecord = await db.query.files.findFirst({
where: and(eq(files.id, data.uploadedFileId), eq(files.portId, portId)),
});
if (!fileRecord) {
throw new NotFoundError('File');
}
await assertSubjectFksInPort(portId, {
clientId: data.clientId,
interestId: data.interestId,
companyId: data.companyId,
yachtId: data.yachtId,
reservationId: data.reservationId,
});
const [doc] = await db
.insert(documents)
.values({
portId,
interestId: data.interestId ?? null,
reservationId: data.reservationId ?? null,
clientId: data.clientId ?? null,
companyId: data.companyId ?? null,
yachtId: data.yachtId ?? null,
documentType: data.documentType,
title: data.title,
notes: data.notes ?? null,
status: 'draft',
fileId: fileRecord.id,
remindersDisabled: data.remindersDisabled,
reminderCadenceOverride: data.reminderCadenceOverride ?? null,
createdBy: meta.userId,
})
.returning();
if (!doc)
throw new CodedError('INSERT_RETURNING_EMPTY', {
internalMessage: 'Failed to insert document',
});
await db.insert(documentSigners).values(
data.signers.map((s) => ({
documentId: doc.id,
signerName: s.signerName,
signerEmail: s.signerEmail,
signerRole: s.signerRole,
signingOrder: s.signingOrder,
status: 'pending' as const,
})),
);
if (data.watchers.length > 0) {
await assertWatchersInPort(portId, data.watchers);
await db.insert(documentWatchers).values(
data.watchers.map((userId) => ({
documentId: doc.id,
userId,
addedBy: meta.userId,
})),
);
}
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'document',
entityId: doc.id,
newValue: {
documentType: doc.documentType,
title: doc.title,
pathway: 'upload',
source: 'upload',
uploadedFileId: fileRecord.id,
},
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:created', { documentId: doc.id });
return doc;
}
// ─── Aggregated Workflow Projection ───────────────────────────────────────────
export interface AggregatedWorkflowGroup {
label: string;
source: 'direct' | 'client' | 'company' | 'yacht';
workflows: Array<typeof documents.$inferSelect>;
total: number;
}
const WORKFLOW_GROUP_LIMIT = 20;
const INFLIGHT_STATUSES = ['draft', 'sent', 'partially_signed'] as const;
/**
* Same projection shape as listFilesAggregatedByEntity but for in-flight
* signing workflows. Completed/expired/cancelled workflows are hidden —
* they surface via their signed-PDF file row.
*/
export async function listInflightWorkflowsAggregatedByEntity(
portId: string,
entityType: 'client' | 'company' | 'yacht',
entityId: string,
): Promise<{ groups: AggregatedWorkflowGroup[] }> {
const entityExists = await assertEntityInPort(portId, entityType, entityId);
if (!entityExists) return { groups: [] };
const related = await collectRelatedEntities(portId, entityType, entityId);
const groups: AggregatedWorkflowGroup[] = [];
const directColumn =
entityType === 'client'
? documents.clientId
: entityType === 'company'
? documents.companyId
: documents.yachtId;
const direct = await fetchWorkflowGroupRows(portId, eq(directColumn, entityId));
if (direct.rows.length > 0) {
groups.push({
label: 'DIRECTLY ATTACHED',
source: 'direct',
workflows: direct.rows,
total: direct.total,
});
}
for (const { id, name } of related.companies) {
const g = await fetchWorkflowGroupRows(portId, eq(documents.companyId, id));
if (g.rows.length === 0) continue;
groups.push({
label: `FROM COMPANY: ${name.toUpperCase()}`,
source: 'company',
workflows: g.rows,
total: g.total,
});
}
for (const { id, name } of related.yachts) {
const g = await fetchWorkflowGroupRows(portId, eq(documents.yachtId, id));
if (g.rows.length === 0) continue;
groups.push({
label: `FROM YACHT: ${name.toUpperCase()}`,
source: 'yacht',
workflows: g.rows,
total: g.total,
});
}
for (const { id, name } of related.clients) {
const g = await fetchWorkflowGroupRows(portId, eq(documents.clientId, id));
if (g.rows.length === 0) continue;
groups.push({
label: `FROM CLIENT: ${name.toUpperCase()}`,
source: 'client',
workflows: g.rows,
total: g.total,
});
}
return { groups };
}
async function fetchWorkflowGroupRows(
portId: string,
predicate: ReturnType<typeof eq>,
): Promise<{ rows: Array<typeof documents.$inferSelect>; total: number }> {
const inflightStatuses = INFLIGHT_STATUSES as unknown as string[];
const rows = await db
.select()
.from(documents)
.where(
and(eq(documents.portId, portId), inArray(documents.status, inflightStatuses), predicate),
)
.orderBy(desc(documents.updatedAt))
.limit(WORKFLOW_GROUP_LIMIT);
const [countRow] = await db
.select({ count: sql<number>`count(*)::int` })
.from(documents)
.where(
and(eq(documents.portId, portId), inArray(documents.status, inflightStatuses), predicate),
);
return { rows, total: Number(countRow?.count ?? 0) };
}