import { and, count, eq, gt, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { documents, documentWatchers } from '@/lib/db/schema/documents'; import { notifications } from '@/lib/db/schema/operations'; import { userNotificationPreferences } from '@/lib/db/schema/system'; import { emitToRoom } from '@/lib/socket/server'; import { getQueue } from '@/lib/queue'; import { NotFoundError } from '@/lib/errors'; import { logger } from '@/lib/logger'; import type { ListNotificationsInput, UpdatePreferencesInput, } from '@/lib/validators/notifications'; // ─── Types ──────────────────────────────────────────────────────────────────── export interface CreateNotificationParams { portId: string; userId: string; type: string; title: string; description?: string; link?: string; entityType?: string; entityId?: string; dedupeKey?: string; cooldownMs?: number; } // ─── Helpers ───────────────────────────────────────────────────────────────── async function getUnreadCountValue(userId: string, portId: string): Promise { const [row] = await db .select({ count: count() }) .from(notifications) .where( and( eq(notifications.userId, userId), eq(notifications.portId, portId), eq(notifications.isRead, false), ), ); return row?.count ?? 0; } // ─── createNotification ─────────────────────────────────────────────────────── export async function createNotification( params: CreateNotificationParams, ): Promise { const { portId, userId, type, title, description, link, entityType, entityId, dedupeKey, cooldownMs = 300_000, } = params; // 1. Cooldown / deduplication check if (dedupeKey) { const cutoff = new Date(Date.now() - cooldownMs); const [existing] = await db .select({ id: notifications.id }) .from(notifications) .where( and( eq(notifications.userId, userId), eq(notifications.type, type), gt(notifications.createdAt, cutoff), sql`${notifications.metadata}->>'dedupeKey' = ${dedupeKey}`, ), ) .limit(1); if (existing) { return null; // suppressed by cooldown } } // 2. Preference check (skip for system_alert type — always delivered) if (type !== 'system_alert') { const [pref] = await db .select({ inApp: userNotificationPreferences.inApp, email: userNotificationPreferences.email, }) .from(userNotificationPreferences) .where( and( eq(userNotificationPreferences.userId, userId), eq(userNotificationPreferences.portId, portId), eq(userNotificationPreferences.notificationType, type), ), ) .limit(1); if (pref && pref.inApp === false) { // Check if email is enabled — if neither, skip entirely if (pref.email === false) { return null; } // inApp disabled but email enabled: still enqueue email but skip insert // We can't insert and emit, so just enqueue if there were a row — but we need an ID. // Per spec: if inApp=false, skip insert. Email requires notificationId so skip email too. return null; } } // 3. Insert notification const [notif] = await db .insert(notifications) .values({ portId, userId, type, title, description: description ?? null, link: link ?? null, entityType: entityType ?? null, entityId: entityId ?? null, isRead: false, emailSent: false, metadata: dedupeKey ? { dedupeKey } : {}, }) .returning(); if (!notif) return null; // 4. Emit socket events emitToRoom(`user:${userId}`, 'notification:new', { notificationId: notif.id, type: notif.type, title: notif.title, description: notif.description ?? '', link: notif.link ?? '', }); const unreadCount = await getUnreadCountValue(userId, portId); emitToRoom(`user:${userId}`, 'notification:unreadCount', { count: unreadCount }); // 5. Check email preference and enqueue if needed const [pref] = await db .select({ email: userNotificationPreferences.email }) .from(userNotificationPreferences) .where( and( eq(userNotificationPreferences.userId, userId), eq(userNotificationPreferences.portId, portId), eq(userNotificationPreferences.notificationType, type), ), ) .limit(1); // Default to sending email if no preference record exists (opt-in by default) const shouldEmail = pref ? pref.email : false; if (shouldEmail) { const queue = getQueue('notifications'); await queue.add('send-notification-email', { notificationId: notif.id }); } return notif; } // ─── listNotifications ──────────────────────────────────────────────────────── export async function listNotifications( userId: string, portId: string, query: ListNotificationsInput, ): Promise<{ data: (typeof notifications.$inferSelect)[]; total: number }> { const { page, limit, unreadOnly } = query; const offset = (page - 1) * limit; const conditions = [eq(notifications.userId, userId), eq(notifications.portId, portId)]; if (unreadOnly) { conditions.push(eq(notifications.isRead, false)); } const where = and(...conditions); const [totalRow, rows] = await Promise.all([ db.select({ count: count() }).from(notifications).where(where), db .select() .from(notifications) .where(where) .orderBy(sql`${notifications.createdAt} DESC`) .limit(limit) .offset(offset), ]); return { data: rows, total: totalRow[0]?.count ?? 0, }; } // ─── markRead ───────────────────────────────────────────────────────────────── export async function markRead(notificationId: string, userId: string): Promise { const [notif] = await db .select({ id: notifications.id, portId: notifications.portId, userId: notifications.userId }) .from(notifications) .where(eq(notifications.id, notificationId)) .limit(1); if (!notif || notif.userId !== userId) { throw new NotFoundError('Notification'); } await db .update(notifications) .set({ isRead: true }) .where(and(eq(notifications.id, notificationId), eq(notifications.userId, userId))); const unreadCount = await getUnreadCountValue(userId, notif.portId); emitToRoom(`user:${userId}`, 'notification:unreadCount', { count: unreadCount }); } // ─── markAllRead ────────────────────────────────────────────────────────────── export async function markAllRead(userId: string, portId: string): Promise { await db .update(notifications) .set({ isRead: true }) .where( and( eq(notifications.userId, userId), eq(notifications.portId, portId), eq(notifications.isRead, false), ), ); emitToRoom(`user:${userId}`, 'notification:unreadCount', { count: 0 }); } // ─── getUnreadCount ─────────────────────────────────────────────────────────── export async function getUnreadCount(userId: string, portId: string): Promise<{ count: number }> { const c = await getUnreadCountValue(userId, portId); return { count: c }; } // ─── getPreferences ─────────────────────────────────────────────────────────── export async function getPreferences(userId: string, portId: string) { return db .select() .from(userNotificationPreferences) .where( and( eq(userNotificationPreferences.userId, userId), eq(userNotificationPreferences.portId, portId), ), ); } // ─── notifyDocumentEvent ────────────────────────────────────────────────────── export type DocumentEventType = | 'sent' | 'signed' | 'completed' | 'expired' | 'cancelled' | 'rejected'; const DOCUMENT_EVENT_TITLES: Record = { sent: 'Document sent for signing', signed: 'Document signed', completed: 'Document fully signed', expired: 'Document expired', cancelled: 'Document cancelled', rejected: 'Document rejected', }; const DOCUMENT_EVENT_NOTIF_TYPES: Record = { sent: 'document_sent', signed: 'document_signed', completed: 'document_completed', expired: 'document_expired', cancelled: 'document_cancelled', rejected: 'document_rejected', }; /** * Fan out an in-app notification for a document lifecycle event to: * - the document creator * - all rows in `document_watchers` for the document * * Existing socket events (`document:created`, `document:sent`, etc.) keep * firing from `documents.service.ts`; this helper only adds in-app * notifications. Used by PR4/PR5 detail page + watcher feature. * * Future: also notify the entity assignee once that concept exists on * interests/reservations. */ export async function notifyDocumentEvent( documentId: string, eventType: DocumentEventType, ): Promise { const doc = await db.query.documents.findFirst({ where: eq(documents.id, documentId), }); if (!doc) { logger.warn({ documentId }, 'notifyDocumentEvent: document not found'); return; } const watcherRows = await db .select({ userId: documentWatchers.userId }) .from(documentWatchers) .where(eq(documentWatchers.documentId, documentId)); const recipientIds = new Set(); if (doc.createdBy && doc.createdBy !== 'system') { recipientIds.add(doc.createdBy); } for (const row of watcherRows) { recipientIds.add(row.userId); } const title = DOCUMENT_EVENT_TITLES[eventType]; const notifType = DOCUMENT_EVENT_NOTIF_TYPES[eventType]; await Promise.all( Array.from(recipientIds).map((userId) => createNotification({ portId: doc.portId, userId, type: notifType, title, description: `"${doc.title}"`, link: `/documents/${doc.id}`, entityType: 'document', entityId: doc.id, dedupeKey: `document:${doc.id}:${eventType}`, }), ), ); } // ─── updatePreferences ──────────────────────────────────────────────────────── export async function updatePreferences( userId: string, portId: string, data: UpdatePreferencesInput, ) { for (const pref of data.preferences) { await db .insert(userNotificationPreferences) .values({ userId, portId, notificationType: pref.notificationType, inApp: pref.inApp, email: pref.email, }) .onConflictDoUpdate({ target: [ userNotificationPreferences.userId, userNotificationPreferences.portId, userNotificationPreferences.notificationType, ], set: { inApp: pref.inApp, email: pref.email, }, }); } return getPreferences(userId, portId); }