Improve email session management and add IMAP connection pooling
- Switch from localStorage to sessionStorage for email sessions - Add session validation on component mount - Implement IMAP connection pool with folder search capabilities - Add operation locking utility for concurrent request handling - Refactor EOI section component structure - Update API endpoints for better email thread management
This commit is contained in:
@@ -2,6 +2,7 @@ import Imap from 'imap';
|
||||
import { simpleParser } from 'mailparser';
|
||||
import { getCredentialsFromSession, decryptCredentials } from '~/server/utils/encryption';
|
||||
import { listFiles, getFileStats, getMinioClient, uploadFile } from '~/server/utils/minio';
|
||||
import { getIMAPPool } from '~/server/utils/imap-pool';
|
||||
|
||||
interface EmailMessage {
|
||||
id: string;
|
||||
@@ -13,6 +14,7 @@ interface EmailMessage {
|
||||
timestamp: string;
|
||||
direction: 'sent' | 'received';
|
||||
threadId?: string;
|
||||
attachments?: any[];
|
||||
}
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
@@ -144,7 +146,7 @@ export default defineEventHandler(async (event) => {
|
||||
|
||||
try {
|
||||
imapEmails = await Promise.race([
|
||||
fetchImapEmails(imapConfig, userEmail, clientEmail, limit, interestId),
|
||||
fetchImapEmailsWithPool(sessionId, userEmail, clientEmail, limit, interestId),
|
||||
timeoutPromise
|
||||
]);
|
||||
} catch (imapError) {
|
||||
@@ -187,7 +189,217 @@ export default defineEventHandler(async (event) => {
|
||||
}
|
||||
});
|
||||
|
||||
// Separate function for IMAP fetching with proper cleanup
|
||||
// Function for IMAP fetching using connection pool
|
||||
async function fetchImapEmailsWithPool(
|
||||
sessionId: string,
|
||||
userEmail: string,
|
||||
clientEmail: string,
|
||||
limit: number,
|
||||
interestId?: string
|
||||
): Promise<EmailMessage[]> {
|
||||
const pool = getIMAPPool();
|
||||
const imap = await pool.getConnection(sessionId);
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const allEmails: EmailMessage[] = [];
|
||||
|
||||
// Search in both INBOX and Sent folders
|
||||
const foldersToSearch = ['INBOX', 'Sent', 'Sent Items', 'Sent Mail'];
|
||||
let currentFolderIndex = 0;
|
||||
|
||||
const searchNextFolder = () => {
|
||||
if (currentFolderIndex >= foldersToSearch.length) {
|
||||
resolve(allEmails);
|
||||
return;
|
||||
}
|
||||
|
||||
const folderName = foldersToSearch[currentFolderIndex];
|
||||
currentFolderIndex++;
|
||||
|
||||
imap.openBox(folderName, true, (err: any, box: any) => {
|
||||
if (err) {
|
||||
console.log(`[IMAPPool] Folder ${folderName} not found, trying next...`);
|
||||
searchNextFolder();
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[IMAPPool] Searching in folder: ${folderName}`);
|
||||
|
||||
if (!clientEmail || clientEmail.trim() === '') {
|
||||
console.log('[IMAPPool] No client email provided, skipping search');
|
||||
searchNextFolder();
|
||||
return;
|
||||
}
|
||||
|
||||
imap.search(['ALL'], (err: any, results: number[]) => {
|
||||
if (err) {
|
||||
console.error(`[IMAPPool] Search error in ${folderName}:`, err);
|
||||
searchNextFolder();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!results || results.length === 0) {
|
||||
console.log(`[IMAPPool] No emails found in ${folderName}`);
|
||||
searchNextFolder();
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[IMAPPool] Found ${results.length} emails in ${folderName}`);
|
||||
const messagesToFetch = results.slice(-limit);
|
||||
let messagesProcessed = 0;
|
||||
|
||||
const fetch = imap.fetch(messagesToFetch, {
|
||||
bodies: '',
|
||||
struct: true,
|
||||
envelope: true
|
||||
});
|
||||
|
||||
fetch.on('message', (msg: any, seqno: number) => {
|
||||
msg.on('body', (stream: any, info: any) => {
|
||||
simpleParser(stream as any, async (err: any, parsed: any) => {
|
||||
if (err) {
|
||||
console.error('[IMAPPool] Parse error:', err);
|
||||
messagesProcessed++;
|
||||
if (messagesProcessed === messagesToFetch.length) {
|
||||
searchNextFolder();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if this email involves the client
|
||||
const fromEmail = parsed.from?.text || '';
|
||||
const toEmails = Array.isArray(parsed.to)
|
||||
? parsed.to.map((addr: any) => addr.text).join(', ')
|
||||
: parsed.to?.text || '';
|
||||
const ccEmails = Array.isArray(parsed.cc)
|
||||
? parsed.cc.map((addr: any) => addr.text).join(', ')
|
||||
: parsed.cc?.text || '';
|
||||
|
||||
// Filter to only include emails to/from the client
|
||||
const involvesClient =
|
||||
fromEmail.toLowerCase().includes(clientEmail.toLowerCase()) ||
|
||||
toEmails.toLowerCase().includes(clientEmail.toLowerCase()) ||
|
||||
ccEmails.toLowerCase().includes(clientEmail.toLowerCase());
|
||||
|
||||
if (!involvesClient) {
|
||||
messagesProcessed++;
|
||||
if (messagesProcessed === messagesToFetch.length) {
|
||||
searchNextFolder();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Process attachments
|
||||
const attachments: any[] = [];
|
||||
if (parsed.attachments && parsed.attachments.length > 0) {
|
||||
for (const attachment of parsed.attachments) {
|
||||
try {
|
||||
// Save attachment to MinIO
|
||||
const attachmentId = `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
|
||||
const fileName = attachment.filename || `attachment-${attachmentId}`;
|
||||
const sanitizedFileName = fileName.replace(/[^a-zA-Z0-9.-]/g, '_');
|
||||
const objectName = `interest-${interestId}/attachments/${attachmentId}-${sanitizedFileName}`;
|
||||
|
||||
const client = getMinioClient();
|
||||
await client.putObject('client-emails', objectName, attachment.content, attachment.content?.length || 0, {
|
||||
'Content-Type': attachment.contentType || 'application/octet-stream',
|
||||
'Content-Disposition': `attachment; filename="${sanitizedFileName}"`
|
||||
});
|
||||
|
||||
attachments.push({
|
||||
id: attachmentId,
|
||||
filename: sanitizedFileName,
|
||||
originalName: attachment.filename,
|
||||
contentType: attachment.contentType,
|
||||
size: attachment.content?.length || attachment.size || 0,
|
||||
path: objectName,
|
||||
bucket: 'client-emails'
|
||||
});
|
||||
|
||||
console.log(`[IMAPPool] Saved attachment: ${sanitizedFileName} (${attachment.content?.length || 0} bytes)`);
|
||||
} catch (attachmentError) {
|
||||
console.error('[IMAPPool] Failed to save attachment:', attachment.filename, attachmentError);
|
||||
// Still include attachment info even if save failed
|
||||
attachments.push({
|
||||
filename: attachment.filename || 'Unknown',
|
||||
originalName: attachment.filename,
|
||||
contentType: attachment.contentType,
|
||||
size: attachment.content?.length || attachment.size || 0,
|
||||
error: 'Failed to save'
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const email: EmailMessage = {
|
||||
id: parsed.messageId || `${Date.now()}-${seqno}`,
|
||||
from: fromEmail,
|
||||
to: toEmails,
|
||||
subject: parsed.subject || '',
|
||||
body: parsed.text || '',
|
||||
html: parsed.html || undefined,
|
||||
timestamp: parsed.date?.toISOString() || new Date().toISOString(),
|
||||
direction: fromEmail.toLowerCase().includes(userEmail.toLowerCase()) ? 'sent' : 'received',
|
||||
attachments: attachments.length > 0 ? attachments : undefined
|
||||
};
|
||||
|
||||
if (parsed.headers.has('in-reply-to')) {
|
||||
email.threadId = parsed.headers.get('in-reply-to') as string;
|
||||
}
|
||||
|
||||
allEmails.push(email);
|
||||
|
||||
// Cache this email if we have an interestId
|
||||
if (interestId && involvesClient) {
|
||||
try {
|
||||
const emailData = {
|
||||
...email,
|
||||
interestId: interestId
|
||||
};
|
||||
|
||||
const objectName = `interest-${interestId}/${Date.now()}-${email.direction}.json`;
|
||||
const buffer = Buffer.from(JSON.stringify(emailData, null, 2));
|
||||
|
||||
// Upload to the client-emails bucket
|
||||
const client = getMinioClient();
|
||||
client.putObject('client-emails', objectName, buffer, buffer.length, {
|
||||
'Content-Type': 'application/json',
|
||||
}).catch(err => {
|
||||
console.error('[IMAPPool] Failed to cache email:', err);
|
||||
});
|
||||
} catch (cacheError) {
|
||||
console.error('[IMAPPool] Failed to cache email:', cacheError);
|
||||
}
|
||||
}
|
||||
|
||||
messagesProcessed++;
|
||||
|
||||
if (messagesProcessed === messagesToFetch.length) {
|
||||
searchNextFolder();
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
fetch.once('error', (err: any) => {
|
||||
console.error('[IMAPPool] Fetch error:', err);
|
||||
searchNextFolder();
|
||||
});
|
||||
|
||||
fetch.once('end', () => {
|
||||
if (messagesProcessed === 0) {
|
||||
searchNextFolder();
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
searchNextFolder();
|
||||
});
|
||||
}
|
||||
|
||||
// Separate function for IMAP fetching with proper cleanup (legacy)
|
||||
async function fetchImapEmails(
|
||||
imapConfig: any,
|
||||
userEmail: string,
|
||||
@@ -304,6 +516,48 @@ async function fetchImapEmails(
|
||||
return;
|
||||
}
|
||||
|
||||
// Process attachments
|
||||
const attachments: any[] = [];
|
||||
if (parsed.attachments && parsed.attachments.length > 0) {
|
||||
for (const attachment of parsed.attachments) {
|
||||
try {
|
||||
// Save attachment to MinIO
|
||||
const attachmentId = `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
|
||||
const fileName = attachment.filename || `attachment-${attachmentId}`;
|
||||
const sanitizedFileName = fileName.replace(/[^a-zA-Z0-9.-]/g, '_');
|
||||
const objectName = `interest-${interestId}/attachments/${attachmentId}-${sanitizedFileName}`;
|
||||
|
||||
const client = getMinioClient();
|
||||
await client.putObject('client-emails', objectName, attachment.content, attachment.content?.length || 0, {
|
||||
'Content-Type': attachment.contentType || 'application/octet-stream',
|
||||
'Content-Disposition': `attachment; filename="${sanitizedFileName}"`
|
||||
});
|
||||
|
||||
attachments.push({
|
||||
id: attachmentId,
|
||||
filename: sanitizedFileName,
|
||||
originalName: attachment.filename,
|
||||
contentType: attachment.contentType,
|
||||
size: attachment.content?.length || attachment.size || 0,
|
||||
path: objectName,
|
||||
bucket: 'client-emails'
|
||||
});
|
||||
|
||||
console.log(`Saved attachment: ${sanitizedFileName} (${attachment.content?.length || 0} bytes)`);
|
||||
} catch (attachmentError) {
|
||||
console.error('Failed to save attachment:', attachment.filename, attachmentError);
|
||||
// Still include attachment info even if save failed
|
||||
attachments.push({
|
||||
filename: attachment.filename || 'Unknown',
|
||||
originalName: attachment.filename,
|
||||
contentType: attachment.contentType,
|
||||
size: attachment.content?.length || attachment.size || 0,
|
||||
error: 'Failed to save'
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const email: EmailMessage = {
|
||||
id: parsed.messageId || `${Date.now()}-${seqno}`,
|
||||
from: fromEmail,
|
||||
@@ -312,7 +566,8 @@ async function fetchImapEmails(
|
||||
body: parsed.text || '',
|
||||
html: parsed.html || undefined,
|
||||
timestamp: parsed.date?.toISOString() || new Date().toISOString(),
|
||||
direction: fromEmail.toLowerCase().includes(userEmail.toLowerCase()) ? 'sent' : 'received'
|
||||
direction: fromEmail.toLowerCase().includes(userEmail.toLowerCase()) ? 'sent' : 'received',
|
||||
attachments: attachments.length > 0 ? attachments : undefined
|
||||
};
|
||||
|
||||
if (parsed.headers.has('in-reply-to')) {
|
||||
|
||||
@@ -22,6 +22,38 @@ export default defineEventHandler(async (event) => {
|
||||
});
|
||||
|
||||
console.log('[get-berths] Successfully fetched berths, count:', berths.list?.length || 0);
|
||||
|
||||
// Sort berths by letter zone and then by number
|
||||
if (berths.list && Array.isArray(berths.list)) {
|
||||
berths.list.sort((a, b) => {
|
||||
const berthA = a['Berth Number'] || '';
|
||||
const berthB = b['Berth Number'] || '';
|
||||
|
||||
// Extract letter and number parts
|
||||
const matchA = berthA.match(/^([A-Za-z]+)(\d+)$/);
|
||||
const matchB = berthB.match(/^([A-Za-z]+)(\d+)$/);
|
||||
|
||||
if (matchA && matchB) {
|
||||
const [, letterA, numberA] = matchA;
|
||||
const [, letterB, numberB] = matchB;
|
||||
|
||||
// First sort by letter zone
|
||||
const letterCompare = letterA.localeCompare(letterB);
|
||||
if (letterCompare !== 0) {
|
||||
return letterCompare;
|
||||
}
|
||||
|
||||
// Then sort by number within the same letter zone
|
||||
return parseInt(numberA) - parseInt(numberB);
|
||||
}
|
||||
|
||||
// Fallback to string comparison if pattern doesn't match
|
||||
return berthA.localeCompare(berthB);
|
||||
});
|
||||
|
||||
console.log('[get-berths] Berths sorted by zone and number');
|
||||
}
|
||||
|
||||
return berths;
|
||||
} catch (error) {
|
||||
console.error('[get-berths] Error occurred:', error);
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import { withBerthQueue } from '~/server/utils/operation-lock';
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
const xTagHeader = getRequestHeader(event, "x-tag");
|
||||
console.log('[link-berths] Request received with x-tag:', xTagHeader);
|
||||
@@ -19,27 +21,31 @@ export default defineEventHandler(async (event) => {
|
||||
});
|
||||
}
|
||||
|
||||
const config = getNocoDbConfiguration();
|
||||
const interestsTableId = "mbs9hjauug4eseo";
|
||||
const berthsLinkFieldId = "cj7v7bb9pa5eyo3"; // Berths field
|
||||
|
||||
// Format the berth IDs for the API
|
||||
const berthRecords = berthIds.map(id => ({ Id: id }));
|
||||
console.log('[link-berths] Berth records to link:', berthRecords);
|
||||
|
||||
const url = `${config.url}/api/v2/tables/${interestsTableId}/links/${berthsLinkFieldId}/records/${interestId}`;
|
||||
console.log('[link-berths] URL:', url);
|
||||
|
||||
const result = await $fetch(url, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
"xc-token": config.token,
|
||||
},
|
||||
body: berthRecords,
|
||||
// Use queuing system to handle rapid berth selections gracefully
|
||||
return await withBerthQueue(interestId, async () => {
|
||||
const config = getNocoDbConfiguration();
|
||||
const interestsTableId = "mbs9hjauug4eseo";
|
||||
const berthsLinkFieldId = "cj7v7bb9pa5eyo3"; // Berths field
|
||||
|
||||
// Format the berth IDs for the API
|
||||
const berthRecords = berthIds.map(id => ({ Id: id }));
|
||||
console.log('[link-berths] Berth records to link:', berthRecords);
|
||||
|
||||
const url = `${config.url}/api/v2/tables/${interestsTableId}/links/${berthsLinkFieldId}/records/${interestId}`;
|
||||
console.log('[link-berths] URL:', url);
|
||||
|
||||
const result = await $fetch(url, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
"xc-token": config.token,
|
||||
},
|
||||
body: berthRecords,
|
||||
});
|
||||
|
||||
console.log('[link-berths] Successfully linked berths to interest:', interestId);
|
||||
return result;
|
||||
});
|
||||
|
||||
console.log('[link-berths] Successfully linked berths to interest:', interestId);
|
||||
return result;
|
||||
} catch (error) {
|
||||
console.error('[link-berths] Error occurred:', error);
|
||||
console.error('[link-berths] Error details:', error instanceof Error ? error.message : 'Unknown error');
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import { withBerthQueue } from '~/server/utils/operation-lock';
|
||||
|
||||
export default defineEventHandler(async (event) => {
|
||||
const xTagHeader = getRequestHeader(event, "x-tag");
|
||||
console.log('[unlink-berths] Request received with x-tag:', xTagHeader);
|
||||
@@ -17,23 +19,27 @@ export default defineEventHandler(async (event) => {
|
||||
});
|
||||
}
|
||||
|
||||
const config = getNocoDbConfiguration();
|
||||
const interestsTableId = "mbs9hjauug4eseo";
|
||||
const berthsLinkFieldId = "cj7v7bb9pa5eyo3"; // Berths field
|
||||
|
||||
// Format the berth IDs for the API
|
||||
const berthRecords = berthIds.map(id => ({ Id: id }));
|
||||
|
||||
const result = await $fetch(
|
||||
`${config.url}/api/v2/tables/${interestsTableId}/links/${berthsLinkFieldId}/records/${interestId}`,
|
||||
{
|
||||
method: 'DELETE',
|
||||
headers: {
|
||||
"xc-token": config.token,
|
||||
},
|
||||
body: berthRecords,
|
||||
}
|
||||
);
|
||||
|
||||
return result;
|
||||
// Use queuing system to handle rapid berth operations gracefully
|
||||
return await withBerthQueue(interestId, async () => {
|
||||
const config = getNocoDbConfiguration();
|
||||
const interestsTableId = "mbs9hjauug4eseo";
|
||||
const berthsLinkFieldId = "cj7v7bb9pa5eyo3"; // Berths field
|
||||
|
||||
// Format the berth IDs for the API
|
||||
const berthRecords = berthIds.map(id => ({ Id: id }));
|
||||
|
||||
const result = await $fetch(
|
||||
`${config.url}/api/v2/tables/${interestsTableId}/links/${berthsLinkFieldId}/records/${interestId}`,
|
||||
{
|
||||
method: 'DELETE',
|
||||
headers: {
|
||||
"xc-token": config.token,
|
||||
},
|
||||
body: berthRecords,
|
||||
}
|
||||
);
|
||||
|
||||
console.log('[unlink-berths] Successfully unlinked berths from interest:', interestId);
|
||||
return result;
|
||||
});
|
||||
});
|
||||
|
||||
227
server/utils/imap-pool.ts
Normal file
227
server/utils/imap-pool.ts
Normal file
@@ -0,0 +1,227 @@
|
||||
import Imap from 'imap';
|
||||
import { getCredentialsFromSession, decryptCredentials } from './encryption';
|
||||
|
||||
interface ConnectionPoolItem {
|
||||
connection: any;
|
||||
sessionId: string;
|
||||
lastUsed: number;
|
||||
isConnected: boolean;
|
||||
}
|
||||
|
||||
class IMAPConnectionPool {
|
||||
private connections: Map<string, ConnectionPoolItem> = new Map();
|
||||
private maxConnections = 5;
|
||||
private connectionTimeout = 300000; // 5 minutes
|
||||
private reconnectAttempts = 3;
|
||||
private reconnectDelay = 1000; // 1 second
|
||||
|
||||
private cleanupInterval: NodeJS.Timeout;
|
||||
|
||||
constructor() {
|
||||
// Cleanup expired connections every minute
|
||||
this.cleanupInterval = setInterval(() => {
|
||||
this.cleanupExpiredConnections();
|
||||
}, 60000);
|
||||
}
|
||||
|
||||
async getConnection(sessionId: string): Promise<any> {
|
||||
// Check if we have an existing connection
|
||||
const existing = this.connections.get(sessionId);
|
||||
if (existing && existing.isConnected) {
|
||||
// Test connection health
|
||||
if (await this.testConnection(existing.connection)) {
|
||||
existing.lastUsed = Date.now();
|
||||
return existing.connection;
|
||||
} else {
|
||||
// Connection is dead, remove it
|
||||
this.removeConnection(sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
// Create new connection
|
||||
return await this.createConnection(sessionId);
|
||||
}
|
||||
|
||||
private async createConnection(sessionId: string): Promise<any> {
|
||||
const encryptedCredentials = getCredentialsFromSession(sessionId);
|
||||
if (!encryptedCredentials) {
|
||||
throw new Error('No credentials found for session');
|
||||
}
|
||||
|
||||
let credentials;
|
||||
try {
|
||||
credentials = decryptCredentials(encryptedCredentials);
|
||||
} catch (error) {
|
||||
throw new Error('Failed to decrypt credentials');
|
||||
}
|
||||
|
||||
const imapConfig = {
|
||||
user: credentials.email,
|
||||
password: credentials.password,
|
||||
host: process.env.NUXT_EMAIL_IMAP_HOST || 'mail.portnimara.com',
|
||||
port: parseInt(process.env.NUXT_EMAIL_IMAP_PORT || '993'),
|
||||
tls: true,
|
||||
tlsOptions: {
|
||||
rejectUnauthorized: false
|
||||
},
|
||||
connTimeout: 15000, // 15 seconds connection timeout
|
||||
authTimeout: 10000, // 10 seconds auth timeout
|
||||
keepalive: {
|
||||
interval: 10000, // Send keepalive every 10 seconds
|
||||
idleInterval: 300000, // IDLE for 5 minutes
|
||||
forceNoop: true
|
||||
}
|
||||
};
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const imap = new Imap(imapConfig);
|
||||
let connectionAttempts = 0;
|
||||
|
||||
const attemptConnection = () => {
|
||||
connectionAttempts++;
|
||||
|
||||
imap.once('ready', () => {
|
||||
console.log(`[IMAPPool] Connection established for session ${sessionId}`);
|
||||
|
||||
// Store connection
|
||||
this.connections.set(sessionId, {
|
||||
connection: imap,
|
||||
sessionId,
|
||||
lastUsed: Date.now(),
|
||||
isConnected: true
|
||||
});
|
||||
|
||||
// Set up error handlers
|
||||
imap.on('error', (err: any) => {
|
||||
console.error(`[IMAPPool] Connection error for session ${sessionId}:`, err);
|
||||
this.markConnectionAsDead(sessionId);
|
||||
});
|
||||
|
||||
imap.on('end', () => {
|
||||
console.log(`[IMAPPool] Connection ended for session ${sessionId}`);
|
||||
this.removeConnection(sessionId);
|
||||
});
|
||||
|
||||
resolve(imap);
|
||||
});
|
||||
|
||||
imap.once('error', (err: any) => {
|
||||
console.error(`[IMAPPool] Connection attempt ${connectionAttempts} failed:`, err);
|
||||
|
||||
if (connectionAttempts < this.reconnectAttempts) {
|
||||
setTimeout(() => {
|
||||
try {
|
||||
imap.connect();
|
||||
} catch (connectError) {
|
||||
console.error(`[IMAPPool] Reconnect attempt failed:`, connectError);
|
||||
attemptConnection();
|
||||
}
|
||||
}, this.reconnectDelay * connectionAttempts);
|
||||
} else {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
imap.connect();
|
||||
} catch (connectError) {
|
||||
console.error(`[IMAPPool] Initial connect failed:`, connectError);
|
||||
if (connectionAttempts < this.reconnectAttempts) {
|
||||
setTimeout(attemptConnection, this.reconnectDelay);
|
||||
} else {
|
||||
reject(connectError);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
attemptConnection();
|
||||
});
|
||||
}
|
||||
|
||||
private async testConnection(imap: any): Promise<boolean> {
|
||||
return new Promise((resolve) => {
|
||||
try {
|
||||
if (!imap || imap.state !== 'authenticated') {
|
||||
resolve(false);
|
||||
return;
|
||||
}
|
||||
|
||||
// Test with a simple NOOP command
|
||||
imap.seq.fetch('1:1', { bodies: 'HEADER' }, (err: any) => {
|
||||
// Even if fetch fails, if no connection error then connection is likely OK
|
||||
resolve(!err || err.code !== 'ECONNRESET');
|
||||
});
|
||||
|
||||
// Timeout the test after 5 seconds
|
||||
setTimeout(() => resolve(false), 5000);
|
||||
} catch (error) {
|
||||
resolve(false);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private markConnectionAsDead(sessionId: string): void {
|
||||
const connection = this.connections.get(sessionId);
|
||||
if (connection) {
|
||||
connection.isConnected = false;
|
||||
}
|
||||
}
|
||||
|
||||
private removeConnection(sessionId: string): void {
|
||||
const connection = this.connections.get(sessionId);
|
||||
if (connection) {
|
||||
try {
|
||||
if (connection.connection && typeof connection.connection.end === 'function') {
|
||||
connection.connection.end();
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`[IMAPPool] Error closing connection:`, error);
|
||||
}
|
||||
this.connections.delete(sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
private cleanupExpiredConnections(): void {
|
||||
const now = Date.now();
|
||||
for (const [sessionId, connection] of this.connections.entries()) {
|
||||
if (now - connection.lastUsed > this.connectionTimeout) {
|
||||
console.log(`[IMAPPool] Cleaning up expired connection for session ${sessionId}`);
|
||||
this.removeConnection(sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async closeConnection(sessionId: string): Promise<void> {
|
||||
this.removeConnection(sessionId);
|
||||
}
|
||||
|
||||
async closeAllConnections(): Promise<void> {
|
||||
for (const sessionId of this.connections.keys()) {
|
||||
this.removeConnection(sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
destroy(): void {
|
||||
if (this.cleanupInterval) {
|
||||
clearInterval(this.cleanupInterval);
|
||||
}
|
||||
this.closeAllConnections();
|
||||
}
|
||||
}
|
||||
|
||||
// Global instance
|
||||
let globalIMAPPool: IMAPConnectionPool | null = null;
|
||||
|
||||
export function getIMAPPool(): IMAPConnectionPool {
|
||||
if (!globalIMAPPool) {
|
||||
globalIMAPPool = new IMAPConnectionPool();
|
||||
}
|
||||
return globalIMAPPool;
|
||||
}
|
||||
|
||||
export function destroyIMAPPool(): void {
|
||||
if (globalIMAPPool) {
|
||||
globalIMAPPool.destroy();
|
||||
globalIMAPPool = null;
|
||||
}
|
||||
}
|
||||
164
server/utils/operation-lock.ts
Normal file
164
server/utils/operation-lock.ts
Normal file
@@ -0,0 +1,164 @@
|
||||
interface QueuedOperation<T> {
|
||||
operation: () => Promise<T>;
|
||||
resolve: (value: T) => void;
|
||||
reject: (error: any) => void;
|
||||
timestamp: number;
|
||||
}
|
||||
|
||||
// Queuing system for graceful handling of rapid operations
|
||||
class OperationQueue {
|
||||
private queues: Map<string, QueuedOperation<any>[]> = new Map();
|
||||
private processing: Map<string, boolean> = new Map();
|
||||
private maxQueueSize = 10;
|
||||
private batchDelay = 100; // 100ms delay for batching rapid operations
|
||||
|
||||
async enqueue<T>(queueKey: string, operation: () => Promise<T>): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
// Get or create queue for this key
|
||||
let queue = this.queues.get(queueKey);
|
||||
if (!queue) {
|
||||
queue = [];
|
||||
this.queues.set(queueKey, queue);
|
||||
}
|
||||
|
||||
// Check queue size limit
|
||||
if (queue.length >= this.maxQueueSize) {
|
||||
reject(new Error(`Queue for ${queueKey} is full`));
|
||||
return;
|
||||
}
|
||||
|
||||
// Add operation to queue
|
||||
queue.push({
|
||||
operation,
|
||||
resolve,
|
||||
reject,
|
||||
timestamp: Date.now()
|
||||
});
|
||||
|
||||
// Start processing if not already processing
|
||||
if (!this.processing.get(queueKey)) {
|
||||
this.processQueue(queueKey);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async processQueue(queueKey: string): Promise<void> {
|
||||
this.processing.set(queueKey, true);
|
||||
|
||||
try {
|
||||
const queue = this.queues.get(queueKey);
|
||||
if (!queue || queue.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// For berth operations, we can batch rapid selections
|
||||
if (queueKey.startsWith('berth-')) {
|
||||
await this.processBerthQueue(queueKey, queue);
|
||||
} else {
|
||||
// For other operations, process one by one
|
||||
await this.processSequentialQueue(queueKey, queue);
|
||||
}
|
||||
} finally {
|
||||
this.processing.set(queueKey, false);
|
||||
|
||||
// Check if more operations were added during processing
|
||||
const queue = this.queues.get(queueKey);
|
||||
if (queue && queue.length > 0) {
|
||||
// Delay slightly before processing next batch
|
||||
setTimeout(() => this.processQueue(queueKey), this.batchDelay);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async processBerthQueue(queueKey: string, queue: QueuedOperation<any>[]): Promise<void> {
|
||||
// Wait a short time to collect rapid selections
|
||||
await new Promise(resolve => setTimeout(resolve, this.batchDelay));
|
||||
|
||||
// Get all pending operations
|
||||
const operations = [...queue];
|
||||
queue.length = 0; // Clear the queue
|
||||
|
||||
if (operations.length === 0) return;
|
||||
|
||||
console.log(`[OperationQueue] Processing ${operations.length} berth operations for ${queueKey}`);
|
||||
|
||||
// Process operations in order, but allow them to execute
|
||||
for (const { operation, resolve, reject } of operations) {
|
||||
try {
|
||||
const result = await operation();
|
||||
resolve(result);
|
||||
} catch (error) {
|
||||
console.error(`[OperationQueue] Operation failed for ${queueKey}:`, error);
|
||||
reject(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async processSequentialQueue(queueKey: string, queue: QueuedOperation<any>[]): Promise<void> {
|
||||
while (queue.length > 0) {
|
||||
const { operation, resolve, reject } = queue.shift()!;
|
||||
|
||||
try {
|
||||
const result = await operation();
|
||||
resolve(result);
|
||||
} catch (error) {
|
||||
console.error(`[OperationQueue] Operation failed for ${queueKey}:`, error);
|
||||
reject(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up old queues
|
||||
cleanup(): void {
|
||||
const now = Date.now();
|
||||
const maxAge = 300000; // 5 minutes
|
||||
|
||||
for (const [queueKey, queue] of this.queues.entries()) {
|
||||
// Remove operations older than maxAge
|
||||
for (let i = queue.length - 1; i >= 0; i--) {
|
||||
if (now - queue[i].timestamp > maxAge) {
|
||||
const { reject } = queue[i];
|
||||
reject(new Error('Operation timeout'));
|
||||
queue.splice(i, 1);
|
||||
}
|
||||
}
|
||||
|
||||
// Remove empty queues
|
||||
if (queue.length === 0) {
|
||||
this.queues.delete(queueKey);
|
||||
this.processing.delete(queueKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Global instance
|
||||
let globalOperationQueue: OperationQueue | null = null;
|
||||
|
||||
export function getOperationQueue(): OperationQueue {
|
||||
if (!globalOperationQueue) {
|
||||
globalOperationQueue = new OperationQueue();
|
||||
|
||||
// Clean up expired queues every minute
|
||||
setInterval(() => {
|
||||
globalOperationQueue?.cleanup();
|
||||
}, 60000);
|
||||
}
|
||||
return globalOperationQueue;
|
||||
}
|
||||
|
||||
// Helper function for berth operations (allows rapid selection queuing)
|
||||
export async function withBerthQueue<T>(interestId: string, operation: () => Promise<T>): Promise<T> {
|
||||
const queue = getOperationQueue();
|
||||
return queue.enqueue(`berth-${interestId}`, operation);
|
||||
}
|
||||
|
||||
// Helper function for EOI operations
|
||||
export async function withEOIQueue<T>(interestId: string, operation: () => Promise<T>): Promise<T> {
|
||||
const queue = getOperationQueue();
|
||||
return queue.enqueue(`eoi-${interestId}`, operation);
|
||||
}
|
||||
|
||||
// Legacy function names for backward compatibility
|
||||
export const withBerthLock = withBerthQueue;
|
||||
export const withEOILock = withEOIQueue;
|
||||
Reference in New Issue
Block a user