Files
pn-new-crm/src/lib/queue/scheduler.ts
Matt e9ef5831aa feat(reports-p3): BullMQ render + email + schedule poll for report_runs
- new report-render.service.ts: renderReportRun(reportRunId) +
  emailReportRun(reportRunId). Render path fetches the run row,
  advances status to 'rendering', resolves the kind→fetcher+template
  pair from REPORT_RENDER_MAP (dashboard→pipeline, clients→activity,
  berths→occupancy, interests→revenue), generates the PDF, uploads to
  storage, mirrors onto `files` so the standard download/attachment
  surfaces serve it, and stamps storageKey + sizeBytes + status='complete'.
  Failure path stamps 'failed' + errorMessage + compensating
  storage.delete to keep blobs from orphaning. Email path resolves the
  schedule's recipients + the rendered file via the standard
  resolveAttachments port-isolation check, sends one message per
  recipient via the existing sendEmail helper, and stamps emailedAt.
- reports worker (src/lib/queue/workers/reports.ts) gains 3 jobs:
  - 'report-schedules-poll': scans report_schedules where enabled=true
    AND nextRunAt <= now, mints a report_runs row per due schedule via
    createReportRun (triggeredBy='schedule'), advances next_run_at via
    nextRunFor() BEFORE enqueue so a downstream failure doesn't pin the
    schedule on the same tick, then enqueues report-run-render.
  - 'report-run-render': calls renderReportRun + auto-cascades into
    report-run-email when the run was schedule-triggered.
  - 'report-run-email': calls emailReportRun.
  These coexist with the legacy 'report-scheduler' + 'generate-report'
  jobs operating on scheduled_reports/generated_reports.
- scheduler.ts registers 'report-schedules-poll' on a 1-minute cron so
  the system catches due schedules even when no API event nudges them.
- POST /api/v1/reports/runs now enqueues 'report-run-render' after
  createReportRun. Enqueue failures are logged + swallowed so the API
  still returns 201; the schedule poll picks pending rows up as a
  safety net.

Verified: tsc clean, 1493/1493 vitest.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 15:42:53 +02:00

125 lines
5.8 KiB
TypeScript

