Files
pn-new-crm/src/lib/socket/server.ts
Matt 67d7e6e3d5
Some checks failed
Build & Push Docker Images / build-and-push (push) Has been cancelled
Build & Push Docker Images / deploy (push) Has been cancelled
Build & Push Docker Images / lint (push) Has been cancelled
Initial commit: Port Nimara CRM (Layers 0-4)
Full CRM rebuild with Next.js 15, TypeScript, Tailwind, Drizzle ORM,
PostgreSQL, Redis, BullMQ, MinIO, and Socket.io. Includes 461 source
files covering clients, berths, interests/pipeline, documents/EOI,
expenses/invoices, email, notifications, dashboard, admin, and
client portal. CI/CD via Gitea Actions with Docker builds.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 11:52:51 +01:00

104 lines
3.2 KiB
TypeScript

import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import type { Server as HTTPServer } from 'node:http';
import { redis } from '@/lib/redis';
import { auth } from '@/lib/auth';
import { logger } from '@/lib/logger';
import type { ServerToClientEvents, ClientToServerEvents } from './events';
let io: Server<ClientToServerEvents, ServerToClientEvents> | null = null;
export function initSocketServer(httpServer: HTTPServer): Server<ClientToServerEvents, ServerToClientEvents> {
const pubClient = redis.duplicate();
const subClient = redis.duplicate();
io = new Server<ClientToServerEvents, ServerToClientEvents>(httpServer, {
path: '/socket.io/',
adapter: createAdapter(pubClient, subClient),
cors: {
origin: process.env.APP_URL,
credentials: true,
},
connectionStateRecovery: { maxDisconnectionDuration: 2 * 60 * 1000 },
maxHttpBufferSize: 1e6, // 1MB message limit
});
// Auth middleware — validate session cookie via Better Auth
io.use(async (socket, next) => {
try {
const cookie = socket.handshake.headers.cookie;
if (!cookie) return next(new Error('Authentication required'));
// Parse session from cookie
const session = await auth.api.getSession({
headers: new Headers({ cookie }),
});
if (!session?.user) return next(new Error('Invalid session'));
// Enforce max 10 connections per user
const userSockets = await io!.in(`user:${session.user.id}`).fetchSockets();
if (userSockets.length >= 10) {
return next(new Error('Maximum connections reached'));
}
socket.data = {
userId: session.user.id,
portId: socket.handshake.auth.portId as string | undefined,
};
next();
} catch {
next(new Error('Authentication failed'));
}
});
// Connection handler
io.on('connection', (socket) => {
const { userId, portId } = socket.data as { userId: string; portId: string | undefined };
logger.debug({ userId, portId }, 'Socket connected');
// Auto-join personal and port rooms
socket.join(`user:${userId}`);
if (portId) socket.join(`port:${portId}`);
// Entity-level room management
socket.on('join:entity', ({ type, id }) => {
socket.join(`${type}:${id}`);
});
socket.on('leave:entity', ({ type, id }) => {
socket.leave(`${type}:${id}`);
});
// Idle timeout (30 seconds — for development only, would be longer in prod)
let idleTimer = setTimeout(() => socket.disconnect(), 30_000);
socket.onAny(() => {
clearTimeout(idleTimer);
idleTimer = setTimeout(() => socket.disconnect(), 30_000);
});
socket.on('disconnect', () => {
clearTimeout(idleTimer);
logger.debug({ userId }, 'Socket disconnected');
});
});
return io;
}
export function getIO(): Server<ClientToServerEvents, ServerToClientEvents> {
if (!io) throw new Error('Socket.io not initialized');
return io;
}
/**
* Emit an event to a specific room. Used by service layer after mutations.
*/
export function emitToRoom<E extends keyof ServerToClientEvents>(
room: string,
event: E,
...args: Parameters<ServerToClientEvents[E]>
): void {
if (!io) return;
io.to(room).emit(event, ...args);
}