Files
pn-new-crm/11-REALTIME-AND-BACKGROUND-JOBS.md
Matt 67d7e6e3d5
Some checks failed
Build & Push Docker Images / build-and-push (push) Has been cancelled
Build & Push Docker Images / deploy (push) Has been cancelled
Build & Push Docker Images / lint (push) Has been cancelled
Initial commit: Port Nimara CRM (Layers 0-4)
Full CRM rebuild with Next.js 15, TypeScript, Tailwind, Drizzle ORM,
PostgreSQL, Redis, BullMQ, MinIO, and Socket.io. Includes 461 source
files covering clients, berths, interests/pipeline, documents/EOI,
expenses/invoices, email, notifications, dashboard, admin, and
client portal. CI/CD via Gitea Actions with Docker builds.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 11:52:51 +01:00

23 KiB

Port Nimara CRM — Real-Time Events & Background Jobs

Compiled: 2026-03-11 Real-Time: Socket.io (WebSocket alongside Next.js) Job Queue: BullMQ + Redis Transport: Redis adapter for Socket.io (multi-instance ready)


1. Socket.io Architecture

1.1 Connection

Client connects on app load:
  → Authenticate via session cookie (same as HTTP auth)
  → Join room: port:{portId} (scoped to current port)
  → Join room: user:{userId} (for personal notifications)
  → If super admin with cross-port view: join room: global

1.2 Room Structure

Room Members Purpose
port:{portId} All users at that port Port-scoped data updates
user:{userId} Single user (all their tabs/devices) Personal notifications, reminder alerts, calendar sync updates
global Super admin(s) in cross-port mode System alerts, cross-port updates
berth:{berthId} Users viewing a specific berth detail Granular berth updates
client:{clientId} Users viewing a specific client detail Granular client updates
interest:{interestId} Users viewing a specific interest detail Granular interest updates

2. Real-Time Event Catalog

2.1 Berth Events

Event Room Payload Trigger
berth:statusChanged port:{portId} { berthId, oldStatus, newStatus, triggeredBy } Berth status updated (manual or auto)
berth:updated port:{portId}, berth:{berthId} { berthId, changedFields } Any berth field updated
berth:waitingListChanged berth:{berthId} { berthId, action, entry } Waiting list entry added/removed/reordered
berth:maintenanceAdded berth:{berthId} { berthId, logEntry } New maintenance log entry

2.2 Client Events

Event Room Payload Trigger
client:created port:{portId} { clientId, clientName, source } New client created
client:updated port:{portId}, client:{clientId} { clientId, changedFields } Client fields updated
client:archived port:{portId} { clientId } Client archived
client:restored port:{portId} { clientId } Client restored from archive
client:merged port:{portId} { survivingId, mergedId } Clients merged
client:noteAdded client:{clientId} { clientId, noteId, authorName, preview } New note added
client:duplicateDetected port:{portId} { clientAId, clientBId, score, reason } Duplicate alert created

2.3 Interest Events

Event Room Payload Trigger
interest:created port:{portId} { interestId, clientId, berthId, source } New interest created
interest:updated port:{portId}, interest:{interestId} { interestId, changedFields } Interest fields updated
interest:stageChanged port:{portId} { interestId, oldStage, newStage, clientName, berthNumber } Pipeline stage changed
interest:berthLinked port:{portId} { interestId, berthId } Berth linked to interest
interest:berthUnlinked port:{portId} { interestId, berthId } Berth unlinked from interest
interest:archived port:{portId} { interestId } Interest archived
interest:noteAdded interest:{interestId} { interestId, noteId, authorName, preview } Note added to interest

2.4 Document Events

Event Room Payload Trigger
document:created port:{portId} { documentId, type, interestId } Document record created
document:sent port:{portId} { documentId, type, signerCount } Document sent for signing
document:signed port:{portId}, interest:{interestId} { documentId, signerName, signerRole, remainingSigners } Individual signer completed
document:completed port:{portId}, interest:{interestId} { documentId, type, interestId, clientName } All signers completed
document:expired port:{portId} { documentId } Document expired
document:reminderSent interest:{interestId} { documentId, recipientEmail } Signing reminder sent

