import { Worker, type Job } from 'bullmq'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; import { QUEUE_CONFIGS } from '@/lib/queue'; export const notificationsWorker = new Worker( 'notifications', async (job: Job) => { logger.info({ jobId: job.id, jobName: job.name }, 'Processing notifications job'); switch (job.name) { case 'invoice-overdue-check': { const { detectOverdue } = await import('@/lib/services/invoices'); const { db } = await import('@/lib/db'); const { ports } = await import('@/lib/db/schema/ports'); const allPorts = await db.select({ id: ports.id }).from(ports); for (const port of allPorts) { try { await detectOverdue(port.id); } catch (err) { logger.error({ err, portId: port.id }, 'Overdue detection failed'); } } break; } case 'reminder-check': { const { processDocumentReminders } = await import( '@/jobs/processors/document-reminder' ); await processDocumentReminders(); break; } case 'send-notification-email': { const { notificationId } = job.data as { notificationId: string }; const { db } = await import('@/lib/db'); const { notifications } = await import('@/lib/db/schema/operations'); const { user } = await import('@/lib/db/schema/users'); const { eq } = await import('drizzle-orm'); const { sendEmail } = await import('@/lib/email/index'); const [notif] = await db .select() .from(notifications) .where(eq(notifications.id, notificationId)) .limit(1); if (!notif) break; // Get user email from the Better Auth user table const [authUser] = await db .select({ email: user.email, name: user.name }) .from(user) .where(eq(user.id, notif.userId)) .limit(1); if (!authUser?.email) break; await sendEmail( authUser.email, `[Port Nimara] ${notif.title}`, `
${notif.description ?? notif.title}
${ notif.link ? `` : '' }`, ); await db .update(notifications) .set({ emailSent: true }) .where(eq(notifications.id, notificationId)); break; } default: logger.warn({ jobName: job.name }, 'Unknown notifications job'); } }, { connection: { url: process.env.REDIS_URL! } as ConnectionOptions, concurrency: QUEUE_CONFIGS.notifications.concurrency, }, ); notificationsWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Notifications job failed'); });