165 lines
4.9 KiB
TypeScript
165 lines
4.9 KiB
TypeScript
|
|
interface QueuedOperation<T> {
|
||
|
|
operation: () => Promise<T>;
|
||
|
|
resolve: (value: T) => void;
|
||
|
|
reject: (error: any) => void;
|
||
|
|
timestamp: number;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Queuing system for graceful handling of rapid operations
|
||
|
|
class OperationQueue {
|
||
|
|
private queues: Map<string, QueuedOperation<any>[]> = new Map();
|
||
|
|
private processing: Map<string, boolean> = new Map();
|
||
|
|
private maxQueueSize = 10;
|
||
|
|
private batchDelay = 100; // 100ms delay for batching rapid operations
|
||
|
|
|
||
|
|
async enqueue<T>(queueKey: string, operation: () => Promise<T>): Promise<T> {
|
||
|
|
return new Promise((resolve, reject) => {
|
||
|
|
// Get or create queue for this key
|
||
|
|
let queue = this.queues.get(queueKey);
|
||
|
|
if (!queue) {
|
||
|
|
queue = [];
|
||
|
|
this.queues.set(queueKey, queue);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Check queue size limit
|
||
|
|
if (queue.length >= this.maxQueueSize) {
|
||
|
|
reject(new Error(`Queue for ${queueKey} is full`));
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Add operation to queue
|
||
|
|
queue.push({
|
||
|
|
operation,
|
||
|
|
resolve,
|
||
|
|
reject,
|
||
|
|
timestamp: Date.now()
|
||
|
|
});
|
||
|
|
|
||
|
|
// Start processing if not already processing
|
||
|
|
if (!this.processing.get(queueKey)) {
|
||
|
|
this.processQueue(queueKey);
|
||
|
|
}
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
private async processQueue(queueKey: string): Promise<void> {
|
||
|
|
this.processing.set(queueKey, true);
|
||
|
|
|
||
|
|
try {
|
||
|
|
const queue = this.queues.get(queueKey);
|
||
|
|
if (!queue || queue.length === 0) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
// For berth operations, we can batch rapid selections
|
||
|
|
if (queueKey.startsWith('berth-')) {
|
||
|
|
await this.processBerthQueue(queueKey, queue);
|
||
|
|
} else {
|
||
|
|
// For other operations, process one by one
|
||
|
|
await this.processSequentialQueue(queueKey, queue);
|
||
|
|
}
|
||
|
|
} finally {
|
||
|
|
this.processing.set(queueKey, false);
|
||
|
|
|
||
|
|
// Check if more operations were added during processing
|
||
|
|
const queue = this.queues.get(queueKey);
|
||
|
|
if (queue && queue.length > 0) {
|
||
|
|
// Delay slightly before processing next batch
|
||
|
|
setTimeout(() => this.processQueue(queueKey), this.batchDelay);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
private async processBerthQueue(queueKey: string, queue: QueuedOperation<any>[]): Promise<void> {
|
||
|
|
// Wait a short time to collect rapid selections
|
||
|
|
await new Promise(resolve => setTimeout(resolve, this.batchDelay));
|
||
|
|
|
||
|
|
// Get all pending operations
|
||
|
|
const operations = [...queue];
|
||
|
|
queue.length = 0; // Clear the queue
|
||
|
|
|
||
|
|
if (operations.length === 0) return;
|
||
|
|
|
||
|
|
console.log(`[OperationQueue] Processing ${operations.length} berth operations for ${queueKey}`);
|
||
|
|
|
||
|
|
// Process operations in order, but allow them to execute
|
||
|
|
for (const { operation, resolve, reject } of operations) {
|
||
|
|
try {
|
||
|
|
const result = await operation();
|
||
|
|
resolve(result);
|
||
|
|
} catch (error) {
|
||
|
|
console.error(`[OperationQueue] Operation failed for ${queueKey}:`, error);
|
||
|
|
reject(error);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
private async processSequentialQueue(queueKey: string, queue: QueuedOperation<any>[]): Promise<void> {
|
||
|
|
while (queue.length > 0) {
|
||
|
|
const { operation, resolve, reject } = queue.shift()!;
|
||
|
|
|
||
|
|
try {
|
||
|
|
const result = await operation();
|
||
|
|
resolve(result);
|
||
|
|
} catch (error) {
|
||
|
|
console.error(`[OperationQueue] Operation failed for ${queueKey}:`, error);
|
||
|
|
reject(error);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Clean up old queues
|
||
|
|
cleanup(): void {
|
||
|
|
const now = Date.now();
|
||
|
|
const maxAge = 300000; // 5 minutes
|
||
|
|
|
||
|
|
for (const [queueKey, queue] of this.queues.entries()) {
|
||
|
|
// Remove operations older than maxAge
|
||
|
|
for (let i = queue.length - 1; i >= 0; i--) {
|
||
|
|
if (now - queue[i].timestamp > maxAge) {
|
||
|
|
const { reject } = queue[i];
|
||
|
|
reject(new Error('Operation timeout'));
|
||
|
|
queue.splice(i, 1);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Remove empty queues
|
||
|
|
if (queue.length === 0) {
|
||
|
|
this.queues.delete(queueKey);
|
||
|
|
this.processing.delete(queueKey);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Global instance
|
||
|
|
let globalOperationQueue: OperationQueue | null = null;
|
||
|
|
|
||
|
|
export function getOperationQueue(): OperationQueue {
|
||
|
|
if (!globalOperationQueue) {
|
||
|
|
globalOperationQueue = new OperationQueue();
|
||
|
|
|
||
|
|
// Clean up expired queues every minute
|
||
|
|
setInterval(() => {
|
||
|
|
globalOperationQueue?.cleanup();
|
||
|
|
}, 60000);
|
||
|
|
}
|
||
|
|
return globalOperationQueue;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Helper function for berth operations (allows rapid selection queuing)
|
||
|
|
export async function withBerthQueue<T>(interestId: string, operation: () => Promise<T>): Promise<T> {
|
||
|
|
const queue = getOperationQueue();
|
||
|
|
return queue.enqueue(`berth-${interestId}`, operation);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Helper function for EOI operations
|
||
|
|
export async function withEOIQueue<T>(interestId: string, operation: () => Promise<T>): Promise<T> {
|
||
|
|
const queue = getOperationQueue();
|
||
|
|
return queue.enqueue(`eoi-${interestId}`, operation);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Legacy function names for backward compatibility
|
||
|
|
export const withBerthLock = withBerthQueue;
|
||
|
|
export const withEOILock = withEOIQueue;
|