From 64c35b70f894ac4cbbbda9822b4a929b820e979a Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 12 Jun 2025 15:53:12 +0200 Subject: [PATCH] Improve email session management and add IMAP connection pooling - Switch from localStorage to sessionStorage for email sessions - Add session validation on component mount - Implement IMAP connection pool with folder search capabilities - Add operation locking utility for concurrent request handling - Refactor EOI section component structure - Update API endpoints for better email thread management --- components/ClientEmailSection.vue | 43 +- components/EOISection.vue | 799 +--------------------- components/InterestDetailsModal.vue | 41 +- components/PhoneInput.vue | 22 +- components/SliderConfirmation.vue | 7 +- server/api/email/fetch-thread.ts | 261 ++++++- server/api/get-berths.ts | 32 + server/api/link-berths-to-interest.ts | 44 +- server/api/unlink-berths-from-interest.ts | 44 +- server/utils/imap-pool.ts | 227 ++++++ server/utils/operation-lock.ts | 164 +++++ 11 files changed, 798 insertions(+), 886 deletions(-) create mode 100644 server/utils/imap-pool.ts create mode 100644 server/utils/operation-lock.ts diff --git a/components/ClientEmailSection.vue b/components/ClientEmailSection.vue index 7a616f8..7ccdd03 100644 --- a/components/ClientEmailSection.vue +++ b/components/ClientEmailSection.vue @@ -32,8 +32,9 @@ :loading="isRefreshing" size="small" color="grey-darken-4" + icon="mdi-refresh" + class="ml-4" > - mdi-refresh Refresh Emails @@ -625,21 +626,47 @@ const allAttachments = computed(() => { return [...uploaded, ...browser]; }); -// Check for stored session on mount -onMounted(() => { - const storedSession = localStorage.getItem('emailSessionId'); +// Check for stored session on mount and validate it +onMounted(async () => { + // Use sessionStorage instead of localStorage for email sessions to require re-auth on page reload + const storedSession = sessionStorage.getItem('emailSessionId'); + if (storedSession) { - sessionId.value = storedSession; - hasEmailCredentials.value = true; + // Validate the session is still active + try { + const testResponse = await $fetch('/api/email/test-connection', { + method: 'POST', + headers: { + 'x-tag': '094ut234' + }, + body: { + sessionId: storedSession + } + }); + + if (testResponse.success) { + sessionId.value = storedSession; + hasEmailCredentials.value = true; + } else { + // Session invalid, clear it + sessionStorage.removeItem('emailSessionId'); + hasEmailCredentials.value = false; + } + } catch (error) { + // Connection test failed, clear session + console.log('[ClientEmailSection] Email session validation failed, clearing session'); + sessionStorage.removeItem('emailSessionId'); + hasEmailCredentials.value = false; + } } - // Load signature config + // Load signature config (this can persist in localStorage) const storedSignature = localStorage.getItem('emailSignature'); if (storedSignature) { signatureConfig.value = JSON.parse(storedSignature); } - // Load email thread immediately if credentials are available + // Load email thread immediately if credentials are available and valid if (hasEmailCredentials.value) { loadEmailThread(); } diff --git a/components/EOISection.vue b/components/EOISection.vue index 81abba8..53d9eda 100644 --- a/components/EOISection.vue +++ b/components/EOISection.vue @@ -1,784 +1,10 @@ - - - diff --git a/components/InterestDetailsModal.vue b/components/InterestDetailsModal.vue index ff5297a..2155b74 100644 --- a/components/InterestDetailsModal.vue +++ b/components/InterestDetailsModal.vue @@ -407,46 +407,7 @@ prepend-inner-icon="mdi-tag" > - - - - - - - + localNumber.value); // Parse initial value watch(() => props.modelValue, (newValue) => { if (newValue) { - // Find matching country code - const country = countries.find(c => newValue.startsWith(c.dialCode)); - if (country) { - selectedCountry.value = country; - localNumber.value = newValue.slice(country.dialCode.length); + // Find matching country code, but prefer current selection if it matches + const matchingCountries = countries.filter(c => newValue.startsWith(c.dialCode)); + if (matchingCountries.length > 0) { + // If current country is in the matching list, keep it + const currentMatches = matchingCountries.find(c => c.iso2 === selectedCountry.value.iso2); + if (currentMatches) { + // Keep current country + localNumber.value = newValue.slice(selectedCountry.value.dialCode.length); + } else { + // Prefer default country if it matches, otherwise use the first match + const defaultCountry = matchingCountries.find(c => c.iso2 === props.defaultCountry); + const selectedNewCountry = defaultCountry || matchingCountries[0]; + selectedCountry.value = selectedNewCountry; + localNumber.value = newValue.slice(selectedNewCountry.dialCode.length); + } } else { // Default to current country if no match localNumber.value = newValue; diff --git a/components/SliderConfirmation.vue b/components/SliderConfirmation.vue index db844f5..138daa9 100644 --- a/components/SliderConfirmation.vue +++ b/components/SliderConfirmation.vue @@ -142,7 +142,10 @@ const startSliding = (event: MouseEvent | TouchEvent) => { const rect = sliderTrack.value.getBoundingClientRect(); const clientX = 'touches' in moveEvent ? moveEvent.touches[0].clientX : moveEvent.clientX; - const progress = Math.max(0, Math.min(100, ((clientX - rect.left) / rect.width) * 100)); + const handleWidth = 50; // Handle width in pixels + const availableWidth = rect.width - handleWidth; + const relativeX = clientX - rect.left; + const progress = Math.max(0, Math.min(100, (relativeX / availableWidth) * 100)); sliderProgress.value = progress; }; @@ -226,7 +229,7 @@ watch(showDialog, (newValue) => { display: flex; align-items: center; justify-content: center; - transform: translateX(-50%); + transform: translateX(0); transition: none; will-change: transform, left; z-index: 1; diff --git a/server/api/email/fetch-thread.ts b/server/api/email/fetch-thread.ts index af97bda..3f8aa9c 100644 --- a/server/api/email/fetch-thread.ts +++ b/server/api/email/fetch-thread.ts @@ -2,6 +2,7 @@ import Imap from 'imap'; import { simpleParser } from 'mailparser'; import { getCredentialsFromSession, decryptCredentials } from '~/server/utils/encryption'; import { listFiles, getFileStats, getMinioClient, uploadFile } from '~/server/utils/minio'; +import { getIMAPPool } from '~/server/utils/imap-pool'; interface EmailMessage { id: string; @@ -13,6 +14,7 @@ interface EmailMessage { timestamp: string; direction: 'sent' | 'received'; threadId?: string; + attachments?: any[]; } export default defineEventHandler(async (event) => { @@ -144,7 +146,7 @@ export default defineEventHandler(async (event) => { try { imapEmails = await Promise.race([ - fetchImapEmails(imapConfig, userEmail, clientEmail, limit, interestId), + fetchImapEmailsWithPool(sessionId, userEmail, clientEmail, limit, interestId), timeoutPromise ]); } catch (imapError) { @@ -187,7 +189,217 @@ export default defineEventHandler(async (event) => { } }); -// Separate function for IMAP fetching with proper cleanup +// Function for IMAP fetching using connection pool +async function fetchImapEmailsWithPool( + sessionId: string, + userEmail: string, + clientEmail: string, + limit: number, + interestId?: string +): Promise { + const pool = getIMAPPool(); + const imap = await pool.getConnection(sessionId); + + return new Promise((resolve, reject) => { + const allEmails: EmailMessage[] = []; + + // Search in both INBOX and Sent folders + const foldersToSearch = ['INBOX', 'Sent', 'Sent Items', 'Sent Mail']; + let currentFolderIndex = 0; + + const searchNextFolder = () => { + if (currentFolderIndex >= foldersToSearch.length) { + resolve(allEmails); + return; + } + + const folderName = foldersToSearch[currentFolderIndex]; + currentFolderIndex++; + + imap.openBox(folderName, true, (err: any, box: any) => { + if (err) { + console.log(`[IMAPPool] Folder ${folderName} not found, trying next...`); + searchNextFolder(); + return; + } + + console.log(`[IMAPPool] Searching in folder: ${folderName}`); + + if (!clientEmail || clientEmail.trim() === '') { + console.log('[IMAPPool] No client email provided, skipping search'); + searchNextFolder(); + return; + } + + imap.search(['ALL'], (err: any, results: number[]) => { + if (err) { + console.error(`[IMAPPool] Search error in ${folderName}:`, err); + searchNextFolder(); + return; + } + + if (!results || results.length === 0) { + console.log(`[IMAPPool] No emails found in ${folderName}`); + searchNextFolder(); + return; + } + + console.log(`[IMAPPool] Found ${results.length} emails in ${folderName}`); + const messagesToFetch = results.slice(-limit); + let messagesProcessed = 0; + + const fetch = imap.fetch(messagesToFetch, { + bodies: '', + struct: true, + envelope: true + }); + + fetch.on('message', (msg: any, seqno: number) => { + msg.on('body', (stream: any, info: any) => { + simpleParser(stream as any, async (err: any, parsed: any) => { + if (err) { + console.error('[IMAPPool] Parse error:', err); + messagesProcessed++; + if (messagesProcessed === messagesToFetch.length) { + searchNextFolder(); + } + return; + } + + // Check if this email involves the client + const fromEmail = parsed.from?.text || ''; + const toEmails = Array.isArray(parsed.to) + ? parsed.to.map((addr: any) => addr.text).join(', ') + : parsed.to?.text || ''; + const ccEmails = Array.isArray(parsed.cc) + ? parsed.cc.map((addr: any) => addr.text).join(', ') + : parsed.cc?.text || ''; + + // Filter to only include emails to/from the client + const involvesClient = + fromEmail.toLowerCase().includes(clientEmail.toLowerCase()) || + toEmails.toLowerCase().includes(clientEmail.toLowerCase()) || + ccEmails.toLowerCase().includes(clientEmail.toLowerCase()); + + if (!involvesClient) { + messagesProcessed++; + if (messagesProcessed === messagesToFetch.length) { + searchNextFolder(); + } + return; + } + + // Process attachments + const attachments: any[] = []; + if (parsed.attachments && parsed.attachments.length > 0) { + for (const attachment of parsed.attachments) { + try { + // Save attachment to MinIO + const attachmentId = `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; + const fileName = attachment.filename || `attachment-${attachmentId}`; + const sanitizedFileName = fileName.replace(/[^a-zA-Z0-9.-]/g, '_'); + const objectName = `interest-${interestId}/attachments/${attachmentId}-${sanitizedFileName}`; + + const client = getMinioClient(); + await client.putObject('client-emails', objectName, attachment.content, attachment.content?.length || 0, { + 'Content-Type': attachment.contentType || 'application/octet-stream', + 'Content-Disposition': `attachment; filename="${sanitizedFileName}"` + }); + + attachments.push({ + id: attachmentId, + filename: sanitizedFileName, + originalName: attachment.filename, + contentType: attachment.contentType, + size: attachment.content?.length || attachment.size || 0, + path: objectName, + bucket: 'client-emails' + }); + + console.log(`[IMAPPool] Saved attachment: ${sanitizedFileName} (${attachment.content?.length || 0} bytes)`); + } catch (attachmentError) { + console.error('[IMAPPool] Failed to save attachment:', attachment.filename, attachmentError); + // Still include attachment info even if save failed + attachments.push({ + filename: attachment.filename || 'Unknown', + originalName: attachment.filename, + contentType: attachment.contentType, + size: attachment.content?.length || attachment.size || 0, + error: 'Failed to save' + }); + } + } + } + + const email: EmailMessage = { + id: parsed.messageId || `${Date.now()}-${seqno}`, + from: fromEmail, + to: toEmails, + subject: parsed.subject || '', + body: parsed.text || '', + html: parsed.html || undefined, + timestamp: parsed.date?.toISOString() || new Date().toISOString(), + direction: fromEmail.toLowerCase().includes(userEmail.toLowerCase()) ? 'sent' : 'received', + attachments: attachments.length > 0 ? attachments : undefined + }; + + if (parsed.headers.has('in-reply-to')) { + email.threadId = parsed.headers.get('in-reply-to') as string; + } + + allEmails.push(email); + + // Cache this email if we have an interestId + if (interestId && involvesClient) { + try { + const emailData = { + ...email, + interestId: interestId + }; + + const objectName = `interest-${interestId}/${Date.now()}-${email.direction}.json`; + const buffer = Buffer.from(JSON.stringify(emailData, null, 2)); + + // Upload to the client-emails bucket + const client = getMinioClient(); + client.putObject('client-emails', objectName, buffer, buffer.length, { + 'Content-Type': 'application/json', + }).catch(err => { + console.error('[IMAPPool] Failed to cache email:', err); + }); + } catch (cacheError) { + console.error('[IMAPPool] Failed to cache email:', cacheError); + } + } + + messagesProcessed++; + + if (messagesProcessed === messagesToFetch.length) { + searchNextFolder(); + } + }); + }); + }); + + fetch.once('error', (err: any) => { + console.error('[IMAPPool] Fetch error:', err); + searchNextFolder(); + }); + + fetch.once('end', () => { + if (messagesProcessed === 0) { + searchNextFolder(); + } + }); + }); + }); + }; + + searchNextFolder(); + }); +} + +// Separate function for IMAP fetching with proper cleanup (legacy) async function fetchImapEmails( imapConfig: any, userEmail: string, @@ -304,6 +516,48 @@ async function fetchImapEmails( return; } + // Process attachments + const attachments: any[] = []; + if (parsed.attachments && parsed.attachments.length > 0) { + for (const attachment of parsed.attachments) { + try { + // Save attachment to MinIO + const attachmentId = `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; + const fileName = attachment.filename || `attachment-${attachmentId}`; + const sanitizedFileName = fileName.replace(/[^a-zA-Z0-9.-]/g, '_'); + const objectName = `interest-${interestId}/attachments/${attachmentId}-${sanitizedFileName}`; + + const client = getMinioClient(); + await client.putObject('client-emails', objectName, attachment.content, attachment.content?.length || 0, { + 'Content-Type': attachment.contentType || 'application/octet-stream', + 'Content-Disposition': `attachment; filename="${sanitizedFileName}"` + }); + + attachments.push({ + id: attachmentId, + filename: sanitizedFileName, + originalName: attachment.filename, + contentType: attachment.contentType, + size: attachment.content?.length || attachment.size || 0, + path: objectName, + bucket: 'client-emails' + }); + + console.log(`Saved attachment: ${sanitizedFileName} (${attachment.content?.length || 0} bytes)`); + } catch (attachmentError) { + console.error('Failed to save attachment:', attachment.filename, attachmentError); + // Still include attachment info even if save failed + attachments.push({ + filename: attachment.filename || 'Unknown', + originalName: attachment.filename, + contentType: attachment.contentType, + size: attachment.content?.length || attachment.size || 0, + error: 'Failed to save' + }); + } + } + } + const email: EmailMessage = { id: parsed.messageId || `${Date.now()}-${seqno}`, from: fromEmail, @@ -312,7 +566,8 @@ async function fetchImapEmails( body: parsed.text || '', html: parsed.html || undefined, timestamp: parsed.date?.toISOString() || new Date().toISOString(), - direction: fromEmail.toLowerCase().includes(userEmail.toLowerCase()) ? 'sent' : 'received' + direction: fromEmail.toLowerCase().includes(userEmail.toLowerCase()) ? 'sent' : 'received', + attachments: attachments.length > 0 ? attachments : undefined }; if (parsed.headers.has('in-reply-to')) { diff --git a/server/api/get-berths.ts b/server/api/get-berths.ts index bcbd23c..3b3cd50 100644 --- a/server/api/get-berths.ts +++ b/server/api/get-berths.ts @@ -22,6 +22,38 @@ export default defineEventHandler(async (event) => { }); console.log('[get-berths] Successfully fetched berths, count:', berths.list?.length || 0); + + // Sort berths by letter zone and then by number + if (berths.list && Array.isArray(berths.list)) { + berths.list.sort((a, b) => { + const berthA = a['Berth Number'] || ''; + const berthB = b['Berth Number'] || ''; + + // Extract letter and number parts + const matchA = berthA.match(/^([A-Za-z]+)(\d+)$/); + const matchB = berthB.match(/^([A-Za-z]+)(\d+)$/); + + if (matchA && matchB) { + const [, letterA, numberA] = matchA; + const [, letterB, numberB] = matchB; + + // First sort by letter zone + const letterCompare = letterA.localeCompare(letterB); + if (letterCompare !== 0) { + return letterCompare; + } + + // Then sort by number within the same letter zone + return parseInt(numberA) - parseInt(numberB); + } + + // Fallback to string comparison if pattern doesn't match + return berthA.localeCompare(berthB); + }); + + console.log('[get-berths] Berths sorted by zone and number'); + } + return berths; } catch (error) { console.error('[get-berths] Error occurred:', error); diff --git a/server/api/link-berths-to-interest.ts b/server/api/link-berths-to-interest.ts index 287ac55..a082a4c 100644 --- a/server/api/link-berths-to-interest.ts +++ b/server/api/link-berths-to-interest.ts @@ -1,3 +1,5 @@ +import { withBerthQueue } from '~/server/utils/operation-lock'; + export default defineEventHandler(async (event) => { const xTagHeader = getRequestHeader(event, "x-tag"); console.log('[link-berths] Request received with x-tag:', xTagHeader); @@ -19,27 +21,31 @@ export default defineEventHandler(async (event) => { }); } - const config = getNocoDbConfiguration(); - const interestsTableId = "mbs9hjauug4eseo"; - const berthsLinkFieldId = "cj7v7bb9pa5eyo3"; // Berths field - - // Format the berth IDs for the API - const berthRecords = berthIds.map(id => ({ Id: id })); - console.log('[link-berths] Berth records to link:', berthRecords); - - const url = `${config.url}/api/v2/tables/${interestsTableId}/links/${berthsLinkFieldId}/records/${interestId}`; - console.log('[link-berths] URL:', url); - - const result = await $fetch(url, { - method: 'POST', - headers: { - "xc-token": config.token, - }, - body: berthRecords, + // Use queuing system to handle rapid berth selections gracefully + return await withBerthQueue(interestId, async () => { + const config = getNocoDbConfiguration(); + const interestsTableId = "mbs9hjauug4eseo"; + const berthsLinkFieldId = "cj7v7bb9pa5eyo3"; // Berths field + + // Format the berth IDs for the API + const berthRecords = berthIds.map(id => ({ Id: id })); + console.log('[link-berths] Berth records to link:', berthRecords); + + const url = `${config.url}/api/v2/tables/${interestsTableId}/links/${berthsLinkFieldId}/records/${interestId}`; + console.log('[link-berths] URL:', url); + + const result = await $fetch(url, { + method: 'POST', + headers: { + "xc-token": config.token, + }, + body: berthRecords, + }); + + console.log('[link-berths] Successfully linked berths to interest:', interestId); + return result; }); - console.log('[link-berths] Successfully linked berths to interest:', interestId); - return result; } catch (error) { console.error('[link-berths] Error occurred:', error); console.error('[link-berths] Error details:', error instanceof Error ? error.message : 'Unknown error'); diff --git a/server/api/unlink-berths-from-interest.ts b/server/api/unlink-berths-from-interest.ts index a8db8eb..22bfa13 100644 --- a/server/api/unlink-berths-from-interest.ts +++ b/server/api/unlink-berths-from-interest.ts @@ -1,3 +1,5 @@ +import { withBerthQueue } from '~/server/utils/operation-lock'; + export default defineEventHandler(async (event) => { const xTagHeader = getRequestHeader(event, "x-tag"); console.log('[unlink-berths] Request received with x-tag:', xTagHeader); @@ -17,23 +19,27 @@ export default defineEventHandler(async (event) => { }); } - const config = getNocoDbConfiguration(); - const interestsTableId = "mbs9hjauug4eseo"; - const berthsLinkFieldId = "cj7v7bb9pa5eyo3"; // Berths field - - // Format the berth IDs for the API - const berthRecords = berthIds.map(id => ({ Id: id })); - - const result = await $fetch( - `${config.url}/api/v2/tables/${interestsTableId}/links/${berthsLinkFieldId}/records/${interestId}`, - { - method: 'DELETE', - headers: { - "xc-token": config.token, - }, - body: berthRecords, - } - ); - - return result; + // Use queuing system to handle rapid berth operations gracefully + return await withBerthQueue(interestId, async () => { + const config = getNocoDbConfiguration(); + const interestsTableId = "mbs9hjauug4eseo"; + const berthsLinkFieldId = "cj7v7bb9pa5eyo3"; // Berths field + + // Format the berth IDs for the API + const berthRecords = berthIds.map(id => ({ Id: id })); + + const result = await $fetch( + `${config.url}/api/v2/tables/${interestsTableId}/links/${berthsLinkFieldId}/records/${interestId}`, + { + method: 'DELETE', + headers: { + "xc-token": config.token, + }, + body: berthRecords, + } + ); + + console.log('[unlink-berths] Successfully unlinked berths from interest:', interestId); + return result; + }); }); diff --git a/server/utils/imap-pool.ts b/server/utils/imap-pool.ts new file mode 100644 index 0000000..3503d59 --- /dev/null +++ b/server/utils/imap-pool.ts @@ -0,0 +1,227 @@ +import Imap from 'imap'; +import { getCredentialsFromSession, decryptCredentials } from './encryption'; + +interface ConnectionPoolItem { + connection: any; + sessionId: string; + lastUsed: number; + isConnected: boolean; +} + +class IMAPConnectionPool { + private connections: Map = new Map(); + private maxConnections = 5; + private connectionTimeout = 300000; // 5 minutes + private reconnectAttempts = 3; + private reconnectDelay = 1000; // 1 second + + private cleanupInterval: NodeJS.Timeout; + + constructor() { + // Cleanup expired connections every minute + this.cleanupInterval = setInterval(() => { + this.cleanupExpiredConnections(); + }, 60000); + } + + async getConnection(sessionId: string): Promise { + // Check if we have an existing connection + const existing = this.connections.get(sessionId); + if (existing && existing.isConnected) { + // Test connection health + if (await this.testConnection(existing.connection)) { + existing.lastUsed = Date.now(); + return existing.connection; + } else { + // Connection is dead, remove it + this.removeConnection(sessionId); + } + } + + // Create new connection + return await this.createConnection(sessionId); + } + + private async createConnection(sessionId: string): Promise { + const encryptedCredentials = getCredentialsFromSession(sessionId); + if (!encryptedCredentials) { + throw new Error('No credentials found for session'); + } + + let credentials; + try { + credentials = decryptCredentials(encryptedCredentials); + } catch (error) { + throw new Error('Failed to decrypt credentials'); + } + + const imapConfig = { + user: credentials.email, + password: credentials.password, + host: process.env.NUXT_EMAIL_IMAP_HOST || 'mail.portnimara.com', + port: parseInt(process.env.NUXT_EMAIL_IMAP_PORT || '993'), + tls: true, + tlsOptions: { + rejectUnauthorized: false + }, + connTimeout: 15000, // 15 seconds connection timeout + authTimeout: 10000, // 10 seconds auth timeout + keepalive: { + interval: 10000, // Send keepalive every 10 seconds + idleInterval: 300000, // IDLE for 5 minutes + forceNoop: true + } + }; + + return new Promise((resolve, reject) => { + const imap = new Imap(imapConfig); + let connectionAttempts = 0; + + const attemptConnection = () => { + connectionAttempts++; + + imap.once('ready', () => { + console.log(`[IMAPPool] Connection established for session ${sessionId}`); + + // Store connection + this.connections.set(sessionId, { + connection: imap, + sessionId, + lastUsed: Date.now(), + isConnected: true + }); + + // Set up error handlers + imap.on('error', (err: any) => { + console.error(`[IMAPPool] Connection error for session ${sessionId}:`, err); + this.markConnectionAsDead(sessionId); + }); + + imap.on('end', () => { + console.log(`[IMAPPool] Connection ended for session ${sessionId}`); + this.removeConnection(sessionId); + }); + + resolve(imap); + }); + + imap.once('error', (err: any) => { + console.error(`[IMAPPool] Connection attempt ${connectionAttempts} failed:`, err); + + if (connectionAttempts < this.reconnectAttempts) { + setTimeout(() => { + try { + imap.connect(); + } catch (connectError) { + console.error(`[IMAPPool] Reconnect attempt failed:`, connectError); + attemptConnection(); + } + }, this.reconnectDelay * connectionAttempts); + } else { + reject(err); + } + }); + + try { + imap.connect(); + } catch (connectError) { + console.error(`[IMAPPool] Initial connect failed:`, connectError); + if (connectionAttempts < this.reconnectAttempts) { + setTimeout(attemptConnection, this.reconnectDelay); + } else { + reject(connectError); + } + } + }; + + attemptConnection(); + }); + } + + private async testConnection(imap: any): Promise { + return new Promise((resolve) => { + try { + if (!imap || imap.state !== 'authenticated') { + resolve(false); + return; + } + + // Test with a simple NOOP command + imap.seq.fetch('1:1', { bodies: 'HEADER' }, (err: any) => { + // Even if fetch fails, if no connection error then connection is likely OK + resolve(!err || err.code !== 'ECONNRESET'); + }); + + // Timeout the test after 5 seconds + setTimeout(() => resolve(false), 5000); + } catch (error) { + resolve(false); + } + }); + } + + private markConnectionAsDead(sessionId: string): void { + const connection = this.connections.get(sessionId); + if (connection) { + connection.isConnected = false; + } + } + + private removeConnection(sessionId: string): void { + const connection = this.connections.get(sessionId); + if (connection) { + try { + if (connection.connection && typeof connection.connection.end === 'function') { + connection.connection.end(); + } + } catch (error) { + console.error(`[IMAPPool] Error closing connection:`, error); + } + this.connections.delete(sessionId); + } + } + + private cleanupExpiredConnections(): void { + const now = Date.now(); + for (const [sessionId, connection] of this.connections.entries()) { + if (now - connection.lastUsed > this.connectionTimeout) { + console.log(`[IMAPPool] Cleaning up expired connection for session ${sessionId}`); + this.removeConnection(sessionId); + } + } + } + + async closeConnection(sessionId: string): Promise { + this.removeConnection(sessionId); + } + + async closeAllConnections(): Promise { + for (const sessionId of this.connections.keys()) { + this.removeConnection(sessionId); + } + } + + destroy(): void { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + } + this.closeAllConnections(); + } +} + +// Global instance +let globalIMAPPool: IMAPConnectionPool | null = null; + +export function getIMAPPool(): IMAPConnectionPool { + if (!globalIMAPPool) { + globalIMAPPool = new IMAPConnectionPool(); + } + return globalIMAPPool; +} + +export function destroyIMAPPool(): void { + if (globalIMAPPool) { + globalIMAPPool.destroy(); + globalIMAPPool = null; + } +} diff --git a/server/utils/operation-lock.ts b/server/utils/operation-lock.ts new file mode 100644 index 0000000..3b89ca0 --- /dev/null +++ b/server/utils/operation-lock.ts @@ -0,0 +1,164 @@ +interface QueuedOperation { + operation: () => Promise; + resolve: (value: T) => void; + reject: (error: any) => void; + timestamp: number; +} + +// Queuing system for graceful handling of rapid operations +class OperationQueue { + private queues: Map[]> = new Map(); + private processing: Map = new Map(); + private maxQueueSize = 10; + private batchDelay = 100; // 100ms delay for batching rapid operations + + async enqueue(queueKey: string, operation: () => Promise): Promise { + return new Promise((resolve, reject) => { + // Get or create queue for this key + let queue = this.queues.get(queueKey); + if (!queue) { + queue = []; + this.queues.set(queueKey, queue); + } + + // Check queue size limit + if (queue.length >= this.maxQueueSize) { + reject(new Error(`Queue for ${queueKey} is full`)); + return; + } + + // Add operation to queue + queue.push({ + operation, + resolve, + reject, + timestamp: Date.now() + }); + + // Start processing if not already processing + if (!this.processing.get(queueKey)) { + this.processQueue(queueKey); + } + }); + } + + private async processQueue(queueKey: string): Promise { + this.processing.set(queueKey, true); + + try { + const queue = this.queues.get(queueKey); + if (!queue || queue.length === 0) { + return; + } + + // For berth operations, we can batch rapid selections + if (queueKey.startsWith('berth-')) { + await this.processBerthQueue(queueKey, queue); + } else { + // For other operations, process one by one + await this.processSequentialQueue(queueKey, queue); + } + } finally { + this.processing.set(queueKey, false); + + // Check if more operations were added during processing + const queue = this.queues.get(queueKey); + if (queue && queue.length > 0) { + // Delay slightly before processing next batch + setTimeout(() => this.processQueue(queueKey), this.batchDelay); + } + } + } + + private async processBerthQueue(queueKey: string, queue: QueuedOperation[]): Promise { + // Wait a short time to collect rapid selections + await new Promise(resolve => setTimeout(resolve, this.batchDelay)); + + // Get all pending operations + const operations = [...queue]; + queue.length = 0; // Clear the queue + + if (operations.length === 0) return; + + console.log(`[OperationQueue] Processing ${operations.length} berth operations for ${queueKey}`); + + // Process operations in order, but allow them to execute + for (const { operation, resolve, reject } of operations) { + try { + const result = await operation(); + resolve(result); + } catch (error) { + console.error(`[OperationQueue] Operation failed for ${queueKey}:`, error); + reject(error); + } + } + } + + private async processSequentialQueue(queueKey: string, queue: QueuedOperation[]): Promise { + while (queue.length > 0) { + const { operation, resolve, reject } = queue.shift()!; + + try { + const result = await operation(); + resolve(result); + } catch (error) { + console.error(`[OperationQueue] Operation failed for ${queueKey}:`, error); + reject(error); + } + } + } + + // Clean up old queues + cleanup(): void { + const now = Date.now(); + const maxAge = 300000; // 5 minutes + + for (const [queueKey, queue] of this.queues.entries()) { + // Remove operations older than maxAge + for (let i = queue.length - 1; i >= 0; i--) { + if (now - queue[i].timestamp > maxAge) { + const { reject } = queue[i]; + reject(new Error('Operation timeout')); + queue.splice(i, 1); + } + } + + // Remove empty queues + if (queue.length === 0) { + this.queues.delete(queueKey); + this.processing.delete(queueKey); + } + } + } +} + +// Global instance +let globalOperationQueue: OperationQueue | null = null; + +export function getOperationQueue(): OperationQueue { + if (!globalOperationQueue) { + globalOperationQueue = new OperationQueue(); + + // Clean up expired queues every minute + setInterval(() => { + globalOperationQueue?.cleanup(); + }, 60000); + } + return globalOperationQueue; +} + +// Helper function for berth operations (allows rapid selection queuing) +export async function withBerthQueue(interestId: string, operation: () => Promise): Promise { + const queue = getOperationQueue(); + return queue.enqueue(`berth-${interestId}`, operation); +} + +// Helper function for EOI operations +export async function withEOIQueue(interestId: string, operation: () => Promise): Promise { + const queue = getOperationQueue(); + return queue.enqueue(`eoi-${interestId}`, operation); +} + +// Legacy function names for backward compatibility +export const withBerthLock = withBerthQueue; +export const withEOILock = withEOIQueue;