import { and, eq, isNull, lte, gte, desc, asc, inArray, sql } from 'drizzle-orm'; import { db } from '@/lib/db'; import { reminders, interests, clients } from '@/lib/db/schema'; import { berths } from '@/lib/db/schema/berths'; import { yachts } from '@/lib/db/schema/yachts'; import { createAuditLog, type AuditMeta } from '@/lib/audit'; import { NotFoundError, ValidationError } from '@/lib/errors'; import { emitToRoom } from '@/lib/socket/server'; import { activeInterestsWhere } from '@/lib/services/active-interest'; import { createNotification } from '@/lib/services/notifications.service'; import { logger } from '@/lib/logger'; import type { CreateReminderInput, UpdateReminderInput, SnoozeReminderInput, ReminderListQuery, } from '@/lib/validators/reminders'; // ─── List ──────────────────────────────────────────────────────────────────── export async function listReminders(portId: string, query: ReminderListQuery) { const conditions = [eq(reminders.portId, portId)]; if (query.status) conditions.push(eq(reminders.status, query.status)); if (query.priority) conditions.push(eq(reminders.priority, query.priority)); if (query.assignedTo) conditions.push(eq(reminders.assignedTo, query.assignedTo)); if (query.clientId) conditions.push(eq(reminders.clientId, query.clientId)); if (query.interestId) conditions.push(eq(reminders.interestId, query.interestId)); if (query.berthId) conditions.push(eq(reminders.berthId, query.berthId)); if (query.yachtId) conditions.push(eq(reminders.yachtId, query.yachtId)); if (query.dueBefore) conditions.push(lte(reminders.dueAt, new Date(query.dueBefore))); if (query.dueAfter) conditions.push(gte(reminders.dueAt, new Date(query.dueAfter))); if (query.search) { conditions.push(sql`${reminders.title} ILIKE ${'%' + query.search + '%'}`); } const orderDir = query.order === 'asc' ? asc : desc; const orderCol = query.sort === 'priority' ? reminders.priority : reminders.dueAt; const offset = (query.page - 1) * query.limit; const [data, countResult] = await Promise.all([ db .select() .from(reminders) .where(and(...conditions)) .orderBy(orderDir(orderCol)) .limit(query.limit) .offset(offset), db .select({ count: sql`count(*)` }) .from(reminders) .where(and(...conditions)), ]); return { data, pagination: { page: query.page, limit: query.limit, total: Number(countResult[0]?.count ?? 0), }, }; } export async function getMyReminders(userId: string, portId: string) { return db .select() .from(reminders) .where( and( eq(reminders.portId, portId), eq(reminders.assignedTo, userId), inArray(reminders.status, ['pending', 'snoozed']), ), ) .orderBy(asc(reminders.dueAt)); } export async function getOverdueReminders(portId: string) { return db .select() .from(reminders) .where( and( eq(reminders.portId, portId), inArray(reminders.status, ['pending', 'snoozed']), lte(reminders.dueAt, new Date()), ), ) .orderBy(asc(reminders.dueAt)); } export async function getUpcomingReminders(portId: string, days: number = 14) { const until = new Date(); until.setDate(until.getDate() + days); return db .select() .from(reminders) .where( and( eq(reminders.portId, portId), inArray(reminders.status, ['pending', 'snoozed']), lte(reminders.dueAt, until), gte(reminders.dueAt, new Date()), ), ) .orderBy(asc(reminders.dueAt)); } /** * Validate that the supplied subject FKs (clientId / interestId / berthId) * all point at rows inside the caller's port. Without this guard, a * reminder created with a foreign-port FK would later be hydrated with * `with: { client, interest, berth }` joins (no port filter on the * relation) - leaking the foreign-tenant rows back to the attacker. */ async function assertReminderFksInPort( portId: string, fks: { clientId?: string | null; interestId?: string | null; berthId?: string | null; yachtId?: string | null; }, ): Promise { const checks: Array> = []; if (fks.clientId) { checks.push( db.query.clients .findFirst({ where: and(eq(clients.id, fks.clientId), eq(clients.portId, portId)) }) .then((row) => { if (!row) throw new ValidationError('clientId not found in this port'); }), ); } if (fks.interestId) { checks.push( db.query.interests .findFirst({ where: and(eq(interests.id, fks.interestId), eq(interests.portId, portId)), }) .then((row) => { if (!row) throw new ValidationError('interestId not found in this port'); }), ); } if (fks.berthId) { checks.push( db.query.berths .findFirst({ where: and(eq(berths.id, fks.berthId), eq(berths.portId, portId)) }) .then((row) => { if (!row) throw new ValidationError('berthId not found in this port'); }), ); } if (fks.yachtId) { checks.push( db.query.yachts .findFirst({ where: and(eq(yachts.id, fks.yachtId), eq(yachts.portId, portId)) }) .then((row) => { if (!row) throw new ValidationError('yachtId not found in this port'); }), ); } await Promise.all(checks); } // ─── CRUD ──────────────────────────────────────────────────────────────────── export async function getReminder(id: string, portId: string) { const reminder = await db.query.reminders.findFirst({ where: and(eq(reminders.id, id), eq(reminders.portId, portId)), with: { client: true, interest: true, berth: true, yacht: true }, }); if (!reminder) throw new NotFoundError('Reminder'); return reminder; } export async function createReminder(portId: string, data: CreateReminderInput, meta: AuditMeta) { await assertReminderFksInPort(portId, { clientId: data.clientId, interestId: data.interestId, berthId: data.berthId, yachtId: data.yachtId, }); const [reminder] = await db .insert(reminders) .values({ portId, title: data.title, note: data.note ?? null, dueAt: new Date(data.dueAt), priority: data.priority, assignedTo: data.assignedTo ?? meta.userId, createdBy: meta.userId, clientId: data.clientId ?? null, interestId: data.interestId ?? null, berthId: data.berthId ?? null, yachtId: data.yachtId ?? null, }) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'create', entityType: 'reminder', entityId: reminder!.id, newValue: { title: reminder!.title, dueAt: reminder!.dueAt, priority: reminder!.priority }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'reminder:created', { reminderId: reminder!.id, title: reminder!.title, dueAt: reminder!.dueAt.toISOString(), assignedTo: reminder!.assignedTo ?? meta.userId, }); if (reminder!.assignedTo) { emitToRoom(`user:${reminder!.assignedTo}`, 'reminder:created', { reminderId: reminder!.id, title: reminder!.title, dueAt: reminder!.dueAt.toISOString(), assignedTo: reminder!.assignedTo, }); } return reminder!; } export async function updateReminder( id: string, portId: string, data: UpdateReminderInput, meta: AuditMeta, ) { const existing = await db.query.reminders.findFirst({ where: and(eq(reminders.id, id), eq(reminders.portId, portId)), }); if (!existing) throw new NotFoundError('Reminder'); const updates: Record = { updatedAt: new Date() }; if (data.title !== undefined) updates.title = data.title; if (data.note !== undefined) updates.note = data.note; if (data.dueAt !== undefined) updates.dueAt = new Date(data.dueAt); if (data.priority !== undefined) updates.priority = data.priority; if (data.assignedTo !== undefined) updates.assignedTo = data.assignedTo; if (data.clientId !== undefined) updates.clientId = data.clientId; if (data.interestId !== undefined) updates.interestId = data.interestId; if (data.berthId !== undefined) updates.berthId = data.berthId; if (data.yachtId !== undefined) updates.yachtId = data.yachtId; // Re-validate any subject-FK changes against the caller's port. await assertReminderFksInPort(portId, { clientId: data.clientId, interestId: data.interestId, berthId: data.berthId, yachtId: data.yachtId, }); const [updated] = await db .update(reminders) .set(updates) .where(and(eq(reminders.id, id), eq(reminders.portId, portId))) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'reminder', entityId: id, oldValue: { title: existing.title, dueAt: existing.dueAt, priority: existing.priority }, newValue: { title: updated!.title, dueAt: updated!.dueAt, priority: updated!.priority }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'reminder:updated', { reminderId: updated!.id, changedFields: Object.keys(data), }); return updated!; } export async function deleteReminder(id: string, portId: string, meta: AuditMeta) { const existing = await db.query.reminders.findFirst({ where: and(eq(reminders.id, id), eq(reminders.portId, portId)), }); if (!existing) throw new NotFoundError('Reminder'); await db.delete(reminders).where(and(eq(reminders.id, id), eq(reminders.portId, portId))); void createAuditLog({ userId: meta.userId, portId, action: 'delete', entityType: 'reminder', entityId: id, oldValue: { title: existing.title }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); } // ─── Status Actions ────────────────────────────────────────────────────────── export async function completeReminder(id: string, portId: string, meta: AuditMeta) { const existing = await db.query.reminders.findFirst({ where: and(eq(reminders.id, id), eq(reminders.portId, portId)), }); if (!existing) throw new NotFoundError('Reminder'); if (existing.status === 'completed') throw new ValidationError('Reminder already completed'); const [updated] = await db .update(reminders) .set({ status: 'completed', completedAt: new Date(), updatedAt: new Date(), }) .where(and(eq(reminders.id, id), eq(reminders.portId, portId))) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'reminder', entityId: id, oldValue: { status: existing.status }, newValue: { status: 'completed' }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'reminder:completed', { reminderId: updated!.id, title: updated!.title, completedBy: meta.userId, }); return updated!; } export async function snoozeReminder( id: string, portId: string, data: SnoozeReminderInput, meta: AuditMeta, ) { const existing = await db.query.reminders.findFirst({ where: and(eq(reminders.id, id), eq(reminders.portId, portId)), }); if (!existing) throw new NotFoundError('Reminder'); const [updated] = await db .update(reminders) .set({ status: 'snoozed', snoozedUntil: new Date(data.snoozeUntil), updatedAt: new Date(), }) .where(and(eq(reminders.id, id), eq(reminders.portId, portId))) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'reminder', entityId: id, oldValue: { status: existing.status }, newValue: { status: 'snoozed', snoozedUntil: data.snoozeUntil }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); emitToRoom(`port:${portId}`, 'reminder:snoozed', { reminderId: updated!.id, snoozedUntil: data.snoozeUntil, }); return updated!; } export async function dismissReminder(id: string, portId: string, meta: AuditMeta) { const existing = await db.query.reminders.findFirst({ where: and(eq(reminders.id, id), eq(reminders.portId, portId)), }); if (!existing) throw new NotFoundError('Reminder'); const [updated] = await db .update(reminders) .set({ status: 'dismissed', updatedAt: new Date() }) .where(and(eq(reminders.id, id), eq(reminders.portId, portId))) .returning(); void createAuditLog({ userId: meta.userId, portId, action: 'update', entityType: 'reminder', entityId: id, oldValue: { status: existing.status }, newValue: { status: 'dismissed' }, ipAddress: meta.ipAddress, userAgent: meta.userAgent, }); return updated!; } // ─── Background Processors ────────────────────────────────────────────────── /** * Hourly check: creates auto-follow-up reminders for interests with * reminderEnabled=true where no activity in reminderDays days (BR-060). */ export async function processFollowUpReminders() { const ports = await db.query.ports.findMany({ where: eq(sql`true`, true) }); for (const port of ports) { const enabledInterests = await db .select({ id: interests.id, clientId: interests.clientId, reminderDays: interests.reminderDays, reminderLastFired: interests.reminderLastFired, updatedAt: interests.updatedAt, }) .from(interests) .where(and(activeInterestsWhere(port.id), eq(interests.reminderEnabled, true))); const now = new Date(); // Pick the interests whose follow-up window has elapsed. Pre-filtering // here means the per-row N+1 walk that used to issue (1 client lookup // + 1 reminder insert + 1 interest update) per interest is replaced by // a single client-bulk-fetch + a single reminder bulk-insert + a // single interests bulk-update against an `inArray` set. const dueInterests = enabledInterests.filter((interest) => { if (!interest.reminderDays) return false; const lastActivity = interest.reminderLastFired ?? interest.updatedAt; const daysSinceActivity = (now.getTime() - lastActivity.getTime()) / (1000 * 60 * 60 * 24); return daysSinceActivity >= interest.reminderDays; }); if (dueInterests.length === 0) continue; const clientIds = Array.from( new Set(dueInterests.map((i) => i.clientId).filter((v): v is string => Boolean(v))), ); const clientsByIdEntries = clientIds.length > 0 ? await db .select({ id: clients.id, fullName: clients.fullName }) .from(clients) .where(inArray(clients.id, clientIds)) : []; const clientById = new Map(clientsByIdEntries.map((c) => [c.id, c])); const newReminders = dueInterests.map((interest) => { const client = interest.clientId ? clientById.get(interest.clientId) : null; return { portId: port.id, title: client ? `Follow up with ${client.fullName}` : 'Follow up on interest', note: 'Auto-generated: no activity detected within the configured follow-up window.', dueAt: now, priority: 'medium', assignedTo: null, createdBy: 'system', interestId: interest.id, clientId: interest.clientId, autoGenerated: true, }; }); if (newReminders.length > 0) { await db.insert(reminders).values(newReminders); await db .update(interests) .set({ reminderLastFired: now }) .where( inArray( interests.id, dueInterests.map((i) => i.id), ), ); } // Single port-room emit summarising the batch — the per-row emit was // mostly noise to the dashboard and amplified socket traffic linearly // with interest count. if (newReminders.length > 0) { emitToRoom(`port:${port.id}`, 'system:alert', { alertType: 'follow_up_created', message: `${newReminders.length} follow-up reminder${ newReminders.length === 1 ? '' : 's' } created`, severity: 'info', }); } logger.info( { portId: port.id, created: newReminders.length }, 'Auto follow-up reminders created (bulk)', ); } } /** * Every 15 minutes: checks for past-due reminders and creates overdue notifications. */ export async function processOverdueReminders() { const now = new Date(); // Un-snooze reminders whose snooze window has elapsed first, so a // reminder that just transitioned snoozed → pending is eligible in // the same tick (rather than waiting a full 15 minutes for the next // scan). await db .update(reminders) .set({ status: 'pending', snoozedUntil: null, updatedAt: now }) .where(and(eq(reminders.status, 'snoozed'), lte(reminders.snoozedUntil, now))); // Phase 4 — claim due reminders by stamping fired_at in a single // UPDATE...RETURNING. Postgres's row locks guarantee only one worker // wins per row, so parallel maintenance workers can't double-fire the // same reminder. Limited to status='pending' (the un-snooze pass // above already promoted anything that was snoozed-expired). // // Partial index `idx_reminders_due_unfired` from migration 0072 // covers (port_id, due_at) WHERE fired_at IS NULL AND status IN // ('pending', 'snoozed') so the scan stays cheap even on a large // backlog of long-fired reminders. const claimed = await db .update(reminders) .set({ firedAt: now, updatedAt: now }) .where( and(eq(reminders.status, 'pending'), lte(reminders.dueAt, now), isNull(reminders.firedAt)), ) .returning(); for (const reminder of claimed) { if (!reminder.assignedTo) continue; void createNotification({ portId: reminder.portId, userId: reminder.assignedTo, type: 'reminder_overdue', title: 'Reminder overdue', description: reminder.title, entityType: 'reminder', entityId: reminder.id, link: '/reminders', // Per-reminder dedup is now redundant given fired_at, but keep // the key so a manual re-fire (e.g. ops clears fired_at) still // respects the cooldown. dedupeKey: `reminder:${reminder.id}`, }); emitToRoom(`user:${reminder.assignedTo}`, 'reminder:overdue', { reminderId: reminder.id, title: reminder.title, dueAt: reminder.dueAt.toISOString(), }); } logger.info({ firedCount: claimed.length }, 'Reminder cron: claimed + notified'); }