Files
pn-new-crm/src/lib/queue/workers/ai.ts
Matt Ciaccio e06fb9545b sec: lock down 5 cross-tenant IDORs uncovered in second-pass review
1. HIGH — /api/v1/admin/ports/[id] PATCH+GET let any port-admin
   (manage_settings) mutate any other tenant's port row by passing the
   foreign id in the path. Now non-super-admins must target their own
   ctx.portId; listPorts and createPort are super-admin only.

2. HIGH — Invoice create/update accepted arbitrary expenseIds and
   linked them into invoice_expenses with no port check; the GET
   response then re-emitted those foreign expense rows via the
   linkedExpenses join. assertExpensesInPort now validates each id
   belongs to the caller's portId before insert; getInvoiceById's
   join filters by expenses.portId as defense-in-depth.

3. HIGH — Document creation paths (createDocument, createFromWizard,
   createFromUpload) persisted user-supplied clientId/interestId/
   companyId/yachtId/reservationId without verifying those FKs were
   in-port. sendForSigning then loaded the foreign client/interest by
   id alone and pushed their PII into the Documenso payload. New
   assertSubjectFksInPort helper rejects out-of-port FKs at create
   time; sendForSigning's interest+client lookups now also filter by
   portId.

4. MEDIUM — calculateInterestScore read its redis cache before
   verifying portId, and the cache key was interestId-only — a
   foreign-port caller could observe a cached score breakdown.
   Cache key now includes portId, and the port-scope DB lookup runs
   before any cache.get.

5. MEDIUM — AI email-draft job results were retrievable by anyone who
   could guess the BullMQ jobId (default sequential integers). Job
   ids are now random UUIDs, requestEmailDraft validates interestId/
   clientId belong to ctx.portId before enqueueing, the worker's
   client lookup is port-scoped, and getEmailDraftResult requires
   the caller to match the original requester's userId+portId before
   returning the drafted subject/body.

The interest-scoring unit test that asserted "DB is bypassed on cache
hit" is updated to reflect the new (security-correct) ordering.
Two new regression test files cover the email-draft binding (5 tests).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 02:48:43 +02:00

