Files
pn-new-crm/src/lib/queue/workers/ai.ts
Matt Ciaccio 0ed401d083 refactor(clients): drop deprecated yacht/company/proxy columns
PR 13: now that all reads are migrated to the dedicated yacht / company
/ membership entities, drop the columns that mirrored them on `clients`:
companyName, isProxy, proxyType, actualOwnerName, relationshipNotes,
yachtName, yachtLength{Ft,M}, yachtWidth{Ft,M}, yachtDraft{Ft,M},
berthSizeDesired.

Migration `0008_loud_ikaris.sql` issues the destructive ALTER TABLE
DROP COLUMN statements. Run `pnpm db:push` (or the migration runner) to
apply.

Caller cleanup (zero behavioral change to remaining flows):

- Drops the legacy `generateEoi` flow entirely (route, service function,
  pdfme template, validator schema). The dual-path generate-and-sign
  service from PR 11 has fully replaced it; the route was no longer
  wired to the UI.
- `clients.service`: company-name search column / WHERE / audit value
  removed; search now ranks by full name only.
- `interests.service`: `resolveLeadCategory` reads dimensions from
  `yachts` via `interest.yachtId` instead of the dropped
  `client.yachtLength{Ft,M}`.
- `record-export`: client-summary now lists yachts via owner-side
  lookup (direct + active company memberships); interest-summary fetches
  yacht via `interest.yachtId`. Both PDF templates updated to read
  yacht details from the new entity.
- `client-detail-header`, `client-picker`, `command-search`,
  `search-result-item`, `use-search` hook, `types/domain.ts`,
  `search.service` — drop the companyName badge / sub-label / typed
  field everywhere it was rendered or fetched.
- `ai.ts` worker: drop the company / yacht context lines from the
  prompt (will be re-added later sourced from the new entities).
- `validators/interests.ts`: remove the deprecated public-form flat
  yacht/company fields. The route already ignores them.
- `factories.ts`: drop the `isProxy: false` default.

Tests: 652/652 green; type-check clean. The
`security-sensitive-data` tests use `companyName` / `isProxy` as
arbitrary record keys for a generic util — left unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 13:57:54 +02:00

