import { and, count, desc, eq, gte, inArray, isNull, lte, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { clients, clientNotes } from '@/lib/db/schema/clients'; import { yachts, yachtNotes } from '@/lib/db/schema/yachts'; import { companies, companyNotes } from '@/lib/db/schema/companies'; import { interests, interestBerths, interestNotes } from '@/lib/db/schema/interests'; import { berths } from '@/lib/db/schema/berths'; import { berthTenancies } from '@/lib/db/schema/tenancies'; import { invoices, expenses } from '@/lib/db/schema/financial'; import { payments } from '@/lib/db/schema/pipeline'; import { documents } from '@/lib/db/schema/documents'; import { reminders } from '@/lib/db/schema/operations'; import { residentialClients, residentialInterests } from '@/lib/db/schema/residential'; import { ports } from '@/lib/db/schema/ports'; import { systemSettings, auditLogs } from '@/lib/db/schema/system'; import { userProfiles } from '@/lib/db/schema/users'; import { PIPELINE_STAGES, STAGE_WEIGHTS, canonicalizeStage } from '@/lib/constants'; import { activeInterestsWhere } from '@/lib/services/active-interest'; import { convert as convertCurrency } from '@/lib/services/currency'; const DEFAULT_PIPELINE_WEIGHTS: Record = STAGE_WEIGHTS; // ─── KPIs ───────────────────────────────────────────────────────────────────── /** * Pipeline KPIs. When `range` is supplied the pipeline-value calculation * is scoped to interests whose `createdAt` falls inside the range - lets * leadership see "what was added to the pipeline this period" rather * than the all-time snapshot. Active-interests count + occupancy are * always all-active (no temporal sense for "active right now"). */ export async function getKpis(portId: string, range?: { from: Date; to: Date } | null) { const [totalClientsRow] = await db .select({ value: count() }) .from(clients) .where(and(eq(clients.portId, portId), isNull(clients.archivedAt))); // Range filter - clamp to the interest's createdAt. Returns undefined // when no range is provided so the existing all-time queries stay // unaffected. const rangeClause = range ? and(gte(interests.createdAt, range.from), lte(interests.createdAt, range.to)) : undefined; const [activeInterestsRow] = await db .select({ value: count() }) .from(interests) .where( rangeClause ? and(activeInterestsWhere(portId), rangeClause) : activeInterestsWhere(portId), ); // Pipeline value: SUM each berth's price ONCE regardless of how many // active interests reference it. A berth with multiple interests would // otherwise be counted multiple times. Reads the primary-berth link // via interest_berths (plan §3.4). // // Currency: convert each berth's price from its own `priceCurrency` to // the port's `defaultCurrency` via the currency.service rate table. // Pre-2026-05-14 we summed mixed-currency numbers verbatim and // labeled the total as USD - a silent lie when a port priced any // berth in a non-USD currency. const [portRow] = await db .select({ defaultCurrency: ports.defaultCurrency }) .from(ports) .where(eq(ports.id, portId)); const targetCurrency = portRow?.defaultCurrency ?? 'USD'; const pipelineRows = await db .selectDistinct({ berthId: interestBerths.berthId, price: berths.price, priceCurrency: berths.priceCurrency, }) .from(interests) .innerJoin( interestBerths, and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)), ) .innerJoin(berths, eq(interestBerths.berthId, berths.id)) .where( rangeClause ? and(activeInterestsWhere(portId), rangeClause) : activeInterestsWhere(portId), ); let pipelineValue = 0; for (const row of pipelineRows) { if (!row.price) continue; const amount = parseFloat(String(row.price)); if (!Number.isFinite(amount) || amount === 0) continue; const sourceCurrency = (row.priceCurrency ?? targetCurrency).toUpperCase(); if (sourceCurrency === targetCurrency.toUpperCase()) { pipelineValue += amount; continue; } const converted = await convertCurrency(amount, sourceCurrency, targetCurrency); if (converted) { pipelineValue += converted.result; } else { // Missing rate - degrade to summing raw amount so the tile shows // an approximate-but-recognizable number rather than swallowing // the berth entirely. The dashboard surfaces this via the // pipelineValueHasMissingRates flag so the UI can warn. pipelineValue += amount; } } // Occupancy rate: berths with `status='sold'` / total * 100. Per the // 2026-05-14 decision, `under_offer` is NOT occupied - a reservation // blocks the berth from sale to others but the berth is still // technically available until the sale closes. const allBerthsRows = await db .select({ status: berths.status }) .from(berths) // F5: archived berths excluded so retired moorings don't dilute denominator. .where(and(eq(berths.portId, portId), isNull(berths.archivedAt))); const totalBerths = allBerthsRows.length; const occupiedBerths = allBerthsRows.filter((b) => b.status === 'sold').length; const occupancyRate = totalBerths > 0 ? (occupiedBerths / totalBerths) * 100 : 0; return { totalClients: totalClientsRow?.value ?? 0, activeInterests: activeInterestsRow?.value ?? 0, pipelineValue, pipelineValueCurrency: targetCurrency, occupancyRate, }; } // ─── Pipeline Counts ────────────────────────────────────────────────────────── export async function getPipelineCounts(portId: string) { const rows = await db .select({ stage: interests.pipelineStage, count: sql`count(*)::int`, }) .from(interests) .where(activeInterestsWhere(portId)) .groupBy(interests.pipelineStage); const countsByStage = Object.fromEntries(rows.map((r) => [r.stage, r.count])); return PIPELINE_STAGES.map((stage) => ({ stage, count: countsByStage[stage] ?? 0, })); } // ─── Revenue Forecast ───────────────────────────────────────────────────────── export async function getRevenueForecast(portId: string, range?: { from: Date; to: Date } | null) { // Load weights from systemSettings let weights: Record = DEFAULT_PIPELINE_WEIGHTS; let weightsSource: 'db' | 'default' = 'default'; const settingRow = await db.query.systemSettings.findFirst({ where: and(eq(systemSettings.key, 'pipeline_weights'), eq(systemSettings.portId, portId)), }); if (settingRow?.value) { try { const parsed = settingRow.value as Record; if (typeof parsed === 'object' && parsed !== null) { weights = parsed; weightsSource = 'db'; } } catch { // Fall through to defaults } } // Forecast excludes lost/cancelled - only currently-active or won-out // interests should affect the weighted pipeline value. Reads the // primary-berth link via interest_berths (plan §3.4). const forecastRangeClause = range ? and(gte(interests.createdAt, range.from), lte(interests.createdAt, range.to)) : undefined; const interestRows = await db .select({ id: interests.id, pipelineStage: interests.pipelineStage, berthPrice: berths.price, }) .from(interests) .innerJoin( interestBerths, and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)), ) .innerJoin(berths, eq(interestBerths.berthId, berths.id)) .where( forecastRangeClause ? and(activeInterestsWhere(portId), forecastRangeClause) : activeInterestsWhere(portId), ); // Build stageBreakdown - gross value, weighted value, per-stage weight, // and `dealsMissingPrice` (deals whose primary berth has no/zero price) // all surface to callers. The dashboard tile shows a warning chip when // any deals in a stage are missing a berth price so the $0 line item // doesn't read as legitimate. const stageMap: Record< string, { count: number; grossValue: number; weightedValue: number; dealsMissingPrice: number } > = {}; for (const row of interestRows) { const stage = canonicalizeStage(row.pipelineStage); const price = row.berthPrice ? parseFloat(String(row.berthPrice)) : 0; const weight = weights[stage] ?? 0; const weighted = price * weight; if (!stageMap[stage]) { stageMap[stage] = { count: 0, grossValue: 0, weightedValue: 0, dealsMissingPrice: 0 }; } stageMap[stage]!.count += 1; stageMap[stage]!.grossValue += price; stageMap[stage]!.weightedValue += weighted; if (!(price > 0)) stageMap[stage]!.dealsMissingPrice += 1; } const stageBreakdown = PIPELINE_STAGES.map((stage) => ({ stage, count: stageMap[stage]?.count ?? 0, grossValue: stageMap[stage]?.grossValue ?? 0, weightedValue: stageMap[stage]?.weightedValue ?? 0, weight: weights[stage] ?? 0, dealsMissingPrice: stageMap[stage]?.dealsMissingPrice ?? 0, })); const totalGrossValue = stageBreakdown.reduce((acc, s) => acc + s.grossValue, 0); const totalWeightedValue = stageBreakdown.reduce((acc, s) => acc + s.weightedValue, 0); return { totalGrossValue, totalWeightedValue, stageBreakdown, weightsSource, }; } // ─── Compact widget queries ─────────────────────────────────────────────────── /** * Berth status split for the donut widget. Returns counts plus the total * so the chart can show "12 of 47 sold" alongside the segment percentage. */ export async function getBerthStatusDistribution(portId: string) { const rows = await db .select({ status: berths.status, c: sql`count(*)::int` }) .from(berths) .where(and(eq(berths.portId, portId), isNull(berths.archivedAt))) .groupBy(berths.status); const counts: Record = {}; for (const r of rows) counts[r.status] = r.c; const total = Object.values(counts).reduce((a, b) => a + b, 0); return { total, available: counts['available'] ?? 0, underOffer: counts['under_offer'] ?? 0, sold: counts['sold'] ?? 0, }; } /** * Top 5 active interests closest to closing - ranked by pipeline stage * (further = closer to closing) with most-recent activity as a * tiebreaker. Surfaces the deals reps should actually be chasing on the * dashboard without making them open the pipeline board. */ export async function getHotDeals(portId: string, limit = 5) { // Stage rank: bigger = closer to closing. Mirrors the 7-stage pipeline // shipped 2026-05-14 (pipeline-refactor wave). Nurturing is a holding // pen below qualified - supply-constrained ports flip deals there when // they can't progress. Won/lost/cancelled outcomes are filtered out via // `outcome IS NULL` below, so they don't need a rank slot. const rank = sql`CASE ${interests.pipelineStage} WHEN 'contract' THEN 7 WHEN 'deposit_paid' THEN 6 WHEN 'reservation' THEN 5 WHEN 'eoi' THEN 4 WHEN 'qualified' THEN 3 WHEN 'nurturing' THEN 2 WHEN 'enquiry' THEN 1 ELSE 0 END`; const rows = await db .select({ id: interests.id, stage: interests.pipelineStage, clientName: clients.fullName, mooring: berths.mooringNumber, lastContact: interests.dateLastContact, updatedAt: interests.updatedAt, rank, }) .from(interests) .innerJoin(clients, eq(interests.clientId, clients.id)) .leftJoin( interestBerths, and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)), ) .leftJoin(berths, eq(interestBerths.berthId, berths.id)) .where(activeInterestsWhere(portId)) .orderBy(desc(rank), desc(interests.updatedAt)) .limit(limit); return rows.map((r) => ({ id: r.id, stage: r.stage, clientName: r.clientName, mooringNumber: r.mooring, lastContact: r.lastContact ? r.lastContact.toISOString() : null, })); } /** * Source-conversion breakdown for the marketing widget. Returns per- * source totals (active + won + lost) and a derived conversion rate so * reps see which channels deliver buyers vs tire-kickers - orthogonal * to the existing "lead source attribution" chart which only counts * inbound volume. */ export async function getSourceConversion(portId: string) { const rows = await db .select({ source: interests.source, total: sql`count(*)::int`, won: sql`sum(case when ${interests.outcome} = 'won' then 1 else 0 end)::int`, lost: sql`sum(case when ${interests.outcome} = 'lost' then 1 else 0 end)::int`, }) .from(interests) .where(and(eq(interests.portId, portId), isNull(interests.archivedAt))) .groupBy(interests.source); return rows .filter((r) => r.source) .map((r) => ({ source: r.source!, total: r.total, won: r.won, lost: r.lost, conversionRate: r.total > 0 ? r.won / r.total : 0, })) .sort((a, b) => b.total - a.total); } // ─── Recent Activity ────────────────────────────────────────────────────────── export async function getRecentActivity(portId: string, limit = 20) { const rows = await db .select({ id: auditLogs.id, action: auditLogs.action, entityType: auditLogs.entityType, entityId: auditLogs.entityId, userId: auditLogs.userId, fieldChanged: auditLogs.fieldChanged, oldValue: auditLogs.oldValue, newValue: auditLogs.newValue, metadata: auditLogs.metadata, createdAt: auditLogs.createdAt, }) .from(auditLogs) .where(eq(auditLogs.portId, portId)) .orderBy(desc(auditLogs.createdAt)) .limit(limit); // Resolve a human label per row (client name, yacht name, invoice number, // …). The dashboard widget previously rendered the bare UUID prefix which // told reps nothing about which entity was touched. We batch one SELECT // per entityType, capping at the row set's natural size (<= `limit`). const byType = new Map>(); for (const r of rows) { if (!r.entityId) continue; if (!byType.has(r.entityType)) byType.set(r.entityType, new Set()); byType.get(r.entityType)!.add(r.entityId); } const labels = new Map(); // `${type}:${id}` → label async function loadLabels( type: string, fetcher: (ids: string[]) => Promise, pick: (row: T) => string, ) { const ids = Array.from(byType.get(type) ?? []); if (ids.length === 0) return; const fetched = await fetcher(ids); for (const row of fetched) labels.set(`${type}:${row.id}`, pick(row)); } await Promise.all([ loadLabels( 'client', (ids) => db .select({ id: clients.id, name: clients.fullName }) .from(clients) .where(and(eq(clients.portId, portId), inArray(clients.id, ids))), (r) => r.name, ), loadLabels( 'yacht', (ids) => db .select({ id: yachts.id, name: yachts.name }) .from(yachts) .where(and(eq(yachts.portId, portId), inArray(yachts.id, ids))), (r) => r.name, ), loadLabels( 'company', (ids) => db .select({ id: companies.id, name: companies.name }) .from(companies) .where(and(eq(companies.portId, portId), inArray(companies.id, ids))), (r) => r.name, ), loadLabels( 'interest', (ids) => db .select({ id: interests.id, clientName: clients.fullName }) .from(interests) .innerJoin(clients, eq(interests.clientId, clients.id)) .where(and(eq(interests.portId, portId), inArray(interests.id, ids))), (r) => r.clientName, ), loadLabels( 'berth', (ids) => db .select({ id: berths.id, mooring: berths.mooringNumber }) .from(berths) .where(and(eq(berths.portId, portId), inArray(berths.id, ids))), (r) => `Berth ${r.mooring}`, ), loadLabels( 'invoice', (ids) => db .select({ id: invoices.id, num: invoices.invoiceNumber }) .from(invoices) .where(and(eq(invoices.portId, portId), inArray(invoices.id, ids))), (r) => r.num, ), loadLabels( 'expense', (ids) => db .select({ id: expenses.id, desc: expenses.description, vendor: expenses.establishmentName, }) .from(expenses) .where(and(eq(expenses.portId, portId), inArray(expenses.id, ids))), (r) => r.desc ?? r.vendor ?? 'Expense', ), loadLabels( 'document', (ids) => db .select({ id: documents.id, title: documents.title }) .from(documents) .where(and(eq(documents.portId, portId), inArray(documents.id, ids))), (r) => r.title, ), loadLabels( 'reminder', (ids) => db .select({ id: reminders.id, title: reminders.title }) .from(reminders) .where(and(eq(reminders.portId, portId), inArray(reminders.id, ids))), (r) => r.title, ), loadLabels( 'residential_client', (ids) => db .select({ id: residentialClients.id, name: residentialClients.fullName }) .from(residentialClients) .where(and(eq(residentialClients.portId, portId), inArray(residentialClients.id, ids))), (r) => r.name, ), loadLabels( 'residential_interest', (ids) => db .select({ id: residentialInterests.id, clientName: residentialClients.fullName, }) .from(residentialInterests) .innerJoin( residentialClients, eq(residentialInterests.residentialClientId, residentialClients.id), ) .where( and(eq(residentialInterests.portId, portId), inArray(residentialInterests.id, ids)), ), (r) => r.clientName, ), loadLabels( 'berth_tenancy', (ids) => db .select({ id: berthTenancies.id, mooring: berths.mooringNumber, clientName: clients.fullName, }) .from(berthTenancies) .innerJoin(berths, eq(berthTenancies.berthId, berths.id)) .leftJoin(clients, eq(berthTenancies.clientId, clients.id)) .where(and(eq(berthTenancies.portId, portId), inArray(berthTenancies.id, ids))), (r) => `Berth ${r.mooring}${r.clientName ? ` · ${r.clientName}` : ''}`, ), loadLabels( 'payment', (ids) => db .select({ id: payments.id, clientName: clients.fullName, amount: payments.amount, currency: payments.currency, }) .from(payments) .innerJoin(interests, eq(payments.interestId, interests.id)) .innerJoin(clients, eq(interests.clientId, clients.id)) .where(and(eq(payments.portId, portId), inArray(payments.id, ids))), (r) => `${r.clientName} · ${r.currency} ${r.amount}`, ), // Notes resolve to their parent entity's name so the feed reads // "Client note on Matthew Ciaccio" rather than a UUID-prefix fallback // when the note itself has no human-readable identifier. loadLabels( 'client_note', (ids) => db .select({ id: clientNotes.id, parent: clients.fullName }) .from(clientNotes) .innerJoin(clients, eq(clientNotes.clientId, clients.id)) .where(and(eq(clients.portId, portId), inArray(clientNotes.id, ids))), (r) => `Note on ${r.parent}`, ), loadLabels( 'interest_note', (ids) => db .select({ id: interestNotes.id, parent: clients.fullName }) .from(interestNotes) .innerJoin(interests, eq(interestNotes.interestId, interests.id)) .innerJoin(clients, eq(interests.clientId, clients.id)) .where(and(eq(interests.portId, portId), inArray(interestNotes.id, ids))), (r) => `Note on ${r.parent}`, ), loadLabels( 'yacht_note', (ids) => db .select({ id: yachtNotes.id, parent: yachts.name }) .from(yachtNotes) .innerJoin(yachts, eq(yachtNotes.yachtId, yachts.id)) .where(and(eq(yachts.portId, portId), inArray(yachtNotes.id, ids))), (r) => `Note on ${r.parent}`, ), loadLabels( 'company_note', (ids) => db .select({ id: companyNotes.id, parent: companies.name }) .from(companyNotes) .innerJoin(companies, eq(companyNotes.companyId, companies.id)) .where(and(eq(companies.portId, portId), inArray(companyNotes.id, ids))), (r) => `Note on ${r.parent}`, ), ]); // Resolve user UUIDs that appear as the actor (auditLogs.userId) and // as oldValue/newValue on user-FK diff rows (assignedTo, ownerId, // reassignedTo, createdBy). Activity-feed audit-log rows previously // rendered the raw UUID prefix, which was unreadable. const USER_FK_FIELDS = new Set([ 'assignedTo', 'ownerId', 'reassignedTo', 'createdBy', 'addedBy', 'changedBy', 'transferredBy', ]); const userIds = new Set(); for (const r of rows) { if (r.userId) userIds.add(r.userId); if (r.fieldChanged && USER_FK_FIELDS.has(r.fieldChanged)) { if (typeof r.oldValue === 'string') userIds.add(r.oldValue); if (typeof r.newValue === 'string') userIds.add(r.newValue); } } const userNames = new Map(); if (userIds.size > 0) { const profiles = await db .select({ userId: userProfiles.userId, displayName: userProfiles.displayName, firstName: userProfiles.firstName, lastName: userProfiles.lastName, }) .from(userProfiles) .where(inArray(userProfiles.userId, Array.from(userIds))); for (const p of profiles) { const name = [p.firstName, p.lastName].filter(Boolean).join(' ').trim() || p.displayName; userNames.set(p.userId, name); } } function resolveUser(id: unknown): unknown { if (typeof id !== 'string') return id; const name = userNames.get(id); if (name) return name; return `Unknown user (#${id.slice(0, 8)})`; } return rows.map((r) => { const isUserFk = r.fieldChanged && USER_FK_FIELDS.has(r.fieldChanged); return { ...r, label: r.entityId ? (labels.get(`${r.entityType}:${r.entityId}`) ?? null) : null, // Replace user UUIDs with display names; non-user-FK rows pass through. oldValue: isUserFk ? resolveUser(r.oldValue) : r.oldValue, newValue: isUserFk ? resolveUser(r.newValue) : r.newValue, // Surfaces the actor's name to the renderer; original userId stays // available for forensics / deep-link if a later UI needs it. actorName: r.userId ? (userNames.get(r.userId) ?? null) : null, }; }); }