import { and, count, eq, gte, isNull, lte, sql, sum } from 'drizzle-orm'; import { db } from '@/lib/db'; import { interests, interestBerths } from '@/lib/db/schema/interests'; import { berths } from '@/lib/db/schema/berths'; import { auditLogs, systemSettings } from '@/lib/db/schema/system'; import { STAGE_WEIGHTS, canonicalizeStage } from '@/lib/constants'; import { activeInterestsWhere } from '@/lib/services/active-interest'; // ─── Types ──────────────────────────────────────────────────────────────────── export interface PipelineData { stageCounts: Record; topInterests: Array<{ id: string; clientId: string; pipelineStage: string; berthPrice: string | null; }>; generatedAt: string; } export interface RevenueData { /** Gross berth prices per pipeline stage (unweighted). */ stageRevenue: Record; /** Money-changed-hands total: sum of berth prices for won deals. */ totalCompleted: string; /** Pipeline-weighted forecast: sum of (berth price × stage weight) * for every active interest. Aligns with the dashboard forecast tile * so the PDF and dashboard reconcile. */ totalForecast: string; /** Pipeline weights actually applied (port-customizable). Echoes * `system_settings.pipeline_weights` when set, otherwise the * STAGE_WEIGHTS defaults. */ pipelineWeights: Record; generatedAt: string; } export interface ActivityData { logs: Array<{ id: string; action: string; entityType: string; entityId: string | null; userId: string | null; createdAt: Date; }>; summary: Record; generatedAt: string; } export interface OccupancyData { statusCounts: Record; occupancyRate: number; totalBerths: number; generatedAt: string; } // ─── Pipeline ───────────────────────────────────────────────────────────────── export async function fetchPipelineData( portId: string, _params: Record, ): Promise { // Count interests per pipeline stage (non-archived). // The reporting audit caught the missing .groupBy() — without it, // postgres rejects the SELECT or collapses every interest into a // single ELSE-stage row. groupBy fixes the per-stage breakdown. const stageCounts = await db .select({ stage: interests.pipelineStage, count: count(), }) .from(interests) .where(activeInterestsWhere(portId)) .groupBy(interests.pipelineStage); // M-L02: legacy 9-stage values (deposit_10pct, contract_sent…) may // still be present on historical rows. canonicalizeStage maps them // back to the modern 7-stage keys so the rollup doesn't carry phantom // buckets through to the PDF. const stageCountMap: Record = {}; for (const row of stageCounts) { const key = canonicalizeStage(row.stage); stageCountMap[key] = (stageCountMap[key] ?? 0) + row.count; } // Top 10 interests by berth price (via primary-berth junction join, plan §3.4). const topInterestsRows = await db .select({ id: interests.id, clientId: interests.clientId, pipelineStage: interests.pipelineStage, berthPrice: berths.price, }) .from(interests) .leftJoin( interestBerths, and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)), ) .leftJoin(berths, eq(interestBerths.berthId, berths.id)) .where(activeInterestsWhere(portId)) .orderBy(sql`${berths.price} DESC NULLS LAST`) .limit(10); return { stageCounts: stageCountMap, topInterests: topInterestsRows.map((r) => ({ id: r.id, clientId: r.clientId, // M-L02: canonicalize for the same reason — the PDF stage label // should always resolve from the modern 7-stage set. pipelineStage: canonicalizeStage(r.pipelineStage), berthPrice: r.berthPrice ? String(r.berthPrice) : null, })), generatedAt: new Date().toISOString(), }; } // ─── Revenue ────────────────────────────────────────────────────────────────── export async function fetchRevenueData( portId: string, _params: Record, ): Promise { // Sum berth prices grouped by pipeline stage. Reads the primary-berth link // via interest_berths (plan §3.4) - non-primary junction rows do not // contribute to the revenue rollup. const stageRevenue = await db .select({ stage: interests.pipelineStage, revenue: sum(berths.price), }) .from(interests) .leftJoin( interestBerths, and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)), ) .leftJoin(berths, eq(interestBerths.berthId, berths.id)) .where(activeInterestsWhere(portId)) .groupBy(interests.pipelineStage); // M-L02: canonicalize so legacy 9-stage rows fold into the modern bucket. const stageRevenueMap: Record = {}; for (const row of stageRevenue) { const key = canonicalizeStage(row.stage); const prior = parseFloat(stageRevenueMap[key] ?? '0'); const next = row.revenue ? parseFloat(String(row.revenue)) : 0; stageRevenueMap[key] = String(prior + next); } // Total revenue from WON interests only. Reporting audit caught the // `outcome='won'` is the canonical money-changed-hands signal — won // deals can technically be set from any pipeline stage, and the legacy // belt-and-suspenders `pipeline_stage='completed'` filter is brittle to // future cleanup of the 'completed' sentinel-stage convention (see // PRE-DEPLOY-PLAN follow-ups). The outcome filter alone catches every // won deal regardless of the stage it closed at. const completedRevenue = await db .select({ total: sum(berths.price) }) .from(interests) .leftJoin( interestBerths, and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)), ) .leftJoin(berths, eq(interestBerths.berthId, berths.id)) .where( and(eq(interests.portId, portId), eq(interests.outcome, 'won'), isNull(interests.archivedAt)), ); // Pipeline-weighted forecast — sums (berth price × stage weight) for // every active interest. Stage weights resolve from // `system_settings.pipeline_weights` (per-port admin override) and // fall back to STAGE_WEIGHTS defaults. The PDF surfaces this number // alongside totalCompleted so investors / leadership see both // "money in the bank" and "expected from pipeline" on the same page. let pipelineWeights: Record = STAGE_WEIGHTS; const weightsSetting = await db.query.systemSettings.findFirst({ where: and(eq(systemSettings.key, 'pipeline_weights'), eq(systemSettings.portId, portId)), }); if (weightsSetting?.value && typeof weightsSetting.value === 'object') { pipelineWeights = weightsSetting.value as Record; } const forecastRows = await db .select({ stage: interests.pipelineStage, revenue: sum(berths.price), }) .from(interests) .leftJoin( interestBerths, and(eq(interestBerths.interestId, interests.id), eq(interestBerths.isPrimary, true)), ) .leftJoin(berths, eq(interestBerths.berthId, berths.id)) .where(activeInterestsWhere(portId)) .groupBy(interests.pipelineStage); let totalForecast = 0; for (const row of forecastRows) { if (!row.revenue) continue; // M-L02: canonicalize so legacy keys hit pipelineWeights via their // modern equivalent (otherwise the lookup falls through to 0 and the // forecast silently undershoots). const weight = pipelineWeights[canonicalizeStage(row.stage)] ?? 0; totalForecast += parseFloat(String(row.revenue)) * weight; } return { stageRevenue: stageRevenueMap, totalCompleted: completedRevenue[0]?.total ? String(completedRevenue[0].total) : '0', totalForecast: totalForecast.toFixed(2), pipelineWeights, generatedAt: new Date().toISOString(), }; } // ─── Activity ───────────────────────────────────────────────────────────────── export async function fetchActivityData( portId: string, params: Record, ): Promise { const dateFrom = params.dateFrom as string | undefined; const dateTo = params.dateTo as string | undefined; const thirtyDaysAgo = new Date(); thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30); const fromDate = dateFrom ? new Date(dateFrom) : thirtyDaysAgo; const conditions = [eq(auditLogs.portId, portId), gte(auditLogs.createdAt, fromDate)]; if (dateTo) { conditions.push(lte(auditLogs.createdAt, new Date(dateTo))); } const logs = await db .select({ id: auditLogs.id, action: auditLogs.action, entityType: auditLogs.entityType, entityId: auditLogs.entityId, userId: auditLogs.userId, createdAt: auditLogs.createdAt, }) .from(auditLogs) .where(and(...conditions)) .orderBy(sql`${auditLogs.createdAt} DESC`) .limit(200); // Group by action type const summary: Record = {}; for (const log of logs) { const key = `${log.action}:${log.entityType}`; summary[key] = (summary[key] ?? 0) + 1; } return { logs, summary, generatedAt: new Date().toISOString(), }; } // ─── Occupancy ──────────────────────────────────────────────────────────────── export async function fetchOccupancyData( portId: string, _params: Record, ): Promise { const statusCounts = await db .select({ status: berths.status, count: count(), }) .from(berths) .where(eq(berths.portId, portId)) .groupBy(berths.status); const statusCountMap: Record = {}; let totalBerths = 0; for (const row of statusCounts) { statusCountMap[row.status] = row.count; totalBerths += row.count; } // Occupied = sold only. Per 2026-05-14 decision, `under_offer` is a // hold (blocks the berth from sale to other clients) but the berth is // still technically available until the deal closes. Aligned with the // KPI tile + analytics timeline so the same dashboard shows one number. const occupiedCount = statusCountMap['sold'] ?? 0; const occupancyRate = totalBerths > 0 ? (occupiedCount / totalBerths) * 100 : 0; return { statusCounts: statusCountMap, occupancyRate: Math.round(occupancyRate * 10) / 10, totalBerths, generatedAt: new Date().toISOString(), }; }