diff --git a/src/app/api/v1/reports/runs/route.ts b/src/app/api/v1/reports/runs/route.ts index 7906583c..d40a4a08 100644 --- a/src/app/api/v1/reports/runs/route.ts +++ b/src/app/api/v1/reports/runs/route.ts @@ -3,6 +3,7 @@ import { NextResponse } from 'next/server'; import { withAuth, withPermission } from '@/lib/api/helpers'; import { parseBody, parseQuery } from '@/lib/api/route-helpers'; import { errorResponse } from '@/lib/errors'; +import { getQueue } from '@/lib/queue'; import { createReportRun, listReportRuns } from '@/lib/services/report-runs.service'; import { createReportRunSchema, listReportRunsSchema } from '@/lib/validators/reports'; @@ -49,6 +50,25 @@ export const POST = withAuth( userAgent: ctx.userAgent, }, }); + // P3: hand off to the BullMQ render worker. Failure to enqueue + // doesn't abort the create — the schedule poll picks up pending + // rows on the next tick as a safety net. + try { + await getQueue('reports').add( + 'report-run-render', + { reportRunId: row.id }, + { jobId: `report-run-render:${row.id}` }, + ); + } catch (queueErr) { + // Logged but swallowed so the API still returns 201 with the row. + // The pending row stays visible in /reports/runs so the rep can + // see it queued; a future ops job can rescue stuck pendings. + const { logger } = await import('@/lib/logger'); + logger.error( + { queueErr, reportRunId: row.id }, + 'Failed to enqueue render after createReportRun', + ); + } return NextResponse.json({ data: row }, { status: 201 }); } catch (error) { return errorResponse(error); diff --git a/src/lib/queue/scheduler.ts b/src/lib/queue/scheduler.ts index ee905e5b..cf065dec 100644 --- a/src/lib/queue/scheduler.ts +++ b/src/lib/queue/scheduler.ts @@ -44,6 +44,10 @@ export async function registerRecurringJobs(): Promise { // Report scheduler - checks every minute for reports due to run { queue: 'reports', name: 'report-scheduler', pattern: '* * * * *' }, + // Reports P3 — new-system poll (report_schedules + report_runs). + // Runs alongside the legacy poll above; the two queues coexist. + { queue: 'reports', name: 'report-schedules-poll', pattern: '* * * * *' }, + // Notification digest - fires hourly globally; the worker checks each // user's `notification_digest_paused_until` and unread-count threshold // before composing a digest, so most ticks are no-ops. Per-user time- diff --git a/src/lib/queue/workers/reports.ts b/src/lib/queue/workers/reports.ts index b52f26c6..e738bd1c 100644 --- a/src/lib/queue/workers/reports.ts +++ b/src/lib/queue/workers/reports.ts @@ -98,6 +98,111 @@ export const reportsWorker = new Worker( break; } + // Reports P3 — new system: report_schedules + report_runs. + case 'report-schedules-poll': { + // Scan report_schedules due to fire, mint a report_runs row per + // schedule, advance next_run_at by cadence math, enqueue render. + const { db } = await import('@/lib/db'); + const { reportSchedules, reportTemplates } = await import('@/lib/db/schema/reports'); + const { createReportRun } = await import('@/lib/services/report-runs.service'); + const { nextRunFor } = await import('@/lib/services/report-schedules.service'); + const { and, eq, lte } = await import('drizzle-orm'); + + const now = new Date(); + const due = await db + .select() + .from(reportSchedules) + .where(and(eq(reportSchedules.enabled, true), lte(reportSchedules.nextRunAt, now))); + + for (const schedule of due) { + const template = await db.query.reportTemplates.findFirst({ + where: eq(reportTemplates.id, schedule.templateId), + }); + if (!template) { + logger.warn( + { scheduleId: schedule.id, templateId: schedule.templateId }, + 'Skipping schedule: template missing (likely archived); pausing', + ); + await db + .update(reportSchedules) + .set({ enabled: false, updatedAt: new Date() }) + .where(eq(reportSchedules.id, schedule.id)); + continue; + } + + // Compute the next fire BEFORE the enqueue so a downstream + // failure (storage outage, etc.) doesn't pin the schedule on + // the same tick — preserves the "no-op doesn't slip" rule. + await db + .update(reportSchedules) + .set({ + lastRunAt: now, + nextRunAt: nextRunFor(schedule.cadence as Parameters[0], now), + updatedAt: new Date(), + }) + .where(eq(reportSchedules.id, schedule.id)); + + try { + const run = await createReportRun( + { + kind: template.kind as 'dashboard' | 'clients' | 'berths' | 'interests', + config: template.config, + outputFormat: schedule.outputFormat as 'pdf' | 'csv' | 'png', + templateId: template.id, + }, + { + portId: schedule.portId, + triggeredBy: 'schedule', + scheduleId: schedule.id, + meta: { + userId: 'system', + portId: schedule.portId, + ipAddress: '0.0.0.0', + userAgent: 'scheduler', + }, + }, + ); + const { getQueue: enqueue } = await import('@/lib/queue'); + await enqueue('reports').add( + 'report-run-render', + { reportRunId: run.id }, + { jobId: `report-run-render:${run.id}` }, + ); + } catch (err) { + logger.error( + { err, scheduleId: schedule.id }, + 'Failed to mint report_run for due schedule; will retry on next poll', + ); + } + } + break; + } + + case 'report-run-render': { + const { reportRunId } = job.data as { reportRunId: string }; + const { renderReportRun } = await import('@/lib/services/report-render.service'); + const run = await renderReportRun(reportRunId); + + // Schedule-driven runs auto-cascade into the email job. User- + // triggered runs are inert — the rep downloads via the UI. + if (run.triggeredBy === 'schedule' && run.status === 'complete') { + const { getQueue: enqueue } = await import('@/lib/queue'); + await enqueue('reports').add( + 'report-run-email', + { reportRunId: run.id }, + { jobId: `report-run-email:${run.id}` }, + ); + } + break; + } + + case 'report-run-email': { + const { reportRunId } = job.data as { reportRunId: string }; + const { emailReportRun } = await import('@/lib/services/report-render.service'); + await emailReportRun(reportRunId); + break; + } + default: logger.warn({ jobName: job.name }, 'Unknown reports job'); } diff --git a/src/lib/services/report-render.service.ts b/src/lib/services/report-render.service.ts new file mode 100644 index 00000000..9d7d4ded --- /dev/null +++ b/src/lib/services/report-render.service.ts @@ -0,0 +1,255 @@ +/** + * Reports P3 — render + email orchestration for `report_runs` rows. + * + * Two entry points are called from the BullMQ reports worker: + * + * - `renderReportRun(reportRunId)` — fetches the run, advances to + * `rendering`, generates the artefact via the registered renderer + * for that `kind`, uploads to storage, and stamps the run row with + * storage key + size + status='complete' (or 'failed' + errorMessage). + * - `emailReportRun(reportRunId)` — for runs born from a schedule: + * reads the schedule's recipients + the stored artefact and ships + * an email with the file attached via the existing sendEmail helper. + * + * Kind-specific renderers live in REPORT_RENDER_MAP. v1 wires the four + * new-system kinds (dashboard/clients/berths/interests) onto the + * existing legacy fetcher+template pairs; dedicated per-kind templates + * land alongside the builder UI (P4+). + */ + +import { and, eq } from 'drizzle-orm'; + +import { db } from '@/lib/db'; +import { env } from '@/lib/env'; +import { logger } from '@/lib/logger'; +import { reportRuns, reportSchedules, type ReportRun } from '@/lib/db/schema/reports'; +import { ports } from '@/lib/db/schema/ports'; +import { files } from '@/lib/db/schema/documents'; +import { buildStoragePath } from '@/lib/minio'; +import { getStorageBackend } from '@/lib/storage'; +import { resolvePortLogo } from '@/lib/pdf/brand-kit/logo'; +import { renderPdf } from '@/lib/pdf/render'; +import { sendEmail } from '@/lib/email'; +import { updateReportRunStatus } from '@/lib/services/report-runs.service'; +import { CodedError, NotFoundError } from '@/lib/errors'; +import { + fetchActivityData, + fetchOccupancyData, + fetchPipelineData, + fetchRevenueData, + type ActivityData, + type OccupancyData, + type PipelineData, + type RevenueData, +} from '@/lib/services/report-generators'; +import { ActivityReportPdf } from '@/lib/pdf/templates/reports/activity-report'; +import { OccupancyReportPdf } from '@/lib/pdf/templates/reports/occupancy-report'; +import { PipelineReportPdf } from '@/lib/pdf/templates/reports/pipeline-report'; +import { RevenueReportPdf } from '@/lib/pdf/templates/reports/revenue-report'; + +interface RenderCtx { + portName: string; + logoBuffer: Buffer | null; +} + +interface KindRenderer { + fetchData: (portId: string, params: Record) => Promise; + render: (data: unknown, ctx: RenderCtx) => Promise; +} + +const REPORT_RENDER_MAP: Record = { + dashboard: { + fetchData: fetchPipelineData as KindRenderer['fetchData'], + render: async (data, ctx) => + renderPdf( + PipelineReportPdf({ + portName: ctx.portName, + logoBuffer: ctx.logoBuffer ?? null, + data: data as PipelineData, + }), + ), + }, + clients: { + fetchData: fetchActivityData as KindRenderer['fetchData'], + render: async (data, ctx) => + renderPdf( + ActivityReportPdf({ + portName: ctx.portName, + logoBuffer: ctx.logoBuffer ?? null, + data: data as ActivityData, + }), + ), + }, + berths: { + fetchData: fetchOccupancyData as KindRenderer['fetchData'], + render: async (data, ctx) => + renderPdf( + OccupancyReportPdf({ + portName: ctx.portName, + logoBuffer: ctx.logoBuffer ?? null, + data: data as OccupancyData, + }), + ), + }, + interests: { + fetchData: fetchRevenueData as KindRenderer['fetchData'], + render: async (data, ctx) => + renderPdf( + RevenueReportPdf({ + portName: ctx.portName, + logoBuffer: ctx.logoBuffer ?? null, + data: data as RevenueData, + }), + ), + }, +}; + +export async function renderReportRun(reportRunId: string): Promise { + const run = await db.query.reportRuns.findFirst({ + where: eq(reportRuns.id, reportRunId), + }); + if (!run) throw new NotFoundError('Report run'); + + if (run.status === 'complete' && run.storageKey) { + return run; + } + + await updateReportRunStatus(run.id, run.portId, { status: 'rendering' }); + + let putStoragePath: string | null = null; + try { + const renderer = REPORT_RENDER_MAP[run.kind]; + if (!renderer) { + throw new CodedError('VALIDATION_ERROR', { + internalMessage: `No renderer registered for report kind "${run.kind}"`, + }); + } + + const port = await db.query.ports.findFirst({ where: eq(ports.id, run.portId) }); + if (!port) { + throw new Error(`Cannot render report ${run.id}: port ${run.portId} not found`); + } + + const logo = await resolvePortLogo(run.portId).catch(() => ({ + buffer: null as Buffer | null, + })); + const ctx: RenderCtx = { portName: port.name, logoBuffer: logo.buffer ?? null }; + const params = (run.config as Record) ?? {}; + + const data = await renderer.fetchData(run.portId, params); + const pdfBytes = await renderer.render(data, ctx); + + const fileId = crypto.randomUUID(); + const storagePath = buildStoragePath(port.slug, 'reports', run.id, fileId, 'pdf'); + + const backend = await getStorageBackend(); + await backend.put(storagePath, pdfBytes, { + contentType: 'application/pdf', + sizeBytes: pdfBytes.length, + }); + putStoragePath = storagePath; + + await db.insert(files).values({ + id: fileId, + portId: run.portId, + filename: `${run.kind}-${run.id.slice(0, 8)}.pdf`, + originalName: `${run.kind}-report.pdf`, + mimeType: 'application/pdf', + sizeBytes: String(pdfBytes.length), + storagePath, + storageBucket: env.MINIO_BUCKET, + category: 'misc', + uploadedBy: run.triggeredByUserId ?? 'system', + }); + + const updated = await updateReportRunStatus(run.id, run.portId, { + status: 'complete', + storageKey: fileId, + sizeBytes: pdfBytes.length, + }); + putStoragePath = null; + return updated; + } catch (err) { + logger.error({ err, reportRunId: run.id }, 'renderReportRun failed'); + await updateReportRunStatus(run.id, run.portId, { + status: 'failed', + errorMessage: err instanceof Error ? err.message : String(err), + }).catch(() => undefined); + + if (putStoragePath) { + try { + await (await getStorageBackend()).delete(putStoragePath); + } catch (compErr) { + logger.error( + { compErr, putStoragePath }, + 'Compensating storage.delete failed after render error', + ); + } + } + throw err; + } +} + +/** + * Schedule-driven email side effect. Looks up the schedule's recipients + * and ships an email with the rendered PDF attached. Stamps `emailedAt` + * on success; logs + rethrows on failure so BullMQ retries. + */ +export async function emailReportRun(reportRunId: string): Promise { + const run = await db.query.reportRuns.findFirst({ + where: eq(reportRuns.id, reportRunId), + }); + if (!run) throw new NotFoundError('Report run'); + if (run.status !== 'complete' || !run.storageKey) { + throw new CodedError('VALIDATION_ERROR', { + internalMessage: `Cannot email report ${run.id} — status=${run.status}, storageKey=${run.storageKey}`, + }); + } + if (!run.scheduleId) { + logger.info({ reportRunId: run.id }, 'Skipping email for user-triggered report (no schedule)'); + return; + } + + const schedule = await db.query.reportSchedules.findFirst({ + where: eq(reportSchedules.id, run.scheduleId), + }); + if (!schedule) { + logger.warn( + { reportRunId: run.id, scheduleId: run.scheduleId }, + 'Schedule deleted before email could fire; skipping', + ); + return; + } + + const recipients = (schedule.recipients ?? []).filter( + (r): r is { name?: string; email: string } => Boolean(r?.email), + ); + if (recipients.length === 0) { + logger.info({ reportRunId: run.id }, 'No recipients on schedule; nothing to email'); + return; + } + + const port = await db.query.ports.findFirst({ where: eq(ports.id, run.portId) }); + if (!port) throw new Error(`Port ${run.portId} missing during emailReportRun`); + + const fileRow = await db.query.files.findFirst({ + where: and(eq(files.id, run.storageKey), eq(files.portId, run.portId)), + }); + if (!fileRow) { + throw new Error(`Report artefact file ${run.storageKey} missing during emailReportRun`); + } + + const subject = `${port.name} · ${run.kind} report`; + const html = `

Your scheduled ${run.kind} report is attached.

`; + + for (const recipient of recipients) { + await sendEmail(recipient.email, subject, html, undefined, undefined, run.portId, [ + { fileId: fileRow.id, filename: fileRow.originalName ?? `${run.kind}-report.pdf` }, + ]); + } + + await updateReportRunStatus(run.id, run.portId, { + status: 'complete', + emailedAt: new Date(), + }); +}