246 lines
9.1 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { Worker, type Job } from 'bullmq';
import type { ConnectionOptions } from 'bullmq';
import { logger } from '@/lib/logger';
import { QUEUE_CONFIGS } from '@/lib/queue';
// ─── Email draft generation ───────────────────────────────────────────────────
const MAX_OUTPUT_BYTES = 10 * 1024; // 10 KB
const OPENAI_TIMEOUT_MS = 30_000; // 30 s
interface GenerateEmailDraftPayload {
interestId: string;
clientId: string;
portId: string;
context: 'follow_up' | 'introduction' | 'stage_update' | 'general';
additionalInstructions?: string;
requestedBy: string;
}
interface DraftResult {
subject: string;
body: string;
generatedAt: string;
}
async function generateEmailDraft(payload: GenerateEmailDraftPayload): Promise<DraftResult> {
const { interestId, clientId, portId, context, additionalInstructions } = payload;
// Fetch data by IDs in the worker — never trust PII from the queue payload
const { db } = await import('@/lib/db');
const { interests } = await import('@/lib/db/schema/interests');
const { clients } = await import('@/lib/db/schema/clients');
const { berths } = await import('@/lib/db/schema/berths');
const { interestNotes } = await import('@/lib/db/schema/interests');
const { emailThreads } = await import('@/lib/db/schema/email');
const { and, eq, desc } = await import('drizzle-orm');
// Fetch interest, client, berth — both lookups port-scoped so a
// crafted job payload cannot exfiltrate foreign-tenant data.
const [interest, client] = await Promise.all([
db.query.interests.findFirst({
where: and(eq(interests.id, interestId), eq(interests.portId, portId)),
}),
db.query.clients.findFirst({
where: and(eq(clients.id, clientId), eq(clients.portId, portId)),
}),
]);
if (!interest || !client) {
throw new Error('Interest or client not found');
}
let berthMooring: string | null = null;
if (interest.berthId) {
const berth = await db.query.berths.findFirst({
where: eq(berths.id, interest.berthId),
});
berthMooring = berth?.mooringNumber ?? null;
}
// Fetch last 5 notes
const recentNotes = await db
.select({ content: interestNotes.content, createdAt: interestNotes.createdAt })
.from(interestNotes)
.where(eq(interestNotes.interestId, interestId))
.orderBy(desc(interestNotes.createdAt))
.limit(5);
// Fetch last 5 email subjects (via threads linked to client)
const recentThreads = await db
.select({ subject: emailThreads.subject, lastMessageAt: emailThreads.lastMessageAt })
.from(emailThreads)
.where(and(eq(emailThreads.clientId, clientId), eq(emailThreads.portId, portId)))
.orderBy(desc(emailThreads.lastMessageAt))
.limit(5);
const apiKey = process.env.OPENAI_API_KEY;
if (!apiKey) {
// Fallback: template-based draft
return buildTemplateDraft({
clientName: client.fullName,
context,
berthMooring,
pipelineStage: interest.pipelineStage,
});
}
// Build prompt
const contextDescriptions: Record<string, string> = {
follow_up: 'a friendly follow-up email',
introduction: 'an initial introduction email',
stage_update: `an email informing the client about their pipeline progression to stage "${interest.pipelineStage}"`,
general: 'a general communication email',
};
const prompt = [
`Write ${contextDescriptions[context] ?? 'an email'} to a marina berth client.`,
'',
`Client name: ${client.fullName}`,
berthMooring ? `Berth: ${berthMooring}` : 'Berth: not yet assigned',
`Pipeline stage: ${interest.pipelineStage}`,
'',
recentNotes.length > 0
? `Recent notes:\n${recentNotes.map((n) => `- ${n.content.slice(0, 200)}`).join('\n')}`
: null,
recentThreads.length > 0
? `Recent email subjects:\n${recentThreads.map((t) => `- ${t.subject ?? '(no subject)'}`).join('\n')}`
: null,
additionalInstructions ? `Additional instructions: ${additionalInstructions}` : null,
'',
'Return JSON with keys: subject (string) and body (string, plain text).',
]
.filter(Boolean)
.join('\n');
// Call OpenAI with timeout
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), OPENAI_TIMEOUT_MS);
let subject: string;
let body: string;
try {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${apiKey}`,
},
body: JSON.stringify({
model: 'gpt-4o-mini',
messages: [
{
role: 'system',
content:
'You are an expert marina sales and relationship manager. Generate professional, concise emails. Always return valid JSON with "subject" and "body" keys only.',
},
{ role: 'user', content: prompt },
],
max_tokens: 800,
temperature: 0.7,
response_format: { type: 'json_object' },
}),
signal: controller.signal,
});
clearTimeout(timeoutId);
if (!response.ok) {
const errorText = await response.text().catch(() => '');
throw new Error(`OpenAI API error ${response.status}: ${errorText}`);
}
const data = (await response.json()) as {
choices: Array<{ message: { content: string } }>;
};
const content = data.choices[0]?.message?.content ?? '{}';
// Enforce output size cap
if (content.length > MAX_OUTPUT_BYTES) {
throw new Error('AI output exceeded 10 KB cap');
}
const parsed = JSON.parse(content) as { subject?: string; body?: string };
subject = parsed.subject ?? `Follow-up: ${client.fullName}`;
body = parsed.body ?? '';
} catch (err) {
clearTimeout(timeoutId);
logger.warn({ err, interestId }, 'OpenAI call failed, falling back to template draft');
return buildTemplateDraft({
clientName: client.fullName,
context,
berthMooring,
pipelineStage: interest.pipelineStage,
});
}
return { subject, body, generatedAt: new Date().toISOString() };
}
// ─── Template fallback ────────────────────────────────────────────────────────
function buildTemplateDraft(opts: {
clientName: string;
context: string;
berthMooring: string | null;
pipelineStage: string;
}): DraftResult {
const { clientName, context, berthMooring, pipelineStage } = opts;
const berthText = berthMooring ? `berth ${berthMooring}` : 'your requested berth';
const templates: Record<string, { subject: string; body: string }> = {
introduction: {
subject: `Welcome to Port Nimara ${clientName}`,
body: `Dear ${clientName},\n\nThank you for your interest in Port Nimara. We are delighted to introduce our marina facilities and look forward to discussing how we can accommodate your needs for ${berthText}.\n\nPlease feel free to reach out at any time.\n\nKind regards,\nPort Nimara Team`,
},
follow_up: {
subject: `Following up ${clientName}`,
body: `Dear ${clientName},\n\nI wanted to follow up regarding your interest in ${berthText}. Please let us know if you have any questions or if there is anything we can assist you with.\n\nWe look forward to hearing from you.\n\nKind regards,\nPort Nimara Team`,
},
stage_update: {
subject: `Update on your application ${clientName}`,
body: `Dear ${clientName},\n\nWe are pleased to inform you that your application for ${berthText} has progressed to the "${pipelineStage.replace(/_/g, ' ')}" stage.\n\nWe will be in touch shortly with the next steps.\n\nKind regards,\nPort Nimara Team`,
},
general: {
subject: `Message from Port Nimara ${clientName}`,
body: `Dear ${clientName},\n\nThank you for your continued interest in Port Nimara. We appreciate your patience and look forward to assisting you with ${berthText}.\n\nKind regards,\nPort Nimara Team`,
},
};
const template = templates[context] ?? templates['general']!;
return { ...template, generatedAt: new Date().toISOString() };
}
// ─── Worker ───────────────────────────────────────────────────────────────────
export const aiWorker = new Worker(
'ai',
async (job: Job) => {
logger.info({ jobId: job.id, jobName: job.name }, 'Processing AI job');
switch (job.name) {
case 'generate-email-draft': {
const payload = job.data as GenerateEmailDraftPayload;
const result = await generateEmailDraft(payload);
return result;
}
default:
logger.warn({ jobName: job.name }, 'Unknown AI job');
return undefined;
}
},
{
connection: { url: process.env.REDIS_URL! } as ConnectionOptions,
concurrency: QUEUE_CONFIGS.ai.concurrency,
},
);
aiWorker.on('failed', (job, err) => {
logger.error({ jobId: job?.id, jobName: job?.name, err }, 'AI job failed');
});