Compare commits
2 Commits
94f049c8b8
...
2fa70f4582
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2fa70f4582 | ||
|
|
01b201e1a2 |
@@ -50,6 +50,8 @@ export async function registerRecurringJobs(): Promise<void> {
|
|||||||
|
|
||||||
// Phase B: alert rule engine sweep
|
// Phase B: alert rule engine sweep
|
||||||
{ queue: 'maintenance', name: 'alerts-evaluate', pattern: '*/5 * * * *' },
|
{ queue: 'maintenance', name: 'alerts-evaluate', pattern: '*/5 * * * *' },
|
||||||
|
// Phase B: analytics snapshot warm
|
||||||
|
{ queue: 'maintenance', name: 'analytics-refresh', pattern: '*/15 * * * *' },
|
||||||
];
|
];
|
||||||
|
|
||||||
for (const job of recurring) {
|
for (const job of recurring) {
|
||||||
|
|||||||
@@ -34,6 +34,20 @@ export const maintenanceWorker = new Worker(
|
|||||||
logger.info(summary, 'Alert engine sweep complete');
|
logger.info(summary, 'Alert engine sweep complete');
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case 'analytics-refresh': {
|
||||||
|
const { ports } = await import('@/lib/db/schema/ports');
|
||||||
|
const { refreshSnapshotsForPort } = await import('@/lib/services/analytics.service');
|
||||||
|
const allPorts = await db.select({ id: ports.id }).from(ports);
|
||||||
|
for (const p of allPorts) {
|
||||||
|
try {
|
||||||
|
await refreshSnapshotsForPort(p.id);
|
||||||
|
} catch (err) {
|
||||||
|
logger.warn({ portId: p.id, err }, 'Analytics refresh failed for port');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.info({ count: allPorts.length }, 'Analytics snapshot refresh complete');
|
||||||
|
break;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
logger.warn({ jobName: job.name }, 'Unknown maintenance job');
|
logger.warn({ jobName: job.name }, 'Unknown maintenance job');
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,35 +1,50 @@
|
|||||||
/**
|
/**
|
||||||
* Phase B analytics service. Reads pre-computed snapshots from
|
* Phase B analytics service. Reads pre-computed snapshots from
|
||||||
* `analytics_snapshots` keyed by `metric_id` and recomputes on demand if
|
* `analytics_snapshots`; recomputes on demand if older than `SNAPSHOT_TTL_MS`.
|
||||||
* the cached row is older than `SNAPSHOT_TTL_MS`. The recomputation jobs
|
* The recurring `analytics-refresh` BullMQ job (PR3) warms the table
|
||||||
* land in `analytics-snapshot-job.ts` (PR3).
|
* every 15 minutes per port × per metric.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { and, eq } from 'drizzle-orm';
|
import { and, eq, gte, isNull, sql } from 'drizzle-orm';
|
||||||
|
|
||||||
import { db } from '@/lib/db';
|
import { db } from '@/lib/db';
|
||||||
import { analyticsSnapshots } from '@/lib/db/schema/insights';
|
import { analyticsSnapshots } from '@/lib/db/schema/insights';
|
||||||
|
import { interests } from '@/lib/db/schema/interests';
|
||||||
|
import { invoices } from '@/lib/db/schema/financial';
|
||||||
|
import { berthReservations } from '@/lib/db/schema/reservations';
|
||||||
|
|
||||||
export type DateRange = '7d' | '30d' | '90d' | 'today';
|
export type DateRange = '7d' | '30d' | '90d' | 'today';
|
||||||
|
|
||||||
export type MetricId =
|
export type MetricBase =
|
||||||
| `pipeline_funnel.${DateRange}`
|
| 'pipeline_funnel'
|
||||||
| `occupancy_timeline.${DateRange}`
|
| 'occupancy_timeline'
|
||||||
| `revenue_breakdown.${DateRange}`
|
| 'revenue_breakdown'
|
||||||
| `lead_source_attribution.${DateRange}`;
|
| 'lead_source_attribution';
|
||||||
|
|
||||||
|
export type MetricId = `${MetricBase}.${DateRange}`;
|
||||||
|
|
||||||
|
export const ALL_RANGES: readonly DateRange[] = ['today', '7d', '30d', '90d'] as const;
|
||||||
|
export const ALL_METRICS: readonly MetricBase[] = [
|
||||||
|
'pipeline_funnel',
|
||||||
|
'occupancy_timeline',
|
||||||
|
'revenue_breakdown',
|
||||||
|
'lead_source_attribution',
|
||||||
|
] as const;
|
||||||
|
|
||||||
export const SNAPSHOT_TTL_MS = 15 * 60 * 1000; // 15 minutes
|
export const SNAPSHOT_TTL_MS = 15 * 60 * 1000; // 15 minutes
|
||||||
|
|
||||||
|
// ─── Output shapes ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
export interface PipelineFunnelData {
|
export interface PipelineFunnelData {
|
||||||
stages: Array<{ stage: string; count: number; conversionPct: number }>;
|
stages: Array<{ stage: string; count: number; conversionPct: number }>;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface OccupancyTimelineData {
|
export interface OccupancyTimelineData {
|
||||||
points: Array<{ date: string; available: number; underOffer: number; sold: number }>;
|
points: Array<{ date: string; occupied: number; total: number; occupancyPct: number }>;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface RevenueBreakdownData {
|
export interface RevenueBreakdownData {
|
||||||
bars: Array<{ category: string; amount: number; currency: string }>;
|
bars: Array<{ status: string; amount: number; currency: string }>;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface LeadSourceAttributionData {
|
export interface LeadSourceAttributionData {
|
||||||
@@ -42,11 +57,8 @@ export type SnapshotData =
|
|||||||
| RevenueBreakdownData
|
| RevenueBreakdownData
|
||||||
| LeadSourceAttributionData;
|
| LeadSourceAttributionData;
|
||||||
|
|
||||||
/**
|
// ─── Cache layer ──────────────────────────────────────────────────────────────
|
||||||
* Read a snapshot by `(portId, metricId)`. Returns null when missing or
|
|
||||||
* stale; the caller should request a recompute (or the recurring job
|
|
||||||
* eventually fills it).
|
|
||||||
*/
|
|
||||||
export async function readSnapshot<T extends SnapshotData>(
|
export async function readSnapshot<T extends SnapshotData>(
|
||||||
portId: string,
|
portId: string,
|
||||||
metricId: MetricId,
|
metricId: MetricId,
|
||||||
@@ -74,33 +86,235 @@ export async function writeSnapshot(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Computation entrypoints — bodies land in PR3 along with the recurring
|
// ─── Range helpers ────────────────────────────────────────────────────────────
|
||||||
// snapshot job. Exported as no-op stubs so PR1's tsc/lint stay green.
|
|
||||||
|
function rangeToCutoff(range: DateRange): Date {
|
||||||
|
const now = Date.now();
|
||||||
|
switch (range) {
|
||||||
|
case 'today':
|
||||||
|
return new Date(now - 1 * 86_400_000);
|
||||||
|
case '7d':
|
||||||
|
return new Date(now - 7 * 86_400_000);
|
||||||
|
case '30d':
|
||||||
|
return new Date(now - 30 * 86_400_000);
|
||||||
|
case '90d':
|
||||||
|
return new Date(now - 90 * 86_400_000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function rangeToDays(range: DateRange): number {
|
||||||
|
switch (range) {
|
||||||
|
case 'today':
|
||||||
|
return 1;
|
||||||
|
case '7d':
|
||||||
|
return 7;
|
||||||
|
case '30d':
|
||||||
|
return 30;
|
||||||
|
case '90d':
|
||||||
|
return 90;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Computations ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
const PIPELINE_STAGES = [
|
||||||
|
'open',
|
||||||
|
'details_sent',
|
||||||
|
'in_communication',
|
||||||
|
'visited',
|
||||||
|
'signed_eoi_nda',
|
||||||
|
'deposit_10pct',
|
||||||
|
'contract',
|
||||||
|
'completed',
|
||||||
|
] as const;
|
||||||
|
|
||||||
export async function computePipelineFunnel(
|
export async function computePipelineFunnel(
|
||||||
_portId: string,
|
portId: string,
|
||||||
_range: DateRange,
|
range: DateRange,
|
||||||
): Promise<PipelineFunnelData> {
|
): Promise<PipelineFunnelData> {
|
||||||
return { stages: [] };
|
const cutoff = rangeToCutoff(range);
|
||||||
|
const rows = await db
|
||||||
|
.select({ stage: interests.pipelineStage, count: sql<number>`count(*)::int` })
|
||||||
|
.from(interests)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(interests.portId, portId),
|
||||||
|
isNull(interests.archivedAt),
|
||||||
|
gte(interests.createdAt, cutoff),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.groupBy(interests.pipelineStage);
|
||||||
|
|
||||||
|
const counts = new Map(rows.map((r) => [r.stage, r.count]));
|
||||||
|
const top = counts.get('open') ?? 0;
|
||||||
|
|
||||||
|
const stages = PIPELINE_STAGES.map((stage) => {
|
||||||
|
const count = counts.get(stage) ?? 0;
|
||||||
|
const conversionPct = top === 0 ? 0 : Math.round((count / top) * 1000) / 10;
|
||||||
|
return { stage, count, conversionPct };
|
||||||
|
});
|
||||||
|
|
||||||
|
return { stages };
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function computeOccupancyTimeline(
|
export async function computeOccupancyTimeline(
|
||||||
_portId: string,
|
portId: string,
|
||||||
_range: DateRange,
|
range: DateRange,
|
||||||
): Promise<OccupancyTimelineData> {
|
): Promise<OccupancyTimelineData> {
|
||||||
return { points: [] };
|
const days = rangeToDays(range);
|
||||||
|
// Total berths per port (current count — assumes no churn).
|
||||||
|
const totalRow = await db.execute<{ total: number }>(
|
||||||
|
sql`SELECT count(*)::int AS total FROM berths WHERE port_id = ${portId}`,
|
||||||
|
);
|
||||||
|
const total = totalRow[0]?.total ?? 0;
|
||||||
|
|
||||||
|
// For each day in range, count berths that have an active reservation
|
||||||
|
// covering that day. A reservation is "covering" if start_date <= day
|
||||||
|
// AND (end_date IS NULL OR end_date >= day).
|
||||||
|
const points: OccupancyTimelineData['points'] = [];
|
||||||
|
for (let i = days - 1; i >= 0; i--) {
|
||||||
|
const day = new Date(Date.now() - i * 86_400_000);
|
||||||
|
const dayStr = day.toISOString().slice(0, 10);
|
||||||
|
const occRow = await db
|
||||||
|
.select({ occupied: sql<number>`count(distinct ${berthReservations.berthId})::int` })
|
||||||
|
.from(berthReservations)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(berthReservations.portId, portId),
|
||||||
|
eq(berthReservations.status, 'active'),
|
||||||
|
sql`${berthReservations.startDate} <= ${dayStr}::date`,
|
||||||
|
sql`(${berthReservations.endDate} IS NULL OR ${berthReservations.endDate} >= ${dayStr}::date)`,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
const occupied = occRow[0]?.occupied ?? 0;
|
||||||
|
const occupancyPct = total === 0 ? 0 : Math.round((occupied / total) * 1000) / 10;
|
||||||
|
points.push({ date: dayStr, occupied, total, occupancyPct });
|
||||||
|
}
|
||||||
|
return { points };
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function computeRevenueBreakdown(
|
export async function computeRevenueBreakdown(
|
||||||
_portId: string,
|
portId: string,
|
||||||
_range: DateRange,
|
range: DateRange,
|
||||||
): Promise<RevenueBreakdownData> {
|
): Promise<RevenueBreakdownData> {
|
||||||
return { bars: [] };
|
const cutoff = rangeToCutoff(range);
|
||||||
|
const rows = await db
|
||||||
|
.select({
|
||||||
|
status: invoices.status,
|
||||||
|
currency: invoices.currency,
|
||||||
|
amount: sql<string>`coalesce(sum(${invoices.total}), 0)::text`,
|
||||||
|
})
|
||||||
|
.from(invoices)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(invoices.portId, portId),
|
||||||
|
isNull(invoices.archivedAt),
|
||||||
|
gte(invoices.createdAt, cutoff),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.groupBy(invoices.status, invoices.currency);
|
||||||
|
|
||||||
|
return {
|
||||||
|
bars: rows.map((r) => ({
|
||||||
|
status: r.status,
|
||||||
|
currency: r.currency,
|
||||||
|
amount: Number(r.amount),
|
||||||
|
})),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function computeLeadSourceAttribution(
|
export async function computeLeadSourceAttribution(
|
||||||
_portId: string,
|
portId: string,
|
||||||
_range: DateRange,
|
range: DateRange,
|
||||||
): Promise<LeadSourceAttributionData> {
|
): Promise<LeadSourceAttributionData> {
|
||||||
return { slices: [] };
|
const cutoff = rangeToCutoff(range);
|
||||||
|
const rows = await db
|
||||||
|
.select({ source: interests.source, count: sql<number>`count(*)::int` })
|
||||||
|
.from(interests)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(interests.portId, portId),
|
||||||
|
isNull(interests.archivedAt),
|
||||||
|
gte(interests.createdAt, cutoff),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.groupBy(interests.source);
|
||||||
|
|
||||||
|
return {
|
||||||
|
slices: rows
|
||||||
|
.map((r) => ({
|
||||||
|
source: r.source ?? 'unspecified',
|
||||||
|
count: r.count,
|
||||||
|
}))
|
||||||
|
.sort((a, b) => b.count - a.count),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Public read API (cache → compute → write back) ──────────────────────────
|
||||||
|
|
||||||
|
export async function getPipelineFunnel(
|
||||||
|
portId: string,
|
||||||
|
range: DateRange,
|
||||||
|
): Promise<PipelineFunnelData> {
|
||||||
|
const metricId = `pipeline_funnel.${range}` as const;
|
||||||
|
const cached = await readSnapshot<PipelineFunnelData>(portId, metricId);
|
||||||
|
if (cached) return cached;
|
||||||
|
const fresh = await computePipelineFunnel(portId, range);
|
||||||
|
await writeSnapshot(portId, metricId, fresh);
|
||||||
|
return fresh;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getOccupancyTimeline(
|
||||||
|
portId: string,
|
||||||
|
range: DateRange,
|
||||||
|
): Promise<OccupancyTimelineData> {
|
||||||
|
const metricId = `occupancy_timeline.${range}` as const;
|
||||||
|
const cached = await readSnapshot<OccupancyTimelineData>(portId, metricId);
|
||||||
|
if (cached) return cached;
|
||||||
|
const fresh = await computeOccupancyTimeline(portId, range);
|
||||||
|
await writeSnapshot(portId, metricId, fresh);
|
||||||
|
return fresh;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getRevenueBreakdown(
|
||||||
|
portId: string,
|
||||||
|
range: DateRange,
|
||||||
|
): Promise<RevenueBreakdownData> {
|
||||||
|
const metricId = `revenue_breakdown.${range}` as const;
|
||||||
|
const cached = await readSnapshot<RevenueBreakdownData>(portId, metricId);
|
||||||
|
if (cached) return cached;
|
||||||
|
const fresh = await computeRevenueBreakdown(portId, range);
|
||||||
|
await writeSnapshot(portId, metricId, fresh);
|
||||||
|
return fresh;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getLeadSourceAttribution(
|
||||||
|
portId: string,
|
||||||
|
range: DateRange,
|
||||||
|
): Promise<LeadSourceAttributionData> {
|
||||||
|
const metricId = `lead_source_attribution.${range}` as const;
|
||||||
|
const cached = await readSnapshot<LeadSourceAttributionData>(portId, metricId);
|
||||||
|
if (cached) return cached;
|
||||||
|
const fresh = await computeLeadSourceAttribution(portId, range);
|
||||||
|
await writeSnapshot(portId, metricId, fresh);
|
||||||
|
return fresh;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Cron entrypoint: warm every (port × metric × range) ────────────────────
|
||||||
|
|
||||||
|
export async function refreshSnapshotsForPort(portId: string): Promise<void> {
|
||||||
|
for (const range of ALL_RANGES) {
|
||||||
|
const [funnel, occupancy, revenue, leadSource] = await Promise.all([
|
||||||
|
computePipelineFunnel(portId, range),
|
||||||
|
computeOccupancyTimeline(portId, range),
|
||||||
|
computeRevenueBreakdown(portId, range),
|
||||||
|
computeLeadSourceAttribution(portId, range),
|
||||||
|
]);
|
||||||
|
await Promise.all([
|
||||||
|
writeSnapshot(portId, `pipeline_funnel.${range}`, funnel),
|
||||||
|
writeSnapshot(portId, `occupancy_timeline.${range}`, occupancy),
|
||||||
|
writeSnapshot(portId, `revenue_breakdown.${range}`, revenue),
|
||||||
|
writeSnapshot(portId, `lead_source_attribution.${range}`, leadSource),
|
||||||
|
]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
208
tests/integration/analytics-service.test.ts
Normal file
208
tests/integration/analytics-service.test.ts
Normal file
@@ -0,0 +1,208 @@
|
|||||||
|
/**
|
||||||
|
* Analytics service integration tests — exercise the four computations
|
||||||
|
* against a seeded port + assert the cache layer reads/writes correctly.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { describe, it, expect } from 'vitest';
|
||||||
|
import { eq, and } from 'drizzle-orm';
|
||||||
|
|
||||||
|
import { db } from '@/lib/db';
|
||||||
|
import { interests } from '@/lib/db/schema/interests';
|
||||||
|
import { invoices } from '@/lib/db/schema/financial';
|
||||||
|
import { berthReservations } from '@/lib/db/schema/reservations';
|
||||||
|
import { analyticsSnapshots } from '@/lib/db/schema/insights';
|
||||||
|
import {
|
||||||
|
computePipelineFunnel,
|
||||||
|
computeOccupancyTimeline,
|
||||||
|
computeRevenueBreakdown,
|
||||||
|
computeLeadSourceAttribution,
|
||||||
|
getPipelineFunnel,
|
||||||
|
refreshSnapshotsForPort,
|
||||||
|
ALL_METRICS,
|
||||||
|
ALL_RANGES,
|
||||||
|
SNAPSHOT_TTL_MS,
|
||||||
|
} from '@/lib/services/analytics.service';
|
||||||
|
import { makePort, makeClient, makeBerth, makeYacht } from '../helpers/factories';
|
||||||
|
|
||||||
|
describe('analytics service', () => {
|
||||||
|
describe('computePipelineFunnel', () => {
|
||||||
|
it('aggregates interests by stage with conversion percentages', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
const client = await makeClient({ portId: port.id });
|
||||||
|
// 3 open, 2 details_sent, 1 visited
|
||||||
|
for (const stage of ['open', 'open', 'open', 'details_sent', 'details_sent', 'visited']) {
|
||||||
|
await db.insert(interests).values({
|
||||||
|
portId: port.id,
|
||||||
|
clientId: client.id,
|
||||||
|
pipelineStage: stage,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = await computePipelineFunnel(port.id, '30d');
|
||||||
|
|
||||||
|
const open = result.stages.find((s) => s.stage === 'open');
|
||||||
|
const details = result.stages.find((s) => s.stage === 'details_sent');
|
||||||
|
const visited = result.stages.find((s) => s.stage === 'visited');
|
||||||
|
expect(open?.count).toBe(3);
|
||||||
|
expect(open?.conversionPct).toBe(100);
|
||||||
|
expect(details?.count).toBe(2);
|
||||||
|
expect(details?.conversionPct).toBeCloseTo(66.7, 0);
|
||||||
|
expect(visited?.count).toBe(1);
|
||||||
|
expect(visited?.conversionPct).toBeCloseTo(33.3, 0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns zeros when port has no interests', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
const result = await computePipelineFunnel(port.id, '30d');
|
||||||
|
expect(result.stages).toHaveLength(8);
|
||||||
|
expect(result.stages.every((s) => s.count === 0)).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('computeOccupancyTimeline', () => {
|
||||||
|
it('returns 7 points for 7d range with occupancy percentages', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
await makeBerth({ portId: port.id });
|
||||||
|
await makeBerth({ portId: port.id });
|
||||||
|
const client = await makeClient({ portId: port.id });
|
||||||
|
const yacht = await makeYacht({
|
||||||
|
portId: port.id,
|
||||||
|
ownerType: 'client',
|
||||||
|
ownerId: client.id,
|
||||||
|
});
|
||||||
|
const berth = await makeBerth({ portId: port.id });
|
||||||
|
// Active reservation covering today
|
||||||
|
await db.insert(berthReservations).values({
|
||||||
|
portId: port.id,
|
||||||
|
berthId: berth.id,
|
||||||
|
clientId: client.id,
|
||||||
|
yachtId: yacht.id,
|
||||||
|
status: 'active',
|
||||||
|
startDate: new Date(Date.now() - 5 * 86_400_000),
|
||||||
|
createdBy: 'seed',
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await computeOccupancyTimeline(port.id, '7d');
|
||||||
|
expect(result.points).toHaveLength(7);
|
||||||
|
// Last point is today; should reflect 1/3 occupancy.
|
||||||
|
const today = result.points[result.points.length - 1]!;
|
||||||
|
expect(today.total).toBe(3);
|
||||||
|
expect(today.occupied).toBe(1);
|
||||||
|
expect(today.occupancyPct).toBeCloseTo(33.3, 0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('computeRevenueBreakdown', () => {
|
||||||
|
it('groups invoice totals by status and currency', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
const baseInvoice = {
|
||||||
|
portId: port.id,
|
||||||
|
clientName: 'Acme',
|
||||||
|
billingEntityType: 'client' as const,
|
||||||
|
billingEntityId: 'client-id',
|
||||||
|
dueDate: '2026-12-31',
|
||||||
|
currency: 'USD',
|
||||||
|
subtotal: '0',
|
||||||
|
createdBy: 'seed',
|
||||||
|
};
|
||||||
|
await db.insert(invoices).values([
|
||||||
|
{ ...baseInvoice, invoiceNumber: 'INV-001', total: '1000', status: 'paid' },
|
||||||
|
{ ...baseInvoice, invoiceNumber: 'INV-002', total: '500', status: 'paid' },
|
||||||
|
{ ...baseInvoice, invoiceNumber: 'INV-003', total: '2000', status: 'sent' },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const result = await computeRevenueBreakdown(port.id, '30d');
|
||||||
|
const paid = result.bars.find((b) => b.status === 'paid');
|
||||||
|
const sent = result.bars.find((b) => b.status === 'sent');
|
||||||
|
expect(paid?.amount).toBe(1500);
|
||||||
|
expect(sent?.amount).toBe(2000);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('computeLeadSourceAttribution', () => {
|
||||||
|
it('counts interests grouped by source descending', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
const client = await makeClient({ portId: port.id });
|
||||||
|
for (const source of ['website', 'website', 'website', 'manual', 'referral', 'referral']) {
|
||||||
|
await db.insert(interests).values({
|
||||||
|
portId: port.id,
|
||||||
|
clientId: client.id,
|
||||||
|
pipelineStage: 'open',
|
||||||
|
source,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = await computeLeadSourceAttribution(port.id, '30d');
|
||||||
|
expect(result.slices[0]).toEqual({ source: 'website', count: 3 });
|
||||||
|
expect(result.slices[1]).toEqual({ source: 'referral', count: 2 });
|
||||||
|
expect(result.slices[2]).toEqual({ source: 'manual', count: 1 });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('groups null source as "unspecified"', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
const client = await makeClient({ portId: port.id });
|
||||||
|
await db.insert(interests).values({
|
||||||
|
portId: port.id,
|
||||||
|
clientId: client.id,
|
||||||
|
pipelineStage: 'open',
|
||||||
|
source: null,
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await computeLeadSourceAttribution(port.id, '30d');
|
||||||
|
expect(result.slices.find((s) => s.source === 'unspecified')?.count).toBe(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('cache', () => {
|
||||||
|
it('getPipelineFunnel writes a snapshot and returns it on subsequent calls', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
const client = await makeClient({ portId: port.id });
|
||||||
|
await db.insert(interests).values({
|
||||||
|
portId: port.id,
|
||||||
|
clientId: client.id,
|
||||||
|
pipelineStage: 'open',
|
||||||
|
});
|
||||||
|
|
||||||
|
const first = await getPipelineFunnel(port.id, '30d');
|
||||||
|
// Snapshot written.
|
||||||
|
const row = await db.query.analyticsSnapshots.findFirst({
|
||||||
|
where: and(
|
||||||
|
eq(analyticsSnapshots.portId, port.id),
|
||||||
|
eq(analyticsSnapshots.metricId, 'pipeline_funnel.30d'),
|
||||||
|
),
|
||||||
|
});
|
||||||
|
expect(row).toBeDefined();
|
||||||
|
expect(row?.data).toEqual(first);
|
||||||
|
|
||||||
|
// Mutate the snapshot row directly to confirm cache is being read,
|
||||||
|
// not recomputed.
|
||||||
|
const sentinel = { stages: [{ stage: 'sentinel', count: 999, conversionPct: 0 }] };
|
||||||
|
await db
|
||||||
|
.update(analyticsSnapshots)
|
||||||
|
.set({ data: sentinel })
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(analyticsSnapshots.portId, port.id),
|
||||||
|
eq(analyticsSnapshots.metricId, 'pipeline_funnel.30d'),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
const second = await getPipelineFunnel(port.id, '30d');
|
||||||
|
expect(second).toEqual(sentinel);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('refreshSnapshotsForPort warms every metric × range combo', async () => {
|
||||||
|
const port = await makePort();
|
||||||
|
await refreshSnapshotsForPort(port.id);
|
||||||
|
const rows = await db
|
||||||
|
.select({ metricId: analyticsSnapshots.metricId })
|
||||||
|
.from(analyticsSnapshots)
|
||||||
|
.where(eq(analyticsSnapshots.portId, port.id));
|
||||||
|
const expected = ALL_METRICS.length * ALL_RANGES.length;
|
||||||
|
expect(rows).toHaveLength(expected);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('snapshot ttl constant is 15 minutes', () => {
|
||||||
|
expect(SNAPSHOT_TTL_MS).toBe(15 * 60 * 1000);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user