feat(reports-p3): BullMQ render + email + schedule poll for report_runs
- new report-render.service.ts: renderReportRun(reportRunId) +
emailReportRun(reportRunId). Render path fetches the run row,
advances status to 'rendering', resolves the kind→fetcher+template
pair from REPORT_RENDER_MAP (dashboard→pipeline, clients→activity,
berths→occupancy, interests→revenue), generates the PDF, uploads to
storage, mirrors onto `files` so the standard download/attachment
surfaces serve it, and stamps storageKey + sizeBytes + status='complete'.
Failure path stamps 'failed' + errorMessage + compensating
storage.delete to keep blobs from orphaning. Email path resolves the
schedule's recipients + the rendered file via the standard
resolveAttachments port-isolation check, sends one message per
recipient via the existing sendEmail helper, and stamps emailedAt.
- reports worker (src/lib/queue/workers/reports.ts) gains 3 jobs:
- 'report-schedules-poll': scans report_schedules where enabled=true
AND nextRunAt <= now, mints a report_runs row per due schedule via
createReportRun (triggeredBy='schedule'), advances next_run_at via
nextRunFor() BEFORE enqueue so a downstream failure doesn't pin the
schedule on the same tick, then enqueues report-run-render.
- 'report-run-render': calls renderReportRun + auto-cascades into
report-run-email when the run was schedule-triggered.
- 'report-run-email': calls emailReportRun.
These coexist with the legacy 'report-scheduler' + 'generate-report'
jobs operating on scheduled_reports/generated_reports.
- scheduler.ts registers 'report-schedules-poll' on a 1-minute cron so
the system catches due schedules even when no API event nudges them.
- POST /api/v1/reports/runs now enqueues 'report-run-render' after
createReportRun. Enqueue failures are logged + swallowed so the API
still returns 201; the schedule poll picks pending rows up as a
safety net.
Verified: tsc clean, 1493/1493 vitest.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3,6 +3,7 @@ import { NextResponse } from 'next/server';
|
|||||||
import { withAuth, withPermission } from '@/lib/api/helpers';
|
import { withAuth, withPermission } from '@/lib/api/helpers';
|
||||||
import { parseBody, parseQuery } from '@/lib/api/route-helpers';
|
import { parseBody, parseQuery } from '@/lib/api/route-helpers';
|
||||||
import { errorResponse } from '@/lib/errors';
|
import { errorResponse } from '@/lib/errors';
|
||||||
|
import { getQueue } from '@/lib/queue';
|
||||||
import { createReportRun, listReportRuns } from '@/lib/services/report-runs.service';
|
import { createReportRun, listReportRuns } from '@/lib/services/report-runs.service';
|
||||||
import { createReportRunSchema, listReportRunsSchema } from '@/lib/validators/reports';
|
import { createReportRunSchema, listReportRunsSchema } from '@/lib/validators/reports';
|
||||||
|
|
||||||
@@ -49,6 +50,25 @@ export const POST = withAuth(
|
|||||||
userAgent: ctx.userAgent,
|
userAgent: ctx.userAgent,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
// P3: hand off to the BullMQ render worker. Failure to enqueue
|
||||||
|
// doesn't abort the create — the schedule poll picks up pending
|
||||||
|
// rows on the next tick as a safety net.
|
||||||
|
try {
|
||||||
|
await getQueue('reports').add(
|
||||||
|
'report-run-render',
|
||||||
|
{ reportRunId: row.id },
|
||||||
|
{ jobId: `report-run-render:${row.id}` },
|
||||||
|
);
|
||||||
|
} catch (queueErr) {
|
||||||
|
// Logged but swallowed so the API still returns 201 with the row.
|
||||||
|
// The pending row stays visible in /reports/runs so the rep can
|
||||||
|
// see it queued; a future ops job can rescue stuck pendings.
|
||||||
|
const { logger } = await import('@/lib/logger');
|
||||||
|
logger.error(
|
||||||
|
{ queueErr, reportRunId: row.id },
|
||||||
|
'Failed to enqueue render after createReportRun',
|
||||||
|
);
|
||||||
|
}
|
||||||
return NextResponse.json({ data: row }, { status: 201 });
|
return NextResponse.json({ data: row }, { status: 201 });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
return errorResponse(error);
|
return errorResponse(error);
|
||||||
|
|||||||
@@ -44,6 +44,10 @@ export async function registerRecurringJobs(): Promise<void> {
|
|||||||
// Report scheduler - checks every minute for reports due to run
|
// Report scheduler - checks every minute for reports due to run
|
||||||
{ queue: 'reports', name: 'report-scheduler', pattern: '* * * * *' },
|
{ queue: 'reports', name: 'report-scheduler', pattern: '* * * * *' },
|
||||||
|
|
||||||
|
// Reports P3 — new-system poll (report_schedules + report_runs).
|
||||||
|
// Runs alongside the legacy poll above; the two queues coexist.
|
||||||
|
{ queue: 'reports', name: 'report-schedules-poll', pattern: '* * * * *' },
|
||||||
|
|
||||||
// Notification digest - fires hourly globally; the worker checks each
|
// Notification digest - fires hourly globally; the worker checks each
|
||||||
// user's `notification_digest_paused_until` and unread-count threshold
|
// user's `notification_digest_paused_until` and unread-count threshold
|
||||||
// before composing a digest, so most ticks are no-ops. Per-user time-
|
// before composing a digest, so most ticks are no-ops. Per-user time-
|
||||||
|
|||||||
@@ -98,6 +98,111 @@ export const reportsWorker = new Worker(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reports P3 — new system: report_schedules + report_runs.
|
||||||
|
case 'report-schedules-poll': {
|
||||||
|
// Scan report_schedules due to fire, mint a report_runs row per
|
||||||
|
// schedule, advance next_run_at by cadence math, enqueue render.
|
||||||
|
const { db } = await import('@/lib/db');
|
||||||
|
const { reportSchedules, reportTemplates } = await import('@/lib/db/schema/reports');
|
||||||
|
const { createReportRun } = await import('@/lib/services/report-runs.service');
|
||||||
|
const { nextRunFor } = await import('@/lib/services/report-schedules.service');
|
||||||
|
const { and, eq, lte } = await import('drizzle-orm');
|
||||||
|
|
||||||
|
const now = new Date();
|
||||||
|
const due = await db
|
||||||
|
.select()
|
||||||
|
.from(reportSchedules)
|
||||||
|
.where(and(eq(reportSchedules.enabled, true), lte(reportSchedules.nextRunAt, now)));
|
||||||
|
|
||||||
|
for (const schedule of due) {
|
||||||
|
const template = await db.query.reportTemplates.findFirst({
|
||||||
|
where: eq(reportTemplates.id, schedule.templateId),
|
||||||
|
});
|
||||||
|
if (!template) {
|
||||||
|
logger.warn(
|
||||||
|
{ scheduleId: schedule.id, templateId: schedule.templateId },
|
||||||
|
'Skipping schedule: template missing (likely archived); pausing',
|
||||||
|
);
|
||||||
|
await db
|
||||||
|
.update(reportSchedules)
|
||||||
|
.set({ enabled: false, updatedAt: new Date() })
|
||||||
|
.where(eq(reportSchedules.id, schedule.id));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute the next fire BEFORE the enqueue so a downstream
|
||||||
|
// failure (storage outage, etc.) doesn't pin the schedule on
|
||||||
|
// the same tick — preserves the "no-op doesn't slip" rule.
|
||||||
|
await db
|
||||||
|
.update(reportSchedules)
|
||||||
|
.set({
|
||||||
|
lastRunAt: now,
|
||||||
|
nextRunAt: nextRunFor(schedule.cadence as Parameters<typeof nextRunFor>[0], now),
|
||||||
|
updatedAt: new Date(),
|
||||||
|
})
|
||||||
|
.where(eq(reportSchedules.id, schedule.id));
|
||||||
|
|
||||||
|
try {
|
||||||
|
const run = await createReportRun(
|
||||||
|
{
|
||||||
|
kind: template.kind as 'dashboard' | 'clients' | 'berths' | 'interests',
|
||||||
|
config: template.config,
|
||||||
|
outputFormat: schedule.outputFormat as 'pdf' | 'csv' | 'png',
|
||||||
|
templateId: template.id,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
portId: schedule.portId,
|
||||||
|
triggeredBy: 'schedule',
|
||||||
|
scheduleId: schedule.id,
|
||||||
|
meta: {
|
||||||
|
userId: 'system',
|
||||||
|
portId: schedule.portId,
|
||||||
|
ipAddress: '0.0.0.0',
|
||||||
|
userAgent: 'scheduler',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
const { getQueue: enqueue } = await import('@/lib/queue');
|
||||||
|
await enqueue('reports').add(
|
||||||
|
'report-run-render',
|
||||||
|
{ reportRunId: run.id },
|
||||||
|
{ jobId: `report-run-render:${run.id}` },
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
logger.error(
|
||||||
|
{ err, scheduleId: schedule.id },
|
||||||
|
'Failed to mint report_run for due schedule; will retry on next poll',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'report-run-render': {
|
||||||
|
const { reportRunId } = job.data as { reportRunId: string };
|
||||||
|
const { renderReportRun } = await import('@/lib/services/report-render.service');
|
||||||
|
const run = await renderReportRun(reportRunId);
|
||||||
|
|
||||||
|
// Schedule-driven runs auto-cascade into the email job. User-
|
||||||
|
// triggered runs are inert — the rep downloads via the UI.
|
||||||
|
if (run.triggeredBy === 'schedule' && run.status === 'complete') {
|
||||||
|
const { getQueue: enqueue } = await import('@/lib/queue');
|
||||||
|
await enqueue('reports').add(
|
||||||
|
'report-run-email',
|
||||||
|
{ reportRunId: run.id },
|
||||||
|
{ jobId: `report-run-email:${run.id}` },
|
||||||
|
);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'report-run-email': {
|
||||||
|
const { reportRunId } = job.data as { reportRunId: string };
|
||||||
|
const { emailReportRun } = await import('@/lib/services/report-render.service');
|
||||||
|
await emailReportRun(reportRunId);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
logger.warn({ jobName: job.name }, 'Unknown reports job');
|
logger.warn({ jobName: job.name }, 'Unknown reports job');
|
||||||
}
|
}
|
||||||
|
|||||||
255
src/lib/services/report-render.service.ts
Normal file
255
src/lib/services/report-render.service.ts
Normal file
@@ -0,0 +1,255 @@
|
|||||||
|
/**
|
||||||
|
* Reports P3 — render + email orchestration for `report_runs` rows.
|
||||||
|
*
|
||||||
|
* Two entry points are called from the BullMQ reports worker:
|
||||||
|
*
|
||||||
|
* - `renderReportRun(reportRunId)` — fetches the run, advances to
|
||||||
|
* `rendering`, generates the artefact via the registered renderer
|
||||||
|
* for that `kind`, uploads to storage, and stamps the run row with
|
||||||
|
* storage key + size + status='complete' (or 'failed' + errorMessage).
|
||||||
|
* - `emailReportRun(reportRunId)` — for runs born from a schedule:
|
||||||
|
* reads the schedule's recipients + the stored artefact and ships
|
||||||
|
* an email with the file attached via the existing sendEmail helper.
|
||||||
|
*
|
||||||
|
* Kind-specific renderers live in REPORT_RENDER_MAP. v1 wires the four
|
||||||
|
* new-system kinds (dashboard/clients/berths/interests) onto the
|
||||||
|
* existing legacy fetcher+template pairs; dedicated per-kind templates
|
||||||
|
* land alongside the builder UI (P4+).
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { and, eq } from 'drizzle-orm';
|
||||||
|
|
||||||
|
import { db } from '@/lib/db';
|
||||||
|
import { env } from '@/lib/env';
|
||||||
|
import { logger } from '@/lib/logger';
|
||||||
|
import { reportRuns, reportSchedules, type ReportRun } from '@/lib/db/schema/reports';
|
||||||
|
import { ports } from '@/lib/db/schema/ports';
|
||||||
|
import { files } from '@/lib/db/schema/documents';
|
||||||
|
import { buildStoragePath } from '@/lib/minio';
|
||||||
|
import { getStorageBackend } from '@/lib/storage';
|
||||||
|
import { resolvePortLogo } from '@/lib/pdf/brand-kit/logo';
|
||||||
|
import { renderPdf } from '@/lib/pdf/render';
|
||||||
|
import { sendEmail } from '@/lib/email';
|
||||||
|
import { updateReportRunStatus } from '@/lib/services/report-runs.service';
|
||||||
|
import { CodedError, NotFoundError } from '@/lib/errors';
|
||||||
|
import {
|
||||||
|
fetchActivityData,
|
||||||
|
fetchOccupancyData,
|
||||||
|
fetchPipelineData,
|
||||||
|
fetchRevenueData,
|
||||||
|
type ActivityData,
|
||||||
|
type OccupancyData,
|
||||||
|
type PipelineData,
|
||||||
|
type RevenueData,
|
||||||
|
} from '@/lib/services/report-generators';
|
||||||
|
import { ActivityReportPdf } from '@/lib/pdf/templates/reports/activity-report';
|
||||||
|
import { OccupancyReportPdf } from '@/lib/pdf/templates/reports/occupancy-report';
|
||||||
|
import { PipelineReportPdf } from '@/lib/pdf/templates/reports/pipeline-report';
|
||||||
|
import { RevenueReportPdf } from '@/lib/pdf/templates/reports/revenue-report';
|
||||||
|
|
||||||
|
interface RenderCtx {
|
||||||
|
portName: string;
|
||||||
|
logoBuffer: Buffer | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface KindRenderer {
|
||||||
|
fetchData: (portId: string, params: Record<string, unknown>) => Promise<unknown>;
|
||||||
|
render: (data: unknown, ctx: RenderCtx) => Promise<Buffer>;
|
||||||
|
}
|
||||||
|
|
||||||
|
const REPORT_RENDER_MAP: Record<string, KindRenderer> = {
|
||||||
|
dashboard: {
|
||||||
|
fetchData: fetchPipelineData as KindRenderer['fetchData'],
|
||||||
|
render: async (data, ctx) =>
|
||||||
|
renderPdf(
|
||||||
|
PipelineReportPdf({
|
||||||
|
portName: ctx.portName,
|
||||||
|
logoBuffer: ctx.logoBuffer ?? null,
|
||||||
|
data: data as PipelineData,
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
clients: {
|
||||||
|
fetchData: fetchActivityData as KindRenderer['fetchData'],
|
||||||
|
render: async (data, ctx) =>
|
||||||
|
renderPdf(
|
||||||
|
ActivityReportPdf({
|
||||||
|
portName: ctx.portName,
|
||||||
|
logoBuffer: ctx.logoBuffer ?? null,
|
||||||
|
data: data as ActivityData,
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
berths: {
|
||||||
|
fetchData: fetchOccupancyData as KindRenderer['fetchData'],
|
||||||
|
render: async (data, ctx) =>
|
||||||
|
renderPdf(
|
||||||
|
OccupancyReportPdf({
|
||||||
|
portName: ctx.portName,
|
||||||
|
logoBuffer: ctx.logoBuffer ?? null,
|
||||||
|
data: data as OccupancyData,
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
interests: {
|
||||||
|
fetchData: fetchRevenueData as KindRenderer['fetchData'],
|
||||||
|
render: async (data, ctx) =>
|
||||||
|
renderPdf(
|
||||||
|
RevenueReportPdf({
|
||||||
|
portName: ctx.portName,
|
||||||
|
logoBuffer: ctx.logoBuffer ?? null,
|
||||||
|
data: data as RevenueData,
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
export async function renderReportRun(reportRunId: string): Promise<ReportRun> {
|
||||||
|
const run = await db.query.reportRuns.findFirst({
|
||||||
|
where: eq(reportRuns.id, reportRunId),
|
||||||
|
});
|
||||||
|
if (!run) throw new NotFoundError('Report run');
|
||||||
|
|
||||||
|
if (run.status === 'complete' && run.storageKey) {
|
||||||
|
return run;
|
||||||
|
}
|
||||||
|
|
||||||
|
await updateReportRunStatus(run.id, run.portId, { status: 'rendering' });
|
||||||
|
|
||||||
|
let putStoragePath: string | null = null;
|
||||||
|
try {
|
||||||
|
const renderer = REPORT_RENDER_MAP[run.kind];
|
||||||
|
if (!renderer) {
|
||||||
|
throw new CodedError('VALIDATION_ERROR', {
|
||||||
|
internalMessage: `No renderer registered for report kind "${run.kind}"`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const port = await db.query.ports.findFirst({ where: eq(ports.id, run.portId) });
|
||||||
|
if (!port) {
|
||||||
|
throw new Error(`Cannot render report ${run.id}: port ${run.portId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const logo = await resolvePortLogo(run.portId).catch(() => ({
|
||||||
|
buffer: null as Buffer | null,
|
||||||
|
}));
|
||||||
|
const ctx: RenderCtx = { portName: port.name, logoBuffer: logo.buffer ?? null };
|
||||||
|
const params = (run.config as Record<string, unknown>) ?? {};
|
||||||
|
|
||||||
|
const data = await renderer.fetchData(run.portId, params);
|
||||||
|
const pdfBytes = await renderer.render(data, ctx);
|
||||||
|
|
||||||
|
const fileId = crypto.randomUUID();
|
||||||
|
const storagePath = buildStoragePath(port.slug, 'reports', run.id, fileId, 'pdf');
|
||||||
|
|
||||||
|
const backend = await getStorageBackend();
|
||||||
|
await backend.put(storagePath, pdfBytes, {
|
||||||
|
contentType: 'application/pdf',
|
||||||
|
sizeBytes: pdfBytes.length,
|
||||||
|
});
|
||||||
|
putStoragePath = storagePath;
|
||||||
|
|
||||||
|
await db.insert(files).values({
|
||||||
|
id: fileId,
|
||||||
|
portId: run.portId,
|
||||||
|
filename: `${run.kind}-${run.id.slice(0, 8)}.pdf`,
|
||||||
|
originalName: `${run.kind}-report.pdf`,
|
||||||
|
mimeType: 'application/pdf',
|
||||||
|
sizeBytes: String(pdfBytes.length),
|
||||||
|
storagePath,
|
||||||
|
storageBucket: env.MINIO_BUCKET,
|
||||||
|
category: 'misc',
|
||||||
|
uploadedBy: run.triggeredByUserId ?? 'system',
|
||||||
|
});
|
||||||
|
|
||||||
|
const updated = await updateReportRunStatus(run.id, run.portId, {
|
||||||
|
status: 'complete',
|
||||||
|
storageKey: fileId,
|
||||||
|
sizeBytes: pdfBytes.length,
|
||||||
|
});
|
||||||
|
putStoragePath = null;
|
||||||
|
return updated;
|
||||||
|
} catch (err) {
|
||||||
|
logger.error({ err, reportRunId: run.id }, 'renderReportRun failed');
|
||||||
|
await updateReportRunStatus(run.id, run.portId, {
|
||||||
|
status: 'failed',
|
||||||
|
errorMessage: err instanceof Error ? err.message : String(err),
|
||||||
|
}).catch(() => undefined);
|
||||||
|
|
||||||
|
if (putStoragePath) {
|
||||||
|
try {
|
||||||
|
await (await getStorageBackend()).delete(putStoragePath);
|
||||||
|
} catch (compErr) {
|
||||||
|
logger.error(
|
||||||
|
{ compErr, putStoragePath },
|
||||||
|
'Compensating storage.delete failed after render error',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule-driven email side effect. Looks up the schedule's recipients
|
||||||
|
* and ships an email with the rendered PDF attached. Stamps `emailedAt`
|
||||||
|
* on success; logs + rethrows on failure so BullMQ retries.
|
||||||
|
*/
|
||||||
|
export async function emailReportRun(reportRunId: string): Promise<void> {
|
||||||
|
const run = await db.query.reportRuns.findFirst({
|
||||||
|
where: eq(reportRuns.id, reportRunId),
|
||||||
|
});
|
||||||
|
if (!run) throw new NotFoundError('Report run');
|
||||||
|
if (run.status !== 'complete' || !run.storageKey) {
|
||||||
|
throw new CodedError('VALIDATION_ERROR', {
|
||||||
|
internalMessage: `Cannot email report ${run.id} — status=${run.status}, storageKey=${run.storageKey}`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if (!run.scheduleId) {
|
||||||
|
logger.info({ reportRunId: run.id }, 'Skipping email for user-triggered report (no schedule)');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const schedule = await db.query.reportSchedules.findFirst({
|
||||||
|
where: eq(reportSchedules.id, run.scheduleId),
|
||||||
|
});
|
||||||
|
if (!schedule) {
|
||||||
|
logger.warn(
|
||||||
|
{ reportRunId: run.id, scheduleId: run.scheduleId },
|
||||||
|
'Schedule deleted before email could fire; skipping',
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const recipients = (schedule.recipients ?? []).filter(
|
||||||
|
(r): r is { name?: string; email: string } => Boolean(r?.email),
|
||||||
|
);
|
||||||
|
if (recipients.length === 0) {
|
||||||
|
logger.info({ reportRunId: run.id }, 'No recipients on schedule; nothing to email');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const port = await db.query.ports.findFirst({ where: eq(ports.id, run.portId) });
|
||||||
|
if (!port) throw new Error(`Port ${run.portId} missing during emailReportRun`);
|
||||||
|
|
||||||
|
const fileRow = await db.query.files.findFirst({
|
||||||
|
where: and(eq(files.id, run.storageKey), eq(files.portId, run.portId)),
|
||||||
|
});
|
||||||
|
if (!fileRow) {
|
||||||
|
throw new Error(`Report artefact file ${run.storageKey} missing during emailReportRun`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const subject = `${port.name} · ${run.kind} report`;
|
||||||
|
const html = `<p>Your scheduled ${run.kind} report is attached.</p>`;
|
||||||
|
|
||||||
|
for (const recipient of recipients) {
|
||||||
|
await sendEmail(recipient.email, subject, html, undefined, undefined, run.portId, [
|
||||||
|
{ fileId: fileRow.id, filename: fileRow.originalName ?? `${run.kind}-report.pdf` },
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
await updateReportRunStatus(run.id, run.portId, {
|
||||||
|
status: 'complete',
|
||||||
|
emailedAt: new Date(),
|
||||||
|
});
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user