296 lines
9.2 KiB
TypeScript
296 lines
9.2 KiB
TypeScript
|
|
import { and, count, eq, gt, sql } from 'drizzle-orm';
|
||
|
|
|
||
|
|
import { db } from '@/lib/db';
|
||
|
|
import { notifications } from '@/lib/db/schema/operations';
|
||
|
|
import { userNotificationPreferences } from '@/lib/db/schema/system';
|
||
|
|
import { emitToRoom } from '@/lib/socket/server';
|
||
|
|
import { getQueue } from '@/lib/queue';
|
||
|
|
import { NotFoundError } from '@/lib/errors';
|
||
|
|
import type { ListNotificationsInput, UpdatePreferencesInput } from '@/lib/validators/notifications';
|
||
|
|
|
||
|
|
// ─── Types ────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
export interface CreateNotificationParams {
|
||
|
|
portId: string;
|
||
|
|
userId: string;
|
||
|
|
type: string;
|
||
|
|
title: string;
|
||
|
|
description?: string;
|
||
|
|
link?: string;
|
||
|
|
entityType?: string;
|
||
|
|
entityId?: string;
|
||
|
|
dedupeKey?: string;
|
||
|
|
cooldownMs?: number;
|
||
|
|
}
|
||
|
|
|
||
|
|
// ─── Helpers ─────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
async function getUnreadCountValue(userId: string, portId: string): Promise<number> {
|
||
|
|
const [row] = await db
|
||
|
|
.select({ count: count() })
|
||
|
|
.from(notifications)
|
||
|
|
.where(
|
||
|
|
and(
|
||
|
|
eq(notifications.userId, userId),
|
||
|
|
eq(notifications.portId, portId),
|
||
|
|
eq(notifications.isRead, false),
|
||
|
|
),
|
||
|
|
);
|
||
|
|
return row?.count ?? 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
// ─── createNotification ───────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
export async function createNotification(
|
||
|
|
params: CreateNotificationParams,
|
||
|
|
): Promise<typeof notifications.$inferSelect | null> {
|
||
|
|
const {
|
||
|
|
portId,
|
||
|
|
userId,
|
||
|
|
type,
|
||
|
|
title,
|
||
|
|
description,
|
||
|
|
link,
|
||
|
|
entityType,
|
||
|
|
entityId,
|
||
|
|
dedupeKey,
|
||
|
|
cooldownMs = 300_000,
|
||
|
|
} = params;
|
||
|
|
|
||
|
|
// 1. Cooldown / deduplication check
|
||
|
|
if (dedupeKey) {
|
||
|
|
const cutoff = new Date(Date.now() - cooldownMs);
|
||
|
|
const [existing] = await db
|
||
|
|
.select({ id: notifications.id })
|
||
|
|
.from(notifications)
|
||
|
|
.where(
|
||
|
|
and(
|
||
|
|
eq(notifications.userId, userId),
|
||
|
|
eq(notifications.type, type),
|
||
|
|
gt(notifications.createdAt, cutoff),
|
||
|
|
sql`${notifications.metadata}->>'dedupeKey' = ${dedupeKey}`,
|
||
|
|
),
|
||
|
|
)
|
||
|
|
.limit(1);
|
||
|
|
|
||
|
|
if (existing) {
|
||
|
|
return null; // suppressed by cooldown
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// 2. Preference check (skip for system_alert type — always delivered)
|
||
|
|
if (type !== 'system_alert') {
|
||
|
|
const [pref] = await db
|
||
|
|
.select({ inApp: userNotificationPreferences.inApp, email: userNotificationPreferences.email })
|
||
|
|
.from(userNotificationPreferences)
|
||
|
|
.where(
|
||
|
|
and(
|
||
|
|
eq(userNotificationPreferences.userId, userId),
|
||
|
|
eq(userNotificationPreferences.portId, portId),
|
||
|
|
eq(userNotificationPreferences.notificationType, type),
|
||
|
|
),
|
||
|
|
)
|
||
|
|
.limit(1);
|
||
|
|
|
||
|
|
if (pref && pref.inApp === false) {
|
||
|
|
// Check if email is enabled — if neither, skip entirely
|
||
|
|
if (pref.email === false) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
// inApp disabled but email enabled: still enqueue email but skip insert
|
||
|
|
// We can't insert and emit, so just enqueue if there were a row — but we need an ID.
|
||
|
|
// Per spec: if inApp=false, skip insert. Email requires notificationId so skip email too.
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// 3. Insert notification
|
||
|
|
const [notif] = await db
|
||
|
|
.insert(notifications)
|
||
|
|
.values({
|
||
|
|
portId,
|
||
|
|
userId,
|
||
|
|
type,
|
||
|
|
title,
|
||
|
|
description: description ?? null,
|
||
|
|
link: link ?? null,
|
||
|
|
entityType: entityType ?? null,
|
||
|
|
entityId: entityId ?? null,
|
||
|
|
isRead: false,
|
||
|
|
emailSent: false,
|
||
|
|
metadata: dedupeKey ? { dedupeKey } : {},
|
||
|
|
})
|
||
|
|
.returning();
|
||
|
|
|
||
|
|
if (!notif) return null;
|
||
|
|
|
||
|
|
// 4. Emit socket events
|
||
|
|
emitToRoom(`user:${userId}`, 'notification:new', {
|
||
|
|
notificationId: notif.id,
|
||
|
|
type: notif.type,
|
||
|
|
title: notif.title,
|
||
|
|
description: notif.description ?? '',
|
||
|
|
link: notif.link ?? '',
|
||
|
|
});
|
||
|
|
|
||
|
|
const unreadCount = await getUnreadCountValue(userId, portId);
|
||
|
|
emitToRoom(`user:${userId}`, 'notification:unreadCount', { count: unreadCount });
|
||
|
|
|
||
|
|
// 5. Check email preference and enqueue if needed
|
||
|
|
const [pref] = await db
|
||
|
|
.select({ email: userNotificationPreferences.email })
|
||
|
|
.from(userNotificationPreferences)
|
||
|
|
.where(
|
||
|
|
and(
|
||
|
|
eq(userNotificationPreferences.userId, userId),
|
||
|
|
eq(userNotificationPreferences.portId, portId),
|
||
|
|
eq(userNotificationPreferences.notificationType, type),
|
||
|
|
),
|
||
|
|
)
|
||
|
|
.limit(1);
|
||
|
|
|
||
|
|
// Default to sending email if no preference record exists (opt-in by default)
|
||
|
|
const shouldEmail = pref ? pref.email : false;
|
||
|
|
|
||
|
|
if (shouldEmail) {
|
||
|
|
const queue = getQueue('notifications');
|
||
|
|
await queue.add('send-notification-email', { notificationId: notif.id });
|
||
|
|
}
|
||
|
|
|
||
|
|
return notif;
|
||
|
|
}
|
||
|
|
|
||
|
|
// ─── listNotifications ────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
export async function listNotifications(
|
||
|
|
userId: string,
|
||
|
|
portId: string,
|
||
|
|
query: ListNotificationsInput,
|
||
|
|
): Promise<{ data: (typeof notifications.$inferSelect)[]; total: number }> {
|
||
|
|
const { page, limit, unreadOnly } = query;
|
||
|
|
const offset = (page - 1) * limit;
|
||
|
|
|
||
|
|
const conditions = [
|
||
|
|
eq(notifications.userId, userId),
|
||
|
|
eq(notifications.portId, portId),
|
||
|
|
];
|
||
|
|
|
||
|
|
if (unreadOnly) {
|
||
|
|
conditions.push(eq(notifications.isRead, false));
|
||
|
|
}
|
||
|
|
|
||
|
|
const where = and(...conditions);
|
||
|
|
|
||
|
|
const [totalRow, rows] = await Promise.all([
|
||
|
|
db.select({ count: count() }).from(notifications).where(where),
|
||
|
|
db
|
||
|
|
.select()
|
||
|
|
.from(notifications)
|
||
|
|
.where(where)
|
||
|
|
.orderBy(sql`${notifications.createdAt} DESC`)
|
||
|
|
.limit(limit)
|
||
|
|
.offset(offset),
|
||
|
|
]);
|
||
|
|
|
||
|
|
return {
|
||
|
|
data: rows,
|
||
|
|
total: totalRow[0]?.count ?? 0,
|
||
|
|
};
|
||
|
|
}
|
||
|
|
|
||
|
|
// ─── markRead ─────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
export async function markRead(notificationId: string, userId: string): Promise<void> {
|
||
|
|
const [notif] = await db
|
||
|
|
.select({ id: notifications.id, portId: notifications.portId, userId: notifications.userId })
|
||
|
|
.from(notifications)
|
||
|
|
.where(eq(notifications.id, notificationId))
|
||
|
|
.limit(1);
|
||
|
|
|
||
|
|
if (!notif || notif.userId !== userId) {
|
||
|
|
throw new NotFoundError('Notification');
|
||
|
|
}
|
||
|
|
|
||
|
|
await db
|
||
|
|
.update(notifications)
|
||
|
|
.set({ isRead: true })
|
||
|
|
.where(and(eq(notifications.id, notificationId), eq(notifications.userId, userId)));
|
||
|
|
|
||
|
|
const unreadCount = await getUnreadCountValue(userId, notif.portId);
|
||
|
|
emitToRoom(`user:${userId}`, 'notification:unreadCount', { count: unreadCount });
|
||
|
|
}
|
||
|
|
|
||
|
|
// ─── markAllRead ──────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
export async function markAllRead(userId: string, portId: string): Promise<void> {
|
||
|
|
await db
|
||
|
|
.update(notifications)
|
||
|
|
.set({ isRead: true })
|
||
|
|
.where(
|
||
|
|
and(
|
||
|
|
eq(notifications.userId, userId),
|
||
|
|
eq(notifications.portId, portId),
|
||
|
|
eq(notifications.isRead, false),
|
||
|
|
),
|
||
|
|
);
|
||
|
|
|
||
|
|
emitToRoom(`user:${userId}`, 'notification:unreadCount', { count: 0 });
|
||
|
|
}
|
||
|
|
|
||
|
|
// ─── getUnreadCount ───────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
export async function getUnreadCount(
|
||
|
|
userId: string,
|
||
|
|
portId: string,
|
||
|
|
): Promise<{ count: number }> {
|
||
|
|
const c = await getUnreadCountValue(userId, portId);
|
||
|
|
return { count: c };
|
||
|
|
}
|
||
|
|
|
||
|
|
// ─── getPreferences ───────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
export async function getPreferences(userId: string, portId: string) {
|
||
|
|
return db
|
||
|
|
.select()
|
||
|
|
.from(userNotificationPreferences)
|
||
|
|
.where(
|
||
|
|
and(
|
||
|
|
eq(userNotificationPreferences.userId, userId),
|
||
|
|
eq(userNotificationPreferences.portId, portId),
|
||
|
|
),
|
||
|
|
);
|
||
|
|
}
|
||
|
|
|
||
|
|
// ─── updatePreferences ────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
export async function updatePreferences(
|
||
|
|
userId: string,
|
||
|
|
portId: string,
|
||
|
|
data: UpdatePreferencesInput,
|
||
|
|
) {
|
||
|
|
for (const pref of data.preferences) {
|
||
|
|
await db
|
||
|
|
.insert(userNotificationPreferences)
|
||
|
|
.values({
|
||
|
|
userId,
|
||
|
|
portId,
|
||
|
|
notificationType: pref.notificationType,
|
||
|
|
inApp: pref.inApp,
|
||
|
|
email: pref.email,
|
||
|
|
})
|
||
|
|
.onConflictDoUpdate({
|
||
|
|
target: [
|
||
|
|
userNotificationPreferences.userId,
|
||
|
|
userNotificationPreferences.portId,
|
||
|
|
userNotificationPreferences.notificationType,
|
||
|
|
],
|
||
|
|
set: {
|
||
|
|
inApp: pref.inApp,
|
||
|
|
email: pref.email,
|
||
|
|
},
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
return getPreferences(userId, portId);
|
||
|
|
}
|