import { Worker, type Job } from 'bullmq'; import { env } from '@/lib/env'; import { createHmac } from 'node:crypto'; import { lookup } from 'node:dns/promises'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; import { QUEUE_CONFIGS } from '@/lib/queue'; import { isLocalOrPrivateHost } from '@/lib/validators/webhooks'; /** * Resolve the webhook hostname and reject if any returned address is in a * disallowed range. Defends against DNS rebinding where the validator-time * resolution returned a public address but dispatch-time resolution * returns a private one. */ async function resolveAndCheckHost( rawUrl: string, ): Promise<{ ok: true } | { ok: false; reason: string }> { if (isLocalOrPrivateHost(rawUrl)) { return { ok: false, reason: 'webhook URL host blocked by static check' }; } let host: string; try { host = new URL(rawUrl).hostname; } catch { return { ok: false, reason: 'invalid URL' }; } try { const addresses = await lookup(host, { all: true }); for (const a of addresses) { // Reuse the validator's literal-address checks on each resolved IP. if (isLocalOrPrivateHost(`https://${a.family === 6 ? `[${a.address}]` : a.address}`)) { return { ok: false, reason: `resolved address ${a.address} is in a blocked range` }; } } } catch (err) { return { ok: false, reason: `DNS resolution failed: ${err instanceof Error ? err.message : 'unknown'}`, }; } return { ok: true }; } // ─── Job Payload ───────────────────────────────────────────────────────────── interface WebhookDeliverPayload { webhookId: string; portId: string; event: string; deliveryId: string; payload: Record; } // ─── Worker ────────────────────────────────────────────────────────────────── export const webhooksWorker = new Worker( 'webhooks', async (job: Job) => { logger.info({ jobId: job.id, jobName: job.name }, 'Processing webhooks job'); if (job.name !== 'deliver') { logger.warn({ jobName: job.name }, 'Unknown webhooks job'); return; } const { webhookId, portId, event, deliveryId, payload } = job.data as WebhookDeliverPayload; const { db } = await import('@/lib/db'); const { webhooks, webhookDeliveries } = await import('@/lib/db/schema/system'); const { userProfiles } = await import('@/lib/db/schema/users'); const { decrypt } = await import('@/lib/utils/encryption'); const { createNotification } = await import('@/lib/services/notifications.service'); const { eq, and } = await import('drizzle-orm'); // 1. Fetch webhook - skip if deleted const webhook = await db.query.webhooks.findFirst({ where: eq(webhooks.id, webhookId), }); if (!webhook) { logger.info({ webhookId }, 'Webhook deleted - skipping delivery'); await db.delete(webhookDeliveries).where(eq(webhookDeliveries.id, deliveryId)); return; } // Safety net: when EMAIL_REDIRECT_TO is set (dev / staging / migration // dry-run), short-circuit webhook delivery so we don't accidentally // ping a user-configured production endpoint with synthetic events. // Records the delivery as `dead_letter` with a clear reason so the // attempt is still visible in the deliveries listing. if (process.env.EMAIL_REDIRECT_TO) { logger.info( { webhookId, deliveryId, url: webhook.url }, 'Webhook delivery skipped (EMAIL_REDIRECT_TO is set - outbound comms are paused)', ); await db .update(webhookDeliveries) .set({ status: 'dead_letter', responseStatus: null, responseBody: 'Skipped: EMAIL_REDIRECT_TO is set, outbound comms paused.', deliveredAt: new Date(), }) .where(eq(webhookDeliveries.id, deliveryId)); return; } // 2. Decrypt secret let secret: string; try { secret = webhook.secret ? decrypt(webhook.secret) : ''; } catch (err) { logger.error({ webhookId, err }, 'Failed to decrypt webhook secret'); throw err; // Let BullMQ retry } // 3. Build final payload const finalPayload = { id: deliveryId, event, timestamp: new Date().toISOString(), port_id: portId, data: payload, }; const bodyString = JSON.stringify(finalPayload); // 4. Sign with HMAC-SHA256 const signature = secret ? `sha256=${createHmac('sha256', secret).update(bodyString).digest('hex')}` : ''; const attempt = (job.attemptsMade ?? 0) + 1; // 5. POST to webhook URL with 10s timeout let responseStatus: number | null = null; let responseBody: string | null = null; let success = false; // SSRF gate: re-resolve the hostname at dispatch time and reject if it // points anywhere internal. The validator already filtered literal // hostnames at create/update time, but DNS rebinding could swap the // answer between then and now. const hostCheck = await resolveAndCheckHost(webhook.url); if (!hostCheck.ok) { logger.warn( { webhookId, deliveryId, url: webhook.url, reason: hostCheck.reason }, 'Webhook dispatch blocked by SSRF guard', ); // Persist the failure so the deliveries listing reflects it. const { db: dbInner } = await import('@/lib/db'); const { webhookDeliveries } = await import('@/lib/db/schema/system'); const { eq } = await import('drizzle-orm'); await dbInner .update(webhookDeliveries) .set({ status: 'dead_letter', responseStatus: null, responseBody: `Blocked: ${hostCheck.reason}`, deliveredAt: new Date(), }) .where(eq(webhookDeliveries.id, deliveryId)); return; } try { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), 10_000); const response = await fetch(webhook.url, { method: 'POST', headers: { 'Content-Type': 'application/json', 'User-Agent': 'PortNimara-Webhook/1.0', 'X-Webhook-Id': webhookId, 'X-Webhook-Event': event, 'X-Webhook-Signature': signature, 'X-Webhook-Delivery': deliveryId, }, body: bodyString, signal: controller.signal, }); clearTimeout(timeoutId); responseStatus = response.status; // Read up to 1KB of response body const rawBody = await response.text(); responseBody = rawBody.slice(0, 1024); success = response.status >= 200 && response.status < 300; } catch (err) { // Network error or timeout logger.warn({ webhookId, deliveryId, err }, 'Webhook delivery network error'); responseStatus = null; responseBody = err instanceof Error ? err.message.slice(0, 1024) : String(err).slice(0, 1024); success = false; } const maxAttempts = QUEUE_CONFIGS.webhooks.maxAttempts; const isFinalAttempt = attempt >= maxAttempts; const { createAuditLog } = await import('@/lib/audit'); if (success) { // 6a. Record success await db .update(webhookDeliveries) .set({ status: 'success', responseStatus, responseBody, attempt, deliveredAt: new Date(), }) .where(eq(webhookDeliveries.id, deliveryId)); logger.info({ webhookId, deliveryId, event }, 'Webhook delivered successfully'); void createAuditLog({ userId: null, portId, action: 'webhook_delivered', entityType: 'webhook_delivery', entityId: deliveryId, metadata: { webhookId, event, responseStatus, attempt }, source: 'webhook', severity: 'info', }); } else if (!success && isFinalAttempt) { // 6b. Final failure → dead_letter + system alert await db .update(webhookDeliveries) .set({ status: 'dead_letter', responseStatus, responseBody, attempt, }) .where(eq(webhookDeliveries.id, deliveryId)); logger.error( { webhookId, deliveryId, event, attempt }, 'Webhook delivery permanently failed - dead_letter', ); void createAuditLog({ userId: null, portId, action: 'webhook_dead_letter', entityType: 'webhook_delivery', entityId: deliveryId, metadata: { webhookId, event, responseStatus, attempt, responseBody }, source: 'webhook', severity: 'error', }); // Notify all super admins try { const superAdmins = await db .select({ userId: userProfiles.userId }) .from(userProfiles) .where(and(eq(userProfiles.isSuperAdmin, true), eq(userProfiles.isActive, true))); for (const admin of superAdmins) { void createNotification({ portId, userId: admin.userId, type: 'system_alert', title: 'Webhook delivery failed permanently', description: `Webhook "${webhook.name}" failed to deliver event "${event}" after ${maxAttempts} attempts.`, link: `/admin/webhooks/${webhookId}`, entityType: 'webhook', entityId: webhookId, dedupeKey: `webhook:dead_letter:${deliveryId}`, cooldownMs: 0, }); } } catch (notifyErr) { logger.error({ notifyErr }, 'Failed to send dead_letter notification'); } // Throw to let BullMQ mark job as failed (it won't retry since it's the final attempt) throw new Error( `Webhook delivery failed after ${attempt} attempts. Status: ${responseStatus ?? 'network error'}`, ); } else { // 6c. Non-final failure → update record then throw to trigger retry await db .update(webhookDeliveries) .set({ status: 'failed', responseStatus, responseBody, attempt, }) .where(eq(webhookDeliveries.id, deliveryId)); void createAuditLog({ userId: null, portId, action: 'webhook_failed', entityType: 'webhook_delivery', entityId: deliveryId, metadata: { webhookId, event, responseStatus, attempt }, source: 'webhook', severity: 'warning', }); throw new Error( `Webhook delivery attempt ${attempt} failed. Status: ${responseStatus ?? 'network error'}. Retrying...`, ); } }, { connection: { url: env.REDIS_URL } as ConnectionOptions, concurrency: QUEUE_CONFIGS.webhooks.concurrency, }, ); webhooksWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Webhooks job failed'); });