Files
pn-new-crm/src/lib/services/documents.service.ts
Matt ccc775dc66 feat(tenancies-p2): rename berth_reservations → berth_tenancies (schema + perms + UI)
73-file atomic rename per docs/tenancies-design.md:

- Migration 0085: rename table + indexes + FK constraints; rename
  documents.reservation_id → tenancy_id; migrate jsonb permission maps
  (reservations resource → tenancies; collapse create+activate → manage);
  rewrite historical audit_logs.entity_type='berth_reservation' →
  'berth_tenancy'. FK renames wrapped in DO blocks so dev DBs that pre-date
  the FK additions don't abort.
- Schema: berthReservations → berthTenancies; BerthReservation type →
  BerthTenancy; indexes idx_br_* / idx_brr_* → idx_bt_*.
- RolePermissions: resource { view, create, activate, cancel } collapses to
  { view, manage, cancel }; all 8 default seed bundles + role-form + matrix
  updated.
- Service: berth-reservations.service.ts → berth-tenancies.service.ts;
  endReservation → endTenancy; listReservations → listTenancies.
- API: /api/v1/berth-reservations → /api/v1/tenancies (+ nested [id]);
  /api/v1/berths/[id]/reservations → /api/v1/berths/[id]/tenancies.
- Validators: reservations.ts → tenancies.ts; RESERVATION_STATUSES →
  TENANCY_STATUSES; endReservationSchema → endTenancySchema.
- Routes: /{portSlug}/berth-reservations → /{portSlug}/tenancies;
  /portal/my-reservations → /portal/my-tenancies.
