Files
pn-new-crm/src/lib/queue/workers/webhooks.ts

289 lines
9.9 KiB
TypeScript
Raw Normal View History

import { Worker, type Job } from 'bullmq';
import { env } from '@/lib/env';
import { createHmac } from 'node:crypto';
sec: webhook SSRF guard, IMAP-sync owner check, watcher port membership Three findings from a fourth-pass review: 1. MEDIUM — webhook URL SSRF. The validator only enforced HTTPS+URL parse; it accepted private/loopback/link-local/.internal hosts. The delivery worker fetched arbitrary URLs and persisted up to 1KB of response body into webhook_deliveries.response_body, which is then surfaced via the deliveries listing endpoint — a port admin could register a webhook to an internal HTTPS endpoint, hit the test endpoint to force immediate dispatch, and read the response back. Validator now rejects RFC-1918/loopback/link-local/CGNAT/ULA IPs (v4 + v6) and .internal/.local/.localhost/.lan/.intranet/.corp suffixes; the worker re-resolves the hostname at dispatch time and blocks before fetch (DNS rebinding defense). 21-case unit test covers the matrix. 2. MEDIUM — POST /api/v1/email/accounts/[id]/sync had no owner check. Any user with email:view could enqueue an inbox-sync job for any accountId, which the worker would honour using the foreign user's decrypted IMAP credentials and advance the account's lastSyncAt (data-loss risk on the legitimate owner's next sync). Route now asserts account.userId === ctx.userId before enqueueing, matching the toggle/disconnect endpoints. 3. MEDIUM — addDocumentWatcher (and the wizard / upload watcher inserts) didn't validate the watcher's userId belonged to the document's port. notifyDocumentEvent then emitted a real-time socket toast + email containing the document title to the foreign user. New assertWatchersInPort helper verifies each candidate has a userPortRoles row for the port (super-admin bypass). 818 vitest tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 03:15:39 +02:00
import { lookup } from 'node:dns/promises';
import type { ConnectionOptions } from 'bullmq';
import { logger } from '@/lib/logger';
import { QUEUE_CONFIGS } from '@/lib/queue';
sec: webhook SSRF guard, IMAP-sync owner check, watcher port membership Three findings from a fourth-pass review: 1. MEDIUM — webhook URL SSRF. The validator only enforced HTTPS+URL parse; it accepted private/loopback/link-local/.internal hosts. The delivery worker fetched arbitrary URLs and persisted up to 1KB of response body into webhook_deliveries.response_body, which is then surfaced via the deliveries listing endpoint — a port admin could register a webhook to an internal HTTPS endpoint, hit the test endpoint to force immediate dispatch, and read the response back. Validator now rejects RFC-1918/loopback/link-local/CGNAT/ULA IPs (v4 + v6) and .internal/.local/.localhost/.lan/.intranet/.corp suffixes; the worker re-resolves the hostname at dispatch time and blocks before fetch (DNS rebinding defense). 21-case unit test covers the matrix. 2. MEDIUM — POST /api/v1/email/accounts/[id]/sync had no owner check. Any user with email:view could enqueue an inbox-sync job for any accountId, which the worker would honour using the foreign user's decrypted IMAP credentials and advance the account's lastSyncAt (data-loss risk on the legitimate owner's next sync). Route now asserts account.userId === ctx.userId before enqueueing, matching the toggle/disconnect endpoints. 3. MEDIUM — addDocumentWatcher (and the wizard / upload watcher inserts) didn't validate the watcher's userId belonged to the document's port. notifyDocumentEvent then emitted a real-time socket toast + email containing the document title to the foreign user. New assertWatchersInPort helper verifies each candidate has a userPortRoles row for the port (super-admin bypass). 818 vitest tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 03:15:39 +02:00
import { isLocalOrPrivateHost } from '@/lib/validators/webhooks';
/**
* Resolve the webhook hostname and reject if any returned address is in a
* disallowed range. Defends against DNS rebinding where the validator-time
* resolution returned a public address but dispatch-time resolution
* returns a private one.
*/
async function resolveAndCheckHost(
rawUrl: string,
): Promise<{ ok: true } | { ok: false; reason: string }> {
if (isLocalOrPrivateHost(rawUrl)) {
return { ok: false, reason: 'webhook URL host blocked by static check' };
}
let host: string;
try {
host = new URL(rawUrl).hostname;
} catch {
return { ok: false, reason: 'invalid URL' };
}
try {
const addresses = await lookup(host, { all: true });
for (const a of addresses) {
// Reuse the validator's literal-address checks on each resolved IP.
if (isLocalOrPrivateHost(`https://${a.family === 6 ? `[${a.address}]` : a.address}`)) {
return { ok: false, reason: `resolved address ${a.address} is in a blocked range` };
}
}
} catch (err) {
return {
ok: false,
reason: `DNS resolution failed: ${err instanceof Error ? err.message : 'unknown'}`,
};
}
return { ok: true };
}
// ─── Job Payload ─────────────────────────────────────────────────────────────
interface WebhookDeliverPayload {
webhookId: string;
portId: string;
event: string;
deliveryId: string;
payload: Record<string, unknown>;
}
// ─── Worker ──────────────────────────────────────────────────────────────────
export const webhooksWorker = new Worker(
'webhooks',
async (job: Job) => {
logger.info({ jobId: job.id, jobName: job.name }, 'Processing webhooks job');
if (job.name !== 'deliver') {
logger.warn({ jobName: job.name }, 'Unknown webhooks job');
return;
}
sec: webhook SSRF guard, IMAP-sync owner check, watcher port membership Three findings from a fourth-pass review: 1. MEDIUM — webhook URL SSRF. The validator only enforced HTTPS+URL parse; it accepted private/loopback/link-local/.internal hosts. The delivery worker fetched arbitrary URLs and persisted up to 1KB of response body into webhook_deliveries.response_body, which is then surfaced via the deliveries listing endpoint — a port admin could register a webhook to an internal HTTPS endpoint, hit the test endpoint to force immediate dispatch, and read the response back. Validator now rejects RFC-1918/loopback/link-local/CGNAT/ULA IPs (v4 + v6) and .internal/.local/.localhost/.lan/.intranet/.corp suffixes; the worker re-resolves the hostname at dispatch time and blocks before fetch (DNS rebinding defense). 21-case unit test covers the matrix. 2. MEDIUM — POST /api/v1/email/accounts/[id]/sync had no owner check. Any user with email:view could enqueue an inbox-sync job for any accountId, which the worker would honour using the foreign user's decrypted IMAP credentials and advance the account's lastSyncAt (data-loss risk on the legitimate owner's next sync). Route now asserts account.userId === ctx.userId before enqueueing, matching the toggle/disconnect endpoints. 3. MEDIUM — addDocumentWatcher (and the wizard / upload watcher inserts) didn't validate the watcher's userId belonged to the document's port. notifyDocumentEvent then emitted a real-time socket toast + email containing the document title to the foreign user. New assertWatchersInPort helper verifies each candidate has a userPortRoles row for the port (super-admin bypass). 818 vitest tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 03:15:39 +02:00
const { webhookId, portId, event, deliveryId, payload } = job.data as WebhookDeliverPayload;
const { db } = await import('@/lib/db');
const { webhooks, webhookDeliveries } = await import('@/lib/db/schema/system');
const { userProfiles } = await import('@/lib/db/schema/users');
const { decrypt } = await import('@/lib/utils/encryption');
const { createNotification } = await import('@/lib/services/notifications.service');
const { eq, and } = await import('drizzle-orm');
// 1. Fetch webhook - skip if deleted
const webhook = await db.query.webhooks.findFirst({
where: eq(webhooks.id, webhookId),
});
if (!webhook) {
logger.info({ webhookId }, 'Webhook deleted - skipping delivery');
sec: webhook SSRF guard, IMAP-sync owner check, watcher port membership Three findings from a fourth-pass review: 1. MEDIUM — webhook URL SSRF. The validator only enforced HTTPS+URL parse; it accepted private/loopback/link-local/.internal hosts. The delivery worker fetched arbitrary URLs and persisted up to 1KB of response body into webhook_deliveries.response_body, which is then surfaced via the deliveries listing endpoint — a port admin could register a webhook to an internal HTTPS endpoint, hit the test endpoint to force immediate dispatch, and read the response back. Validator now rejects RFC-1918/loopback/link-local/CGNAT/ULA IPs (v4 + v6) and .internal/.local/.localhost/.lan/.intranet/.corp suffixes; the worker re-resolves the hostname at dispatch time and blocks before fetch (DNS rebinding defense). 21-case unit test covers the matrix. 2. MEDIUM — POST /api/v1/email/accounts/[id]/sync had no owner check. Any user with email:view could enqueue an inbox-sync job for any accountId, which the worker would honour using the foreign user's decrypted IMAP credentials and advance the account's lastSyncAt (data-loss risk on the legitimate owner's next sync). Route now asserts account.userId === ctx.userId before enqueueing, matching the toggle/disconnect endpoints. 3. MEDIUM — addDocumentWatcher (and the wizard / upload watcher inserts) didn't validate the watcher's userId belonged to the document's port. notifyDocumentEvent then emitted a real-time socket toast + email containing the document title to the foreign user. New assertWatchersInPort helper verifies each candidate has a userPortRoles row for the port (super-admin bypass). 818 vitest tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 03:15:39 +02:00
await db.delete(webhookDeliveries).where(eq(webhookDeliveries.id, deliveryId));
return;
}
feat(safety): EMAIL_REDIRECT_TO now also pauses Documenso + webhooks Closes a gap exposed by the comms safety audit: the existing EMAIL_REDIRECT_TO env var only redirected outbound SMTP via the sendEmail() bottleneck. Two channels still leaked when set: 1. Documenso e-signature recipients — Documenso's own server emails them on our behalf, so SMTP redirect doesn't help. We were sending real client emails to the Documenso REST API, which would then deliver to the real client. 2. Outbound webhooks — fire from the BullMQ worker to user-configured URLs. SSRF guard blocks internal hosts but doesn't pause production endpoints. Documenso (src/lib/services/documenso-client.ts): - createDocument: rewrite every recipient.email to EMAIL_REDIRECT_TO and prefix the recipient.name with the original email so the doc is traceable. - generateDocumentFromTemplate: same treatment for both v1.13 formValues.*Email keys and v2.x recipients[]. The redirect happens BEFORE the API call, so even Documenso's own retry logic can't reach the original recipient. - Both paths log when they redirect so it's visible in dev. Webhooks (src/lib/queue/workers/webhooks.ts): - When EMAIL_REDIRECT_TO is set, short-circuit the dispatch and write a `dead_letter` row with reason "Skipped: EMAIL_REDIRECT_TO is set, outbound comms paused." so the attempt is still visible in the deliveries listing. Doc: docs/operations/outbound-comms-safety.md catalogs every outbound comms channel (email, Documenso, webhooks, WhatsApp/phone deep-links, SMS-not-implemented) and explains how each one respects the env flag. Includes a verification checklist to run before any production data import + cutover steps for going live. Single env var EMAIL_REDIRECT_TO now reliably pauses ALL automated outbound comms. Unset for production. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 17:24:41 +02:00
// Safety net: when EMAIL_REDIRECT_TO is set (dev / staging / migration
// dry-run), short-circuit webhook delivery so we don't accidentally
// ping a user-configured production endpoint with synthetic events.
// Records the delivery as `dead_letter` with a clear reason so the
// attempt is still visible in the deliveries listing.
if (process.env.EMAIL_REDIRECT_TO) {
logger.info(
{ webhookId, deliveryId, url: webhook.url },
'Webhook delivery skipped (EMAIL_REDIRECT_TO is set - outbound comms are paused)',
feat(safety): EMAIL_REDIRECT_TO now also pauses Documenso + webhooks Closes a gap exposed by the comms safety audit: the existing EMAIL_REDIRECT_TO env var only redirected outbound SMTP via the sendEmail() bottleneck. Two channels still leaked when set: 1. Documenso e-signature recipients — Documenso's own server emails them on our behalf, so SMTP redirect doesn't help. We were sending real client emails to the Documenso REST API, which would then deliver to the real client. 2. Outbound webhooks — fire from the BullMQ worker to user-configured URLs. SSRF guard blocks internal hosts but doesn't pause production endpoints. Documenso (src/lib/services/documenso-client.ts): - createDocument: rewrite every recipient.email to EMAIL_REDIRECT_TO and prefix the recipient.name with the original email so the doc is traceable. - generateDocumentFromTemplate: same treatment for both v1.13 formValues.*Email keys and v2.x recipients[]. The redirect happens BEFORE the API call, so even Documenso's own retry logic can't reach the original recipient. - Both paths log when they redirect so it's visible in dev. Webhooks (src/lib/queue/workers/webhooks.ts): - When EMAIL_REDIRECT_TO is set, short-circuit the dispatch and write a `dead_letter` row with reason "Skipped: EMAIL_REDIRECT_TO is set, outbound comms paused." so the attempt is still visible in the deliveries listing. Doc: docs/operations/outbound-comms-safety.md catalogs every outbound comms channel (email, Documenso, webhooks, WhatsApp/phone deep-links, SMS-not-implemented) and explains how each one respects the env flag. Includes a verification checklist to run before any production data import + cutover steps for going live. Single env var EMAIL_REDIRECT_TO now reliably pauses ALL automated outbound comms. Unset for production. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 17:24:41 +02:00
);
await db
.update(webhookDeliveries)
.set({
status: 'dead_letter',
responseStatus: null,
responseBody: 'Skipped: EMAIL_REDIRECT_TO is set, outbound comms paused.',
deliveredAt: new Date(),
})
.where(eq(webhookDeliveries.id, deliveryId));
return;
}
// 2. Decrypt secret
let secret: string;
try {
secret = webhook.secret ? decrypt(webhook.secret) : '';
} catch (err) {
logger.error({ webhookId, err }, 'Failed to decrypt webhook secret');
throw err; // Let BullMQ retry
}
// 3. Build final payload
const finalPayload = {
id: deliveryId,
event,
timestamp: new Date().toISOString(),
port_id: portId,
data: payload,
};
const bodyString = JSON.stringify(finalPayload);
// 4. Sign with HMAC-SHA256
const signature = secret
? `sha256=${createHmac('sha256', secret).update(bodyString).digest('hex')}`
: '';
const attempt = (job.attemptsMade ?? 0) + 1;
// 5. POST to webhook URL with 10s timeout
let responseStatus: number | null = null;
let responseBody: string | null = null;
let success = false;
sec: webhook SSRF guard, IMAP-sync owner check, watcher port membership Three findings from a fourth-pass review: 1. MEDIUM — webhook URL SSRF. The validator only enforced HTTPS+URL parse; it accepted private/loopback/link-local/.internal hosts. The delivery worker fetched arbitrary URLs and persisted up to 1KB of response body into webhook_deliveries.response_body, which is then surfaced via the deliveries listing endpoint — a port admin could register a webhook to an internal HTTPS endpoint, hit the test endpoint to force immediate dispatch, and read the response back. Validator now rejects RFC-1918/loopback/link-local/CGNAT/ULA IPs (v4 + v6) and .internal/.local/.localhost/.lan/.intranet/.corp suffixes; the worker re-resolves the hostname at dispatch time and blocks before fetch (DNS rebinding defense). 21-case unit test covers the matrix. 2. MEDIUM — POST /api/v1/email/accounts/[id]/sync had no owner check. Any user with email:view could enqueue an inbox-sync job for any accountId, which the worker would honour using the foreign user's decrypted IMAP credentials and advance the account's lastSyncAt (data-loss risk on the legitimate owner's next sync). Route now asserts account.userId === ctx.userId before enqueueing, matching the toggle/disconnect endpoints. 3. MEDIUM — addDocumentWatcher (and the wizard / upload watcher inserts) didn't validate the watcher's userId belonged to the document's port. notifyDocumentEvent then emitted a real-time socket toast + email containing the document title to the foreign user. New assertWatchersInPort helper verifies each candidate has a userPortRoles row for the port (super-admin bypass). 818 vitest tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 03:15:39 +02:00
// SSRF gate: re-resolve the hostname at dispatch time and reject if it
// points anywhere internal. The validator already filtered literal
// hostnames at create/update time, but DNS rebinding could swap the
// answer between then and now.
const hostCheck = await resolveAndCheckHost(webhook.url);
if (!hostCheck.ok) {
logger.warn(
{ webhookId, deliveryId, url: webhook.url, reason: hostCheck.reason },
'Webhook dispatch blocked by SSRF guard',
);
// Persist the failure so the deliveries listing reflects it.
const { db: dbInner } = await import('@/lib/db');
const { webhookDeliveries } = await import('@/lib/db/schema/system');
const { eq } = await import('drizzle-orm');
await dbInner
.update(webhookDeliveries)
.set({
status: 'dead_letter',
responseStatus: null,
responseBody: `Blocked: ${hostCheck.reason}`,
deliveredAt: new Date(),
})
.where(eq(webhookDeliveries.id, deliveryId));
return;
}
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 10_000);
const response = await fetch(webhook.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'User-Agent': 'PortNimara-Webhook/1.0',
'X-Webhook-Id': webhookId,
'X-Webhook-Event': event,
'X-Webhook-Signature': signature,
'X-Webhook-Delivery': deliveryId,
},
body: bodyString,
signal: controller.signal,
});
clearTimeout(timeoutId);
responseStatus = response.status;
// Read up to 1KB of response body
const rawBody = await response.text();
responseBody = rawBody.slice(0, 1024);
success = response.status >= 200 && response.status < 300;
} catch (err) {
// Network error or timeout
logger.warn({ webhookId, deliveryId, err }, 'Webhook delivery network error');
responseStatus = null;
responseBody = err instanceof Error ? err.message.slice(0, 1024) : String(err).slice(0, 1024);
success = false;
}
const maxAttempts = QUEUE_CONFIGS.webhooks.maxAttempts;
const isFinalAttempt = attempt >= maxAttempts;
if (success) {
// 6a. Record success
await db
.update(webhookDeliveries)
.set({
status: 'success',
responseStatus,
responseBody,
attempt,
deliveredAt: new Date(),
})
.where(eq(webhookDeliveries.id, deliveryId));
logger.info({ webhookId, deliveryId, event }, 'Webhook delivered successfully');
} else if (!success && isFinalAttempt) {
// 6b. Final failure → dead_letter + system alert
await db
.update(webhookDeliveries)
.set({
status: 'dead_letter',
responseStatus,
responseBody,
attempt,
})
.where(eq(webhookDeliveries.id, deliveryId));
logger.error(
{ webhookId, deliveryId, event, attempt },
'Webhook delivery permanently failed - dead_letter',
);
// Notify all super admins
try {
const superAdmins = await db
.select({ userId: userProfiles.userId })
.from(userProfiles)
.where(and(eq(userProfiles.isSuperAdmin, true), eq(userProfiles.isActive, true)));
for (const admin of superAdmins) {
void createNotification({
portId,
userId: admin.userId,
type: 'system_alert',
title: 'Webhook delivery failed permanently',
description: `Webhook "${webhook.name}" failed to deliver event "${event}" after ${maxAttempts} attempts.`,
link: `/admin/webhooks/${webhookId}`,
entityType: 'webhook',
entityId: webhookId,
dedupeKey: `webhook:dead_letter:${deliveryId}`,
cooldownMs: 0,
});
}
} catch (notifyErr) {
logger.error({ notifyErr }, 'Failed to send dead_letter notification');
}
// Throw to let BullMQ mark job as failed (it won't retry since it's the final attempt)
throw new Error(
`Webhook delivery failed after ${attempt} attempts. Status: ${responseStatus ?? 'network error'}`,
);
} else {
// 6c. Non-final failure → update record then throw to trigger retry
await db
.update(webhookDeliveries)
.set({
status: 'failed',
responseStatus,
responseBody,
attempt,
})
.where(eq(webhookDeliveries.id, deliveryId));
throw new Error(
`Webhook delivery attempt ${attempt} failed. Status: ${responseStatus ?? 'network error'}. Retrying...`,
);
}
},
{
connection: { url: env.REDIS_URL } as ConnectionOptions,
concurrency: QUEUE_CONFIGS.webhooks.concurrency,
},
);
webhooksWorker.on('failed', (job, err) => {
logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Webhooks job failed');
});