Files
pn-new-crm/src/lib/queue/workers/notifications.ts

90 lines
3.2 KiB
TypeScript
Raw Normal View History

import { Worker, type Job } from 'bullmq';
import type { ConnectionOptions } from 'bullmq';
import { logger } from '@/lib/logger';
import { QUEUE_CONFIGS } from '@/lib/queue';
export const notificationsWorker = new Worker(
'notifications',
async (job: Job) => {
logger.info({ jobId: job.id, jobName: job.name }, 'Processing notifications job');
switch (job.name) {
case 'invoice-overdue-check': {
const { detectOverdue } = await import('@/lib/services/invoices');
const { db } = await import('@/lib/db');
const { ports } = await import('@/lib/db/schema/ports');
const allPorts = await db.select({ id: ports.id }).from(ports);
for (const port of allPorts) {
try {
await detectOverdue(port.id);
} catch (err) {
logger.error({ err, portId: port.id }, 'Overdue detection failed');
}
}
break;
}
case 'reminder-check': {
// Document signing reminders (EOI)
const { processDocumentReminders } = await import('@/jobs/processors/document-reminder');
await processDocumentReminders();
// CRM follow-up reminders (BR-060)
const { processFollowUpReminders } = await import('@/lib/services/reminders.service');
await processFollowUpReminders();
break;
}
case 'reminder-overdue-check': {
const { processOverdueReminders } = await import('@/lib/services/reminders.service');
await processOverdueReminders();
break;
}
case 'send-notification-email': {
const { notificationId } = job.data as { notificationId: string };
const { db } = await import('@/lib/db');
const { notifications } = await import('@/lib/db/schema/operations');
const { user } = await import('@/lib/db/schema/users');
const { eq } = await import('drizzle-orm');
const { sendEmail } = await import('@/lib/email/index');
const [notif] = await db
.select()
.from(notifications)
.where(eq(notifications.id, notificationId))
.limit(1);
if (!notif) break;
// Get user email from the Better Auth user table
const [authUser] = await db
.select({ email: user.email, name: user.name })
.from(user)
.where(eq(user.id, notif.userId))
.limit(1);
if (!authUser?.email) break;
await sendEmail(
authUser.email,
`[Port Nimara] ${notif.title}`,
`<p>${notif.description ?? notif.title}</p>${
notif.link ? `<p><a href="${process.env.APP_URL}${notif.link}">View in CRM</a></p>` : ''
}`,
);
await db
.update(notifications)
.set({ emailSent: true })
.where(eq(notifications.id, notificationId));
break;
}
default:
logger.warn({ jobName: job.name }, 'Unknown notifications job');
}
},
{
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
concurrency: QUEUE_CONFIGS.notifications.concurrency,
},
);
notificationsWorker.on('failed', (job, err) => {
logger.error({ jobId: job?.id, jobName: job?.name, err }, 'Notifications job failed');
});