import { and, eq, inArray, isNull, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { interests, interestTags } from '@/lib/db/schema/interests'; import { clients } from '@/lib/db/schema/clients'; import { berths } from '@/lib/db/schema/berths'; import { tags } from '@/lib/db/schema/system'; import { createAuditLog } from '@/lib/audit'; import { NotFoundError, ConflictError } from '@/lib/errors'; import { emitToRoom } from '@/lib/socket/server'; import { buildListQuery } from '@/lib/db/query-builder'; import { diffEntity } from '@/lib/entity-diff'; import { softDelete, restore, withTransaction } from '@/lib/db/utils'; import type { CreateInterestInput, UpdateInterestInput, ChangeStageInput, ListInterestsInput, } from '@/lib/validators/interests'; // ─── Types ──────────────────────────────────────────────────────────────────── interface AuditMeta { userId: string; portId: string; ipAddress: string; userAgent: string; } // ─── BR-011: Auto-promote leadCategory ─────────────────────────────────────── async function resolveLeadCategory( clientId: string, leadCategory: string | undefined | null, ): Promise { if (leadCategory && leadCategory !== 'general_interest') { return leadCategory; } const client = await db.query.clients.findFirst({ where: eq(clients.id, clientId), }); if (client && (client.yachtLengthFt || client.yachtLengthM)) { return 'specific_qualified'; } return leadCategory ?? undefined; } // ─── List ───────────────────────────────────────────────────────────────────── export async function listInterests(portId: string, query: ListInterestsInput) { const { page, limit, sort, order, search, includeArchived, clientId, berthId, pipelineStage, leadCategory, eoiStatus, tagIds, } = query; const filters = []; if (clientId) { filters.push(eq(interests.clientId, clientId)); } if (berthId) { filters.push(eq(interests.berthId, berthId)); } if (pipelineStage && pipelineStage.length > 0) { filters.push(inArray(interests.pipelineStage, pipelineStage)); } if (leadCategory) { filters.push(eq(interests.leadCategory, leadCategory)); } if (eoiStatus) { filters.push(eq(interests.eoiStatus, eoiStatus)); } if (tagIds && tagIds.length > 0) { const interestsWithTags = await db .selectDistinct({ interestId: interestTags.interestId }) .from(interestTags) .where(inArray(interestTags.tagId, tagIds)); const matchingIds = interestsWithTags.map((r) => r.interestId); if (matchingIds.length > 0) { filters.push(inArray(interests.id, matchingIds)); } else { return { data: [], total: 0 }; } } const sortColumn = (() => { switch (sort) { case 'pipelineStage': return interests.pipelineStage; case 'leadCategory': return interests.leadCategory; case 'createdAt': return interests.createdAt; default: return interests.updatedAt; } })(); const result = await buildListQuery({ table: interests, portIdColumn: interests.portId, portId, idColumn: interests.id, updatedAtColumn: interests.updatedAt, filters, sort: { column: sortColumn, direction: order }, page, pageSize: limit, searchColumns: [], searchTerm: search, includeArchived, archivedAtColumn: interests.archivedAt, }); // Join client names and berth mooring numbers const interestIds = (result.data as Array<{ id: string; clientId: string; berthId: string | null }>).map((i) => i.id); const clientIds = [...new Set((result.data as Array<{ clientId: string }>).map((i) => i.clientId))]; const berthIds = [...new Set( (result.data as Array<{ berthId: string | null }>) .map((i) => i.berthId) .filter(Boolean) as string[] )]; let clientsMap: Record = {}; let berthsMap: Record = {}; let tagsByInterestId: Record> = {}; if (clientIds.length > 0) { const clientRows = await db .select({ id: clients.id, fullName: clients.fullName }) .from(clients) .where(inArray(clients.id, clientIds)); clientsMap = Object.fromEntries(clientRows.map((c) => [c.id, c.fullName])); } if (berthIds.length > 0) { const berthRows = await db .select({ id: berths.id, mooringNumber: berths.mooringNumber }) .from(berths) .where(inArray(berths.id, berthIds)); berthsMap = Object.fromEntries(berthRows.map((b) => [b.id, b.mooringNumber])); } if (interestIds.length > 0) { const tagRows = await db .select({ interestId: interestTags.interestId, id: tags.id, name: tags.name, color: tags.color, }) .from(interestTags) .innerJoin(tags, eq(interestTags.tagId, tags.id)) .where(inArray(interestTags.interestId, interestIds)); for (const row of tagRows) { if (!tagsByInterestId[row.interestId]) tagsByInterestId[row.interestId] = []; tagsByInterestId[row.interestId]!.push({ id: row.id, name: row.name, color: row.color }); } } const data = (result.data as Array>).map((i) => ({ ...i, clientName: clientsMap[i.clientId as string] ?? null, berthMooringNumber: i.berthId ? (berthsMap[i.berthId as string] ?? null) : null, tags: tagsByInterestId[i.id as string] ?? [], })); return { data, total: result.total }; } // ─── Get by ID ──────────────────────────────────────────────────────────────── export async function getInterestById(id: string, portId: string) { const interest = await db.query.interests.findFirst({ where: eq(interests.id, id), }); if (!interest || interest.portId !== portId) { throw new NotFoundError('Interest'); } const [clientRow] = await db .select({ fullName: clients.fullName }) .from(clients) .where(eq(clients.id, interest.clientId)); let berthMooringNumber: string | null = null; if (interest.berthId) { const [berthRow] = await db .select({ mooringNumber: berths.mooringNumber }) .from(berths) .where(eq(berths.id, interest.berthId)); berthMooringNumber = berthRow?.mooringNumber ?? null; } const tagRows = await db .select({ id: tags.id, name: tags.name, color: tags.color }) .from(interestTags) .innerJoin(tags, eq(interestTags.tagId, tags.id)) .where(eq(interestTags.interestId, id)); return { ...interest, clientName: clientRow?.fullName ?? null, berthMooringNumber, tags: tagRows, }; } // ─── Create ─────────────────────────────────────────────────────────────────── export async function createInterest( portId: string, data: CreateInterestInput, meta: AuditMeta, ) { const { tagIds, ...interestData } = data; // BR-011: auto-promote leadCategory const resolvedLeadCategory = await resolveLeadCategory( data.clientId, data.leadCategory, ); const result = await withTransaction(async (tx) => { const [interest] = await tx .insert(interests) .values({ portId, ...interestData, leadCategory: resolvedLeadCategory, }) .returning(); if (tagIds && tagIds.length > 0) { await tx.insert(interestTags).values( tagIds.map((tagId) => ({ interestId: interest!.id, tagId })), ); } return interest!; }); void createAuditLog({ userId: meta.userId, portId, action: 'create', entityType: 'interest', entityId: result.id, newValue: { clientId: result.clientId, pipelineStage: result.pipelineStage }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'interest:created', { interestId: result.id, clientId: result.clientId, berthId: result.berthId ?? null, source: result.source ?? '' }); void import('@/lib/services/webhook-dispatch').then(({ dispatchWebhookEvent }) => dispatchWebhookEvent(portId, 'interest:created', { interestId: result.id, clientId: result.clientId }), ); return result; } // ─── Update ─────────────────────────────────────────────────────────────────── export async function updateInterest( id: string, portId: string, data: UpdateInterestInput, meta: AuditMeta, ) { const existing = await db.query.interests.findFirst({ where: eq(interests.id, id), }); if (!existing || existing.portId !== portId) { throw new NotFoundError('Interest'); } // BR-011: auto-promote leadCategory if provided let resolvedLeadCategory = data.leadCategory; if ('leadCategory' in data) { resolvedLeadCategory = await resolveLeadCategory( existing.clientId, data.leadCategory, ) as typeof data.leadCategory; } const updateData = { ...data, leadCategory: resolvedLeadCategory }; const { diff } = diffEntity( existing as Record, updateData as Record, ); const [updated] = await db .update(interests) .set({ ...updateData, updatedAt: new Date() }) .where(and(eq(interests.id, id), eq(interests.portId, portId))) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'interest', entityId: id, oldValue: diff as Record, newValue: updateData as Record, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'interest:updated', { interestId: id, changedFields: Object.keys(diff) }); return updated!; } // ─── Change Stage ───────────────────────────────────────────────────────────── export async function changeInterestStage( id: string, portId: string, data: ChangeStageInput, meta: AuditMeta, ) { const existing = await db.query.interests.findFirst({ where: eq(interests.id, id), }); if (!existing || existing.portId !== portId) { throw new NotFoundError('Interest'); } const oldStage = existing.pipelineStage; const [updated] = await db .update(interests) .set({ pipelineStage: data.pipelineStage, updatedAt: new Date() }) .where(and(eq(interests.id, id), eq(interests.portId, portId))) .returning(); // BR-133: Auto-populate milestones based on stage const milestoneUpdates: Record = {}; if (data.pipelineStage === 'signed_eoi_nda') milestoneUpdates.dateEoiSigned = new Date(); if (data.pipelineStage === 'contract') milestoneUpdates.dateContractSigned = new Date(); if (data.pipelineStage === 'deposit_10pct') milestoneUpdates.dateDepositReceived = new Date(); if (Object.keys(milestoneUpdates).length > 0) { await db.update(interests).set({ ...milestoneUpdates, updatedAt: new Date() }).where(eq(interests.id, id)); } void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'interest', entityId: id, oldValue: { pipelineStage: oldStage }, newValue: { pipelineStage: data.pipelineStage, reason: data.reason }, metadata: { type: 'stage_change', reason: data.reason }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'interest:stageChanged', { interestId: id, oldStage: oldStage ?? '', newStage: data.pipelineStage, clientName: '', berthNumber: '', }); void import('@/lib/services/webhook-dispatch').then(({ dispatchWebhookEvent }) => dispatchWebhookEvent(portId, 'interest:stageChanged', { interestId: id, oldStage: oldStage ?? null, newStage: data.pipelineStage, }), ); // Fire-and-forget notification to the acting user void import('@/lib/services/notifications.service').then(({ createNotification }) => createNotification({ portId, userId: meta.userId, type: 'interest_stage_changed', title: `Interest moved to ${data.pipelineStage}`, description: `Interest ${id} stage changed from ${oldStage ?? 'unknown'} to ${data.pipelineStage}`, link: `/interests/${id}`, entityType: 'interest', entityId: id, dedupeKey: `interest:${id}:stage:${data.pipelineStage}`, cooldownMs: 300_000, }), ); return updated!; } // ─── Archive / Restore ──────────────────────────────────────────────────────── export async function archiveInterest(id: string, portId: string, meta: AuditMeta) { const existing = await db.query.interests.findFirst({ where: eq(interests.id, id), }); if (!existing || existing.portId !== portId) { throw new NotFoundError('Interest'); } // BR-014: Block archive if pending EOI/contract if (existing.eoiStatus === 'waiting_for_signatures' || existing.contractStatus === 'pending') { throw new ConflictError('Cannot archive interest with pending documents. Cancel documents first.'); } await softDelete(interests, interests.id, id); void createAuditLog({ userId: meta.userId, portId, action: 'archive', entityType: 'interest', entityId: id, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'interest:archived', { interestId: id }); } export async function restoreInterest(id: string, portId: string, meta: AuditMeta) { const existing = await db.query.interests.findFirst({ where: eq(interests.id, id), }); if (!existing || existing.portId !== portId) { throw new NotFoundError('Interest'); } await restore(interests, interests.id, id); void createAuditLog({ userId: meta.userId, portId, action: 'restore', entityType: 'interest', entityId: id, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'interest:updated', { interestId: id, changedFields: [] }); } // ─── Set Tags ───────────────────────────────────────────────────────────────── export async function setInterestTags( id: string, portId: string, tagIds: string[], meta: AuditMeta, ) { const existing = await db.query.interests.findFirst({ where: eq(interests.id, id), }); if (!existing || existing.portId !== portId) { throw new NotFoundError('Interest'); } await db.delete(interestTags).where(eq(interestTags.interestId, id)); if (tagIds.length > 0) { await db .insert(interestTags) .values(tagIds.map((tagId) => ({ interestId: id, tagId }))); } void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'interest', entityId: id, metadata: { type: 'tags_updated', tagIds }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'interest:updated', { interestId: id, changedFields: ['tags'] }); return { interestId: id, tagIds }; } // ─── Link / Unlink Berth ────────────────────────────────────────────────────── export async function linkBerth( id: string, portId: string, berthId: string, meta: AuditMeta, ) { const existing = await db.query.interests.findFirst({ where: eq(interests.id, id), }); if (!existing || existing.portId !== portId) { throw new NotFoundError('Interest'); } const [updated] = await db .update(interests) .set({ berthId, updatedAt: new Date() }) .where(and(eq(interests.id, id), eq(interests.portId, portId))) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'interest', entityId: id, oldValue: { berthId: existing.berthId }, newValue: { berthId }, metadata: { type: 'berth_linked' }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'interest:berthLinked', { interestId: id, berthId }); void import('@/lib/services/webhook-dispatch').then(({ dispatchWebhookEvent }) => dispatchWebhookEvent(portId, 'interest:berthLinked', { interestId: id, berthId }), ); return updated!; } export async function unlinkBerth(id: string, portId: string, meta: AuditMeta) { const existing = await db.query.interests.findFirst({ where: eq(interests.id, id), }); if (!existing || existing.portId !== portId) { throw new NotFoundError('Interest'); } const oldBerthId = existing.berthId; const [updated] = await db .update(interests) .set({ berthId: null, updatedAt: new Date() }) .where(and(eq(interests.id, id), eq(interests.portId, portId))) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'interest', entityId: id, oldValue: { berthId: oldBerthId }, newValue: { berthId: null }, metadata: { type: 'berth_unlinked' }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'interest:berthUnlinked', { interestId: id, berthId: oldBerthId ?? '' }); return updated!; } // ─── Stage Counts (for board) ──────────────────────────────────────────────── export async function getInterestStageCounts(portId: string) { const rows = await db.select({ stage: interests.pipelineStage, count: sql`count(*)::int` }) .from(interests) .where(and(eq(interests.portId, portId), isNull(interests.archivedAt))) .groupBy(interests.pipelineStage); return Object.fromEntries(rows.map(r => [r.stage, r.count])); }