/** * Notification digest dispatcher. * * Cron `notification-digest` fires hourly via the email worker. For * every port with `reminder_digest_enabled === true`, we check whether * the configured `reminder_digest_time` (HH:MM in * `reminder_digest_timezone`) matches the current hour. If yes, we * batch each user's unread notifications from the last 24h into a * single digest email and mark those notifications as `email_sent` so * they don't appear in tomorrow's digest. * * Per-user respect: * - The digest is skipped for users with no unread notifications. * - Notification preferences (per-user opt-out) are honored via the * existing `userNotificationPreferences` table where present. */ import { and, desc, eq, gte, inArray, isNull } from 'drizzle-orm'; import { db } from '@/lib/db'; import { ports } from '@/lib/db/schema/ports'; import { notifications } from '@/lib/db/schema/operations'; import { user as authUser } from '@/lib/db/schema/users'; import { userPortRoles } from '@/lib/db/schema/users'; import { sendEmail } from '@/lib/email'; import { notificationDigestEmail } from '@/lib/email/templates/notification-digest'; import { getPortReminderConfig } from '@/lib/services/port-config'; import { env } from '@/lib/env'; import { logger } from '@/lib/logger'; import { resolveSubject } from '@/lib/email/resolve-subject'; import { getBrandingShell } from '@/lib/email/branding-resolver'; const DIGEST_LOOKBACK_MS = 24 * 60 * 60 * 1000; const MAX_ITEMS_PER_USER = 20; /** * Returns the local hour (0-23) for `at` in IANA `timezone`. Falls * back to UTC if the timezone is unparseable so a misconfigured port * still gets exactly one digest fire per day instead of zero. */ function localHourFor(at: Date, timezone: string): number { try { const fmt = new Intl.DateTimeFormat('en-US', { timeZone: timezone, hour: 'numeric', hour12: false, }); const parts = fmt.formatToParts(at); const hourPart = parts.find((p) => p.type === 'hour')?.value; if (hourPart === undefined) return at.getUTCHours(); const n = Number.parseInt(hourPart, 10); return Number.isFinite(n) ? n % 24 : at.getUTCHours(); } catch { return at.getUTCHours(); } } export interface DigestRunResult { portsConsidered: number; portsFired: number; digestsSent: number; errors: number; } export async function runNotificationDigest(now: Date = new Date()): Promise { const allPorts = await db.select({ id: ports.id, name: ports.name }).from(ports); let portsFired = 0; let digestsSent = 0; let errors = 0; for (const port of allPorts) { let cfg; try { cfg = await getPortReminderConfig(port.id); } catch (err) { logger.warn({ err, portId: port.id }, 'digest: failed to load port reminder config'); errors += 1; continue; } if (!cfg.digestEnabled) continue; // Only fire when the current local hour in the port's TZ matches // the configured digest time. The cron pattern is hourly so this // gate ensures we send exactly once per day per port. const targetHour = Number.parseInt(cfg.digestTime.split(':')[0] ?? '9', 10); const localHour = localHourFor(now, cfg.digestTimezone); if (localHour !== targetHour) continue; portsFired += 1; // Find all users with a port-role on this port — that's the // recipient set. Future iteration could honor per-user opt-out // flags from userNotificationPreferences. const portUsers = await db .select({ userId: userPortRoles.userId, email: authUser.email, name: authUser.name, }) .from(userPortRoles) .innerJoin(authUser, eq(userPortRoles.userId, authUser.id)) .where(eq(userPortRoles.portId, port.id)); if (portUsers.length === 0) continue; // Resolve branding once per port — every user on this port gets // the same shell. const branding = await getBrandingShell(port.id); const since = new Date(now.getTime() - DIGEST_LOOKBACK_MS); for (const u of portUsers) { try { const rows = await db .select({ id: notifications.id, type: notifications.type, title: notifications.title, description: notifications.description, link: notifications.link, createdAt: notifications.createdAt, }) .from(notifications) .where( and( eq(notifications.portId, port.id), eq(notifications.userId, u.userId), eq(notifications.isRead, false), eq(notifications.emailSent, false), gte(notifications.createdAt, since), ), ) .orderBy(desc(notifications.createdAt)) .limit(100); if (rows.length === 0) continue; const visible = rows.slice(0, MAX_ITEMS_PER_USER); const inboxLink = `${env.APP_URL}/notifications`; const result = await notificationDigestEmail( { portName: port.name, recipientName: u.name ?? '', items: visible.map((r) => ({ type: r.type, title: r.title, description: r.description, link: r.link ? `${env.APP_URL}${r.link}` : null, createdAt: r.createdAt, })), totalUnread: rows.length, inboxLink, }, { branding }, ); // M-EM04: dedicated catalog key — admins can override the // digest subject from /admin/email without an unsafe cast and // the digest's setting key now namespaces cleanly. const subject = await resolveSubject({ key: 'notification_digest', portId: port.id, fallback: result.subject, tokens: { portName: port.name }, }); await sendEmail(u.email, subject, result.html, undefined, result.text, port.id); await db .update(notifications) .set({ emailSent: true }) .where( and( eq(notifications.portId, port.id), eq(notifications.userId, u.userId), inArray( notifications.id, rows.map((r) => r.id), ), isNull(notifications.isRead) || eq(notifications.isRead, false), ), ); digestsSent += 1; } catch (err) { logger.error( { err, portId: port.id, userId: u.userId }, 'digest: per-user dispatch failed', ); errors += 1; } } } logger.info( { portsConsidered: allPorts.length, portsFired, digestsSent, errors }, 'notification-digest run complete', ); return { portsConsidered: allPorts.length, portsFired, digestsSent, errors, }; }