243 lines
8.9 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { Worker, type Job } from 'bullmq';
import type { ConnectionOptions } from 'bullmq';
import { logger } from '@/lib/logger';
import { QUEUE_CONFIGS } from '@/lib/queue';
// ─── Email draft generation ───────────────────────────────────────────────────
const MAX_OUTPUT_BYTES = 10 * 1024; // 10 KB
const OPENAI_TIMEOUT_MS = 30_000; // 30 s
interface GenerateEmailDraftPayload {
interestId: string;
clientId: string;
portId: string;
context: 'follow_up' | 'introduction' | 'stage_update' | 'general';
additionalInstructions?: string;
requestedBy: string;
}
interface DraftResult {
subject: string;
body: string;
generatedAt: string;
}
async function generateEmailDraft(payload: GenerateEmailDraftPayload): Promise<DraftResult> {
const { interestId, clientId, portId, context, additionalInstructions } = payload;
// Fetch data by IDs in the worker — never trust PII from the queue payload
const { db } = await import('@/lib/db');
const { interests } = await import('@/lib/db/schema/interests');
const { clients } = await import('@/lib/db/schema/clients');
const { berths } = await import('@/lib/db/schema/berths');
const { interestNotes } = await import('@/lib/db/schema/interests');
const { emailThreads } = await import('@/lib/db/schema/email');
const { and, eq, desc } = await import('drizzle-orm');
// Fetch interest, client, berth
const [interest, client] = await Promise.all([
db.query.interests.findFirst({
where: and(eq(interests.id, interestId), eq(interests.portId, portId)),
}),
db.query.clients.findFirst({ where: eq(clients.id, clientId) }),
]);
if (!interest || !client) {
throw new Error('Interest or client not found');
}
let berthMooring: string | null = null;
if (interest.berthId) {
const berth = await db.query.berths.findFirst({
where: eq(berths.id, interest.berthId),
});
berthMooring = berth?.mooringNumber ?? null;
}
// Fetch last 5 notes
const recentNotes = await db
.select({ content: interestNotes.content, createdAt: interestNotes.createdAt })
.from(interestNotes)
.where(eq(interestNotes.interestId, interestId))
.orderBy(desc(interestNotes.createdAt))
.limit(5);
// Fetch last 5 email subjects (via threads linked to client)
const recentThreads = await db
.select({ subject: emailThreads.subject, lastMessageAt: emailThreads.lastMessageAt })
.from(emailThreads)
.where(and(eq(emailThreads.clientId, clientId), eq(emailThreads.portId, portId)))
.orderBy(desc(emailThreads.lastMessageAt))
.limit(5);
const apiKey = process.env.OPENAI_API_KEY;
if (!apiKey) {
// Fallback: template-based draft
return buildTemplateDraft({
clientName: client.fullName,
context,
berthMooring,
pipelineStage: interest.pipelineStage,
});
}
// Build prompt
const contextDescriptions: Record<string, string> = {
follow_up: 'a friendly follow-up email',
introduction: 'an initial introduction email',
stage_update: `an email informing the client about their pipeline progression to stage "${interest.pipelineStage}"`,
general: 'a general communication email',
};
const prompt = [
`Write ${contextDescriptions[context] ?? 'an email'} to a marina berth client.`,
'',
`Client name: ${client.fullName}`,
berthMooring ? `Berth: ${berthMooring}` : 'Berth: not yet assigned',
`Pipeline stage: ${interest.pipelineStage}`,
'',
recentNotes.length > 0
? `Recent notes:\n${recentNotes.map((n) => `- ${n.content.slice(0, 200)}`).join('\n')}`
: null,
recentThreads.length > 0
? `Recent email subjects:\n${recentThreads.map((t) => `- ${t.subject ?? '(no subject)'}`).join('\n')}`
: null,
additionalInstructions ? `Additional instructions: ${additionalInstructions}` : null,
'',
'Return JSON with keys: subject (string) and body (string, plain text).',
]
.filter(Boolean)
.join('\n');
// Call OpenAI with timeout
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), OPENAI_TIMEOUT_MS);
let subject: string;
let body: string;
try {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${apiKey}`,
},
body: JSON.stringify({
model: 'gpt-4o-mini',
messages: [
{
role: 'system',
content:
'You are an expert marina sales and relationship manager. Generate professional, concise emails. Always return valid JSON with "subject" and "body" keys only.',
},
{ role: 'user', content: prompt },
],
max_tokens: 800,
temperature: 0.7,
response_format: { type: 'json_object' },
}),
signal: controller.signal,
});
clearTimeout(timeoutId);
if (!response.ok) {
const errorText = await response.text().catch(() => '');
throw new Error(`OpenAI API error ${response.status}: ${errorText}`);
}
const data = (await response.json()) as {
choices: Array<{ message: { content: string } }>;
};
const content = data.choices[0]?.message?.content ?? '{}';
// Enforce output size cap
if (content.length > MAX_OUTPUT_BYTES) {
throw new Error('AI output exceeded 10 KB cap');
}
const parsed = JSON.parse(content) as { subject?: string; body?: string };
subject = parsed.subject ?? `Follow-up: ${client.fullName}`;
body = parsed.body ?? '';
} catch (err) {
clearTimeout(timeoutId);
logger.warn({ err, interestId }, 'OpenAI call failed, falling back to template draft');
return buildTemplateDraft({
clientName: client.fullName,
context,
berthMooring,
pipelineStage: interest.pipelineStage,
});
}
return { subject, body, generatedAt: new Date().toISOString() };
}
// ─── Template fallback ────────────────────────────────────────────────────────
function buildTemplateDraft(opts: {
clientName: string;
context: string;
berthMooring: string | null;
pipelineStage: string;
}): DraftResult {
const { clientName, context, berthMooring, pipelineStage } = opts;
const berthText = berthMooring ? `berth ${berthMooring}` : 'your requested berth';
const templates: Record<string, { subject: string; body: string }> = {
introduction: {
subject: `Welcome to Port Nimara ${clientName}`,
body: `Dear ${clientName},\n\nThank you for your interest in Port Nimara. We are delighted to introduce our marina facilities and look forward to discussing how we can accommodate your needs for ${berthText}.\n\nPlease feel free to reach out at any time.\n\nKind regards,\nPort Nimara Team`,
},
follow_up: {
subject: `Following up ${clientName}`,
body: `Dear ${clientName},\n\nI wanted to follow up regarding your interest in ${berthText}. Please let us know if you have any questions or if there is anything we can assist you with.\n\nWe look forward to hearing from you.\n\nKind regards,\nPort Nimara Team`,
},
stage_update: {
subject: `Update on your application ${clientName}`,
body: `Dear ${clientName},\n\nWe are pleased to inform you that your application for ${berthText} has progressed to the "${pipelineStage.replace(/_/g, ' ')}" stage.\n\nWe will be in touch shortly with the next steps.\n\nKind regards,\nPort Nimara Team`,
},
general: {
subject: `Message from Port Nimara ${clientName}`,
body: `Dear ${clientName},\n\nThank you for your continued interest in Port Nimara. We appreciate your patience and look forward to assisting you with ${berthText}.\n\nKind regards,\nPort Nimara Team`,
},
};
const template = templates[context] ?? templates['general']!;
return { ...template, generatedAt: new Date().toISOString() };
}
// ─── Worker ───────────────────────────────────────────────────────────────────
export const aiWorker = new Worker(
'ai',
async (job: Job) => {
logger.info({ jobId: job.id, jobName: job.name }, 'Processing AI job');
switch (job.name) {
case 'generate-email-draft': {
const payload = job.data as GenerateEmailDraftPayload;
const result = await generateEmailDraft(payload);
return result;
}
default:
logger.warn({ jobName: job.name }, 'Unknown AI job');
return undefined;
}
},
{
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
concurrency: QUEUE_CONFIGS.ai.concurrency,
},
);
aiWorker.on('failed', (job, err) => {
logger.error({ jobId: job?.id, jobName: job?.name, err }, 'AI job failed');
});