From 9890d065f8dd0ec544e5cdd1ba4e55e3f5df1197 Mon Sep 17 00:00:00 2001 From: Matt Ciaccio Date: Wed, 6 May 2026 20:44:38 +0200 Subject: [PATCH] =?UTF-8?q?feat(audit):=20wider=20coverage=20=E2=80=94=20s?= =?UTF-8?q?ensitive=20views,=20cron,=20jobs,=20portal=20abuse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Builds on the audit infra split (severity/source) by emitting events from every place a security or operations review would want to see: Sensitive data views (severity=warning): - GDPR export download URL issued - Audit log page opened (watch-the-watchers; first page only) - CSV export of expenses - Webhook secret regenerated Authentication abuse (severity=warning, source=auth): - Portal sign-in: success + failed-credentials + portal-disabled - Portal password reset: unknown email + portal-disabled + bad token - Portal activation: bad/expired token Inbound webhook hardening: - Documenso webhook with invalid X-Documenso-Secret now writes webhook_failed instead of being silently logged Background work (source=cron / job): - New attachWorkerAudit() helper wires every BullMQ worker to emit job_failed (severity=error) on .on('failed') and cron_run on .on('completed') for any job whose name matches the recurring scheduler list. Applied across all 10 workers. 1175/1175 vitest passing. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/app/api/v1/admin/audit/route.ts | 29 ++++++ .../[id]/gdpr-export/[exportId]/route.ts | 19 ++++ src/app/api/v1/expenses/export/csv/route.ts | 13 +++ src/app/api/webhooks/documenso/route.ts | 16 ++++ src/lib/queue/audit-helpers.ts | 92 +++++++++++++++++++ src/lib/queue/workers/ai.ts | 3 + src/lib/queue/workers/bulk.ts | 3 + src/lib/queue/workers/documents.ts | 3 + src/lib/queue/workers/email.ts | 3 + src/lib/queue/workers/export.ts | 3 + src/lib/queue/workers/import.ts | 3 + src/lib/queue/workers/maintenance.ts | 3 + src/lib/queue/workers/notifications.ts | 3 + src/lib/queue/workers/reports.ts | 3 + src/lib/queue/workers/webhooks.ts | 3 + src/lib/services/portal-auth.service.ts | 61 ++++++++++++ src/lib/services/webhooks.service.ts | 1 + 17 files changed, 261 insertions(+) create mode 100644 src/lib/queue/audit-helpers.ts diff --git a/src/app/api/v1/admin/audit/route.ts b/src/app/api/v1/admin/audit/route.ts index 74854a9..d27c7ba 100644 --- a/src/app/api/v1/admin/audit/route.ts +++ b/src/app/api/v1/admin/audit/route.ts @@ -8,6 +8,7 @@ import { searchAuditLogs } from '@/lib/services/audit-search.service'; import { db } from '@/lib/db'; import { user } from '@/lib/db/schema/users'; import { errorResponse } from '@/lib/errors'; +import { createAuditLog } from '@/lib/audit'; const auditQuerySchema = z.object({ limit: z.coerce.number().int().min(1).max(200).default(50), @@ -67,6 +68,34 @@ export const GET = withAuth( actor: r.userId ? (userMap.get(r.userId) ?? null) : null, })); + // Watch-the-watchers: record that an operator opened the audit log + // page. Only fire on the first page (no cursor) so paginating + // through doesn't spam the log; use 'view' at warning severity so + // the entry stands out in the inspector. + if (!cursor) { + void createAuditLog({ + userId: ctx.userId, + portId: ctx.portId, + action: 'view', + entityType: 'audit_log', + entityId: 'list', + metadata: { + filters: { + entityType: query.entityType, + action: query.action, + severity: query.severity, + source: query.source, + userId: query.userId, + entityId: query.entityId, + search: query.search, + }, + }, + ipAddress: ctx.ipAddress, + userAgent: ctx.userAgent, + severity: 'warning', + }); + } + return NextResponse.json({ data, pagination: { diff --git a/src/app/api/v1/clients/[id]/gdpr-export/[exportId]/route.ts b/src/app/api/v1/clients/[id]/gdpr-export/[exportId]/route.ts index bf48cb2..922ec01 100644 --- a/src/app/api/v1/clients/[id]/gdpr-export/[exportId]/route.ts +++ b/src/app/api/v1/clients/[id]/gdpr-export/[exportId]/route.ts @@ -3,10 +3,16 @@ import { NextResponse } from 'next/server'; import { withAuth, withPermission, withRateLimit } from '@/lib/api/helpers'; import { errorResponse } from '@/lib/errors'; import { getExportDownloadUrl } from '@/lib/services/gdpr-export.service'; +import { createAuditLog } from '@/lib/audit'; /** * Returns a fresh signed URL for an existing GDPR export. Staff use this * from the admin UI; the email path embeds its own signed URL. + * + * Every call writes a `view` audit row at 'warning' severity — GDPR + * exports contain the entire personal data of a client and a fresh + * presigned URL would let the operator download it; we want a clear + * trail of who pulled what when. */ export const GET = withAuth( withPermission( @@ -15,6 +21,19 @@ export const GET = withAuth( withRateLimit('exports', async (req, ctx, params) => { try { const url = await getExportDownloadUrl(params.exportId!, ctx.portId); + + void createAuditLog({ + userId: ctx.userId, + portId: ctx.portId, + action: 'view', + entityType: 'gdpr_export', + entityId: params.exportId!, + metadata: { clientId: params.id ?? null, urlIssued: true }, + ipAddress: ctx.ipAddress, + userAgent: ctx.userAgent, + severity: 'warning', + }); + return NextResponse.json({ data: { url } }); } catch (error) { return errorResponse(error); diff --git a/src/app/api/v1/expenses/export/csv/route.ts b/src/app/api/v1/expenses/export/csv/route.ts index 7c35711..befd283 100644 --- a/src/app/api/v1/expenses/export/csv/route.ts +++ b/src/app/api/v1/expenses/export/csv/route.ts @@ -4,6 +4,7 @@ import { withAuth, withPermission } from '@/lib/api/helpers'; import { errorResponse } from '@/lib/errors'; import { exportCsv } from '@/lib/services/expense-export'; import { listExpensesSchema } from '@/lib/validators/expenses'; +import { createAuditLog } from '@/lib/audit'; export const POST = withAuth( withPermission('expenses', 'view', async (req, ctx) => { @@ -12,6 +13,18 @@ export const POST = withAuth( const query = listExpensesSchema.parse(body); const csv = await exportCsv(ctx.portId, query); + void createAuditLog({ + userId: ctx.userId, + portId: ctx.portId, + action: 'send', + entityType: 'expense_export', + entityId: 'csv', + metadata: { format: 'csv', filterCount: Object.keys(query).length, byteSize: csv.length }, + ipAddress: ctx.ipAddress, + userAgent: ctx.userAgent, + severity: 'warning', + }); + return new NextResponse(csv, { status: 200, headers: { diff --git a/src/app/api/webhooks/documenso/route.ts b/src/app/api/webhooks/documenso/route.ts index 0f5a0f0..255df92 100644 --- a/src/app/api/webhooks/documenso/route.ts +++ b/src/app/api/webhooks/documenso/route.ts @@ -13,6 +13,7 @@ import { handleDocumentCancelled, } from '@/lib/services/documents.service'; import { logger } from '@/lib/logger'; +import { createAuditLog } from '@/lib/audit'; // BR-024: Dedup via signatureHash unique index on documentEvents // Always return 200 from webhook (webhook best practice) @@ -66,6 +67,21 @@ export async function POST(req: NextRequest): Promise { } if (!matched) { logger.warn({ providedLen: providedSecret.length }, 'Invalid Documenso webhook secret'); + void createAuditLog({ + userId: null, + portId: null, + action: 'webhook_failed', + entityType: 'webhook_inbound', + entityId: 'documenso', + metadata: { + reason: 'invalid_secret', + providedLen: providedSecret.length, + }, + ipAddress: req.headers.get('x-forwarded-for')?.split(',')[0]?.trim() ?? '', + userAgent: req.headers.get('user-agent') ?? '', + severity: 'warning', + source: 'webhook', + }); return NextResponse.json({ ok: false, error: 'Invalid secret' }, { status: 200 }); } diff --git a/src/lib/queue/audit-helpers.ts b/src/lib/queue/audit-helpers.ts new file mode 100644 index 0000000..ffafbf9 --- /dev/null +++ b/src/lib/queue/audit-helpers.ts @@ -0,0 +1,92 @@ +/** + * Shared BullMQ → audit log glue. + * + * Each worker calls `attachWorkerAudit(worker, workerName)` after + * defining itself. We listen on the worker's BullMQ events and emit: + * + * - `job_failed` (severity error, source 'job') for every BullMQ + * `failed` event, regardless of attempt number. (Producers know if + * this was a final failure via the existing per-worker logic.) + * - `cron_run` (severity info, source 'cron') for every successful + * completion of a job whose name matches a recurring scheduler + * entry — gives operators a heartbeat row per cron tick. + * + * Audit writes are fire-and-forget and never throw. + */ + +import type { Job, Worker } from 'bullmq'; + +import { createAuditLog } from '@/lib/audit'; +import { logger } from '@/lib/logger'; + +/** + * Names that match recurring jobs registered in `scheduler.ts`. + * Keep in sync — a typo here just means the cron-tick row gets logged + * as a regular job instead of a cron run, no functional impact. + */ +const RECURRING_JOB_NAMES: ReadonlySet = new Set([ + 'signature-poll', + 'reminder-check', + 'reminder-overdue-check', + 'calendar-sync', + 'invoice-overdue-check', + 'tenure-expiry-check', + 'currency-refresh', + 'database-backup', + 'backup-cleanup', + 'session-cleanup', + 'report-scheduler', + 'notification-digest', + 'temp-file-cleanup', + 'form-expiry-check', + 'alerts-evaluate', + 'analytics-refresh', + 'gdpr-export-cleanup', + 'ai-usage-retention', + 'error-events-retention', + 'website-submissions-retention', +]); + +export function attachWorkerAudit(worker: Worker, workerName: string): void { + worker.on('failed', (job: Job | undefined, err: Error) => { + void createAuditLog({ + userId: null, + portId: null, + action: 'job_failed', + entityType: 'queue_job', + entityId: job?.id ?? `${workerName}:unknown`, + metadata: { + worker: workerName, + jobName: job?.name ?? 'unknown', + attemptsMade: job?.attemptsMade ?? null, + opts: job?.opts ? { attempts: job.opts.attempts } : null, + error: err.message?.slice(0, 1024) ?? null, + }, + severity: 'error', + source: 'job', + }); + }); + + worker.on('completed', (job: Job) => { + if (!RECURRING_JOB_NAMES.has(job.name)) return; + void createAuditLog({ + userId: null, + portId: null, + action: 'cron_run', + entityType: 'cron', + entityId: job.name, + metadata: { + worker: workerName, + jobId: job.id ?? null, + durationMs: job.processedOn && job.finishedOn ? job.finishedOn - job.processedOn : null, + }, + severity: 'info', + source: 'cron', + }); + }); + + // Defensive logger — surface any audit-side failure to the worker log. + worker.on('error', (err) => { + logger.warn({ workerName, err }, 'BullMQ worker error'); + }); +} diff --git a/src/lib/queue/workers/ai.ts b/src/lib/queue/workers/ai.ts index 3427c76..dcf54eb 100644 --- a/src/lib/queue/workers/ai.ts +++ b/src/lib/queue/workers/ai.ts @@ -3,6 +3,7 @@ import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; +import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; // ─── Email draft generation ─────────────────────────────────────────────────── @@ -319,3 +320,5 @@ export const aiWorker = new Worker( aiWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'AI job failed'); }); + +attachWorkerAudit(aiWorker, 'ai'); diff --git a/src/lib/queue/workers/bulk.ts b/src/lib/queue/workers/bulk.ts index dbfae8c..c71b8e5 100644 --- a/src/lib/queue/workers/bulk.ts +++ b/src/lib/queue/workers/bulk.ts @@ -3,6 +3,7 @@ import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; +import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; /** @@ -30,3 +31,5 @@ export const bulkWorker = new Worker( bulkWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Bulk job failed'); }); + +attachWorkerAudit(bulkWorker, 'bulk'); diff --git a/src/lib/queue/workers/documents.ts b/src/lib/queue/workers/documents.ts index 52e9a2d..4a35abb 100644 --- a/src/lib/queue/workers/documents.ts +++ b/src/lib/queue/workers/documents.ts @@ -3,6 +3,7 @@ import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; +import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; export const documentsWorker = new Worker( @@ -48,3 +49,5 @@ export const documentsWorker = new Worker( documentsWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Documents job failed'); }); + +attachWorkerAudit(documentsWorker, 'documents'); diff --git a/src/lib/queue/workers/email.ts b/src/lib/queue/workers/email.ts index a41c43c..9911458 100644 --- a/src/lib/queue/workers/email.ts +++ b/src/lib/queue/workers/email.ts @@ -3,6 +3,7 @@ import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; +import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; export const emailWorker = new Worker( @@ -65,3 +66,5 @@ export const emailWorker = new Worker( emailWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Email job failed'); }); + +attachWorkerAudit(emailWorker, 'email'); diff --git a/src/lib/queue/workers/export.ts b/src/lib/queue/workers/export.ts index 401bb22..ca958ad 100644 --- a/src/lib/queue/workers/export.ts +++ b/src/lib/queue/workers/export.ts @@ -3,6 +3,7 @@ import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; +import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; export const exportWorker = new Worker( @@ -35,3 +36,5 @@ export const exportWorker = new Worker( exportWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Export job failed'); }); + +attachWorkerAudit(exportWorker, 'export'); diff --git a/src/lib/queue/workers/import.ts b/src/lib/queue/workers/import.ts index 908a22a..3f3b120 100644 --- a/src/lib/queue/workers/import.ts +++ b/src/lib/queue/workers/import.ts @@ -3,6 +3,7 @@ import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; +import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; export const importWorker = new Worker( @@ -23,3 +24,5 @@ export const importWorker = new Worker( importWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Import job failed'); }); + +attachWorkerAudit(importWorker, 'import'); diff --git a/src/lib/queue/workers/maintenance.ts b/src/lib/queue/workers/maintenance.ts index f52d7bb..f67ce2f 100644 --- a/src/lib/queue/workers/maintenance.ts +++ b/src/lib/queue/workers/maintenance.ts @@ -10,6 +10,7 @@ import { aiUsageLedger } from '@/lib/db/schema/ai-usage'; import { errorEvents } from '@/lib/db/schema/system'; import { websiteSubmissions } from '@/lib/db/schema/website-submissions'; import { logger } from '@/lib/logger'; +import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { getStorageBackend } from '@/lib/storage'; import { QUEUE_CONFIGS } from '@/lib/queue'; @@ -168,3 +169,5 @@ export const maintenanceWorker = new Worker( maintenanceWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Maintenance job failed'); }); + +attachWorkerAudit(maintenanceWorker, 'maintenance'); diff --git a/src/lib/queue/workers/notifications.ts b/src/lib/queue/workers/notifications.ts index 50a9993..a19dd0d 100644 --- a/src/lib/queue/workers/notifications.ts +++ b/src/lib/queue/workers/notifications.ts @@ -3,6 +3,7 @@ import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; +import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; export const notificationsWorker = new Worker( @@ -88,3 +89,5 @@ export const notificationsWorker = new Worker( notificationsWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Notifications job failed'); }); + +attachWorkerAudit(notificationsWorker, 'notifications'); diff --git a/src/lib/queue/workers/reports.ts b/src/lib/queue/workers/reports.ts index 06942e2..631743b 100644 --- a/src/lib/queue/workers/reports.ts +++ b/src/lib/queue/workers/reports.ts @@ -3,6 +3,7 @@ import { env } from '@/lib/env'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; +import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; export const reportsWorker = new Worker( @@ -70,3 +71,5 @@ export const reportsWorker = new Worker( reportsWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Reports job failed'); }); + +attachWorkerAudit(reportsWorker, 'reports'); diff --git a/src/lib/queue/workers/webhooks.ts b/src/lib/queue/workers/webhooks.ts index 5c2ae28..bfdc46b 100644 --- a/src/lib/queue/workers/webhooks.ts +++ b/src/lib/queue/workers/webhooks.ts @@ -5,6 +5,7 @@ import { lookup } from 'node:dns/promises'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; +import { attachWorkerAudit } from '@/lib/queue/audit-helpers'; import { QUEUE_CONFIGS } from '@/lib/queue'; import { isLocalOrPrivateHost } from '@/lib/validators/webhooks'; @@ -321,3 +322,5 @@ export const webhooksWorker = new Worker( webhooksWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Webhooks job failed'); }); + +attachWorkerAudit(webhooksWorker, 'webhooks'); diff --git a/src/lib/services/portal-auth.service.ts b/src/lib/services/portal-auth.service.ts index f2bbdce..c7194d7 100644 --- a/src/lib/services/portal-auth.service.ts +++ b/src/lib/services/portal-auth.service.ts @@ -212,6 +212,16 @@ export async function signIn(args: { : (await verifyPassword(args.password, dummyHash), false); if (!user || !user.isActive || !user.passwordHash || !ok) { + void createAuditLog({ + userId: null, + portId: user?.portId ?? null, + action: 'login', + entityType: 'portal_session', + entityId: user?.id ?? normalizedEmail, + metadata: { ok: false, attemptedEmail: normalizedEmail, reason: 'invalid_credentials' }, + severity: 'warning', + source: 'auth', + }); throw new UnauthorizedError('Invalid email or password'); } @@ -219,6 +229,16 @@ export async function signIn(args: { // password on a disabled-port account still surfaces "invalid email or // password" - we never leak which ports have the portal turned off. if (!(await isPortalEnabledForPort(user.portId))) { + void createAuditLog({ + userId: null, + portId: user.portId, + action: 'login', + entityType: 'portal_session', + entityId: user.id, + metadata: { ok: false, attemptedEmail: normalizedEmail, reason: 'portal_disabled' }, + severity: 'warning', + source: 'auth', + }); throw new UnauthorizedError('Invalid email or password'); } @@ -230,6 +250,17 @@ export async function signIn(args: { await db.update(portalUsers).set({ lastLoginAt: new Date() }).where(eq(portalUsers.id, user.id)); + void createAuditLog({ + userId: null, + portId: user.portId, + action: 'login', + entityType: 'portal_session', + entityId: user.id, + metadata: { ok: true, email: user.email }, + severity: 'info', + source: 'auth', + }); + return { token, clientId: user.clientId, portId: user.portId, email: user.email }; } @@ -246,6 +277,16 @@ export async function requestPasswordReset(email: string): Promise { // Silently no-op so unknown emails don't leak through timing or // response shape. Caller surfaces "if the email matches an account…". logger.debug({ email: normalizedEmail }, 'Password reset for unknown email'); + void createAuditLog({ + userId: null, + portId: null, + action: 'portal_password_reset_request', + entityType: 'portal_user', + entityId: 'unknown', + metadata: { email: normalizedEmail, reason: 'unknown_or_inactive' }, + severity: 'warning', + source: 'auth', + }); return; } @@ -253,6 +294,16 @@ export async function requestPasswordReset(email: string): Promise { // disabled-state from leaking through the public reset endpoint. if (!(await isPortalEnabledForPort(user.portId))) { logger.debug({ portId: user.portId }, 'Password reset on disabled-portal port'); + void createAuditLog({ + userId: null, + portId: user.portId, + action: 'portal_password_reset_request', + entityType: 'portal_user', + entityId: user.id, + metadata: { email: normalizedEmail, reason: 'portal_disabled' }, + severity: 'warning', + source: 'auth', + }); return; } @@ -342,6 +393,16 @@ async function consumeToken( }); if (!row) { + void createAuditLog({ + userId: null, + portId: null, + action: type === 'reset' ? 'portal_password_reset' : 'portal_activate', + entityType: 'portal_auth_token', + entityId: 'invalid', + metadata: { type, reason: 'invalid_or_expired_token' }, + severity: 'warning', + source: 'auth', + }); throw new ValidationError('Invalid or expired token'); } diff --git a/src/lib/services/webhooks.service.ts b/src/lib/services/webhooks.service.ts index 16680a0..57836f3 100644 --- a/src/lib/services/webhooks.service.ts +++ b/src/lib/services/webhooks.service.ts @@ -218,6 +218,7 @@ export async function regenerateSecret(portId: string, webhookId: string, meta: metadata: { type: 'secret_regenerated' }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, + severity: 'warning', }); // Return new plaintext secret - shown ONCE