2.5 Financial Events

Event Room Payload Trigger
expense:created port:{portId} { expenseId, amount, currency, category } Expense created
expense:updated port:{portId} { expenseId, changedFields } Expense updated
invoice:created port:{portId} { invoiceId, invoiceNumber, total, clientName } Invoice created
invoice:sent port:{portId} { invoiceId, invoiceNumber, recipientEmail } Invoice emailed
invoice:paid port:{portId} { invoiceId, invoiceNumber, amount } Payment recorded
invoice:overdue port:{portId} { invoiceId, invoiceNumber, daysPastDue } Invoice became overdue

2.6 Reminder & Calendar Events

Event Room Payload Trigger
reminder:created user:{assigneeId}, port:{portId} { reminderId, title, assignedTo, dueAt } Reminder created
reminder:updated user:{assigneeId} { reminderId, changedFields } Reminder updated
reminder:completed user:{assigneeId}, port:{portId} { reminderId, title, completedBy } Reminder completed
reminder:overdue user:{assigneeId} { reminderId, title, dueAt } Reminder became overdue
reminder:snoozed user:{assigneeId} { reminderId, snoozedUntil } Reminder snoozed
calendar:synced user:{userId} { eventCount, lastSyncAt } Google Calendar sync completed
calendar:disconnected user:{userId} { reason } Google Calendar token revoked/expired

2.7 Notification Events

Event Room Payload Trigger
notification:new user:{userId} { notificationId, type, title, description, link } New notification created
notification:unreadCount user:{userId} { count } Unread count changed

2.8 System Events

Event Room Payload Trigger
system:alert global { alertType, message, severity } System alert triggered
system:jobFailed global { queueName, jobId, error } Background job failure
registration:new port:{portId} { clientId, interestId, clientName, berthNumber } New website registration

2.9 Recommendation & Misc Events

Event Room Payload Trigger
interest:recommendationsGenerated interest:{interestId}, port:{portId} { interestId, count, topBerthId } AI berth recommendations generated
interest:recommendationAdded interest:{interestId} { interestId, berthId, source, matchScore } Manual recommendation added
interest:leadCategoryChanged port:{portId}, interest:{interestId} { interestId, oldCategory, newCategory, auto } Lead category changed (including auto-promotion)
file:uploaded client:{clientId}, port:{portId} { fileId, filename, clientId, category } File uploaded
file:deleted client:{clientId} { fileId, filename } File deleted

3. Background Jobs (BullMQ)

3.1 Queue Structure

Queue Concurrency Description
email 5 All email sending (transactional + user)
documents 3 Documenso API interactions (create, poll, download)
notifications 10 In-app notification creation and email notification sending
import 1 Data import jobs (CSV, Excel, berth specs)
export 2 Data export jobs (CSV, PDF, parent company report)
reports 1 Scheduled report generation
webhooks 5 Outbound webhook delivery
maintenance 1 System maintenance (backups, cleanup, rate refresh)
ai 2 AI-assisted tasks (receipt scanning, berth spec parsing, file migration)
bulk 2 Bulk operations (status change, tag, delete)

3.2 Recurring Jobs

Job Queue Schedule Description
signature-poll documents Every 6 hours Rare fallback poll of Documenso for interests with pending signatures. Primary mechanism is Documenso webhooks (instant). This job is a safety net only — catches any edge case where a webhook was lost.
reminder-check notifications Every hour Check interests with reminder_enabled for inactivity → create follow-up reminders
reminder-overdue-check notifications Every 15 minutes Check for reminders past due_at → create overdue notifications
calendar-sync maintenance Every 30 minutes Background poll for all connected Google Calendar users: fetch upcoming events (14 days), upsert into google_calendar_cache, detect moved/deleted CRM-pushed events. Note: additional event-driven syncs fire on user login and on navigation to calendar-displaying pages (if last sync > 5 min ago)
invoice-overdue-check notifications Daily at 08:00 Check for overdue invoices → update status, create notifications
tenure-expiry-check notifications Daily at 08:00 Check for berth tenure approaching expiry → create notifications
currency-refresh maintenance Every 6 hours Refresh exchange rates from Frankfurter API
database-backup maintenance Daily at 02:00 pg_dump → store in MinIO
backup-cleanup maintenance Weekly (Sunday 03:00) Delete backups older than retention period (default: 30 days, configurable in system_settings)
session-cleanup maintenance Daily at 04:00 Remove expired Better Auth sessions
report-scheduler reports Every minute Check scheduled_reports for reports due to run → enqueue generation
notification-digest email Configurable per user Send batched email digest of unread notifications
temp-file-cleanup maintenance Daily at 05:00 Clean up temporary files (report PDFs > 7 days, orphaned uploads)
form-expiry-check maintenance Hourly Mark expired form submissions as expired (BR-130)

