port-nimara-client-portal/server/utils/request-queue.ts

191 lines
5.4 KiB
TypeScript

interface QueuedRequest<T> {
id: string;
priority: 'high' | 'normal' | 'low';
execute: () => Promise<T>;
resolve: (value: T) => void;
reject: (error: any) => void;
timestamp: number;
retries: number;
}
interface QueueOptions {
maxConcurrent: number;
maxQueueSize: number;
requestTimeout: number;
maxRetries: number;
}
class RequestQueue {
private static instances: Map<string, RequestQueue> = new Map();
private queue: QueuedRequest<any>[] = [];
private activeRequests: number = 0;
private requestId: number = 0;
constructor(private name: string, private options: QueueOptions) {
this.options = {
maxConcurrent: 5,
maxQueueSize: 100,
requestTimeout: 30000,
maxRetries: 2,
...options
};
}
static getInstance(name: string, options?: Partial<QueueOptions>): RequestQueue {
if (!RequestQueue.instances.has(name)) {
RequestQueue.instances.set(name, new RequestQueue(name, options as QueueOptions));
}
return RequestQueue.instances.get(name)!;
}
async add<T>(
execute: () => Promise<T>,
priority: 'high' | 'normal' | 'low' = 'normal'
): Promise<T> {
if (this.queue.length >= this.options.maxQueueSize) {
throw new Error(`Queue ${this.name} is full (${this.queue.length} items)`);
}
return new Promise<T>((resolve, reject) => {
const request: QueuedRequest<T> = {
id: `${this.name}-${++this.requestId}`,
priority,
execute,
resolve,
reject,
timestamp: Date.now(),
retries: 0
};
// Insert based on priority
if (priority === 'high') {
// Find the first non-high priority item
const index = this.queue.findIndex(item => item.priority !== 'high');
if (index === -1) {
this.queue.push(request);
} else {
this.queue.splice(index, 0, request);
}
} else if (priority === 'low') {
this.queue.push(request);
} else {
// Normal priority - insert before low priority items
const index = this.queue.findIndex(item => item.priority === 'low');
if (index === -1) {
this.queue.push(request);
} else {
this.queue.splice(index, 0, request);
}
}
console.log(`[Queue ${this.name}] Added request ${request.id} (priority: ${priority}, queue size: ${this.queue.length})`);
// Process queue
this.processQueue();
});
}
private async processQueue() {
while (this.queue.length > 0 && this.activeRequests < this.options.maxConcurrent) {
const request = this.queue.shift();
if (!request) continue;
this.activeRequests++;
// Process request with timeout
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error('Request timeout')), this.options.requestTimeout);
});
try {
console.log(`[Queue ${this.name}] Processing request ${request.id} (active: ${this.activeRequests}/${this.options.maxConcurrent})`);
const result = await Promise.race([
request.execute(),
timeoutPromise
]);
request.resolve(result);
console.log(`[Queue ${this.name}] Request ${request.id} completed successfully`);
} catch (error) {
console.error(`[Queue ${this.name}] Request ${request.id} failed:`, error);
// Retry logic
if (request.retries < this.options.maxRetries) {
request.retries++;
console.log(`[Queue ${this.name}] Retrying request ${request.id} (attempt ${request.retries + 1}/${this.options.maxRetries + 1})`);
// Re-queue with same priority
this.queue.unshift(request);
} else {
request.reject(error);
}
} finally {
this.activeRequests--;
// Continue processing
if (this.queue.length > 0) {
setImmediate(() => this.processQueue());
}
}
}
}
getStatus() {
return {
name: this.name,
queueLength: this.queue.length,
activeRequests: this.activeRequests,
maxConcurrent: this.options.maxConcurrent,
queueByPriority: {
high: this.queue.filter(r => r.priority === 'high').length,
normal: this.queue.filter(r => r.priority === 'normal').length,
low: this.queue.filter(r => r.priority === 'low').length
}
};
}
clear() {
const cleared = this.queue.length;
this.queue.forEach(request => {
request.reject(new Error('Queue cleared'));
});
this.queue = [];
console.log(`[Queue ${this.name}] Cleared ${cleared} requests`);
return cleared;
}
}
// Pre-configured queues for different operations
export const queues = {
documenso: RequestQueue.getInstance('documenso', {
maxConcurrent: 3,
maxQueueSize: 50,
requestTimeout: 45000,
maxRetries: 2
}),
email: RequestQueue.getInstance('email', {
maxConcurrent: 2,
maxQueueSize: 30,
requestTimeout: 60000,
maxRetries: 1
}),
nocodb: RequestQueue.getInstance('nocodb', {
maxConcurrent: 5,
maxQueueSize: 100,
requestTimeout: 20000,
maxRetries: 3
})
};
// Utility function to get all queue statuses
export function getAllQueueStatuses() {
return {
documenso: queues.documenso.getStatus(),
email: queues.email.getStatus(),
nocodb: queues.nocodb.getStatus()
};
}