- Components: src/components/reservations/* → src/components/tenancies/*;
  BerthReservationsTab → BerthTenanciesTab; ClientReservationsTab →
  ClientTenanciesTab; ReservationList → TenancyList.
- Socket events: berth_reservation:* → berth_tenancy:*; payload
  reservationId → tenancyId.
- Webhook events: berth_reservation.* → berth_tenancy.*.
- Portal: getPortalUserReservations → getPortalUserTenancies;
  PortalReservation → PortalTenancy; PortalDashboard.counts.activeReservations
  → activeTenancies; PortalNav label "Reservations" → "Tenancies".
- Dossier: DossierReservation → DossierTenancy; reservationDecisions →
  tenancyDecisions across smart-archive-dialog + bulk-archive routes.
- Documents schema: documents.reservationId → documents.tenancyId
  (TS + DB column + index + FK constraint).
- Activity feed label berth_reservation → berth_tenancy (matched against
  migrated historical audit rows).

KEPT (separate concepts):
- Reservation Agreement document type (the contract sent to clients).
- "Reservation" pipeline stage name.
- {{reservation.*}} merge tokens in template authoring.
- interest.reservationStatus / reservationDocStatus / dateReservationSent
  fields (track agreement signing on the deal).
- reservation-agreement-context.ts service (builds merge context for the
  Reservation Agreement doc; only its DB imports were renamed).

Verified: tsc clean, 1480/1480 vitest passing, migration applied.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 15:09:35 +02:00

2717 lines
93 KiB
TypeScript

import { and, desc, eq, gte, inArray, isNull, lt, lte, ne, or, sql, exists } from 'drizzle-orm';
import { db } from '@/lib/db';
import {
documents,
documentSigners,
documentEvents,
documentWatchers,
files,
} from '@/lib/db/schema/documents';
import { interests, interestBerths } from '@/lib/db/schema/interests';
import { clients } from '@/lib/db/schema/clients';
import { companies } from '@/lib/db/schema/companies';
import { yachts } from '@/lib/db/schema/yachts';
import { berths } from '@/lib/db/schema/berths';
import { formatBerthRange } from '@/lib/templates/berth-range';
import { berthTenancies } from '@/lib/db/schema/tenancies';
import { ports } from '@/lib/db/schema/ports';
import { userProfiles, userPortRoles } from '@/lib/db/schema/users';
import { buildListQuery } from '@/lib/db/query-builder';
import { createAuditLog, toAuditJson, type AuditMeta } from '@/lib/audit';
import { diffEntity } from '@/lib/entity-diff';
import { CodedError, NotFoundError, ValidationError, ConflictError } from '@/lib/errors';
import { emitToRoom } from '@/lib/socket/server';
import { buildStoragePath } from '@/lib/minio';
import { getStorageBackend } from '@/lib/storage';
import { env } from '@/lib/env';
import { logger } from '@/lib/logger';
import { evaluateRule } from '@/lib/services/berth-rules-engine';
import { PIPELINE_STAGES } from '@/lib/constants';
import { advanceStageIfBehind, advanceStageIfBehindGated } from '@/lib/services/interests.service';
import {
createDocument as documensoCreate,
sendDocument as documensoSend,
downloadSignedPdf,
voidDocument as documensoVoid,
} from '@/lib/services/documenso-client';
import { getPortEoiSigners } from '@/lib/services/documenso-payload';
import { getPortDocumensoConfig } from '@/lib/services/port-config';
import {
DOC_TYPE_LABEL,
extractSigningToken,
nextPendingSigner,
} from '@/lib/services/documenso-signers';
import {
sendSigningInvitation,
sendSigningCompleted,
type SignerRole,
} from '@/lib/services/document-signing-emails.service';
import {
listTree,
collectDescendantIds,
ensureEntityFolder,
type FolderNode,
type EntityType,
} from '@/lib/services/document-folders.service';
import { assertEntityInPort, collectRelatedEntities } from '@/lib/services/files';
import type {
CreateDocumentInput,
UpdateDocumentInput,
ListDocumentsInput,
} from '@/lib/validators/documents';
// ─── Types ────────────────────────────────────────────────────────────────────
// ─── List ─────────────────────────────────────────────────────────────────────
import { documentWatchers as documentWatchersTable } from '@/lib/db/schema/documents';
const NON_SIGNATURE_TYPES = [
'welcome_letter',
'handover_checklist',
'acknowledgment',
'correspondence',
];
function buildHubTabFilters(
tab: ListDocumentsInput['tab'],
currentUserEmail: string | undefined,
): ReturnType<typeof and>[] {
const filters: ReturnType<typeof and>[] = [];
if (!tab || tab === 'all') return filters;
switch (tab) {
case 'in_progress':
// All document types currently in-flight - the everyday "what's in flight" view.
filters.push(
inArray(documents.status, ['draft', 'sent', 'partially_signed']),
sql`${documents.status} != 'expired'`,
);
break;
case 'eoi_queue':
// EOI documents currently in-flight (drafted, sent, or partially signed).
// Used by the dedicated tab on the documents hub to triage EOI signing
// pipeline volume separate from the all-doc-types view.
filters.push(eq(documents.documentType, 'eoi'));
filters.push(inArray(documents.status, ['draft', 'sent', 'partially_signed']));
break;
case 'awaiting_them':
// "awaiting them" = pending signers other than the current user.
// Without a known caller email we cannot make that distinction, so
// short-circuit to empty rather than silently widen the result set.
if (!currentUserEmail) {
filters.push(sql`1 = 0`);
break;
}
filters.push(inArray(documents.status, ['sent', 'partially_signed']));
filters.push(
exists(
db
.select({ x: sql`1` })
.from(documentSigners)
.where(
and(
eq(documentSigners.documentId, documents.id),
eq(documentSigners.status, 'pending'),
ne(documentSigners.signerEmail, currentUserEmail),
),
),
),
);
break;
case 'awaiting_me':
if (!currentUserEmail) {
// Without a current-user email there is no concept of "awaiting me"
filters.push(sql`1 = 0`);
break;
}
filters.push(
exists(
db
.select({ x: sql`1` })
.from(documentSigners)
.where(
and(
eq(documentSigners.documentId, documents.id),
eq(documentSigners.status, 'pending'),
eq(documentSigners.signerEmail, currentUserEmail),
),
),
),
);
break;
case 'completed':
filters.push(inArray(documents.status, ['completed', 'signed']));
break;
case 'expired':
// Either explicitly expired, or in-flight past their expiry date.
// (Documents schema doesn't yet have an `expires_at` column, so for
// now this is just status='expired' - extend when expiry lands.)
filters.push(eq(documents.status, 'expired'));
break;
}
return filters;
}
export interface ListDocumentsExtra {
/** Email of the calling user - used by hub tab filtering for "awaiting me". */
currentUserEmail?: string;
}
export async function listDocuments(
portId: string,
query: ListDocumentsInput,
extra: ListDocumentsExtra = {},
) {
const {
page,
limit,
sort,
order,
search,
interestId,
clientId,
documentType,
status,
tab,
watcherUserId,
signatureOnly,
sentSince,
sentUntil,
} = query;
const filters: ReturnType<typeof and>[] = [];
if (interestId) filters.push(eq(documents.interestId, interestId));
if (clientId) filters.push(eq(documents.clientId, clientId));
if (documentType) filters.push(eq(documents.documentType, documentType));
if (status) {
filters.push(eq(documents.status, status));
} else {
// Hide soft-deleted rows by default from the standard list endpoint.
// Callers that explicitly want the deleted bucket pass `status='deleted'`
// (e.g. the "Deleted" filter chip on the EOI history list).
filters.push(ne(documents.status, 'deleted'));
}
if (query.folderId !== undefined) {
if (query.folderId === null) {
filters.push(isNull(documents.folderId));
} else if (query.includeDescendants) {
const tree = await listTree(portId);
const descendantIds = collectDescendantIds(tree, query.folderId);
filters.push(inArray(documents.folderId, [query.folderId, ...descendantIds]));
} else {
filters.push(eq(documents.folderId, query.folderId));
}
// When viewing a specific folder, hide completed workflows - they surface
// via their resulting signed-PDF file row in the Files section, not the
// Signing section.
filters.push(ne(documents.status, 'completed'));
}
if (sentSince) filters.push(gte(documents.createdAt, new Date(sentSince)));
if (sentUntil) filters.push(lte(documents.createdAt, new Date(sentUntil)));
if (signatureOnly === true) {
filters.push(
sql`${documents.documentType} NOT IN ('welcome_letter','handover_checklist','acknowledgment','correspondence')`,
);
} else if (signatureOnly === false) {
// Pass-through, no extra filter needed.
}
if (watcherUserId) {
filters.push(
exists(
db
.select({ x: sql`1` })
.from(documentWatchersTable)
.where(
and(
eq(documentWatchersTable.documentId, documents.id),
eq(documentWatchersTable.userId, watcherUserId),
),
),
),
);
}
filters.push(...buildHubTabFilters(tab, extra.currentUserEmail));
void NON_SIGNATURE_TYPES;
void lt;
const sortColumn =
sort === 'title'
? documents.title
: sort === 'status'
? documents.status
: sort === 'documentType'
? documents.documentType
: documents.createdAt;
const result = await buildListQuery<typeof documents.$inferSelect>({
table: documents,
portIdColumn: documents.portId,
portId,
idColumn: documents.id,
updatedAtColumn: documents.updatedAt,
searchColumns: [documents.title],
searchTerm: search,
filters: filters.filter(Boolean) as Parameters<typeof buildListQuery>[0]['filters'],
sort: sort ? { column: sortColumn, direction: order } : undefined,
page,
pageSize: limit,
});
const hydrated = await hydrateDocumentsWithDownloadUrl(portId, result.data);
return { ...result, data: hydrated };
}
// ─── Download URL helpers ─────────────────────────────────────────────────────
/**
* Resolve the rep-facing download URL for a document. The URL embeds the
* folder path + filename for browser-tab / shared-link readability, but the
* route handler keys lookup off the doc id and validates the slug for truth
* - a hand-edited URL with the wrong path 404s instead of silently serving
* the wrong file.
*
* Pass the resolved folder tree once per request and call this for each doc
* so we don't refetch the tree per row. Returns `null` when the document has
* no attached file (signature-only docs pre-completion); UI consumers branch
* on that to decide whether to render the download affordance.
*/
export function buildDocumentDownloadUrl(
doc: { id: string; folderId: string | null; filename: string | null },
folderTree: readonly FolderNode[],
): string | null {
if (!doc.filename) return null;
const segments: string[] = [];
if (doc.folderId) {
const path = findFolderPath(folderTree, doc.folderId);
for (const node of path) segments.push(encodeURIComponent(node.name));
}
segments.push(encodeURIComponent(doc.filename));
return `/api/v1/documents/${doc.id}/download/${segments.join('/')}`;
}
/**
* Walk the folder tree to materialize the ancestor chain that ends at
* `id`. Returns roots-first; empty when the id is missing (orphan
* `folder_id` pointer - see listTree's intentional silent drop).
*/
export function findFolderPath(tree: readonly FolderNode[], id: string): FolderNode[] {
for (const node of tree) {
if (node.id === id) return [node];
const inChild = findFolderPath(node.children, id);
if (inChild.length > 0) return [node, ...inChild];
}
return [];
}
type DocumentRow = typeof documents.$inferSelect;
type DocumentRowWithDownload = DocumentRow & { downloadUrl: string | null };
async function hydrateDocumentsWithDownloadUrl(
portId: string,
rows: DocumentRow[],
): Promise<DocumentRowWithDownload[]> {
if (rows.length === 0) return [];
const fileIds = Array.from(
new Set(rows.map((r) => r.fileId).filter((v): v is string => v !== null)),
);
const filenameById = new Map<string, string | null>();
if (fileIds.length > 0) {
const fileRows = await db
.select({ id: files.id, filename: files.filename })
.from(files)
.where(and(inArray(files.id, fileIds), eq(files.portId, portId)));
for (const f of fileRows) filenameById.set(f.id, f.filename);
}
const tree = await listTree(portId);
return rows.map((row) => {
const filename = row.fileId ? (filenameById.get(row.fileId) ?? null) : null;
return {
...row,
downloadUrl: buildDocumentDownloadUrl({ id: row.id, folderId: row.folderId, filename }, tree),
};
});
}
// ─── Deal docs for a berth ────────────────────────────────────────────────────
export interface BerthDealDoc {
id: string;
title: string;
documentType: string;
status: string;
createdAt: Date;
interestId: string;
}
/**
* Documents attached to any interest currently linked to this berth via
* `interest_berths`. Used by the Deal Documents tab on the berth detail
* page so reps can see EOIs / contracts / etc. associated with active
* prospects on this slip without leaving the page.
*
* Read-only; visibility piggybacks on the interest tenancy (the
* permission-gate on the berth page guards entry, and we only return
* documents for interests in the same port). Edits / sends still happen
* from the interest's own page.
*/
export async function listDealDocumentsForBerth(
portId: string,
berthId: string,
): Promise<BerthDealDoc[]> {
const rows = await db
.select({
id: documents.id,
title: documents.title,
documentType: documents.documentType,
status: documents.status,
createdAt: documents.createdAt,
interestId: documents.interestId,
})
.from(documents)
.innerJoin(interestBerths, eq(interestBerths.interestId, documents.interestId))
.innerJoin(interests, eq(interests.id, documents.interestId))
.where(
and(
eq(interestBerths.berthId, berthId),
eq(documents.portId, portId),
eq(interests.portId, portId),
),
)
.orderBy(sql`${documents.createdAt} DESC`);
return rows
.filter((r): r is typeof r & { interestId: string } => Boolean(r.interestId))
.map((r) => ({
id: r.id,
title: r.title,
documentType: r.documentType,
status: r.status,
createdAt: r.createdAt,
interestId: r.interestId,
}));
}
// ─── Get by ID ────────────────────────────────────────────────────────────────
export async function getDocumentById(id: string, portId: string) {
const doc = await db.query.documents.findFirst({
where: and(eq(documents.id, id), eq(documents.portId, portId)),
with: { signers: true },
});
if (!doc) throw new NotFoundError('Document');
let filename: string | null = null;
if (doc.fileId) {
const fileRow = await db
.select({ filename: files.filename })
.from(files)
.where(and(eq(files.id, doc.fileId), eq(files.portId, portId)))
.limit(1)
.then((r) => r[0]);
filename = fileRow?.filename ?? null;
}
const tree = await listTree(portId);
const downloadUrl = buildDocumentDownloadUrl(
{ id: doc.id, folderId: doc.folderId, filename },
tree,
);
return { ...doc, downloadUrl };
}
/**
* Reject any subject FK (clientId / interestId / companyId / yachtId /
* tenancyId) that points at a row outside the caller's port. Without
* this guard, a port-A user could create a document whose subject is a
* port-B client and then exfiltrate the foreign client's name + email
* via sendForSigning's Documenso payload, or via the local watcher /
* notification surfaces that hydrate the linked entity.
*/
async function assertSubjectFksInPort(
portId: string,
fks: {
clientId?: string | null;
interestId?: string | null;
companyId?: string | null;
yachtId?: string | null;
tenancyId?: string | null;
},
): Promise<void> {
const checks: Array<Promise<void>> = [];
if (fks.clientId) {
checks.push(
db.query.clients
.findFirst({ where: and(eq(clients.id, fks.clientId), eq(clients.portId, portId)) })
.then((row) => {
if (!row) throw new ValidationError('clientId not found in this port');
}),
);
}
if (fks.interestId) {
checks.push(
db.query.interests
.findFirst({
where: and(eq(interests.id, fks.interestId), eq(interests.portId, portId)),
})
.then((row) => {
if (!row) throw new ValidationError('interestId not found in this port');
}),
);
}
if (fks.companyId) {
checks.push(
db.query.companies
.findFirst({
where: and(eq(companies.id, fks.companyId), eq(companies.portId, portId)),
})
.then((row) => {
if (!row) throw new ValidationError('companyId not found in this port');
}),
);
}
if (fks.yachtId) {
checks.push(
db.query.yachts
.findFirst({ where: and(eq(yachts.id, fks.yachtId), eq(yachts.portId, portId)) })
.then((row) => {
if (!row) throw new ValidationError('yachtId not found in this port');
}),
);
}
if (fks.tenancyId) {
checks.push(
db.query.berthTenancies
.findFirst({
where: and(eq(berthTenancies.id, fks.tenancyId), eq(berthTenancies.portId, portId)),
})
.then((row) => {
if (!row) throw new ValidationError('tenancyId not found in this port');
}),
);
}
await Promise.all(checks);
}
/**
* Reject watchers whose user does not have access to the document's port.
* Without this guard, a document watcher row could be created for a user
* outside the document's tenant; subsequent notifyDocumentEvent emits a
* socket toast + email to that user revealing the document's title.
* Super-admins are always allowed (they can watch anything).
*/
async function assertWatchersInPort(portId: string, userIds: string[]): Promise<void> {
if (userIds.length === 0) return;
const unique = [...new Set(userIds)];
// Super-admins bypass the port check.
const profiles = await db.query.userProfiles.findMany({
where: inArray(userProfiles.userId, unique),
});
const superAdmins = new Set(profiles.filter((p) => p.isSuperAdmin).map((p) => p.userId));
const needsCheck = unique.filter((u) => !superAdmins.has(u));
if (needsCheck.length === 0) return;
const roles = await db
.select({ userId: userPortRoles.userId })
.from(userPortRoles)
.where(and(inArray(userPortRoles.userId, needsCheck), eq(userPortRoles.portId, portId)));
const allowed = new Set(roles.map((r) => r.userId));
const denied = needsCheck.filter((u) => !allowed.has(u));
if (denied.length > 0) {
throw new ValidationError('One or more watchers do not have access to this port');
}
}
// ─── Create ───────────────────────────────────────────────────────────────────
export async function createDocument(portId: string, data: CreateDocumentInput, meta: AuditMeta) {
await assertSubjectFksInPort(portId, {
clientId: data.clientId,
interestId: data.interestId,
});
const [doc] = await db
.insert(documents)
.values({
portId,
interestId: data.interestId ?? null,
clientId: data.clientId ?? null,
folderId: data.folderId ?? null,
documentType: data.documentType,
title: data.title,
notes: data.notes ?? null,
status: 'draft',
createdBy: meta.userId,
})
.returning();
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'document',
entityId: doc!.id,
newValue: { documentType: doc!.documentType, title: doc!.title },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:created', { documentId: doc!.id });
return doc!;
}
// ─── Update ───────────────────────────────────────────────────────────────────
export async function updateDocument(
id: string,
portId: string,
data: UpdateDocumentInput,
meta: AuditMeta,
) {
const existing = await getDocumentById(id, portId);
const updates: Partial<typeof documents.$inferInsert> = {};
if (data.title !== undefined) updates.title = data.title;
if (data.notes !== undefined) updates.notes = data.notes;
if (data.status !== undefined) updates.status = data.status;
updates.updatedAt = new Date();
const [updated] = await db
.update(documents)
.set(updates)
.where(and(eq(documents.id, id), eq(documents.portId, portId)))
.returning();
diffEntity(existing, updated!);
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'document',
entityId: id,
oldValue: toAuditJson(existing),
newValue: toAuditJson(updated!),
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:updated', { documentId: id });
return updated!;
}
// ─── Delete ───────────────────────────────────────────────────────────────────
/**
* Soft-deletes a document by setting `status='deleted'`. The row stays so
* audit-log/event history references it; the document is hidden from
* primary lists via the `status != 'deleted'` filter at the read layer.
*
* If the document is wired to a Documenso envelope, the call also voids
* the envelope upstream (Documenso DELETE = void, not hard-erase; the
* envelope moves to VOIDED status in Documenso's UI so it stops
* accepting signatures and outstanding signing URLs invalidate).
*
* Refuses to delete a document in the middle of signing (`sent` /
* `partially_signed`) - reps must cancel first, then delete the
* cancelled record.
*/
export async function deleteDocument(id: string, portId: string, meta: AuditMeta) {
const existing = await getDocumentById(id, portId);
if (['sent', 'partially_signed'].includes(existing.status)) {
throw new ConflictError(
'Cannot delete a document while signing is in progress - cancel it first, then delete the cancelled record.',
);
}
if (existing.status === 'deleted') {
// Idempotent: a second DELETE is a no-op rather than a 409.
return;
}
// Best-effort upstream void. A transient Documenso failure shouldn't
// block the CRM-side delete - the document_events row + audit log
// capture what happened, and `voidDocument` treats 404 (already gone)
// as success so a Documenso UI re-delete remains safe.
if (existing.documensoId) {
try {
await documensoVoid(existing.documensoId, portId);
} catch (err) {
logger.warn(
{ err, documentId: id, documensoId: existing.documensoId },
'Documenso void failed during delete; soft-deleting CRM-side anyway',
);
}
}
await db
.update(documents)
.set({ status: 'deleted', updatedAt: new Date() })
.where(and(eq(documents.id, id), eq(documents.portId, portId)));
await db.insert(documentEvents).values({
documentId: id,
eventType: 'deleted',
eventData: { initiatedBy: meta.userId },
});
void createAuditLog({
userId: meta.userId,
portId,
action: 'delete',
entityType: 'document',
entityId: id,
oldValue: { title: existing.title, status: existing.status },
newValue: { status: 'deleted' },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:deleted', { documentId: id });
}
// ─── Send for Signing (BR-021) ────────────────────────────────────────────────
export async function sendForSigning(documentId: string, portId: string, meta: AuditMeta) {
const doc = await getDocumentById(documentId, portId);
if (!doc.fileId) throw new ValidationError('Document has no associated file');
if (doc.status !== 'draft') throw new ConflictError('Document is not in draft status');
// Fetch interest + client to build signers. Filter by portId in addition
// to the FK so that even if a stale or maliciously-set subject FK on the
// document points at a foreign-port row, this signing flow refuses to
// hydrate (and therefore refuses to ship to Documenso) data from outside
// the caller's tenant.
const interest = doc.interestId
? await db.query.interests.findFirst({
where: and(eq(interests.id, doc.interestId), eq(interests.portId, portId)),
})
: null;
const client = doc.clientId
? await db.query.clients.findFirst({
where: and(eq(clients.id, doc.clientId), eq(clients.portId, portId)),
with: { contacts: true },
})
: null;
if (!client) throw new ValidationError('Document has no associated client');
const emailContact = (
client.contacts as Array<{ channel: string; value: string }> | undefined
)?.find((c) => c.channel === 'email');
if (!emailContact?.value) throw new ValidationError('Client has no email contact');
const port = await db.query.ports.findFirst({ where: eq(ports.id, portId) });
if (!port) throw new NotFoundError('Port');
// Resolve port-configured signer emails from system settings; fall back to
// legacy defaults only if the setting is absent. Fabricated slug-based
// addresses (developer@{slug}.com) are no longer used here because they
// never match real port users and cause silent no-ops in handleRecipientSigned.
const eoiSigners = await getPortEoiSigners(portId);
// BR-021: Create 3 signers - client (1), developer (2), sales/approver (3)
const signerRecords = await db
.insert(documentSigners)
.values([
{
documentId,
signerName: client.fullName,
signerEmail: emailContact.value,
signerRole: 'client',
signingOrder: 1,
status: 'pending',
},
{
documentId,
signerName: eoiSigners.developer.name,
signerEmail: eoiSigners.developer.email,
signerRole: 'developer',
signingOrder: 2,
status: 'pending',
},
{
documentId,
signerName: eoiSigners.approver.name,
signerEmail: eoiSigners.approver.email,
signerRole: 'approver',
signingOrder: 3,
status: 'pending',
},
])
.returning();
// Get file from MinIO and base64 encode
const fileRecord = await db.query.files.findFirst({ where: eq(files.id, doc.fileId) });
if (!fileRecord) throw new NotFoundError('File');
const fileStream = await (await getStorageBackend()).get(fileRecord.storagePath);
const chunks: Buffer[] = [];
for await (const chunk of fileStream) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
const pdfBuffer = Buffer.concat(chunks);
const pdfBase64 = pdfBuffer.toString('base64');
// Read per-port v2 signing settings (PARALLEL/SEQUENTIAL + redirect URL).
// Both are optional - passing undefined yields v1's legacy behavior.
const docCfg = await getPortDocumensoConfig(portId);
// Create document in Documenso + send. portId is required for the v2
// envelope/create code path (which routes by per-port apiVersion);
// meta.signingOrder is honoured only on v2 instances.
const documensoDoc = await documensoCreate(
doc.title,
pdfBase64,
[
{ name: client.fullName, email: emailContact.value, role: 'SIGNER', signingOrder: 1 },
{
name: eoiSigners.developer.name,
email: eoiSigners.developer.email,
role: 'SIGNER',
signingOrder: 2,
},
{
name: eoiSigners.approver.name,
email: eoiSigners.approver.email,
role: 'SIGNER',
signingOrder: 3,
},
],
portId,
{
...(docCfg.signingOrder ? { signingOrder: docCfg.signingOrder } : {}),
...(docCfg.redirectUrl ? { redirectUrl: docCfg.redirectUrl } : {}),
},
);
await documensoSend(documensoDoc.id, portId);
// Update signer records with signing URLs + tokens from the Documenso
// response. The signingToken column powers the webhook recipient-match
// path (more robust than email match - same email can serve multiple
// roles on a contract). Documenso's recipient response carries `token`
// explicitly per the OpenAPI spec; we keep the URL-extraction fallback
// for any v2 deployment whose distribute response omits the field.
for (const docSigner of documensoDoc.recipients) {
const localSigner = signerRecords.find((s) => s.signerEmail === docSigner.email);
if (localSigner) {
await db
.update(documentSigners)
.set({
signingUrl: docSigner.signingUrl ?? null,
embeddedUrl: docSigner.embeddedUrl ?? null,
signingToken: docSigner.token ?? extractSigningToken(docSigner.signingUrl),
})
.where(eq(documentSigners.id, localSigner.id));
}
}
// Update document status
await db
.update(documents)
.set({ status: 'sent', documensoId: documensoDoc.id, updatedAt: new Date() })
.where(eq(documents.id, documentId));
// Update interest if linked
if (interest) {
await db
.update(interests)
.set({
documensoId: documensoDoc.id,
dateEoiSent: new Date(),
eoiStatus: 'waiting_for_signatures',
updatedAt: new Date(),
})
.where(eq(interests.id, interest.id));
// Trigger berth rules
void evaluateRule('eoi_sent', interest.id, portId, meta);
// Advance pipeline stage to eoi (no-op if already further along).
// Doc sub-status is set by the webhook receiver when Documenso confirms;
// we stamp eoiDocStatus optimistically here so the UI shows "sent".
void advanceStageIfBehindGated(
interest.id,
portId,
'eoi',
meta,
'EOI sent for signing',
'eoi_sent',
);
await db
.update(interests)
.set({ eoiDocStatus: 'sent', updatedAt: new Date() })
.where(eq(interests.id, interest.id));
// Reservation agreements drive the reservation stage; the contract
// pathway uses its own send call and stamps contractDocStatus.
if (doc.documentType === 'reservation_agreement') {
void advanceStageIfBehind(
interest.id,
portId,
'reservation',
meta,
'Reservation agreement sent',
);
await db
.update(interests)
.set({ reservationDocStatus: 'sent', updatedAt: new Date() })
.where(eq(interests.id, interest.id));
}
}
// Create document event
await db.insert(documentEvents).values({
documentId,
eventType: 'sent',
eventData: { documensoId: documensoDoc.id },
});
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'document',
entityId: documentId,
newValue: { status: 'sent', documensoId: documensoDoc.id },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:sent', {
documentId,
type: doc.documentType,
signerCount: 3,
documensoId: documensoDoc.id,
});
return await getDocumentById(documentId, portId);
}
// ─── Upload Signed Manually (BR-013) ─────────────────────────────────────────
export async function uploadSignedManually(
documentId: string,
portId: string,
fileData: { buffer: Buffer; originalName: string; mimeType: string; size: number },
meta: AuditMeta,
) {
const doc = await getDocumentById(documentId, portId);
const port = await db.query.ports.findFirst({ where: eq(ports.id, portId) });
if (!port) throw new NotFoundError('Port');
// Store the signed file
const fileId = crypto.randomUUID();
const storagePath = buildStoragePath(port.slug, 'eoi-signed', documentId, fileId, 'pdf');
await (
await getStorageBackend()
).put(storagePath, fileData.buffer, {
contentType: fileData.mimeType,
sizeBytes: fileData.size,
});
const [fileRecord] = await db
.insert(files)
.values({
portId,
clientId: doc.clientId ?? null,
filename: fileData.originalName,
originalName: fileData.originalName,
mimeType: fileData.mimeType,
sizeBytes: String(fileData.size),
storagePath,
storageBucket: env.MINIO_BUCKET,
category: 'eoi',
uploadedBy: meta.userId,
})
.returning();
// Update document
await db
.update(documents)
.set({
signedFileId: fileRecord!.id,
status: 'completed',
isManualUpload: true,
updatedAt: new Date(),
})
.where(eq(documents.id, documentId));
// Update interest if linked and type is eoi
if (doc.interestId && doc.documentType === 'eoi') {
const interest = await db.query.interests.findFirst({
where: eq(interests.id, doc.interestId),
});
await db
.update(interests)
.set({
eoiStatus: 'signed',
eoiDocStatus: 'signed',
dateEoiSigned: new Date(),
updatedAt: new Date(),
})
.where(eq(interests.id, doc.interestId));
if (interest) {
void evaluateRule('eoi_signed', doc.interestId, portId, meta);
// Stage stays at 'eoi' - sub-status badge flips to "signed".
void advanceStageIfBehind(
doc.interestId,
portId,
'eoi',
meta,
'Signed EOI uploaded manually',
);
}
}
await db.insert(documentEvents).values({
documentId,
eventType: 'completed',
eventData: { isManualUpload: true, fileId: fileRecord!.id },
});
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'document',
entityId: documentId,
newValue: { status: 'completed', isManualUpload: true },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:completed', { documentId });
// Notify creator about manual completion
void import('@/lib/services/notifications.service').then(({ createNotification }) =>
createNotification({
portId,
userId: meta.userId,
type: 'document_signed',
title: 'Document marked as signed',
description: `"${doc.title}" has been manually uploaded as signed`,
link: `/documents/${documentId}`,
entityType: 'document',
entityId: documentId,
dedupeKey: `document:${documentId}:completed`,
}),
);
return await getDocumentById(documentId, portId);
}
// ─── List Signers ─────────────────────────────────────────────────────────────
export async function listDocumentSigners(documentId: string, portId: string) {
await getDocumentById(documentId, portId); // verify access
return db.query.documentSigners.findMany({
where: eq(documentSigners.documentId, documentId),
orderBy: (ds, { asc }) => [asc(ds.signingOrder)],
});
}
// ─── List Events ──────────────────────────────────────────────────────────────
export async function listDocumentEvents(documentId: string, portId: string) {
await getDocumentById(documentId, portId); // verify access
return db.query.documentEvents.findMany({
where: eq(documentEvents.documentId, documentId),
orderBy: (de, { desc }) => [desc(de.createdAt)],
});
}
// ─── Webhook Handlers ─────────────────────────────────────────────────────────
/**
* Shared port-scoped lookup for inbound Documenso webhooks. Two ports
* sharing a Documenso instance - or migrating between instances with
* documentId reuse - would otherwise let `findFirst` return whichever
* row sorts first across tenants. When the route resolves a portId from
* the matched per-port webhook secret it threads it here; otherwise we
* fall back to a port-agnostic `findMany` and refuse to mutate when the
* lookup is ambiguous (mirrors the guard in handleDocumentExpired).
*
* Returns null when no document matches OR when the lookup is ambiguous
* across multiple ports without a resolved portId. Callers must treat
* null as "drop the event" (the cron sweep / next webhook will catch up
* once the data is consistent).
*/
async function resolveWebhookDocument(
documensoId: string,
portId: string | undefined,
): Promise<typeof documents.$inferSelect | null> {
// Documenso v1 sends `payload.id` = the same numeric id we stored in
// `documents.documenso_id`. Documenso v2 sends `payload.id` = the
// internal numeric pk, while `documents.documenso_id` holds the public
// `envelope_xxx` string and the numeric pk lives in
// `documents.documenso_numeric_id`. Match either column so both
// versions resolve.
const idMatch = or(
eq(documents.documensoId, documensoId),
eq(documents.documensoNumericId, documensoId),
);
if (portId) {
const doc = await db.query.documents.findFirst({
where: and(idMatch, eq(documents.portId, portId)),
});
if (!doc) {
logger.warn({ documensoId, portId }, 'Document not found for webhook (port-scoped)');
return null;
}
return doc;
}
const matches = await db.query.documents.findMany({
where: idMatch,
});
if (matches.length === 0) {
logger.warn({ documensoId }, 'Document not found for webhook');
return null;
}
if (matches.length > 1) {
logger.error(
{ documensoId, matchCount: matches.length, ports: matches.map((m) => m.portId) },
'Documenso webhook ambiguous across multiple ports - refusing to mutate',
);
return null;
}
return matches[0]!;
}
export async function handleRecipientSigned(eventData: {
documentId: string;
recipientEmail: string;
/** Optional Documenso recipient token - when supplied (webhook
* payload exposes it on v1.13 + 2.x), preferred over the email
* match because a single email can serve multiple roles on one
* document. Falls back to email match when null. */
recipientToken?: string | null;
signatureHash?: string;
portId?: string;
}) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
// Token-match first, fall back to email match. Phase 2: webhook
// payloads carry `recipients[].token` which we captured at send-time
// via extractSigningToken - that's the authoritative identifier.
const signerWhere = eventData.recipientToken
? and(
eq(documentSigners.documentId, doc.id),
eq(documentSigners.signingToken, eventData.recipientToken),
)
: and(
eq(documentSigners.documentId, doc.id),
eq(documentSigners.signerEmail, eventData.recipientEmail),
);
// Read prior status so we know whether this delivery is the first
// signing transition. Documenso v2 retries deliver the same
// DOCUMENT_RECIPIENT_COMPLETED multiple times with slightly different
// rawBody hashes - without this gate the cascade fires on every
// delivery, the "your turn" email goes out twice, and downstream side
// effects (rule engine, audit, notifications) duplicate.
const [priorSigner] = await db.select().from(documentSigners).where(signerWhere);
const wasAlreadySigned = priorSigner?.status === 'signed';
const [signer] = await db
.update(documentSigners)
.set({
status: 'signed',
// Preserve the original signedAt timestamp on duplicate webhook
// deliveries - overwriting it makes every signer card show the
// most-recent webhook timestamp instead of the actual sign time.
...(wasAlreadySigned ? {} : { signedAt: new Date() }),
})
.where(signerWhere)
.returning();
if (!signer) {
// Mismatch: neither token nor email matched. This happens when the
// local signers were created with fabricated / stale addresses or
// the document was created out-of-band. Log a warning so operators
// can investigate and fix the port's eoi_signers system setting.
logger.warn(
{
documensoId: eventData.documentId,
documentId: doc.id,
recipientEmail: eventData.recipientEmail,
hadToken: Boolean(eventData.recipientToken),
},
'handleRecipientSigned: no matching signer row for recipient - ' +
'check eoi_signers system setting for this port',
);
}
// Update document to partially_signed if eoi type
if (doc.documentType === 'eoi' && doc.status === 'sent') {
await db
.update(documents)
.set({ status: 'partially_signed', updatedAt: new Date() })
.where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId)));
}
// Idempotent insert: Documenso v2 retries the same logical event with
// varying rawBody hashes, so the (documentId, hash:signed:email) unique
// index would otherwise throw on duplicate deliveries and short-circuit
// the cascade below. `onConflictDoNothing` treats the duplicate as the
// no-op it is. C.2: recipient_email column is the dedup key for the
// per-recipient partial unique index, so a re-delivery of signer A's
// SIGNED is no-op'd, while signer B's SIGNED still lands.
await db
.insert(documentEvents)
.values({
documentId: doc.id,
eventType: 'signed',
signerId: signer?.id ?? null,
recipientEmail: eventData.recipientEmail ?? null,
signatureHash: eventData.signatureHash ?? null,
eventData: { recipientEmail: eventData.recipientEmail },
})
.onConflictDoNothing();
emitToRoom(`port:${doc.portId}`, 'document:signer:signed', {
documentId: doc.id,
signerEmail: eventData.recipientEmail,
});
// Phase 2 cascade: now that this signer is done, fire the branded
// "your turn" invitation to the next pending signer in signing order.
// Skip the cascade entirely on duplicate deliveries - only fire on
// the first pending→signed transition. The `invitedAt` guard inside
// sendCascadingInviteForNextSigner is a second safety net.
if (signer && !wasAlreadySigned) {
await sendCascadingInviteForNextSigner(doc).catch((err) => {
// Cascading-invite failure is non-fatal - the webhook itself
// succeeded. The rep can manually click "Send invitation" if the
// email worker is down.
logger.error(
{ err, documentId: doc.id, justSignedSigner: signer.id },
'cascading "your turn" invite failed after recipient signed',
);
});
}
}
/**
* Phase 2 - cascading invite logic extracted so the
* `handleRecipientSigned` handler stays readable and so the same path
* can be exercised by a dedicated unit test. Finds the next pending
* signer (lowest signing order), sends them a branded invitation, and
* stamps `invitedAt` so a duplicate webhook delivery doesn't re-send.
*
* Phase 6: when the document carries a rep-authored
* `invitation_message`, it flows through as `customMessage` so every
* cascaded recipient (not just the first one) sees the same note.
*/
async function sendCascadingInviteForNextSigner(doc: {
id: string;
portId: string;
documentType: string;
title: string;
invitationMessage: string | null;
}): Promise<void> {
const signers = await db
.select()
.from(documentSigners)
.where(eq(documentSigners.documentId, doc.id))
.orderBy(documentSigners.signingOrder);
const next = nextPendingSigner(signers);
if (!next) return;
if (next.invitedAt) {
// We've already invited them - either via the auto-send wiring at
// document creation (first signer) or via an earlier cascade. Do
// nothing rather than spam them with a second copy.
return;
}
if (!next.signingUrl) {
logger.warn(
{ documentId: doc.id, signerId: next.id },
'cascading invite skipped: signer has no signing URL',
);
return;
}
const port = await db.query.ports.findFirst({ where: eq(ports.id, doc.portId) });
const docCfg = await getPortDocumensoConfig(doc.portId);
await sendSigningInvitation({
portId: doc.portId,
portName: port?.name ?? 'Port Nimara',
recipient: { name: next.signerName, email: next.signerEmail },
documensoSigningUrl: next.signingUrl,
documentLabel: DOC_TYPE_LABEL[doc.documentType] ?? 'Expression of Interest',
signerRole: (next.signerRole as SignerRole) ?? 'other',
senderName: docCfg.developerName ?? null,
customMessage: doc.invitationMessage,
});
await db
.update(documentSigners)
.set({ invitedAt: new Date() })
.where(eq(documentSigners.id, next.id));
// Phase 7 - Project Director RBAC binding: when the per-port settings
// map the developer / approver slot to a CRM user (developerUserId /
// approverUserId), fire an in-CRM notification so the user sees their
// pending signing turn alongside the branded email. The email is the
// primary channel; the notification is a defense-in-depth nudge for
// users who live in the CRM all day. Falls back silently when the
// settings aren't wired or the signer role doesn't match.
const linkedUserId =
next.signerRole === 'developer'
? (docCfg.developerUserId ?? null)
: next.signerRole === 'approver'
? (docCfg.approverUserId ?? null)
: null;
if (linkedUserId) {
void import('@/lib/services/notifications.service').then(({ createNotification }) =>
createNotification({
portId: doc.portId,
userId: linkedUserId,
type: 'document_signing_your_turn',
title: 'Your signature is needed',
description: `"${doc.title}" is waiting for you to sign.`,
link: `/documents/${doc.id}`,
entityType: 'document',
entityId: doc.id,
dedupeKey: `document:${doc.id}:your-turn:${next.id}`,
}).catch((err) => {
logger.warn(
{ err, documentId: doc.id, signerId: next.id, linkedUserId },
'phase-7 in-CRM your-turn notification failed (email still sent)',
);
}),
);
}
}
// ─── Owner-wins resolution ────────────────────────────────────────────────────
interface ResolvedOwner {
entityType: EntityType;
entityId: string;
}
/**
* Owner-wins owner resolution chain - see spec §"Routing on workflow
* completion" §3a. Returns the first non-null candidate in priority
* order: direct client/company/yacht FK on the document, then via the
* linked interest's client / yacht FK. The interests table has no
* companyId (per schema), so the company branch is omitted from the
* interest fallback. Returns null when no owner is resolvable.
*/
async function resolveDocumentOwner(
portId: string,
doc: {
clientId: string | null;
companyId: string | null;
yachtId: string | null;
interestId: string | null;
},
): Promise<ResolvedOwner | null> {
if (doc.clientId) return { entityType: 'client', entityId: doc.clientId };
if (doc.companyId) return { entityType: 'company', entityId: doc.companyId };
if (doc.yachtId) return { entityType: 'yacht', entityId: doc.yachtId };
if (doc.interestId) {
// interests.clientId is NOT NULL - if the interest row exists, the
// client owner is always resolvable through it. The yacht-only path
// would require relaxing the schema constraint first.
const interest = await db.query.interests.findFirst({
where: and(eq(interests.id, doc.interestId), eq(interests.portId, portId)),
columns: { clientId: true },
});
if (interest?.clientId) {
return { entityType: 'client', entityId: interest.clientId };
}
}
return null;
}
/**
* Sentinel thrown by `handleDocumentCompleted`'s in-tx race-check when a
* concurrent webhook delivery has already committed the signed-PDF
* file. Caught by the outer try so we can run the compensating blob
* delete + log at info level instead of error.
*/
class DocumentAlreadyCompletedError extends Error {
constructor() {
super('document already marked completed by a concurrent webhook');
this.name = 'DocumentAlreadyCompletedError';
}
}
export async function handleDocumentCompleted(eventData: { documentId: string; portId?: string }) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
// A1: Idempotency gate. Documenso retries DOCUMENT_COMPLETED on receiver
// 5xx (and the poll worker also reconciles). Without this guard, a second
// delivery re-runs downloadSignedPdf + storage.put + db.insert(files) and
// then clobbers the previous signedFileId on the UPDATE - leaking the
// first file as an orphan blob with no DB pointer. Once we have a signed
// file id we are done.
if (doc.status === 'completed' && doc.signedFileId) return;
// BR-022: Download signed PDF and store in MinIO
const port = await db.query.ports.findFirst({ where: eq(ports.id, doc.portId) });
if (!port) {
logger.error({ portId: doc.portId }, 'Port not found during document completion');
return;
}
// Tracked outside the try so the catch can attempt a compensating
// storage.delete if we put a blob but failed to commit the DB rows.
// Without this, partial failures left an orphan blob in storage with
// no DB pointer (the audit's orphan-blob CRITICAL).
let putStoragePath: string | null = null;
try {
// Download by the stored Documenso ID (envelope_xxx on v2, numeric on
// v1) rather than `eventData.documentId` - webhooks deliver the v2
// numeric internal pk, but the download endpoint expects the public
// envelope_xxx string. Falls back to the webhook's value when the
// stored ID is somehow missing (e.g. legacy pre-#69 rows).
const downloadId = doc.documensoId ?? eventData.documentId;
const signedPdfBuffer = await downloadSignedPdf(downloadId, doc.portId);
// Guard: a 0-byte response from Documenso would otherwise persist a
// permanent corrupt signedFileId pointing at a blob with no content.
// Refuse and let the next retry / poll-worker pass re-fetch.
if (signedPdfBuffer.length === 0) {
throw new CodedError('DOCUMENSO_EMPTY_PDF', {
internalMessage: `Documenso returned 0-byte signed PDF for documensoId=${eventData.documentId}`,
});
}
const fileId = crypto.randomUUID();
const storagePath = buildStoragePath(port.slug, 'eoi-signed', doc.id, fileId, 'pdf');
await (
await getStorageBackend()
).put(storagePath, signedPdfBuffer, {
contentType: 'application/pdf',
sizeBytes: signedPdfBuffer.length,
});
putStoragePath = storagePath;
// Resolve owner via the Owner-wins chain. The signed PDF lands in
// this owner's auto-created entity subfolder (or at root if no owner).
const owner = await resolveDocumentOwner(doc.portId, doc);
let entityFolderId: string | null = null;
if (owner) {
try {
const folder = await ensureEntityFolder(
doc.portId,
owner.entityType,
owner.entityId,
'system',
);
entityFolderId = folder.id;
} catch (err) {
// Folder creation is best-effort - signed file still lands at root.
// Logged at warn level: missing entity folder is recoverable via
// the backfill script.
logger.warn(
{ err, documentId: doc.id, owner },
'ensureEntityFolder failed during document completion',
);
}
}
// Atomic: the files row + the documents.signedFileId pointer + the
// reservation contract mirror commit together. If any throws, the
// outer catch fires storage.delete on the orphan blob.
//
// concurrency-auditor C-1: re-check the idempotency gate INSIDE the
// tx with SELECT … FOR UPDATE so two near-simultaneous webhook
// retries can't both pass the read-outside-lock gate at line 1110
// and both insert into `files` (the losing row would orphan its blob
// since `documents.signed_file_id` only points at one). The outer
// catch handles the "we put a blob but a concurrent worker won the
// race" cleanup via the existing putStoragePath finalizer.
const fileRecord = await db.transaction(async (tx) => {
const [locked] = await tx
.select({ status: documents.status, signedFileId: documents.signedFileId })
.from(documents)
.where(eq(documents.id, doc.id))
.for('update');
if (locked && locked.status === 'completed' && locked.signedFileId) {
// Concurrent webhook beat us - abort so the outer catch deletes
// the duplicate blob we just put into storage. Throw a sentinel
// we recognize so we don't log it as an error.
throw new DocumentAlreadyCompletedError();
}
const [inserted] = await tx
.insert(files)
.values({
portId: doc.portId,
clientId: owner?.entityType === 'client' ? owner.entityId : (doc.clientId ?? null),
companyId: owner?.entityType === 'company' ? owner.entityId : (doc.companyId ?? null),
yachtId: owner?.entityType === 'yacht' ? owner.entityId : (doc.yachtId ?? null),
folderId: entityFolderId,
filename: `signed-${doc.id}.pdf`,
originalName: `signed-${doc.id}.pdf`,
mimeType: 'application/pdf',
sizeBytes: String(signedPdfBuffer.length),
storagePath,
storageBucket: env.MINIO_BUCKET,
category: 'eoi',
uploadedBy: 'system',
})
.returning();
if (!inserted) {
throw new Error('files.insert returned no row');
}
await tx
.update(documents)
.set({ status: 'completed', signedFileId: inserted.id, updatedAt: new Date() })
.where(eq(documents.id, doc.id));
// Reservation agreements mirror their signed PDF onto
// berth_tenancies.contractFileId so the portal "My Tenancies" view
// can resolve the contract without joining through documents.
if (doc.documentType === 'reservation_agreement' && doc.tenancyId) {
const { berthTenancies } = await import('@/lib/db/schema/tenancies');
await tx
.update(berthTenancies)
.set({ contractFileId: inserted.id, updatedAt: new Date() })
.where(eq(berthTenancies.id, doc.tenancyId));
}
return inserted;
});
// Mark as durably committed BEFORE side effects so the catch block
// doesn't try to undo a blob whose DB pointer just landed.
putStoragePath = null;
// Audit log: the webhook just minted a new signed-PDF file row owned by
// 'system'. Without this entry the file appears in the aggregated view
// with no provenance trail; auditors need to see who/what wrote it.
void createAuditLog({
userId: 'system',
portId: doc.portId,
action: 'create',
entityType: 'file',
entityId: fileRecord.id,
newValue: {
filename: fileRecord.filename,
mimeType: 'application/pdf',
size: signedPdfBuffer.length,
documentId: doc.id,
source: 'documenso_completion',
},
ipAddress: '0.0.0.0',
userAgent: 'webhook',
});
} catch (err) {
// Distinguish "we lost the concurrent race" from a real failure -
// the loser of the SELECT FOR UPDATE re-check should clean up its
// blob silently, not log an error.
if (err instanceof DocumentAlreadyCompletedError) {
logger.info(
{ documentId: doc.id, portId: doc.portId },
'Webhook race lost - another worker already committed the signed PDF; deleting our duplicate blob',
);
} else {
logger.error(
{ err, documentId: doc.id, portId: doc.portId },
'Failed to download/store signed PDF',
);
}
// Compensating delete: storage.put landed but the DB commit didn't.
// Without this the blob lives forever with no row pointing at it.
if (putStoragePath) {
try {
await (await getStorageBackend()).delete(putStoragePath);
logger.info(
{ documentId: doc.id, storagePath: putStoragePath },
'Compensating storage.delete after failed signed-PDF commit',
);
} catch (compErr) {
// We tried - log so a human can clean up the orphan if needed.
logger.error(
{ compErr, documentId: doc.id, storagePath: putStoragePath },
'Compensating storage.delete also failed - manual cleanup required',
);
}
}
// Critical: do NOT set documents.status = 'completed' on failure.
// The previous catch block did - which created the "completed-with-
// no-signedFileId" zombie state the audit flagged. Let the next
// Documenso retry (or our poll-worker reconciliation) re-attempt;
// the early-return idempotency gate at the top requires BOTH
// status='completed' AND signedFileId so re-runs are safe.
}
// Update interest if eoi type
if (doc.interestId && doc.documentType === 'eoi') {
const interest = await db.query.interests.findFirst({
where: eq(interests.id, doc.interestId),
});
await db
.update(interests)
.set({
eoiStatus: 'signed',
eoiDocStatus: 'signed',
dateEoiSigned: new Date(),
updatedAt: new Date(),
})
.where(eq(interests.id, doc.interestId));
if (interest) {
const systemMeta: AuditMeta = {
userId: 'system',
portId: doc.portId,
ipAddress: '0.0.0.0',
userAgent: 'webhook',
};
// Guard against double-fire: DOCUMENT_COMPLETED may arrive multiple
// times. evaluateRule has no idempotency - skip when the interest is
// already past the EOI stage so the berth-rule side effect runs once.
const currentStageIdx = PIPELINE_STAGES.indexOf(
interest.pipelineStage as (typeof PIPELINE_STAGES)[number],
);
const eoiIdx = PIPELINE_STAGES.indexOf('eoi');
if (currentStageIdx <= eoiIdx) {
void evaluateRule('eoi_signed', doc.interestId, doc.portId, systemMeta);
}
// EOI signed = formal commitment to proceed → advance to 'reservation'
// (the next milestone). Conventional CRM behaviour: stage reflects the
// deal's CURRENT pursuit phase, not the most recently signed document.
// Per-port admins can override the rule via the `eoi_signed` entry in
// the stage_advance_rules setting (auto / suggest / off).
void advanceStageIfBehindGated(
doc.interestId,
doc.portId,
'reservation',
systemMeta,
'EOI signed via Documenso',
'eoi_signed',
);
// Phase 7 - Umami attribution. EOI signed is the headline
// conversion event so it gets its own Umami event for funnel
// visibility (rather than rolling up into "interest-stage-changed").
void import('@/lib/services/umami.service').then(({ trackEvent }) =>
trackEvent(doc.portId, 'eoi-signed', {
interestId: doc.interestId,
documentId: doc.id,
}),
);
}
}
// Update interest if reservation_agreement type - kept out of the
// signed-PDF try/catch above so a Documenso PDF-download failure doesn't
// also lose the sub-status stamp (which the rep can see immediately on
// the interest detail page).
if (doc.interestId && doc.documentType === 'reservation_agreement') {
const systemMeta: AuditMeta = {
userId: 'system',
portId: doc.portId,
ipAddress: '0.0.0.0',
userAgent: 'webhook',
};
await db
.update(interests)
.set({
reservationDocStatus: 'signed',
dateReservationSigned: new Date(),
updatedAt: new Date(),
})
.where(eq(interests.id, doc.interestId));
void advanceStageIfBehindGated(
doc.interestId,
doc.portId,
'reservation',
systemMeta,
'Reservation agreement signed',
'reservation_signed',
);
void import('@/lib/services/berth-rules-engine').then(({ evaluateRule }) =>
evaluateRule('contract_signed', doc.interestId!, doc.portId, systemMeta),
);
}
// Update interest if contract type. Outcome flip to 'won' is a separate
// explicit decision so reps can record a contract as signed without
// prematurely closing the deal.
if (doc.interestId && doc.documentType === 'contract') {
const systemMeta: AuditMeta = {
userId: 'system',
portId: doc.portId,
ipAddress: '0.0.0.0',
userAgent: 'webhook',
};
await db
.update(interests)
.set({
contractDocStatus: 'signed',
dateContractSigned: new Date(),
updatedAt: new Date(),
})
.where(eq(interests.id, doc.interestId));
void advanceStageIfBehindGated(
doc.interestId,
doc.portId,
'contract',
systemMeta,
'Contract signed via Documenso',
'contract_signed',
);
void import('@/lib/services/berth-rules-engine').then(({ evaluateRule }) =>
evaluateRule('contract_signed', doc.interestId!, doc.portId, systemMeta),
);
}
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'completed',
eventData: { documensoId: eventData.documentId },
});
emitToRoom(`port:${doc.portId}`, 'document:completed', { documentId: doc.id });
// Phase 2: distribute the fully-signed PDF to every recipient via a
// branded "all signed" email. Re-read the document so we see the
// signedFileId the transaction above just committed + the
// completionCcEmails list (Phase 2 - sales mgr / accounts etc who get
// a copy without being a signer).
const completedDoc = await db.query.documents.findFirst({
where: eq(documents.id, doc.id),
columns: { signedFileId: true, completionCcEmails: true },
});
if (completedDoc?.signedFileId) {
const signers = await db
.select({
name: documentSigners.signerName,
email: documentSigners.signerEmail,
})
.from(documentSigners)
.where(eq(documentSigners.documentId, doc.id));
// Phase 2 CC list - emails that weren't signers but get a copy of
// the finalized PDF on completion. Filter to addresses not already
// in the signer set (case-insensitive) so a sales mgr who's also
// a signer doesn't get two emails.
const signerEmailSet = new Set(signers.map((s) => s.email.toLowerCase()));
const ccRecipients = (completedDoc.completionCcEmails ?? [])
.filter((e): e is string => typeof e === 'string' && e.trim().length > 0)
.map((e) => e.trim())
.filter((e) => !signerEmailSet.has(e.toLowerCase()))
.map((email) => ({ name: '', email }));
const allRecipients = [...signers, ...ccRecipients];
if (allRecipients.length > 0) {
const portRow = await db.query.ports.findFirst({
where: eq(ports.id, doc.portId),
columns: { name: true },
});
// Resolve the deal's primary client name for the salutation -
// falls back to the document title when the owner chain doesn't
// surface a client.
let clientName = doc.title;
const owner = await resolveDocumentOwner(doc.portId, doc);
if (owner?.entityType === 'client') {
const client = await db.query.clients.findFirst({
where: eq(clients.id, owner.entityId),
columns: { fullName: true },
});
if (client?.fullName) clientName = client.fullName;
}
await sendSigningCompleted({
portId: doc.portId,
portName: portRow?.name ?? 'Port Nimara',
recipients: allRecipients,
clientName,
documentLabel: DOC_TYPE_LABEL[doc.documentType] ?? 'Expression of Interest',
completedAt: new Date(),
signedPdfFileId: completedDoc.signedFileId,
signedPdfFilename: `signed-${doc.id}.pdf`,
}).catch((err) => {
// Don't let a downstream email failure undo the completion -
// the signed PDF is already stored and the document row is
// marked completed. Log + emit so admins can re-trigger via
// the manual "Send copy" flow.
logger.error(
{ err, documentId: doc.id },
'sendSigningCompleted fan-out failed after document completed',
);
});
}
}
// Notify the document creator about completion
if (doc.createdBy && doc.createdBy !== 'system') {
void import('@/lib/services/notifications.service').then(({ createNotification }) =>
createNotification({
portId: doc.portId,
userId: doc.createdBy!,
type: 'document_signed',
title: 'Document fully signed',
description: `"${doc.title}" has been signed by all parties`,
link: `/documents/${doc.id}`,
entityType: 'document',
entityId: doc.id,
dedupeKey: `document:${doc.id}:completed`,
}),
);
}
}
export async function handleDocumentExpired(eventData: { documentId: string; portId?: string }) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
await db
.update(documents)
.set({ status: 'expired', updatedAt: new Date() })
.where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId)));
if (doc.interestId && doc.documentType === 'eoi') {
await db
.update(interests)
.set({ eoiStatus: 'expired', updatedAt: new Date() })
.where(and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId)));
}
await db.insert(documentEvents).values({
documentId: doc.id,
eventType: 'expired',
eventData: { documensoId: eventData.documentId },
});
emitToRoom(`port:${doc.portId}`, 'document:expired', { documentId: doc.id });
}
export async function handleDocumentOpened(eventData: {
documentId: string;
recipientEmail: string;
/** Optional Documenso recipient token - preferred over email match
* (same email may serve multiple roles on one document). */
recipientToken?: string | null;
signatureHash?: string;
portId?: string;
}) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
const signerWhere = eventData.recipientToken
? and(
eq(documentSigners.documentId, doc.id),
eq(documentSigners.signingToken, eventData.recipientToken),
)
: and(
eq(documentSigners.documentId, doc.id),
eq(documentSigners.signerEmail, eventData.recipientEmail),
);
const [signer] = await db.select().from(documentSigners).where(signerWhere);
// Stamp openedAt the first time we see a viewed event for this signer.
// Re-deliveries (v2 can fire RECIPIENT_VIEWED multiple times per visit)
// hit the idempotent UPDATE without overwriting the original timestamp.
if (signer && !signer.openedAt) {
await db
.update(documentSigners)
.set({ openedAt: new Date() })
.where(eq(documentSigners.id, signer.id));
}
await db
.insert(documentEvents)
.values({
documentId: doc.id,
eventType: 'viewed',
signerId: signer?.id ?? null,
signatureHash: eventData.signatureHash ?? null,
eventData: { recipientEmail: eventData.recipientEmail },
})
.onConflictDoNothing();
emitToRoom(`port:${doc.portId}`, 'document:signer:opened', {
documentId: doc.id,
signerEmail: eventData.recipientEmail,
});
}
export async function handleDocumentRejected(eventData: {
documentId: string;
recipientEmail?: string;
signatureHash?: string;
portId?: string;
}) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
let signerId: string | null = null;
if (eventData.recipientEmail) {
const [signer] = await db
.update(documentSigners)
.set({ status: 'declined' })
.where(
and(
eq(documentSigners.documentId, doc.id),
eq(documentSigners.signerEmail, eventData.recipientEmail),
),
)
.returning();
signerId = signer?.id ?? null;
}
await db
.update(documents)
.set({ status: 'rejected', updatedAt: new Date() })
.where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId)));
if (doc.interestId && doc.documentType === 'eoi') {
await db
.update(interests)
.set({ eoiStatus: 'rejected', updatedAt: new Date() })
.where(and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId)));
}
await db
.insert(documentEvents)
.values({
documentId: doc.id,
eventType: 'rejected',
signerId,
signatureHash: eventData.signatureHash ?? null,
eventData: { recipientEmail: eventData.recipientEmail ?? null },
})
.onConflictDoNothing();
emitToRoom(`port:${doc.portId}`, 'document:rejected', {
documentId: doc.id,
signerEmail: eventData.recipientEmail ?? null,
});
// §4.13: rejection cascade. When any signer declines:
// 1. Notify the interest's assigned rep in-CRM (drives the EOI tab
// banner via the realtime invalidation + the bell).
// 2. Audit-log so the timeline surfaces the rejection.
// Email cascade to the other signers is intentionally NOT fired -
// the legal flow is "this EOI is dead, regenerate"; messaging the
// co-signers would create noise. The rep handles outreach manually.
if (doc.interestId) {
const interest = await db.query.interests.findFirst({
where: and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId)),
columns: { assignedTo: true, clientId: true },
});
const targetUserId = interest?.assignedTo ?? null;
if (targetUserId) {
const { createNotification } = await import('@/lib/services/notifications.service');
void createNotification({
portId: doc.portId,
userId: targetUserId,
type: 'document_rejected',
title: 'EOI declined',
description: eventData.recipientEmail
? `${eventData.recipientEmail} declined to sign - review and regenerate.`
: 'A signer declined the EOI - review and regenerate.',
link: `/interests/${doc.interestId}?tab=eoi`,
entityType: 'document',
entityId: doc.id,
dedupeKey: `document:${doc.id}:rejected`,
}).catch(() => {
// Notification failure shouldn't block the webhook handler.
});
}
}
// Audit verb so the rep's timeline surfaces the rejection with a
// distinct icon/copy rather than a generic document_event row.
const { createAuditLog } = await import('@/lib/audit');
void createAuditLog({
userId: 'system',
portId: doc.portId,
action: 'update',
entityType: 'document',
entityId: doc.id,
metadata: {
type: 'document_declined',
signerEmail: eventData.recipientEmail ?? null,
},
ipAddress: '',
userAgent: '',
});
}
export async function handleDocumentCancelled(eventData: {
documentId: string;
signatureHash?: string;
portId?: string;
}) {
const doc = await resolveWebhookDocument(eventData.documentId, eventData.portId);
if (!doc) return;
await db
.update(documents)
.set({ status: 'cancelled', updatedAt: new Date() })
.where(and(eq(documents.id, doc.id), eq(documents.portId, doc.portId)));
if (doc.interestId && doc.documentType === 'eoi') {
await db
.update(interests)
.set({ eoiStatus: 'cancelled', updatedAt: new Date() })
.where(and(eq(interests.id, doc.interestId), eq(interests.portId, doc.portId)));
}
await db
.insert(documentEvents)
.values({
documentId: doc.id,
eventType: 'cancelled',
signatureHash: eventData.signatureHash ?? null,
eventData: { documensoId: eventData.documentId },
})
.onConflictDoNothing();
emitToRoom(`port:${doc.portId}`, 'document:cancelled', { documentId: doc.id });
}
// ─── Phase A: hub + wizard surface (PR1 skeletons; bodies land in PRs 4-6) ────
export interface DocumentDetailWatcher {
userId: string;
addedBy: string;
addedAt: Date;
}
/**
* #67 linked-entity resolution: resolve each polymorphic FK on the
* document to a human-readable name so the doc-detail "Linked entity"
* card can render "Interest - Matt Ciaccio" instead of "Interest →".
* Each side is null when the FK is null or the row was deleted.
*/
export interface DocumentDetailLinkedEntities {
interest: { id: string; clientName: string | null; berthLabel: string | null } | null;
client: { id: string; fullName: string } | null;
yacht: { id: string; name: string } | null;
company: { id: string; name: string } | null;
}
export interface DocumentDetail {
document: typeof documents.$inferSelect;
signers: (typeof documentSigners.$inferSelect)[];
events: (typeof documentEvents.$inferSelect)[];
watchers: DocumentDetailWatcher[];
linked: DocumentDetailLinkedEntities;
}
/**
* Single-roundtrip aggregator for the document detail page (PR5).
* Returns the document plus all signers, events (newest first), and watchers.
* Throws NotFoundError if the document is not in `portId`.
*/
export async function getDocumentDetail(id: string, portId: string): Promise<DocumentDetail> {
const document = await getDocumentById(id, portId);
const [signers, events, watchers] = await Promise.all([
db.query.documentSigners.findMany({
where: eq(documentSigners.documentId, id),
orderBy: (ds, { asc }) => [asc(ds.signingOrder)],
}),
db.query.documentEvents.findMany({
where: eq(documentEvents.documentId, id),
orderBy: (de, { desc }) => [desc(de.createdAt)],
}),
db
.select({
userId: documentWatchers.userId,
addedBy: documentWatchers.addedBy,
addedAt: documentWatchers.addedAt,
})
.from(documentWatchers)
.where(eq(documentWatchers.documentId, id)),
]);
// #67: resolve linked-entity names. Each helper does its own port
// check via the parent FK. Skipped when the FK is null. All four
// are parallel since they hit different tables.
const [interestRow, clientRow, yachtRow, companyRow] = await Promise.all([
document.interestId
? db
.select({
id: interests.id,
clientId: interests.clientId,
clientName: clients.fullName,
})
.from(interests)
.leftJoin(clients, eq(clients.id, interests.clientId))
.where(and(eq(interests.id, document.interestId), eq(interests.portId, portId)))
.limit(1)
.then((rows) => rows[0] ?? null)
: Promise.resolve(null),
document.clientId
? db.query.clients.findFirst({
where: and(eq(clients.id, document.clientId), eq(clients.portId, portId)),
columns: { id: true, fullName: true },
})
: Promise.resolve(undefined),
document.yachtId
? db.query.yachts.findFirst({
where: and(eq(yachts.id, document.yachtId), eq(yachts.portId, portId)),
columns: { id: true, name: true },
})
: Promise.resolve(undefined),
document.companyId
? db.query.companies.findFirst({
where: and(eq(companies.id, document.companyId), eq(companies.portId, portId)),
columns: { id: true, name: true },
})
: Promise.resolve(undefined),
]);
// Derive the berth label so the doc-detail Interest link carries
// distinct information from the Client link (otherwise both render
// the same client name). Prefer the in-EOI-bundle subset; fall back
// to the primary; fall back to all linked berths if neither flag is
// set anywhere.
let interestBerthLabel: string | null = null;
if (interestRow) {
const berthRows = await db
.select({
mooringNumber: berths.mooringNumber,
isPrimary: interestBerths.isPrimary,
isInEoiBundle: interestBerths.isInEoiBundle,
})
.from(interestBerths)
.innerJoin(berths, eq(berths.id, interestBerths.berthId))
.where(eq(interestBerths.interestId, interestRow.id));
if (berthRows.length > 0) {
const bundled = berthRows.filter((r) => r.isInEoiBundle);
const primary = berthRows.filter((r) => r.isPrimary);
const subset = bundled.length > 0 ? bundled : primary.length > 0 ? primary : berthRows;
const moorings = subset.map((r) => r.mooringNumber).filter((m): m is string => !!m);
if (moorings.length > 0) {
interestBerthLabel = formatBerthRange(moorings);
}
}
}
const linked: DocumentDetailLinkedEntities = {
interest: interestRow
? {
id: interestRow.id,
clientName: interestRow.clientName ?? null,
berthLabel: interestBerthLabel,
}
: null,
client: clientRow ? { id: clientRow.id, fullName: clientRow.fullName } : null,
yacht: yachtRow ? { id: yachtRow.id, name: yachtRow.name } : null,
company: companyRow ? { id: companyRow.id, name: companyRow.name } : null,
};
return { document, signers, events, watchers, linked };
}
/**
* User-initiated cancel of an in-flight document. Voids the doc in Documenso
* (when present), updates DB status, logs an event, emits socket. Webhook
* receiver also handles documenso-initiated cancellations via
* `handleDocumentCancelled`.
*
* The actual Documenso void call lands in PR2 (`documenso-client.voidDocument`);
* this skeleton updates DB state only.
*/
export interface CancelDocumentOptions {
/** Rep-authored reason inlined into notification emails + audit log. */
reason?: string | null;
/** Document_signers ids the rep wants to email about the cancellation.
* Empty list = silent void (Regenerate flow). Each id is validated to
* belong to this document before any email fires. */
notifyRecipients?: string[];
/**
* How to handle the upstream Documenso envelope. `'delete'` (the
* default) fires `DELETE /api/v2/envelope/{id}` so the envelope is
* removed from the Documenso instance - useful for keeping the
* Documenso log clean when drafts get abandoned. `'keep_remote'`
* leaves the envelope intact; the local CRM row still flips to
* `status='cancelled'` and the cancelled-doc badge surfaces the
* "Kept on Documenso" variant so audit-trail expectations are met.
*/
cancelMode?: 'delete' | 'keep_remote';
}
export async function cancelDocument(
documentId: string,
portId: string,
meta: AuditMeta,
options: CancelDocumentOptions = {},
): Promise<typeof documents.$inferSelect> {
const existing = await getDocumentById(documentId, portId);
if (['completed', 'cancelled', 'rejected'].includes(existing.status)) {
throw new ConflictError(`Document is already ${existing.status}`);
}
const cancelMode = options.cancelMode ?? 'delete';
// CRM is the system of record for cancellation status. A transient
// Documenso failure shouldn't block the user from marking the doc cancelled
// here - voidDocument already treats 404 as success, and the periodic
// webhook receiver will reconcile if the remote void eventually lands.
// `cancelMode='keep_remote'` skips the upstream DELETE entirely so the
// envelope stays available in Documenso for audit/forensics.
if (existing.documensoId && cancelMode === 'delete') {
try {
await documensoVoid(existing.documensoId, portId);
} catch (err) {
logger.warn(
{ err, documentId, documensoId: existing.documensoId },
'Documenso void failed; cancelling locally anyway',
);
}
}
const [updated] = await db
.update(documents)
.set({ status: 'cancelled', updatedAt: new Date() })
.where(and(eq(documents.id, documentId), eq(documents.portId, portId)))
.returning();
await db.insert(documentEvents).values({
documentId,
eventType: 'cancelled',
eventData: {
initiatedBy: meta.userId,
reason: options.reason ?? null,
notifyCount: options.notifyRecipients?.length ?? 0,
cancelMode,
},
});
void createAuditLog({
userId: meta.userId,
portId,
action: 'update',
entityType: 'document',
entityId: documentId,
oldValue: { status: existing.status },
newValue: { status: 'cancelled', reason: options.reason ?? null, cancelMode },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:cancelled', { documentId });
// Notify selected signers (rep-picked subset via the cancel-with-notify
// modal). Pull the matching signer rows so we can render the recipient's
// canonical name; skip silently when the rep passed no ids (Regenerate
// flow). Failure to send is logged + non-fatal - the cancellation has
// already committed locally.
const notifyIds = options.notifyRecipients ?? [];
if (notifyIds.length > 0) {
try {
const rows = await db
.select({
id: documentSigners.id,
signerName: documentSigners.signerName,
signerEmail: documentSigners.signerEmail,
})
.from(documentSigners)
.where(
and(eq(documentSigners.documentId, documentId), inArray(documentSigners.id, notifyIds)),
);
if (rows.length > 0) {
const portRow = await db.query.ports.findFirst({
where: eq(ports.id, portId),
columns: { name: true },
});
const { sendSigningCancelled } =
await import('@/lib/services/document-signing-emails.service');
await sendSigningCancelled({
portId,
portName: portRow?.name ?? 'Port Nimara',
recipients: rows.map((r) => ({ name: r.signerName, email: r.signerEmail })),
documentLabel:
(DOC_TYPE_LABEL[existing.documentType] as
| 'Expression of Interest'
| 'Sales Contract'
| 'Reservation Agreement') ?? 'Expression of Interest',
reason: options.reason ?? null,
});
}
} catch (err) {
logger.error(
{ err, documentId, notifyIds },
'cancel-with-notify email fan-out failed; cancellation already committed',
);
}
}
return updated!;
}
/**
* Returns prefilled email composer payload for the "Email signed PDF to all
* signatories" action on the document detail page.
*
* Available for `status='completed' && signedFileId !== null`.
*
* Body content (from per-port `signed_doc_completion` template), full
* sender-resolution, and watcher-cc helpers land alongside PR8 (email
* composer with attachments). For PR1 this returns the minimal correct
* recipients + auto-attachment shape so detail-page integration tests can
* assert against it.
*/
export interface ComposeSignedDocEmailResult {
to: string[];
cc: string[];
subject: string;
body: string;
attachments: Array<{ fileId: string; filename?: string }>;
defaultSenderType: 'system' | 'user';
}
export async function composeSignedDocEmail(
documentId: string,
portId: string,
): Promise<ComposeSignedDocEmailResult> {
const doc = await getDocumentById(documentId, portId);
if (doc.status !== 'completed') {
throw new ConflictError('Document is not completed');
}
if (!doc.signedFileId) {
throw new ValidationError('Document has no signed PDF');
}
const signers = await db
.select({ email: documentSigners.signerEmail })
.from(documentSigners)
.where(eq(documentSigners.documentId, documentId));
const dedupedRecipients = Array.from(new Set(signers.map((s) => s.email)));
return {
to: dedupedRecipients,
cc: [],
subject: `Signed ${doc.documentType.replace(/_/g, ' ')} - ${doc.title}`,
body: '',
attachments: [{ fileId: doc.signedFileId }],
defaultSenderType: 'system',
};
}
// ─── Watchers (PR5) ───────────────────────────────────────────────────────────
export async function listDocumentWatchers(documentId: string, portId: string) {
await getDocumentById(documentId, portId); // port-scope check
return db
.select({
userId: documentWatchers.userId,
addedBy: documentWatchers.addedBy,
addedAt: documentWatchers.addedAt,
})
.from(documentWatchers)
.where(eq(documentWatchers.documentId, documentId));
}
export async function addDocumentWatcher(
documentId: string,
portId: string,
userId: string,
meta: AuditMeta,
): Promise<{ userId: string; addedAt: Date }> {
await getDocumentById(documentId, portId);
await assertWatchersInPort(portId, [userId]);
const [row] = await db
.insert(documentWatchers)
.values({ documentId, userId, addedBy: meta.userId })
.onConflictDoNothing()
.returning();
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'document_watcher',
entityId: documentId,
newValue: { documentId, watcherUserId: userId },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:updated', { documentId });
return row ? { userId: row.userId, addedAt: row.addedAt } : { userId, addedAt: new Date() };
}
export async function removeDocumentWatcher(
documentId: string,
portId: string,
userId: string,
meta: AuditMeta,
): Promise<void> {
await getDocumentById(documentId, portId);
await db
.delete(documentWatchers)
.where(and(eq(documentWatchers.documentId, documentId), eq(documentWatchers.userId, userId)));
void createAuditLog({
userId: meta.userId,
portId,
action: 'delete',
entityType: 'document_watcher',
entityId: documentId,
oldValue: { documentId, watcherUserId: userId },
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:updated', { documentId });
}
/**
* Create-document wizard entry point (PR6).
*
* Dispatches across pathways:
* - 'documenso-template' - Documenso renders + signs from its own template
* - 'inapp' - render PDF locally from a CRM template, upload to Documenso
* - 'upload' - admin-supplied PDF, upload to Documenso (auto-place signature
* fields if `autoPlaceFields`)
*
* Persists the document, applies reminder overrides, attaches watchers, and
* triggers send when `sendImmediately`.
*/
import type { CreateDocumentWizardInput } from '@/lib/validators/documents';
export async function createFromWizard(
portId: string,
data: CreateDocumentWizardInput,
meta: AuditMeta,
): Promise<typeof documents.$inferSelect> {
if (data.source === 'upload') {
return createFromUpload(portId, data, meta);
}
if (!data.templateId) {
throw new ValidationError('templateId is required for template source');
}
await assertSubjectFksInPort(portId, {
clientId: data.clientId,
interestId: data.interestId,
companyId: data.companyId,
yachtId: data.yachtId,
tenancyId: data.tenancyId,
});
const [doc] = await db
.insert(documents)
.values({
portId,
interestId: data.interestId ?? null,
tenancyId: data.tenancyId ?? null,
clientId: data.clientId ?? null,
companyId: data.companyId ?? null,
yachtId: data.yachtId ?? null,
documentType: data.documentType,
title: data.title,
notes: data.notes ?? null,
status: 'draft',
remindersDisabled: data.remindersDisabled,
reminderCadenceOverride: data.reminderCadenceOverride ?? null,
createdBy: meta.userId,
})
.returning();
if (!doc)
throw new CodedError('INSERT_RETURNING_EMPTY', {
internalMessage: 'Failed to insert document',
});
if (data.watchers.length > 0) {
await assertWatchersInPort(portId, data.watchers);
await db.insert(documentWatchers).values(
data.watchers.map((userId) => ({
documentId: doc.id,
userId,
addedBy: meta.userId,
})),
);
}
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'document',
entityId: doc.id,
newValue: {
documentType: doc.documentType,
title: doc.title,
pathway: data.pathway,
source: data.source,
},
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:created', { documentId: doc.id });
return doc;
}
/**
* Upload-driven creation path. Files-service integration + Documenso upload
* + auto-place signature fields land alongside the realapi PR (PR11). For
* PR6 we persist the document row + signers + watchers and leave the
* Documenso upload step to the existing sendForSigning flow on first send.
*/
export async function createFromUpload(
portId: string,
data: CreateDocumentWizardInput,
meta: AuditMeta,
): Promise<typeof documents.$inferSelect> {
if (!data.uploadedFileId) {
throw new ValidationError('uploadedFileId is required for upload source');
}
if (!data.signers || data.signers.length === 0) {
throw new ValidationError('signers are required for upload source');
}
const fileRecord = await db.query.files.findFirst({
where: and(eq(files.id, data.uploadedFileId), eq(files.portId, portId)),
});
if (!fileRecord) {
throw new NotFoundError('File');
}
await assertSubjectFksInPort(portId, {
clientId: data.clientId,
interestId: data.interestId,
companyId: data.companyId,
yachtId: data.yachtId,
tenancyId: data.tenancyId,
});
const [doc] = await db
.insert(documents)
.values({
portId,
interestId: data.interestId ?? null,
tenancyId: data.tenancyId ?? null,
clientId: data.clientId ?? null,
companyId: data.companyId ?? null,
yachtId: data.yachtId ?? null,
documentType: data.documentType,
title: data.title,
notes: data.notes ?? null,
status: 'draft',
fileId: fileRecord.id,
remindersDisabled: data.remindersDisabled,
reminderCadenceOverride: data.reminderCadenceOverride ?? null,
createdBy: meta.userId,
})
.returning();
if (!doc)
throw new CodedError('INSERT_RETURNING_EMPTY', {
internalMessage: 'Failed to insert document',
});
await db.insert(documentSigners).values(
data.signers.map((s) => ({
documentId: doc.id,
signerName: s.signerName,
signerEmail: s.signerEmail,
signerRole: s.signerRole,
signingOrder: s.signingOrder,
status: 'pending' as const,
})),
);
if (data.watchers.length > 0) {
await assertWatchersInPort(portId, data.watchers);
await db.insert(documentWatchers).values(
data.watchers.map((userId) => ({
documentId: doc.id,
userId,
addedBy: meta.userId,
})),
);
}
void createAuditLog({
userId: meta.userId,
portId,
action: 'create',
entityType: 'document',
entityId: doc.id,
newValue: {
documentType: doc.documentType,
title: doc.title,
pathway: 'upload',
source: 'upload',
uploadedFileId: fileRecord.id,
},
ipAddress: meta.ipAddress,
userAgent: meta.userAgent,
});
emitToRoom(`port:${portId}`, 'document:created', { documentId: doc.id });
return doc;
}
// ─── Aggregated Workflow Projection ───────────────────────────────────────────
export interface AggregatedWorkflowGroup {
label: string;
source: 'direct' | 'client' | 'company' | 'yacht';
workflows: Array<typeof documents.$inferSelect>;
total: number;
}
const WORKFLOW_GROUP_LIMIT = 20;
const INFLIGHT_STATUSES = ['draft', 'sent', 'partially_signed'] as const;
/**
* Same projection shape as listFilesAggregatedByEntity but for in-flight
* signing workflows. Completed/expired/cancelled workflows are hidden -
* they surface via their signed-PDF file row.
*/
export async function listInflightWorkflowsAggregatedByEntity(
portId: string,
entityType: 'client' | 'company' | 'yacht',
entityId: string,
): Promise<{ groups: AggregatedWorkflowGroup[] }> {
const entityExists = await assertEntityInPort(portId, entityType, entityId);
if (!entityExists) return { groups: [] };
const related = await collectRelatedEntities(portId, entityType, entityId);
const groups: AggregatedWorkflowGroup[] = [];
const directColumn =
entityType === 'client'
? documents.clientId
: entityType === 'company'
? documents.companyId
: documents.yachtId;
// Batch the related-entity workflow lookups in parallel - the
// pre-2026-05-14 sequential loop fired ~50 queries on a busy client
// (direct + each company + each yacht + each related client), each
// round-trip blocking the next. Now every lookup runs concurrently
// via Promise.all; total wall-clock collapses to "slowest single
// query" instead of "sum of every query". Future fully-batched UNION
// query in PRE-DEPLOY-PLAN follow-ups.
const [directResult, companyResults, yachtResults, clientResults] = await Promise.all([
fetchWorkflowGroupRows(portId, eq(directColumn, entityId)),
Promise.all(
related.companies.map(async ({ id, name }) => ({
name,
result: await fetchWorkflowGroupRows(portId, eq(documents.companyId, id)),
})),
),
Promise.all(
related.yachts.map(async ({ id, name }) => ({
name,
result: await fetchWorkflowGroupRows(portId, eq(documents.yachtId, id)),
})),
),
Promise.all(
related.clients.map(async ({ id, name }) => ({
name,
result: await fetchWorkflowGroupRows(portId, eq(documents.clientId, id)),
})),
),
]);
if (directResult.rows.length > 0) {
groups.push({
label: 'DIRECTLY ATTACHED',
source: 'direct',
workflows: directResult.rows,
total: directResult.total,
});
}
for (const { name, result } of companyResults) {
if (result.rows.length === 0) continue;
groups.push({
label: `FROM COMPANY: ${name.toUpperCase()}`,
source: 'company',
workflows: result.rows,
total: result.total,
});
}
for (const { name, result } of yachtResults) {
if (result.rows.length === 0) continue;
groups.push({
label: `FROM YACHT: ${name.toUpperCase()}`,
source: 'yacht',
workflows: result.rows,
total: result.total,
});
}
for (const { name, result } of clientResults) {
if (result.rows.length === 0) continue;
groups.push({
label: `FROM CLIENT: ${name.toUpperCase()}`,
source: 'client',
workflows: result.rows,
total: result.total,
});
}
return { groups };
}
async function fetchWorkflowGroupRows(
portId: string,
predicate: ReturnType<typeof eq>,
): Promise<{ rows: Array<typeof documents.$inferSelect>; total: number }> {
const inflightStatuses = INFLIGHT_STATUSES as unknown as string[];
const rows = await db
.select()
.from(documents)
.where(
and(eq(documents.portId, portId), inArray(documents.status, inflightStatuses), predicate),
)
.orderBy(desc(documents.updatedAt))
.limit(WORKFLOW_GROUP_LIMIT);
const [countRow] = await db
.select({ count: sql<number>`count(*)::int` })
.from(documents)
.where(
and(eq(documents.portId, portId), inArray(documents.status, inflightStatuses), predicate),
);
return { rows, total: Number(countRow?.count ?? 0) };
}