3.3 Job Retry & Error Handling

Default retry configuration:

  • Attempts: 3
  • Backoff: Exponential (1s, 10s, 100s)
  • Dead letter: After all retries exhausted, move to dead letter queue
  • Alert: Dead letter creates system alert notification for super admin

Per-queue overrides:

Queue Max Attempts Notes
email 5 Email delivery can have transient failures
webhooks 3 Standard exponential backoff
documents 5 Documenso API can be temporarily unavailable
import 1 Imports are idempotent — user retries manually
maintenance 3 Backup failures alert immediately

3.4 Job Priority

BullMQ supports job priority (lower number = higher priority):

Priority Jobs
1 (highest) Password reset emails, system alerts
2 Signature webhooks, EOI reminders, reminder overdue checks
3 (default) Standard emails, notifications, webhook deliveries, calendar sync
4 Export jobs, report generation
5 (lowest) Data imports, AI tasks, backup cleanup

4. Event Flow Examples

4.1 Website Interest Registration

Client submits form on website
  → POST /api/public/interests
  → Create client (or find existing via email)
  → Create interest record
  → Run duplicate detection (BR-030, BR-031)
  → Emit Socket.io: registration:new to port:{portId}
  → Enqueue notification job: notify salesperson(s)
  → If duplicate detected: enqueue notification job: duplicate alert
  → Return 201 to website

4.2 EOI Signing Flow

Salesperson generates EOI
  → POST /api/documents/generate-eoi
  → Generate PDF via @pdfme
  → Enqueue documents job: create Documenso document + assign signers
  → Update interest: eoi_status = waiting_for_signatures
  → Emit: document:sent, interest:stageChanged
  → Enqueue email job: send signing link to first signer (client)

Client signs
  → Documenso webhook: DOCUMENT_SIGNED
  → POST /api/webhooks/documenso
  → Deduplicate (signature_hash check)
  → Update document_signers record
  → Emit: document:signed to port + interest rooms
  → Enqueue notification: notify next signer (developer) + interest owner
  → Enqueue email: send signing link to developer

Developer signs → same flow → notify sales/approver

All sign (DOCUMENT_COMPLETED)
  → Enqueue documents job: download signed PDF from Documenso
  → Store in MinIO under client's EOI folder
  → Update document status = completed, interest eoi_status = signed
  → Emit: document:completed, interest:stageChanged
  → Enqueue email job: send signed PDF to all 3 parties
  → Enqueue notification: EOI completed
Salesperson links berth to interest
  → POST /api/interests/:id/berth { berthId }
  → In transaction:
    → Create interest-berth link
    → If berth.status = 'available': update to 'under_offer' (BR-001)
  → Emit: interest:berthLinked
  → If status changed: emit berth:statusChanged (updates website map in real-time)
  → Audit log: interest updated + berth status changed

5. Redis Usage Summary

Purpose Redis Feature Details
BullMQ job queues Lists + Sorted Sets 10 queues, recurring + one-off jobs
Socket.io adapter Pub/Sub Cross-instance message relay
Session cache Key-Value Better Auth session acceleration
Rate limiting Sorted Sets Per-IP and per-user request counting
Application cache Key-Value with TTL Exchange rates (6h), dashboard aggregates (5m), search results (1m)
Deduplication locks Key-Value with TTL Webhook dedup, notification cooldowns