import { and, eq, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { webhooks, webhookDeliveries } from '@/lib/db/schema/system'; import { getQueue } from '@/lib/queue'; import { logger } from '@/lib/logger'; import { INTERNAL_TO_WEBHOOK_MAP } from '@/lib/services/webhook-event-map'; /** * Translates an internal socket event to the outbound webhook event name, * queries all active webhooks for the given port that are subscribed to that * event, and enqueues a BullMQ delivery job for each one. * * This function is fire-and-forget — callers should use `void dispatchWebhookEvent(...)`. */ export async function dispatchWebhookEvent( portId: string, internalEvent: string, payload: Record, ): Promise { const webhookEvent = INTERNAL_TO_WEBHOOK_MAP[internalEvent]; if (!webhookEvent) { // No mapping for this event — skip silently return; } try { // Query active webhooks for this port that subscribe to this event const matchingWebhooks = await db .select({ id: webhooks.id }) .from(webhooks) .where( and( eq(webhooks.portId, portId), eq(webhooks.isActive, true), // Check if events array contains the webhook event sql`${webhooks.events} @> ARRAY[${webhookEvent}]::text[]`, ), ); if (matchingWebhooks.length === 0) { return; } const queue = getQueue('webhooks'); for (const webhook of matchingWebhooks) { // Create a pending delivery record before enqueueing const [delivery] = await db .insert(webhookDeliveries) .values({ webhookId: webhook.id, eventType: webhookEvent, payload, status: 'pending', }) .returning({ id: webhookDeliveries.id }); await queue.add('deliver', { webhookId: webhook.id, portId, event: webhookEvent, deliveryId: delivery!.id, payload, }); } } catch (err) { // Never block callers — log and swallow logger.error( { portId, internalEvent, webhookEvent, err }, 'Failed to dispatch webhook event', ); } }