import { Worker, type Job } from 'bullmq'; import type { ConnectionOptions } from 'bullmq'; import { logger } from '@/lib/logger'; import { QUEUE_CONFIGS } from '@/lib/queue'; // ─── Email draft generation ─────────────────────────────────────────────────── const MAX_OUTPUT_BYTES = 10 * 1024; // 10 KB const OPENAI_TIMEOUT_MS = 30_000; // 30 s interface GenerateEmailDraftPayload { interestId: string; clientId: string; portId: string; context: 'follow_up' | 'introduction' | 'stage_update' | 'general'; additionalInstructions?: string; requestedBy: string; } interface DraftResult { subject: string; body: string; generatedAt: string; } async function generateEmailDraft(payload: GenerateEmailDraftPayload): Promise { const { interestId, clientId, portId, context, additionalInstructions } = payload; // Fetch data by IDs in the worker — never trust PII from the queue payload const { db } = await import('@/lib/db'); const { interests } = await import('@/lib/db/schema/interests'); const { clients } = await import('@/lib/db/schema/clients'); const { berths } = await import('@/lib/db/schema/berths'); const { interestNotes } = await import('@/lib/db/schema/interests'); const { emailThreads } = await import('@/lib/db/schema/email'); const { and, eq, desc } = await import('drizzle-orm'); // Fetch interest, client, berth const [interest, client] = await Promise.all([ db.query.interests.findFirst({ where: and(eq(interests.id, interestId), eq(interests.portId, portId)), }), db.query.clients.findFirst({ where: eq(clients.id, clientId) }), ]); if (!interest || !client) { throw new Error('Interest or client not found'); } let berthMooring: string | null = null; if (interest.berthId) { const berth = await db.query.berths.findFirst({ where: eq(berths.id, interest.berthId), }); berthMooring = berth?.mooringNumber ?? null; } // Fetch last 5 notes const recentNotes = await db .select({ content: interestNotes.content, createdAt: interestNotes.createdAt }) .from(interestNotes) .where(eq(interestNotes.interestId, interestId)) .orderBy(desc(interestNotes.createdAt)) .limit(5); // Fetch last 5 email subjects (via threads linked to client) const recentThreads = await db .select({ subject: emailThreads.subject, lastMessageAt: emailThreads.lastMessageAt }) .from(emailThreads) .where(and(eq(emailThreads.clientId, clientId), eq(emailThreads.portId, portId))) .orderBy(desc(emailThreads.lastMessageAt)) .limit(5); const apiKey = process.env.OPENAI_API_KEY; if (!apiKey) { // Fallback: template-based draft return buildTemplateDraft({ clientName: client.fullName, context, berthMooring, pipelineStage: interest.pipelineStage, }); } // Build prompt const contextDescriptions: Record = { follow_up: 'a friendly follow-up email', introduction: 'an initial introduction email', stage_update: `an email informing the client about their pipeline progression to stage "${interest.pipelineStage}"`, general: 'a general communication email', }; const prompt = [ `Write ${contextDescriptions[context] ?? 'an email'} to a marina berth client.`, '', `Client name: ${client.fullName}`, berthMooring ? `Berth: ${berthMooring}` : 'Berth: not yet assigned', `Pipeline stage: ${interest.pipelineStage}`, '', recentNotes.length > 0 ? `Recent notes:\n${recentNotes.map((n) => `- ${n.content.slice(0, 200)}`).join('\n')}` : null, recentThreads.length > 0 ? `Recent email subjects:\n${recentThreads.map((t) => `- ${t.subject ?? '(no subject)'}`).join('\n')}` : null, additionalInstructions ? `Additional instructions: ${additionalInstructions}` : null, '', 'Return JSON with keys: subject (string) and body (string, plain text).', ] .filter(Boolean) .join('\n'); // Call OpenAI with timeout const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), OPENAI_TIMEOUT_MS); let subject: string; let body: string; try { const response = await fetch('https://api.openai.com/v1/chat/completions', { method: 'POST', headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${apiKey}`, }, body: JSON.stringify({ model: 'gpt-4o-mini', messages: [ { role: 'system', content: 'You are an expert marina sales and relationship manager. Generate professional, concise emails. Always return valid JSON with "subject" and "body" keys only.', }, { role: 'user', content: prompt }, ], max_tokens: 800, temperature: 0.7, response_format: { type: 'json_object' }, }), signal: controller.signal, }); clearTimeout(timeoutId); if (!response.ok) { const errorText = await response.text().catch(() => ''); throw new Error(`OpenAI API error ${response.status}: ${errorText}`); } const data = (await response.json()) as { choices: Array<{ message: { content: string } }>; }; const content = data.choices[0]?.message?.content ?? '{}'; // Enforce output size cap if (content.length > MAX_OUTPUT_BYTES) { throw new Error('AI output exceeded 10 KB cap'); } const parsed = JSON.parse(content) as { subject?: string; body?: string }; subject = parsed.subject ?? `Follow-up: ${client.fullName}`; body = parsed.body ?? ''; } catch (err) { clearTimeout(timeoutId); logger.warn({ err, interestId }, 'OpenAI call failed, falling back to template draft'); return buildTemplateDraft({ clientName: client.fullName, context, berthMooring, pipelineStage: interest.pipelineStage, }); } return { subject, body, generatedAt: new Date().toISOString() }; } // ─── Template fallback ──────────────────────────────────────────────────────── function buildTemplateDraft(opts: { clientName: string; context: string; berthMooring: string | null; pipelineStage: string; }): DraftResult { const { clientName, context, berthMooring, pipelineStage } = opts; const berthText = berthMooring ? `berth ${berthMooring}` : 'your requested berth'; const templates: Record = { introduction: { subject: `Welcome to Port Nimara – ${clientName}`, body: `Dear ${clientName},\n\nThank you for your interest in Port Nimara. We are delighted to introduce our marina facilities and look forward to discussing how we can accommodate your needs for ${berthText}.\n\nPlease feel free to reach out at any time.\n\nKind regards,\nPort Nimara Team`, }, follow_up: { subject: `Following up – ${clientName}`, body: `Dear ${clientName},\n\nI wanted to follow up regarding your interest in ${berthText}. Please let us know if you have any questions or if there is anything we can assist you with.\n\nWe look forward to hearing from you.\n\nKind regards,\nPort Nimara Team`, }, stage_update: { subject: `Update on your application – ${clientName}`, body: `Dear ${clientName},\n\nWe are pleased to inform you that your application for ${berthText} has progressed to the "${pipelineStage.replace(/_/g, ' ')}" stage.\n\nWe will be in touch shortly with the next steps.\n\nKind regards,\nPort Nimara Team`, }, general: { subject: `Message from Port Nimara – ${clientName}`, body: `Dear ${clientName},\n\nThank you for your continued interest in Port Nimara. We appreciate your patience and look forward to assisting you with ${berthText}.\n\nKind regards,\nPort Nimara Team`, }, }; const template = templates[context] ?? templates['general']!; return { ...template, generatedAt: new Date().toISOString() }; } // ─── Worker ─────────────────────────────────────────────────────────────────── export const aiWorker = new Worker( 'ai', async (job: Job) => { logger.info({ jobId: job.id, jobName: job.name }, 'Processing AI job'); switch (job.name) { case 'generate-email-draft': { const payload = job.data as GenerateEmailDraftPayload; const result = await generateEmailDraft(payload); return result; } default: logger.warn({ jobName: job.name }, 'Unknown AI job'); return undefined; } }, { connection: { url: process.env.REDIS_URL! } as ConnectionOptions, concurrency: QUEUE_CONFIGS.ai.concurrency, }, ); aiWorker.on('failed', (job, err) => { logger.error({ jobId: job?.id, jobName: job?.name, err }, 'AI job failed'); });