import { getQueue, type QueueName } from './index';
import { logger } from '@/lib/logger';
interface RecurringJobDef {
queue: QueueName;
name: string;
pattern: string;
}
/**
* Register all recurring jobs from 11-REALTIME-AND-BACKGROUND-JOBS.md Section 3.2.
* Called once on server startup.
*/
export async function registerRecurringJobs(): Promise<void> {
const recurring: RecurringJobDef[] = [
// Documenso signature fallback poll - primary is webhooks, this is the
// safety net for any missed delivery (cloudflared tunnel hiccup, transient
// 5xx on our receiver, Documenso quirk). Tightened from 6h to 5m so the
// user-facing "stuck on partially_signed" symptom only persists for the
// 5-min window between polls. Cheap query: ~1 GET per in-flight doc.
{ queue: 'documents', name: 'signature-poll', pattern: '*/5 * * * *' },
// Reminder checks
{ queue: 'notifications', name: 'reminder-check', pattern: '0 * * * *' },
{ queue: 'notifications', name: 'reminder-overdue-check', pattern: '*/15 * * * *' },
// Google Calendar background sync
{ queue: 'maintenance', name: 'calendar-sync', pattern: '*/30 * * * *' },
// Daily checks at 08:00
{ queue: 'notifications', name: 'invoice-overdue-check', pattern: '0 8 * * *' },
{ queue: 'notifications', name: 'tenure-expiry-check', pattern: '0 8 * * *' },
// Exchange rate refresh every 6 hours
{ queue: 'maintenance', name: 'currency-refresh', pattern: '0 */6 * * *' },
// Database backup / cleanup
{ queue: 'maintenance', name: 'database-backup', pattern: '0 2 * * *' },
{ queue: 'maintenance', name: 'backup-cleanup', pattern: '0 3 * * 0' }, // Sunday 03:00
// Session cleanup
{ queue: 'maintenance', name: 'session-cleanup', pattern: '0 4 * * *' },
// Report scheduler - checks every minute for reports due to run
{ queue: 'reports', name: 'report-scheduler', pattern: '* * * * *' },
// Reports P3 — new-system poll (report_schedules + report_runs).
// Runs alongside the legacy poll above; the two queues coexist.
{ queue: 'reports', name: 'report-schedules-poll', pattern: '* * * * *' },
// Notification digest - fires hourly globally; the worker checks each
// user's `notification_digest_paused_until` and unread-count threshold
// before composing a digest, so most ticks are no-ops. Per-user time-
// of-day scheduling is DEFERRED - implementing it requires a product
// decision on UX (slider? time picker? per-channel toggles?) and adds
// a per-user cron path that doesn't pay off until enough users are
// actively customizing it. The hourly bucket aligns with how reps
// already check inboxes ("on the hour") so the current behavior is
// operationally acceptable without per-user override. Revisit when
// a customer asks for digest-time control.
{ queue: 'email', name: 'notification-digest', pattern: '0 * * * *' },
// Cleanup jobs
{ queue: 'maintenance', name: 'temp-file-cleanup', pattern: '0 5 * * *' },
{ queue: 'maintenance', name: 'form-expiry-check', pattern: '0 * * * *' },
// Phase B: alert rule engine sweep
{ queue: 'maintenance', name: 'alerts-evaluate', pattern: '*/5 * * * *' },
// Phase 6: IMAP bounce poller - matches NDRs to document_sends rows
// and fires email_bounced notifications. No-op when IMAP_* env unset.
{ queue: 'maintenance', name: 'bounce-poll', pattern: '*/15 * * * *' },
// Phase B: analytics snapshot warm
{ queue: 'maintenance', name: 'analytics-refresh', pattern: '*/15 * * * *' },
// Phase 3d: GDPR Article 17 - actually delete expired export bundles
{ queue: 'maintenance', name: 'gdpr-export-cleanup', pattern: '0 4 * * *' },
// Phase 3b: AI usage ledger retention (90-day rolling window)
{ queue: 'maintenance', name: 'ai-usage-retention', pattern: '0 5 * * *' },
// Migration 0040 contract: error_events older than 90 days get pruned.
{ queue: 'maintenance', name: 'error-events-retention', pattern: '0 6 * * *' },
// 90-day retention for audit_logs - mirrors error_events. Metadata
// is masked at insert time but old rows still represent stale PII
// exposure that has no operational value past the window.
{ queue: 'maintenance', name: 'audit-logs-retention', pattern: '15 6 * * *' },
// Raw website inquiry payloads - 180-day retention.
{ queue: 'maintenance', name: 'website-submissions-retention', pattern: '0 7 * * *' },
];
// BullMQ defaults `tz` to UTC. The cron patterns above are spelled
// in port-local time (e.g. "0 8 * * *" = 8 AM local), so without an
// explicit `tz` the jobs fire in UTC and silently drift across DST
// - twice a year the local firing time shifts by an hour and admin
// docs ("daily check at 8 AM") break. datetime-auditor C2.
//
// The CRM is single-port today (Port Nimara, Europe/Warsaw); when
// multi-port admin schedules ship, this should resolve per-job from
// the owning port's `ports.timezone`. SCHEDULER_TZ overrides for
// ops debugging without a redeploy.
const schedulerTz = process.env.SCHEDULER_TZ ?? 'Europe/Warsaw';
for (const job of recurring) {
const queue = getQueue(job.queue);
// Sub-hourly / hourly patterns are TZ-invariant; skip the `tz`
// option for them so a misconfigured SCHEDULER_TZ can't perturb
// them.
const tzInvariant = /^\*|\*\/\d+ \*|^0 \*/.test(job.pattern);
await queue.upsertJobScheduler(
job.name,
{ pattern: job.pattern, ...(tzInvariant ? {} : { tz: schedulerTz }) },
{ data: {}, name: job.name },
);
logger.info(
{
queue: job.queue,
job: job.name,
pattern: job.pattern,
tz: tzInvariant ? 'UTC (invariant)' : schedulerTz,
},
'Registered recurring job',
);
}
logger.info({ count: recurring.length }, 'All recurring jobs registered');
}