import { Worker, type Job } from 'bullmq'; import { createHmac } from 'node:crypto'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; import { QUEUE_CONFIGS } from '@/lib/queue'; // ─── Job Payload ───────────────────────────────────────────────────────────── interface WebhookDeliverPayload { webhookId: string; portId: string; event: string; deliveryId: string; payload: Record; } // ─── Worker ────────────────────────────────────────────────────────────────── export const webhooksWorker = new Worker( 'webhooks', async (job: Job) => { logger.info({ jobId: job.id, jobName: job.name }, 'Processing webhooks job'); if (job.name !== 'deliver') { logger.warn({ jobName: job.name }, 'Unknown webhooks job'); return; } const { webhookId, portId, event, deliveryId, payload } = job.data as WebhookDeliverPayload; const { db } = await import('@/lib/db'); const { webhooks, webhookDeliveries } = await import('@/lib/db/schema/system'); const { userProfiles } = await import('@/lib/db/schema/users'); const { decrypt } = await import('@/lib/utils/encryption'); const { createNotification } = await import('@/lib/services/notifications.service'); const { eq, and } = await import('drizzle-orm'); // 1. Fetch webhook — skip if deleted const webhook = await db.query.webhooks.findFirst({ where: eq(webhooks.id, webhookId), }); if (!webhook) { logger.info({ webhookId }, 'Webhook deleted — skipping delivery'); await db .delete(webhookDeliveries) .where(eq(webhookDeliveries.id, deliveryId)); return; } // 2. Decrypt secret let secret: string; try { secret = webhook.secret ? decrypt(webhook.secret) : ''; } catch (err) { logger.error({ webhookId, err }, 'Failed to decrypt webhook secret'); throw err; // Let BullMQ retry } // 3. Build final payload const finalPayload = { id: deliveryId, event, timestamp: new Date().toISOString(), port_id: portId, data: payload, }; const bodyString = JSON.stringify(finalPayload); // 4. Sign with HMAC-SHA256 const signature = secret ? `sha256=${createHmac('sha256', secret).update(bodyString).digest('hex')}` : ''; const attempt = (job.attemptsMade ?? 0) + 1; // 5. POST to webhook URL with 10s timeout let responseStatus: number | null = null; let responseBody: string | null = null; let success = false; try { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), 10_000); const response = await fetch(webhook.url, { method: 'POST', headers: { 'Content-Type': 'application/json', 'User-Agent': 'PortNimara-Webhook/1.0', 'X-Webhook-Id': webhookId, 'X-Webhook-Event': event, 'X-Webhook-Signature': signature, 'X-Webhook-Delivery': deliveryId, }, body: bodyString, signal: controller.signal, }); clearTimeout(timeoutId); responseStatus = response.status; // Read up to 1KB of response body const rawBody = await response.text(); responseBody = rawBody.slice(0, 1024); success = response.status >= 200 && response.status < 300; } catch (err) { // Network error or timeout logger.warn({ webhookId, deliveryId, err }, 'Webhook delivery network error'); responseStatus = null; responseBody = err instanceof Error ? err.message.slice(0, 1024) : String(err).slice(0, 1024); success = false; } const maxAttempts = QUEUE_CONFIGS.webhooks.maxAttempts; const isFinalAttempt = attempt >= maxAttempts; if (success) { // 6a. Record success await db .update(webhookDeliveries) .set({ status: 'success', responseStatus, responseBody, attempt, deliveredAt: new Date(), }) .where(eq(webhookDeliveries.id, deliveryId)); logger.info({ webhookId, deliveryId, event }, 'Webhook delivered successfully'); } else if (!success && isFinalAttempt) { // 6b. Final failure → dead_letter + system alert await db .update(webhookDeliveries) .set({ status: 'dead_letter', responseStatus, responseBody, attempt, }) .where(eq(webhookDeliveries.id, deliveryId)); logger.error( { webhookId, deliveryId, event, attempt }, 'Webhook delivery permanently failed — dead_letter', ); // Notify all super admins try { const superAdmins = await db .select({ userId: userProfiles.userId }) .from(userProfiles) .where(and(eq(userProfiles.isSuperAdmin, true), eq(userProfiles.isActive, true))); for (const admin of superAdmins) { void createNotification({ portId, userId: admin.userId, type: 'system_alert', title: 'Webhook delivery failed permanently', description: `Webhook "${webhook.name}" failed to deliver event "${event}" after ${maxAttempts} attempts.`, link: `/admin/webhooks/${webhookId}`, entityType: 'webhook', entityId: webhookId, dedupeKey: `webhook:dead_letter:${deliveryId}`, cooldownMs: 0, }); } } catch (notifyErr) { logger.error({ notifyErr }, 'Failed to send dead_letter notification'); } // Throw to let BullMQ mark job as failed (it won't retry since it's the final attempt) throw new Error( `Webhook delivery failed after ${attempt} attempts. Status: ${responseStatus ?? 'network error'}`, ); } else { // 6c. Non-final failure → update record then throw to trigger retry await db .update(webhookDeliveries) .set({ status: 'failed', responseStatus, responseBody, attempt, }) .where(eq(webhookDeliveries.id, deliveryId)); throw new Error( `Webhook delivery attempt ${attempt} failed. Status: ${responseStatus ?? 'network error'}. Retrying...`, ); } }, { connection: { url: process.env.REDIS_URL! } as ConnectionOptions, concurrency: QUEUE_CONFIGS.webhooks.concurrency, }, ); webhooksWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Webhooks job failed'); });