Files
pn-new-crm/src/lib/queue/workers/webhooks.ts
Matt Ciaccio 9890d065f8 feat(audit): wider coverage — sensitive views, cron, jobs, portal abuse
Builds on the audit infra split (severity/source) by emitting events
from every place a security or operations review would want to see:

Sensitive data views (severity=warning):
- GDPR export download URL issued
- Audit log page opened (watch-the-watchers; first page only)
- CSV export of expenses
- Webhook secret regenerated

Authentication abuse (severity=warning, source=auth):
- Portal sign-in: success + failed-credentials + portal-disabled
- Portal password reset: unknown email + portal-disabled + bad token
- Portal activation: bad/expired token

Inbound webhook hardening:
- Documenso webhook with invalid X-Documenso-Secret now writes
  webhook_failed instead of being silently logged

Background work (source=cron / job):
- New attachWorkerAudit() helper wires every BullMQ worker to emit
  job_failed (severity=error) on .on('failed') and cron_run on
  .on('completed') for any job whose name matches the recurring
  scheduler list. Applied across all 10 workers.

1175/1175 vitest passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 20:44:38 +02:00

327 lines
11 KiB
TypeScript

import { Worker, type Job } from 'bullmq';
import { env } from '@/lib/env';
import { createHmac } from 'node:crypto';
import { lookup } from 'node:dns/promises';
import type { ConnectionOptions } from 'bullmq';
import { logger } from '@/lib/logger';
import { attachWorkerAudit } from '@/lib/queue/audit-helpers';
import { QUEUE_CONFIGS } from '@/lib/queue';
import { isLocalOrPrivateHost } from '@/lib/validators/webhooks';
/**
* Resolve the webhook hostname and reject if any returned address is in a
* disallowed range. Defends against DNS rebinding where the validator-time
* resolution returned a public address but dispatch-time resolution
* returns a private one.
*/
async function resolveAndCheckHost(
rawUrl: string,
): Promise<{ ok: true } | { ok: false; reason: string }> {
if (isLocalOrPrivateHost(rawUrl)) {
return { ok: false, reason: 'webhook URL host blocked by static check' };
}
let host: string;
try {
host = new URL(rawUrl).hostname;
} catch {
return { ok: false, reason: 'invalid URL' };
}
try {
const addresses = await lookup(host, { all: true });
for (const a of addresses) {
// Reuse the validator's literal-address checks on each resolved IP.
if (isLocalOrPrivateHost(`https://${a.family === 6 ? `[${a.address}]` : a.address}`)) {
return { ok: false, reason: `resolved address ${a.address} is in a blocked range` };
}
}
} catch (err) {
return {
ok: false,
reason: `DNS resolution failed: ${err instanceof Error ? err.message : 'unknown'}`,
};
}
return { ok: true };
}
// ─── Job Payload ─────────────────────────────────────────────────────────────
interface WebhookDeliverPayload {
webhookId: string;
portId: string;
event: string;
deliveryId: string;
payload: Record<string, unknown>;
}
// ─── Worker ──────────────────────────────────────────────────────────────────
export const webhooksWorker = new Worker(
'webhooks',
async (job: Job) => {
logger.info({ jobId: job.id, jobName: job.name }, 'Processing webhooks job');
if (job.name !== 'deliver') {
logger.warn({ jobName: job.name }, 'Unknown webhooks job');
return;
}
const { webhookId, portId, event, deliveryId, payload } = job.data as WebhookDeliverPayload;
const { db } = await import('@/lib/db');
const { webhooks, webhookDeliveries } = await import('@/lib/db/schema/system');
const { userProfiles } = await import('@/lib/db/schema/users');
const { decrypt } = await import('@/lib/utils/encryption');
const { createNotification } = await import('@/lib/services/notifications.service');
const { eq, and } = await import('drizzle-orm');
// 1. Fetch webhook - skip if deleted
const webhook = await db.query.webhooks.findFirst({
where: eq(webhooks.id, webhookId),
});
if (!webhook) {
logger.info({ webhookId }, 'Webhook deleted - skipping delivery');
await db.delete(webhookDeliveries).where(eq(webhookDeliveries.id, deliveryId));
return;
}
// Safety net: when EMAIL_REDIRECT_TO is set (dev / staging / migration
// dry-run), short-circuit webhook delivery so we don't accidentally
// ping a user-configured production endpoint with synthetic events.
// Records the delivery as `dead_letter` with a clear reason so the
// attempt is still visible in the deliveries listing.
if (process.env.EMAIL_REDIRECT_TO) {
logger.info(
{ webhookId, deliveryId, url: webhook.url },
'Webhook delivery skipped (EMAIL_REDIRECT_TO is set - outbound comms are paused)',
);
await db
.update(webhookDeliveries)
.set({
status: 'dead_letter',
responseStatus: null,
responseBody: 'Skipped: EMAIL_REDIRECT_TO is set, outbound comms paused.',
deliveredAt: new Date(),
})
.where(eq(webhookDeliveries.id, deliveryId));
return;
}
// 2. Decrypt secret
let secret: string;
try {
secret = webhook.secret ? decrypt(webhook.secret) : '';
} catch (err) {
logger.error({ webhookId, err }, 'Failed to decrypt webhook secret');
throw err; // Let BullMQ retry
}
// 3. Build final payload
const finalPayload = {
id: deliveryId,
event,
timestamp: new Date().toISOString(),
port_id: portId,
data: payload,
};
const bodyString = JSON.stringify(finalPayload);
// 4. Sign with HMAC-SHA256
const signature = secret
? `sha256=${createHmac('sha256', secret).update(bodyString).digest('hex')}`
: '';
const attempt = (job.attemptsMade ?? 0) + 1;
// 5. POST to webhook URL with 10s timeout
let responseStatus: number | null = null;
let responseBody: string | null = null;
let success = false;
// SSRF gate: re-resolve the hostname at dispatch time and reject if it
// points anywhere internal. The validator already filtered literal
// hostnames at create/update time, but DNS rebinding could swap the
// answer between then and now.
const hostCheck = await resolveAndCheckHost(webhook.url);
if (!hostCheck.ok) {
logger.warn(
{ webhookId, deliveryId, url: webhook.url, reason: hostCheck.reason },
'Webhook dispatch blocked by SSRF guard',
);
// Persist the failure so the deliveries listing reflects it.
const { db: dbInner } = await import('@/lib/db');
const { webhookDeliveries } = await import('@/lib/db/schema/system');
const { eq } = await import('drizzle-orm');
await dbInner
.update(webhookDeliveries)
.set({
status: 'dead_letter',
responseStatus: null,
responseBody: `Blocked: ${hostCheck.reason}`,
deliveredAt: new Date(),
})
.where(eq(webhookDeliveries.id, deliveryId));
return;
}
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 10_000);
const response = await fetch(webhook.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'User-Agent': 'PortNimara-Webhook/1.0',
'X-Webhook-Id': webhookId,
'X-Webhook-Event': event,
'X-Webhook-Signature': signature,
'X-Webhook-Delivery': deliveryId,
},
body: bodyString,
signal: controller.signal,
});
clearTimeout(timeoutId);
responseStatus = response.status;
// Read up to 1KB of response body
const rawBody = await response.text();
responseBody = rawBody.slice(0, 1024);
success = response.status >= 200 && response.status < 300;
} catch (err) {
// Network error or timeout
logger.warn({ webhookId, deliveryId, err }, 'Webhook delivery network error');
responseStatus = null;
responseBody = err instanceof Error ? err.message.slice(0, 1024) : String(err).slice(0, 1024);
success = false;
}
const maxAttempts = QUEUE_CONFIGS.webhooks.maxAttempts;
const isFinalAttempt = attempt >= maxAttempts;
const { createAuditLog } = await import('@/lib/audit');
if (success) {
// 6a. Record success
await db
.update(webhookDeliveries)
.set({
status: 'success',
responseStatus,
responseBody,
attempt,
deliveredAt: new Date(),
})
.where(eq(webhookDeliveries.id, deliveryId));
logger.info({ webhookId, deliveryId, event }, 'Webhook delivered successfully');
void createAuditLog({
userId: null,
portId,
action: 'webhook_delivered',
entityType: 'webhook_delivery',
entityId: deliveryId,
metadata: { webhookId, event, responseStatus, attempt },
source: 'webhook',
severity: 'info',
});
} else if (!success && isFinalAttempt) {
// 6b. Final failure → dead_letter + system alert
await db
.update(webhookDeliveries)
.set({
status: 'dead_letter',
responseStatus,
responseBody,
attempt,
})
.where(eq(webhookDeliveries.id, deliveryId));
logger.error(
{ webhookId, deliveryId, event, attempt },
'Webhook delivery permanently failed - dead_letter',
);
void createAuditLog({
userId: null,
portId,
action: 'webhook_dead_letter',
entityType: 'webhook_delivery',
entityId: deliveryId,
metadata: { webhookId, event, responseStatus, attempt, responseBody },
source: 'webhook',
severity: 'error',
});
// Notify all super admins
try {
const superAdmins = await db
.select({ userId: userProfiles.userId })
.from(userProfiles)
.where(and(eq(userProfiles.isSuperAdmin, true), eq(userProfiles.isActive, true)));
for (const admin of superAdmins) {
void createNotification({
portId,
userId: admin.userId,
type: 'system_alert',
title: 'Webhook delivery failed permanently',
description: `Webhook "${webhook.name}" failed to deliver event "${event}" after ${maxAttempts} attempts.`,
link: `/admin/webhooks/${webhookId}`,
entityType: 'webhook',
entityId: webhookId,
dedupeKey: `webhook:dead_letter:${deliveryId}`,
cooldownMs: 0,
});
}
} catch (notifyErr) {
logger.error({ notifyErr }, 'Failed to send dead_letter notification');
}
// Throw to let BullMQ mark job as failed (it won't retry since it's the final attempt)
throw new Error(
`Webhook delivery failed after ${attempt} attempts. Status: ${responseStatus ?? 'network error'}`,
);
} else {
// 6c. Non-final failure → update record then throw to trigger retry
await db
.update(webhookDeliveries)
.set({
status: 'failed',
responseStatus,
responseBody,
attempt,
})
.where(eq(webhookDeliveries.id, deliveryId));
void createAuditLog({
userId: null,
portId,
action: 'webhook_failed',
entityType: 'webhook_delivery',
entityId: deliveryId,
metadata: { webhookId, event, responseStatus, attempt },
source: 'webhook',
severity: 'warning',
});
throw new Error(
`Webhook delivery attempt ${attempt} failed. Status: ${responseStatus ?? 'network error'}. Retrying...`,
);
}
},
{
connection: { url: env.REDIS_URL } as ConnectionOptions,
concurrency: QUEUE_CONFIGS.webhooks.concurrency,
},
);
webhooksWorker.on('failed', (job, err) => {
logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Webhooks job failed');
});
attachWorkerAudit(webhooksWorker, 'webhooks');