Files
pn-new-crm/src/lib/queue/workers/webhooks.ts

266 lines
9.0 KiB
TypeScript
Raw Normal View History

import { Worker, type Job } from 'bullmq';
import { createHmac } from 'node:crypto';
sec: webhook SSRF guard, IMAP-sync owner check, watcher port membership Three findings from a fourth-pass review: 1. MEDIUM — webhook URL SSRF. The validator only enforced HTTPS+URL parse; it accepted private/loopback/link-local/.internal hosts. The delivery worker fetched arbitrary URLs and persisted up to 1KB of response body into webhook_deliveries.response_body, which is then surfaced via the deliveries listing endpoint — a port admin could register a webhook to an internal HTTPS endpoint, hit the test endpoint to force immediate dispatch, and read the response back. Validator now rejects RFC-1918/loopback/link-local/CGNAT/ULA IPs (v4 + v6) and .internal/.local/.localhost/.lan/.intranet/.corp suffixes; the worker re-resolves the hostname at dispatch time and blocks before fetch (DNS rebinding defense). 21-case unit test covers the matrix. 2. MEDIUM — POST /api/v1/email/accounts/[id]/sync had no owner check. Any user with email:view could enqueue an inbox-sync job for any accountId, which the worker would honour using the foreign user's decrypted IMAP credentials and advance the account's lastSyncAt (data-loss risk on the legitimate owner's next sync). Route now asserts account.userId === ctx.userId before enqueueing, matching the toggle/disconnect endpoints. 3. MEDIUM — addDocumentWatcher (and the wizard / upload watcher inserts) didn't validate the watcher's userId belonged to the document's port. notifyDocumentEvent then emitted a real-time socket toast + email containing the document title to the foreign user. New assertWatchersInPort helper verifies each candidate has a userPortRoles row for the port (super-admin bypass). 818 vitest tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 03:15:39 +02:00
import { lookup } from 'node:dns/promises';
import type { ConnectionOptions } from 'bullmq';
import { logger } from '@/lib/logger';
import { QUEUE_CONFIGS } from '@/lib/queue';
sec: webhook SSRF guard, IMAP-sync owner check, watcher port membership Three findings from a fourth-pass review: 1. MEDIUM — webhook URL SSRF. The validator only enforced HTTPS+URL parse; it accepted private/loopback/link-local/.internal hosts. The delivery worker fetched arbitrary URLs and persisted up to 1KB of response body into webhook_deliveries.response_body, which is then surfaced via the deliveries listing endpoint — a port admin could register a webhook to an internal HTTPS endpoint, hit the test endpoint to force immediate dispatch, and read the response back. Validator now rejects RFC-1918/loopback/link-local/CGNAT/ULA IPs (v4 + v6) and .internal/.local/.localhost/.lan/.intranet/.corp suffixes; the worker re-resolves the hostname at dispatch time and blocks before fetch (DNS rebinding defense). 21-case unit test covers the matrix. 2. MEDIUM — POST /api/v1/email/accounts/[id]/sync had no owner check. Any user with email:view could enqueue an inbox-sync job for any accountId, which the worker would honour using the foreign user's decrypted IMAP credentials and advance the account's lastSyncAt (data-loss risk on the legitimate owner's next sync). Route now asserts account.userId === ctx.userId before enqueueing, matching the toggle/disconnect endpoints. 3. MEDIUM — addDocumentWatcher (and the wizard / upload watcher inserts) didn't validate the watcher's userId belonged to the document's port. notifyDocumentEvent then emitted a real-time socket toast + email containing the document title to the foreign user. New assertWatchersInPort helper verifies each candidate has a userPortRoles row for the port (super-admin bypass). 818 vitest tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 03:15:39 +02:00
import { isLocalOrPrivateHost } from '@/lib/validators/webhooks';
/**
* Resolve the webhook hostname and reject if any returned address is in a
* disallowed range. Defends against DNS rebinding where the validator-time
* resolution returned a public address but dispatch-time resolution
* returns a private one.
*/
async function resolveAndCheckHost(
rawUrl: string,
): Promise<{ ok: true } | { ok: false; reason: string }> {
if (isLocalOrPrivateHost(rawUrl)) {
return { ok: false, reason: 'webhook URL host blocked by static check' };
}
let host: string;
try {
host = new URL(rawUrl).hostname;
} catch {
return { ok: false, reason: 'invalid URL' };
}
try {
const addresses = await lookup(host, { all: true });
for (const a of addresses) {
// Reuse the validator's literal-address checks on each resolved IP.
if (isLocalOrPrivateHost(`https://${a.family === 6 ? `[${a.address}]` : a.address}`)) {
return { ok: false, reason: `resolved address ${a.address} is in a blocked range` };
}
}
} catch (err) {
return {
ok: false,
reason: `DNS resolution failed: ${err instanceof Error ? err.message : 'unknown'}`,
};
}
return { ok: true };
}
// ─── Job Payload ─────────────────────────────────────────────────────────────
interface WebhookDeliverPayload {
webhookId: string;
portId: string;
event: string;
deliveryId: string;
payload: Record<string, unknown>;
}
// ─── Worker ──────────────────────────────────────────────────────────────────
export const webhooksWorker = new Worker(
'webhooks',
async (job: Job) => {
logger.info({ jobId: job.id, jobName: job.name }, 'Processing webhooks job');
if (job.name !== 'deliver') {
logger.warn({ jobName: job.name }, 'Unknown webhooks job');
return;
}
sec: webhook SSRF guard, IMAP-sync owner check, watcher port membership Three findings from a fourth-pass review: 1. MEDIUM — webhook URL SSRF. The validator only enforced HTTPS+URL parse; it accepted private/loopback/link-local/.internal hosts. The delivery worker fetched arbitrary URLs and persisted up to 1KB of response body into webhook_deliveries.response_body, which is then surfaced via the deliveries listing endpoint — a port admin could register a webhook to an internal HTTPS endpoint, hit the test endpoint to force immediate dispatch, and read the response back. Validator now rejects RFC-1918/loopback/link-local/CGNAT/ULA IPs (v4 + v6) and .internal/.local/.localhost/.lan/.intranet/.corp suffixes; the worker re-resolves the hostname at dispatch time and blocks before fetch (DNS rebinding defense). 21-case unit test covers the matrix. 2. MEDIUM — POST /api/v1/email/accounts/[id]/sync had no owner check. Any user with email:view could enqueue an inbox-sync job for any accountId, which the worker would honour using the foreign user's decrypted IMAP credentials and advance the account's lastSyncAt (data-loss risk on the legitimate owner's next sync). Route now asserts account.userId === ctx.userId before enqueueing, matching the toggle/disconnect endpoints. 3. MEDIUM — addDocumentWatcher (and the wizard / upload watcher inserts) didn't validate the watcher's userId belonged to the document's port. notifyDocumentEvent then emitted a real-time socket toast + email containing the document title to the foreign user. New assertWatchersInPort helper verifies each candidate has a userPortRoles row for the port (super-admin bypass). 818 vitest tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 03:15:39 +02:00
const { webhookId, portId, event, deliveryId, payload } = job.data as WebhookDeliverPayload;
const { db } = await import('@/lib/db');
const { webhooks, webhookDeliveries } = await import('@/lib/db/schema/system');
const { userProfiles } = await import('@/lib/db/schema/users');
const { decrypt } = await import('@/lib/utils/encryption');
const { createNotification } = await import('@/lib/services/notifications.service');
const { eq, and } = await import('drizzle-orm');
// 1. Fetch webhook — skip if deleted
const webhook = await db.query.webhooks.findFirst({
where: eq(webhooks.id, webhookId),
});
if (!webhook) {
logger.info({ webhookId }, 'Webhook deleted — skipping delivery');
sec: webhook SSRF guard, IMAP-sync owner check, watcher port membership Three findings from a fourth-pass review: 1. MEDIUM — webhook URL SSRF. The validator only enforced HTTPS+URL parse; it accepted private/loopback/link-local/.internal hosts. The delivery worker fetched arbitrary URLs and persisted up to 1KB of response body into webhook_deliveries.response_body, which is then surfaced via the deliveries listing endpoint — a port admin could register a webhook to an internal HTTPS endpoint, hit the test endpoint to force immediate dispatch, and read the response back. Validator now rejects RFC-1918/loopback/link-local/CGNAT/ULA IPs (v4 + v6) and .internal/.local/.localhost/.lan/.intranet/.corp suffixes; the worker re-resolves the hostname at dispatch time and blocks before fetch (DNS rebinding defense). 21-case unit test covers the matrix. 2. MEDIUM — POST /api/v1/email/accounts/[id]/sync had no owner check. Any user with email:view could enqueue an inbox-sync job for any accountId, which the worker would honour using the foreign user's decrypted IMAP credentials and advance the account's lastSyncAt (data-loss risk on the legitimate owner's next sync). Route now asserts account.userId === ctx.userId before enqueueing, matching the toggle/disconnect endpoints. 3. MEDIUM — addDocumentWatcher (and the wizard / upload watcher inserts) didn't validate the watcher's userId belonged to the document's port. notifyDocumentEvent then emitted a real-time socket toast + email containing the document title to the foreign user. New assertWatchersInPort helper verifies each candidate has a userPortRoles row for the port (super-admin bypass). 818 vitest tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 03:15:39 +02:00
await db.delete(webhookDeliveries).where(eq(webhookDeliveries.id, deliveryId));
return;
}
// 2. Decrypt secret
let secret: string;
try {
secret = webhook.secret ? decrypt(webhook.secret) : '';
} catch (err) {
logger.error({ webhookId, err }, 'Failed to decrypt webhook secret');
throw err; // Let BullMQ retry
}
// 3. Build final payload
const finalPayload = {
id: deliveryId,
event,
timestamp: new Date().toISOString(),
port_id: portId,
data: payload,
};
const bodyString = JSON.stringify(finalPayload);
// 4. Sign with HMAC-SHA256
const signature = secret
? `sha256=${createHmac('sha256', secret).update(bodyString).digest('hex')}`
: '';
const attempt = (job.attemptsMade ?? 0) + 1;
// 5. POST to webhook URL with 10s timeout
let responseStatus: number | null = null;
let responseBody: string | null = null;
let success = false;
sec: webhook SSRF guard, IMAP-sync owner check, watcher port membership Three findings from a fourth-pass review: 1. MEDIUM — webhook URL SSRF. The validator only enforced HTTPS+URL parse; it accepted private/loopback/link-local/.internal hosts. The delivery worker fetched arbitrary URLs and persisted up to 1KB of response body into webhook_deliveries.response_body, which is then surfaced via the deliveries listing endpoint — a port admin could register a webhook to an internal HTTPS endpoint, hit the test endpoint to force immediate dispatch, and read the response back. Validator now rejects RFC-1918/loopback/link-local/CGNAT/ULA IPs (v4 + v6) and .internal/.local/.localhost/.lan/.intranet/.corp suffixes; the worker re-resolves the hostname at dispatch time and blocks before fetch (DNS rebinding defense). 21-case unit test covers the matrix. 2. MEDIUM — POST /api/v1/email/accounts/[id]/sync had no owner check. Any user with email:view could enqueue an inbox-sync job for any accountId, which the worker would honour using the foreign user's decrypted IMAP credentials and advance the account's lastSyncAt (data-loss risk on the legitimate owner's next sync). Route now asserts account.userId === ctx.userId before enqueueing, matching the toggle/disconnect endpoints. 3. MEDIUM — addDocumentWatcher (and the wizard / upload watcher inserts) didn't validate the watcher's userId belonged to the document's port. notifyDocumentEvent then emitted a real-time socket toast + email containing the document title to the foreign user. New assertWatchersInPort helper verifies each candidate has a userPortRoles row for the port (super-admin bypass). 818 vitest tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 03:15:39 +02:00
// SSRF gate: re-resolve the hostname at dispatch time and reject if it
// points anywhere internal. The validator already filtered literal
// hostnames at create/update time, but DNS rebinding could swap the
// answer between then and now.
const hostCheck = await resolveAndCheckHost(webhook.url);
if (!hostCheck.ok) {
logger.warn(
{ webhookId, deliveryId, url: webhook.url, reason: hostCheck.reason },
'Webhook dispatch blocked by SSRF guard',
);
// Persist the failure so the deliveries listing reflects it.
const { db: dbInner } = await import('@/lib/db');
const { webhookDeliveries } = await import('@/lib/db/schema/system');
const { eq } = await import('drizzle-orm');
await dbInner
.update(webhookDeliveries)
.set({
status: 'dead_letter',
responseStatus: null,
responseBody: `Blocked: ${hostCheck.reason}`,
deliveredAt: new Date(),
})
.where(eq(webhookDeliveries.id, deliveryId));
return;
}
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 10_000);
const response = await fetch(webhook.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'User-Agent': 'PortNimara-Webhook/1.0',
'X-Webhook-Id': webhookId,
'X-Webhook-Event': event,
'X-Webhook-Signature': signature,
'X-Webhook-Delivery': deliveryId,
},
body: bodyString,
signal: controller.signal,
});
clearTimeout(timeoutId);
responseStatus = response.status;
// Read up to 1KB of response body
const rawBody = await response.text();
responseBody = rawBody.slice(0, 1024);
success = response.status >= 200 && response.status < 300;
} catch (err) {
// Network error or timeout
logger.warn({ webhookId, deliveryId, err }, 'Webhook delivery network error');
responseStatus = null;
responseBody = err instanceof Error ? err.message.slice(0, 1024) : String(err).slice(0, 1024);
success = false;
}
const maxAttempts = QUEUE_CONFIGS.webhooks.maxAttempts;
const isFinalAttempt = attempt >= maxAttempts;
if (success) {
// 6a. Record success
await db
.update(webhookDeliveries)
.set({
status: 'success',
responseStatus,
responseBody,
attempt,
deliveredAt: new Date(),
})
.where(eq(webhookDeliveries.id, deliveryId));
logger.info({ webhookId, deliveryId, event }, 'Webhook delivered successfully');
} else if (!success && isFinalAttempt) {
// 6b. Final failure → dead_letter + system alert
await db
.update(webhookDeliveries)
.set({
status: 'dead_letter',
responseStatus,
responseBody,
attempt,
})
.where(eq(webhookDeliveries.id, deliveryId));
logger.error(
{ webhookId, deliveryId, event, attempt },
'Webhook delivery permanently failed — dead_letter',
);
// Notify all super admins
try {
const superAdmins = await db
.select({ userId: userProfiles.userId })
.from(userProfiles)
.where(and(eq(userProfiles.isSuperAdmin, true), eq(userProfiles.isActive, true)));
for (const admin of superAdmins) {
void createNotification({
portId,
userId: admin.userId,
type: 'system_alert',
title: 'Webhook delivery failed permanently',
description: `Webhook "${webhook.name}" failed to deliver event "${event}" after ${maxAttempts} attempts.`,
link: `/admin/webhooks/${webhookId}`,
entityType: 'webhook',
entityId: webhookId,
dedupeKey: `webhook:dead_letter:${deliveryId}`,
cooldownMs: 0,
});
}
} catch (notifyErr) {
logger.error({ notifyErr }, 'Failed to send dead_letter notification');
}
// Throw to let BullMQ mark job as failed (it won't retry since it's the final attempt)
throw new Error(
`Webhook delivery failed after ${attempt} attempts. Status: ${responseStatus ?? 'network error'}`,
);
} else {
// 6c. Non-final failure → update record then throw to trigger retry
await db
.update(webhookDeliveries)
.set({
status: 'failed',
responseStatus,
responseBody,
attempt,
})
.where(eq(webhookDeliveries.id, deliveryId));
throw new Error(
`Webhook delivery attempt ${attempt} failed. Status: ${responseStatus ?? 'network error'}. Retrying...`,
);
}
},
{
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
concurrency: QUEUE_CONFIGS.webhooks.concurrency,
},
);
webhooksWorker.on('failed', (job, err) => {
logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Webhooks job failed');
});