import { randomBytes } from 'node:crypto'; import { and, desc, eq, count } from 'drizzle-orm'; import { db } from '@/lib/db'; import { webhooks, webhookDeliveries } from '@/lib/db/schema/system'; import { createAuditLog, type AuditMeta } from '@/lib/audit'; import { encrypt, decrypt } from '@/lib/utils/encryption'; import { NotFoundError } from '@/lib/errors'; import { getQueue } from '@/lib/queue'; import type { CreateWebhookInput, UpdateWebhookInput, ListDeliveriesInput, } from '@/lib/validators/webhooks'; import type { WebhookEvent } from '@/lib/services/webhook-event-map'; // ─── Types ──────────────────────────────────────────────────────────────────── // ─── Helpers ───────────────────────────────────────────────────────────────── /** Generates a 32-byte hex secret for signing webhook payloads. */ function generateSecret(): string { return randomBytes(32).toString('hex'); } /** * Returns a masked representation of the plaintext secret. * Shows the first 5 and last 3 characters: `wh_ab...xyz` */ function maskSecret(plaintext: string): string { if (plaintext.length < 10) return '***'; return `${plaintext.slice(0, 5)}...${plaintext.slice(-3)}`; } // ─── Create ─────────────────────────────────────────────────────────────────── export async function createWebhook( portId: string, userId: string, data: CreateWebhookInput, meta: AuditMeta, ) { const plaintextSecret = generateSecret(); const encryptedSecret = encrypt(plaintextSecret); const [webhook] = await db .insert(webhooks) .values({ portId, name: data.name, url: data.url, secret: encryptedSecret, events: data.events, isActive: data.isActive, createdBy: userId, }) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'create', entityType: 'webhook', entityId: webhook!.id, newValue: { name: data.name, url: data.url, events: data.events }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); // Return with plaintext secret - shown ONCE only on creation return { ...webhook!, secret: plaintextSecret, secretMasked: maskSecret(plaintextSecret), }; } // ─── List ───────────────────────────────────────────────────────────────────── export async function listWebhooks(portId: string) { const rows = await db .select() .from(webhooks) .where(eq(webhooks.portId, portId)) .orderBy(desc(webhooks.createdAt)); return rows.map((w) => { let secretMasked = '***'; if (w.secret) { try { const plaintext = decrypt(w.secret); secretMasked = maskSecret(plaintext); } catch { secretMasked = '***'; } } return { ...w, secret: undefined, secretMasked }; }); } // ─── Get Single ─────────────────────────────────────────────────────────────── export async function getWebhook(portId: string, webhookId: string) { const webhook = await db.query.webhooks.findFirst({ where: eq(webhooks.id, webhookId), }); if (!webhook || webhook.portId !== portId) { throw new NotFoundError('Webhook'); } let secretMasked = '***'; if (webhook.secret) { try { const plaintext = decrypt(webhook.secret); secretMasked = maskSecret(plaintext); } catch { secretMasked = '***'; } } return { ...webhook, secret: undefined, secretMasked }; } // ─── Update ─────────────────────────────────────────────────────────────────── export async function updateWebhook( portId: string, webhookId: string, data: UpdateWebhookInput, meta: AuditMeta, ) { const existing = await db.query.webhooks.findFirst({ where: eq(webhooks.id, webhookId), }); if (!existing || existing.portId !== portId) { throw new NotFoundError('Webhook'); } const [updated] = await db .update(webhooks) .set({ ...data, updatedAt: new Date() }) .where(and(eq(webhooks.id, webhookId), eq(webhooks.portId, portId))) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'webhook', entityId: webhookId, oldValue: { name: existing.name, url: existing.url, events: existing.events, isActive: existing.isActive, }, newValue: data as Record, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); return { ...updated!, secret: undefined }; } // ─── Delete ─────────────────────────────────────────────────────────────────── export async function deleteWebhook(portId: string, webhookId: string, meta: AuditMeta) { const existing = await db.query.webhooks.findFirst({ where: eq(webhooks.id, webhookId), }); if (!existing || existing.portId !== portId) { throw new NotFoundError('Webhook'); } // CASCADE deletes webhook_deliveries await db.delete(webhooks).where(and(eq(webhooks.id, webhookId), eq(webhooks.portId, portId))); void createAuditLog({ userId: meta.userId, portId, action: 'delete', entityType: 'webhook', entityId: webhookId, oldValue: { name: existing.name, url: existing.url }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); } // ─── Regenerate Secret ──────────────────────────────────────────────────────── export async function regenerateSecret(portId: string, webhookId: string, meta: AuditMeta) { const existing = await db.query.webhooks.findFirst({ where: eq(webhooks.id, webhookId), }); if (!existing || existing.portId !== portId) { throw new NotFoundError('Webhook'); } const plaintextSecret = generateSecret(); const encryptedSecret = encrypt(plaintextSecret); await db .update(webhooks) .set({ secret: encryptedSecret, updatedAt: new Date() }) .where(and(eq(webhooks.id, webhookId), eq(webhooks.portId, portId))); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'webhook', entityId: webhookId, metadata: { type: 'secret_regenerated' }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); // Return new plaintext secret - shown ONCE return { webhookId, secret: plaintextSecret, secretMasked: maskSecret(plaintextSecret), }; } // ─── List Deliveries ───────────────────────────────────────────────────────── export async function listDeliveries( portId: string, webhookId: string, query: ListDeliveriesInput, ) { // Verify webhook belongs to port const webhook = await db.query.webhooks.findFirst({ where: eq(webhooks.id, webhookId), }); if (!webhook || webhook.portId !== portId) { throw new NotFoundError('Webhook'); } const { page, limit, status } = query; const offset = (page - 1) * limit; const filters = [eq(webhookDeliveries.webhookId, webhookId)]; if (status) { filters.push(eq(webhookDeliveries.status, status)); } const [countRow] = await db .select({ total: count() }) .from(webhookDeliveries) .where(and(...filters)); const total = countRow?.total ?? 0; const data = await db .select() .from(webhookDeliveries) .where(and(...filters)) .orderBy(desc(webhookDeliveries.createdAt)) .limit(limit) .offset(offset); return { data, total }; } // ─── Redeliver a previously failed / dead-letter delivery ─────────────────── /** * Clones a failed or dead-letter delivery into a fresh `pending` row and * re-enqueues it. Multi-tenant safe: looks up the source delivery via * its webhook → port. Idempotency: each redeliver creates a new row so * the original record (and its failure response body) is preserved for * audit. The new delivery's payload includes a `retried_from` marker * that downstream receivers can use to recognise replays. */ export async function redeliverWebhookDelivery( portId: string, webhookId: string, deliveryId: string, meta: AuditMeta, ) { const webhook = await db.query.webhooks.findFirst({ where: eq(webhooks.id, webhookId), }); if (!webhook || webhook.portId !== portId) { throw new NotFoundError('Webhook'); } if (!webhook.isActive) { throw new NotFoundError('Webhook is inactive'); } const [source] = await db .select() .from(webhookDeliveries) .where(and(eq(webhookDeliveries.id, deliveryId), eq(webhookDeliveries.webhookId, webhookId))) .limit(1); if (!source) throw new NotFoundError('Delivery'); const replayPayload = { ...(source.payload as Record), retried_from: deliveryId, retried_at: new Date().toISOString(), }; const [next] = await db .insert(webhookDeliveries) .values({ webhookId, eventType: source.eventType, payload: replayPayload, status: 'pending', }) .returning(); const queue = getQueue('webhooks'); await queue.add('deliver', { webhookId, portId, event: source.eventType, deliveryId: next!.id, payload: replayPayload, }); void createAuditLog({ userId: meta.userId, portId, action: 'send', entityType: 'webhook_delivery', entityId: next!.id, metadata: { redeliveredFrom: deliveryId, originalStatus: source.status }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); return { deliveryId: next!.id, status: 'queued' }; } // ─── Send Test Webhook ──────────────────────────────────────────────────────── export async function sendTestWebhook(portId: string, webhookId: string, eventType: WebhookEvent) { const webhook = await db.query.webhooks.findFirst({ where: eq(webhooks.id, webhookId), }); if (!webhook || webhook.portId !== portId) { throw new NotFoundError('Webhook'); } // Create a pending delivery record const [delivery] = await db .insert(webhookDeliveries) .values({ webhookId, eventType, payload: { test: true, event: eventType, port_id: portId, data: { message: 'This is a test webhook delivery' }, }, status: 'pending', }) .returning(); // Enqueue the job const queue = getQueue('webhooks'); await queue.add('deliver', { webhookId, portId, event: eventType, deliveryId: delivery!.id, payload: delivery!.payload, }); return { deliveryId: delivery!.id, status: 'queued' }; }