# Port Nimara CRM — Real-Time Events & Background Jobs **Compiled:** 2026-03-11 **Real-Time:** Socket.io (WebSocket alongside Next.js) **Job Queue:** BullMQ + Redis **Transport:** Redis adapter for Socket.io (multi-instance ready) --- ## 1. Socket.io Architecture ### 1.1 Connection ``` Client connects on app load: → Authenticate via session cookie (same as HTTP auth) → Join room: port:{portId} (scoped to current port) → Join room: user:{userId} (for personal notifications) → If super admin with cross-port view: join room: global ``` ### 1.2 Room Structure | Room | Members | Purpose | | ----------------------- | ---------------------------------------- | -------------------------------------------------------------- | | `port:{portId}` | All users at that port | Port-scoped data updates | | `user:{userId}` | Single user (all their tabs/devices) | Personal notifications, reminder alerts, calendar sync updates | | `global` | Super admin(s) in cross-port mode | System alerts, cross-port updates | | `berth:{berthId}` | Users viewing a specific berth detail | Granular berth updates | | `client:{clientId}` | Users viewing a specific client detail | Granular client updates | | `interest:{interestId}` | Users viewing a specific interest detail | Granular interest updates | --- ## 2. Real-Time Event Catalog ### 2.1 Berth Events | Event | Room | Payload | Trigger | | -------------------------- | ---------------------------------- | ------------------------------------------------ | ------------------------------------------ | | `berth:statusChanged` | `port:{portId}` | `{ berthId, oldStatus, newStatus, triggeredBy }` | Berth status updated (manual or auto) | | `berth:updated` | `port:{portId}`, `berth:{berthId}` | `{ berthId, changedFields }` | Any berth field updated | | `berth:waitingListChanged` | `berth:{berthId}` | `{ berthId, action, entry }` | Waiting list entry added/removed/reordered | | `berth:maintenanceAdded` | `berth:{berthId}` | `{ berthId, logEntry }` | New maintenance log entry | ### 2.2 Client Events | Event | Room | Payload | Trigger | | -------------------------- | ------------------------------------ | ------------------------------------------- | ---------------------------- | | `client:created` | `port:{portId}` | `{ clientId, clientName, source }` | New client created | | `client:updated` | `port:{portId}`, `client:{clientId}` | `{ clientId, changedFields }` | Client fields updated | | `client:archived` | `port:{portId}` | `{ clientId }` | Client archived | | `client:restored` | `port:{portId}` | `{ clientId }` | Client restored from archive | | `client:merged` | `port:{portId}` | `{ survivingId, mergedId }` | Clients merged | | `client:noteAdded` | `client:{clientId}` | `{ clientId, noteId, authorName, preview }` | New note added | | `client:duplicateDetected` | `port:{portId}` | `{ clientAId, clientBId, score, reason }` | Duplicate alert created | ### 2.3 Interest Events | Event | Room | Payload | Trigger | | ------------------------ | ---------------------------------------- | ------------------------------------------------------------- | ---------------------------- | | `interest:created` | `port:{portId}` | `{ interestId, clientId, berthId, source }` | New interest created | | `interest:updated` | `port:{portId}`, `interest:{interestId}` | `{ interestId, changedFields }` | Interest fields updated | | `interest:stageChanged` | `port:{portId}` | `{ interestId, oldStage, newStage, clientName, berthNumber }` | Pipeline stage changed | | `interest:berthLinked` | `port:{portId}` | `{ interestId, berthId }` | Berth linked to interest | | `interest:berthUnlinked` | `port:{portId}` | `{ interestId, berthId }` | Berth unlinked from interest | | `interest:archived` | `port:{portId}` | `{ interestId }` | Interest archived | | `interest:noteAdded` | `interest:{interestId}` | `{ interestId, noteId, authorName, preview }` | Note added to interest | ### 2.4 Document Events | Event | Room | Payload | Trigger | | ----------------------- | ---------------------------------------- | ---------------------------------------------------------- | --------------------------- | | `document:created` | `port:{portId}` | `{ documentId, type, interestId }` | Document record created | | `document:sent` | `port:{portId}` | `{ documentId, type, signerCount }` | Document sent for signing | | `document:signed` | `port:{portId}`, `interest:{interestId}` | `{ documentId, signerName, signerRole, remainingSigners }` | Individual signer completed | | `document:completed` | `port:{portId}`, `interest:{interestId}` | `{ documentId, type, interestId, clientName }` | All signers completed | | `document:expired` | `port:{portId}` | `{ documentId }` | Document expired | | `document:reminderSent` | `interest:{interestId}` | `{ documentId, recipientEmail }` | Signing reminder sent | ### 2.5 Financial Events | Event | Room | Payload | Trigger | | ----------------- | --------------- | ------------------------------------------------- | ---------------------- | | `expense:created` | `port:{portId}` | `{ expenseId, amount, currency, category }` | Expense created | | `expense:updated` | `port:{portId}` | `{ expenseId, changedFields }` | Expense updated | | `invoice:created` | `port:{portId}` | `{ invoiceId, invoiceNumber, total, clientName }` | Invoice created | | `invoice:sent` | `port:{portId}` | `{ invoiceId, invoiceNumber, recipientEmail }` | Invoice emailed | | `invoice:paid` | `port:{portId}` | `{ invoiceId, invoiceNumber, amount }` | Payment recorded | | `invoice:overdue` | `port:{portId}` | `{ invoiceId, invoiceNumber, daysPastDue }` | Invoice became overdue | ### 2.6 Reminder & Calendar Events | Event | Room | Payload | Trigger | | ----------------------- | ------------------------------------ | ------------------------------------------ | ------------------------------------- | | `reminder:created` | `user:{assigneeId}`, `port:{portId}` | `{ reminderId, title, assignedTo, dueAt }` | Reminder created | | `reminder:updated` | `user:{assigneeId}` | `{ reminderId, changedFields }` | Reminder updated | | `reminder:completed` | `user:{assigneeId}`, `port:{portId}` | `{ reminderId, title, completedBy }` | Reminder completed | | `reminder:overdue` | `user:{assigneeId}` | `{ reminderId, title, dueAt }` | Reminder became overdue | | `reminder:snoozed` | `user:{assigneeId}` | `{ reminderId, snoozedUntil }` | Reminder snoozed | | `calendar:synced` | `user:{userId}` | `{ eventCount, lastSyncAt }` | Google Calendar sync completed | | `calendar:disconnected` | `user:{userId}` | `{ reason }` | Google Calendar token revoked/expired | ### 2.7 Notification Events | Event | Room | Payload | Trigger | | -------------------------- | --------------- | ---------------------------------------------------- | ------------------------ | | `notification:new` | `user:{userId}` | `{ notificationId, type, title, description, link }` | New notification created | | `notification:unreadCount` | `user:{userId}` | `{ count }` | Unread count changed | ### 2.8 System Events | Event | Room | Payload | Trigger | | ------------------ | --------------- | --------------------------------------------------- | ------------------------ | | `system:alert` | `global` | `{ alertType, message, severity }` | System alert triggered | | `system:jobFailed` | `global` | `{ queueName, jobId, error }` | Background job failure | | `registration:new` | `port:{portId}` | `{ clientId, interestId, clientName, berthNumber }` | New website registration | ### 2.9 Recommendation & Misc Events | Event | Room | Payload | Trigger | | ----------------------------------- | ---------------------------------------- | ------------------------------------------------ | ------------------------------------------------ | | `interest:recommendationsGenerated` | `interest:{interestId}`, `port:{portId}` | `{ interestId, count, topBerthId }` | AI berth recommendations generated | | `interest:recommendationAdded` | `interest:{interestId}` | `{ interestId, berthId, source, matchScore }` | Manual recommendation added | | `interest:leadCategoryChanged` | `port:{portId}`, `interest:{interestId}` | `{ interestId, oldCategory, newCategory, auto }` | Lead category changed (including auto-promotion) | | `file:uploaded` | `client:{clientId}`, `port:{portId}` | `{ fileId, filename, clientId, category }` | File uploaded | | `file:deleted` | `client:{clientId}` | `{ fileId, filename }` | File deleted | --- ## 3. Background Jobs (BullMQ) ### 3.1 Queue Structure | Queue | Concurrency | Description | | --------------- | ----------- | ------------------------------------------------------------------------ | | `email` | 5 | All email sending (transactional + user) | | `documents` | 3 | Documenso API interactions (create, poll, download) | | `notifications` | 10 | In-app notification creation and email notification sending | | `import` | 1 | Data import jobs (CSV, Excel, berth specs) | | `export` | 2 | Data export jobs (CSV, PDF, parent company report) | | `reports` | 1 | Scheduled report generation | | `webhooks` | 5 | Outbound webhook delivery | | `maintenance` | 1 | System maintenance (backups, cleanup, rate refresh) | | `ai` | 2 | AI-assisted tasks (receipt scanning, berth spec parsing, file migration) | | `bulk` | 2 | Bulk operations (status change, tag, delete) | ### 3.2 Recurring Jobs | Job | Queue | Schedule | Description | | ------------------------ | --------------- | --------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `signature-poll` | `documents` | Every 6 hours | Rare fallback poll of Documenso for interests with pending signatures. Primary mechanism is Documenso webhooks (instant). This job is a safety net only — catches any edge case where a webhook was lost. | | `reminder-check` | `notifications` | Every hour | Check interests with reminder_enabled for inactivity → create follow-up reminders | | `reminder-overdue-check` | `notifications` | Every 15 minutes | Check for reminders past due_at → create overdue notifications | | `calendar-sync` | `maintenance` | Every 30 minutes | Background poll for all connected Google Calendar users: fetch upcoming events (14 days), upsert into google_calendar_cache, detect moved/deleted CRM-pushed events. Note: additional event-driven syncs fire on user login and on navigation to calendar-displaying pages (if last sync > 5 min ago) | | `invoice-overdue-check` | `notifications` | Daily at 08:00 | Check for overdue invoices → update status, create notifications | | `tenure-expiry-check` | `notifications` | Daily at 08:00 | Check for berth tenure approaching expiry → create notifications | | `currency-refresh` | `maintenance` | Every 6 hours | Refresh exchange rates from Frankfurter API | | `database-backup` | `maintenance` | Daily at 02:00 | pg_dump → store in MinIO | | `backup-cleanup` | `maintenance` | Weekly (Sunday 03:00) | Delete backups older than retention period (default: 30 days, configurable in system_settings) | | `session-cleanup` | `maintenance` | Daily at 04:00 | Remove expired Better Auth sessions | | `report-scheduler` | `reports` | Every minute | Check scheduled_reports for reports due to run → enqueue generation | | `notification-digest` | `email` | Configurable per user | Send batched email digest of unread notifications | | `temp-file-cleanup` | `maintenance` | Daily at 05:00 | Clean up temporary files (report PDFs > 7 days, orphaned uploads) | | `form-expiry-check` | `maintenance` | Hourly | Mark expired form submissions as expired (BR-130) | ### 3.3 Job Retry & Error Handling Default retry configuration: - **Attempts:** 3 - **Backoff:** Exponential (1s, 10s, 100s) - **Dead letter:** After all retries exhausted, move to dead letter queue - **Alert:** Dead letter creates system alert notification for super admin Per-queue overrides: | Queue | Max Attempts | Notes | | ------------- | ------------ | ---------------------------------------------- | | `email` | 5 | Email delivery can have transient failures | | `webhooks` | 3 | Standard exponential backoff | | `documents` | 5 | Documenso API can be temporarily unavailable | | `import` | 1 | Imports are idempotent — user retries manually | | `maintenance` | 3 | Backup failures alert immediately | ### 3.4 Job Priority BullMQ supports job priority (lower number = higher priority): | Priority | Jobs | | ----------- | ----------------------------------------------------------------- | | 1 (highest) | Password reset emails, system alerts | | 2 | Signature webhooks, EOI reminders, reminder overdue checks | | 3 (default) | Standard emails, notifications, webhook deliveries, calendar sync | | 4 | Export jobs, report generation | | 5 (lowest) | Data imports, AI tasks, backup cleanup | --- ## 4. Event Flow Examples ### 4.1 Website Interest Registration ``` Client submits form on website → POST /api/public/interests → Create client (or find existing via email) → Create interest record → Run duplicate detection (BR-030, BR-031) → Emit Socket.io: registration:new to port:{portId} → Enqueue notification job: notify salesperson(s) → If duplicate detected: enqueue notification job: duplicate alert → Return 201 to website ``` ### 4.2 EOI Signing Flow ``` Salesperson generates EOI → POST /api/documents/generate-eoi → Generate PDF via @pdfme → Enqueue documents job: create Documenso document + assign signers → Update interest: eoi_status = waiting_for_signatures → Emit: document:sent, interest:stageChanged → Enqueue email job: send signing link to first signer (client) Client signs → Documenso webhook: DOCUMENT_SIGNED → POST /api/webhooks/documenso → Deduplicate (signature_hash check) → Update document_signers record → Emit: document:signed to port + interest rooms → Enqueue notification: notify next signer (developer) + interest owner → Enqueue email: send signing link to developer Developer signs → same flow → notify sales/approver All sign (DOCUMENT_COMPLETED) → Enqueue documents job: download signed PDF from Documenso → Store in MinIO under client's EOI folder → Update document status = completed, interest eoi_status = signed → Emit: document:completed, interest:stageChanged → Enqueue email job: send signed PDF to all 3 parties → Enqueue notification: EOI completed ``` ### 4.3 Berth Link with Auto-Status ``` Salesperson links berth to interest → POST /api/interests/:id/berth { berthId } → In transaction: → Create interest-berth link → If berth.status = 'available': update to 'under_offer' (BR-001) → Emit: interest:berthLinked → If status changed: emit berth:statusChanged (updates website map in real-time) → Audit log: interest updated + berth status changed ``` --- ## 5. Redis Usage Summary | Purpose | Redis Feature | Details | | ------------------- | ------------------- | ------------------------------------------------------------------- | | BullMQ job queues | Lists + Sorted Sets | 10 queues, recurring + one-off jobs | | Socket.io adapter | Pub/Sub | Cross-instance message relay | | Session cache | Key-Value | Better Auth session acceleration | | Rate limiting | Sorted Sets | Per-IP and per-user request counting | | Application cache | Key-Value with TTL | Exchange rates (6h), dashboard aggregates (5m), search results (1m) | | Deduplication locks | Key-Value with TTL | Webhook dedup, notification cooldowns |