Initial commit: Port Nimara CRM (Layers 0-4)
Full CRM rebuild with Next.js 15, TypeScript, Tailwind, Drizzle ORM,
PostgreSQL, Redis, BullMQ, MinIO, and Socket.io. Includes 461 source
files covering clients, berths, interests/pipeline, documents/EOI,
expenses/invoices, email, notifications, dashboard, admin, and
client portal. CI/CD via Gitea Actions with Docker builds.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 11:52:51 +01:00
import { Worker , type Job } from 'bullmq' ;
import type { ConnectionOptions } from 'bullmq' ;
import { logger } from '@/lib/logger' ;
import { QUEUE_CONFIGS } from '@/lib/queue' ;
// ─── Email draft generation ───────────────────────────────────────────────────
const MAX_OUTPUT_BYTES = 10 * 1024 ; // 10 KB
const OPENAI_TIMEOUT_MS = 30 _000 ; // 30 s
interface GenerateEmailDraftPayload {
interestId : string ;
clientId : string ;
portId : string ;
context : 'follow_up' | 'introduction' | 'stage_update' | 'general' ;
additionalInstructions? : string ;
requestedBy : string ;
}
interface DraftResult {
subject : string ;
body : string ;
generatedAt : string ;
}
async function generateEmailDraft ( payload : GenerateEmailDraftPayload ) : Promise < DraftResult > {
const { interestId , clientId , portId , context , additionalInstructions } = payload ;
// Fetch data by IDs in the worker — never trust PII from the queue payload
const { db } = await import ( '@/lib/db' ) ;
const { interests } = await import ( '@/lib/db/schema/interests' ) ;
const { clients } = await import ( '@/lib/db/schema/clients' ) ;
const { berths } = await import ( '@/lib/db/schema/berths' ) ;
const { interestNotes } = await import ( '@/lib/db/schema/interests' ) ;
2026-03-26 12:06:18 +01:00
const { emailThreads } = await import ( '@/lib/db/schema/email' ) ;
const { and , eq , desc } = await import ( 'drizzle-orm' ) ;
Initial commit: Port Nimara CRM (Layers 0-4)
Full CRM rebuild with Next.js 15, TypeScript, Tailwind, Drizzle ORM,
PostgreSQL, Redis, BullMQ, MinIO, and Socket.io. Includes 461 source
files covering clients, berths, interests/pipeline, documents/EOI,
expenses/invoices, email, notifications, dashboard, admin, and
client portal. CI/CD via Gitea Actions with Docker builds.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 11:52:51 +01:00
// Fetch interest, client, berth
const [ interest , client ] = await Promise . all ( [
db . query . interests . findFirst ( {
where : and ( eq ( interests . id , interestId ) , eq ( interests . portId , portId ) ) ,
} ) ,
db . query . clients . findFirst ( { where : eq ( clients . id , clientId ) } ) ,
] ) ;
if ( ! interest || ! client ) {
throw new Error ( 'Interest or client not found' ) ;
}
let berthMooring : string | null = null ;
if ( interest . berthId ) {
const berth = await db . query . berths . findFirst ( {
where : eq ( berths . id , interest . berthId ) ,
} ) ;
berthMooring = berth ? . mooringNumber ? ? null ;
}
// Fetch last 5 notes
const recentNotes = await db
. select ( { content : interestNotes.content , createdAt : interestNotes.createdAt } )
. from ( interestNotes )
. where ( eq ( interestNotes . interestId , interestId ) )
. orderBy ( desc ( interestNotes . createdAt ) )
. limit ( 5 ) ;
// Fetch last 5 email subjects (via threads linked to client)
const recentThreads = await db
. select ( { subject : emailThreads.subject , lastMessageAt : emailThreads.lastMessageAt } )
. from ( emailThreads )
. where ( and ( eq ( emailThreads . clientId , clientId ) , eq ( emailThreads . portId , portId ) ) )
. orderBy ( desc ( emailThreads . lastMessageAt ) )
. limit ( 5 ) ;
const apiKey = process . env . OPENAI_API_KEY ;
if ( ! apiKey ) {
// Fallback: template-based draft
return buildTemplateDraft ( { clientName : client.fullName , context , berthMooring , pipelineStage : interest.pipelineStage } ) ;
}
// Build prompt
const contextDescriptions : Record < string , string > = {
follow_up : 'a friendly follow-up email' ,
introduction : 'an initial introduction email' ,
stage_update : ` an email informing the client about their pipeline progression to stage " ${ interest . pipelineStage } " ` ,
general : 'a general communication email' ,
} ;
const prompt = [
` Write ${ contextDescriptions [ context ] ? ? 'an email' } to a marina berth client. ` ,
'' ,
` Client name: ${ client . fullName } ` ,
client . companyName ? ` Company: ${ client . companyName } ` : null ,
client . yachtName ? ` Yacht: ${ client . yachtName } ` : null ,
berthMooring ? ` Berth: ${ berthMooring } ` : 'Berth: not yet assigned' ,
` Pipeline stage: ${ interest . pipelineStage } ` ,
'' ,
recentNotes . length > 0
? ` Recent notes: \ n ${ recentNotes . map ( ( n ) = > ` - ${ n . content . slice ( 0 , 200 ) } ` ) . join ( '\n' ) } `
: null ,
recentThreads . length > 0
? ` Recent email subjects: \ n ${ recentThreads . map ( ( t ) = > ` - ${ t . subject ? ? '(no subject)' } ` ) . join ( '\n' ) } `
: null ,
additionalInstructions ? ` Additional instructions: ${ additionalInstructions } ` : null ,
'' ,
'Return JSON with keys: subject (string) and body (string, plain text).' ,
]
. filter ( Boolean )
. join ( '\n' ) ;
// Call OpenAI with timeout
const controller = new AbortController ( ) ;
const timeoutId = setTimeout ( ( ) = > controller . abort ( ) , OPENAI_TIMEOUT_MS ) ;
let subject : string ;
let body : string ;
try {
const response = await fetch ( 'https://api.openai.com/v1/chat/completions' , {
method : 'POST' ,
headers : {
'Content-Type' : 'application/json' ,
Authorization : ` Bearer ${ apiKey } ` ,
} ,
body : JSON.stringify ( {
model : 'gpt-4o-mini' ,
messages : [
{
role : 'system' ,
content :
'You are an expert marina sales and relationship manager. Generate professional, concise emails. Always return valid JSON with "subject" and "body" keys only.' ,
} ,
{ role : 'user' , content : prompt } ,
] ,
max_tokens : 800 ,
temperature : 0.7 ,
response_format : { type : 'json_object' } ,
} ) ,
signal : controller.signal ,
} ) ;
clearTimeout ( timeoutId ) ;
if ( ! response . ok ) {
const errorText = await response . text ( ) . catch ( ( ) = > '' ) ;
throw new Error ( ` OpenAI API error ${ response . status } : ${ errorText } ` ) ;
}
const data = ( await response . json ( ) ) as {
choices : Array < { message : { content : string } } > ;
} ;
const content = data . choices [ 0 ] ? . message ? . content ? ? '{}' ;
// Enforce output size cap
if ( content . length > MAX_OUTPUT_BYTES ) {
throw new Error ( 'AI output exceeded 10 KB cap' ) ;
}
const parsed = JSON . parse ( content ) as { subject? : string ; body? : string } ;
subject = parsed . subject ? ? ` Follow-up: ${ client . fullName } ` ;
body = parsed . body ? ? '' ;
} catch ( err ) {
clearTimeout ( timeoutId ) ;
logger . warn ( { err , interestId } , 'OpenAI call failed, falling back to template draft' ) ;
return buildTemplateDraft ( { clientName : client.fullName , context , berthMooring , pipelineStage : interest.pipelineStage } ) ;
}
return { subject , body , generatedAt : new Date ( ) . toISOString ( ) } ;
}
// ─── Template fallback ────────────────────────────────────────────────────────
function buildTemplateDraft ( opts : {
clientName : string ;
context : string ;
berthMooring : string | null ;
pipelineStage : string ;
} ) : DraftResult {
const { clientName , context , berthMooring , pipelineStage } = opts ;
const berthText = berthMooring ? ` berth ${ berthMooring } ` : 'your requested berth' ;
const templates : Record < string , { subject : string ; body : string } > = {
introduction : {
subject : ` Welcome to Port Nimara – ${ clientName } ` ,
body : ` Dear ${ clientName } , \ n \ nThank you for your interest in Port Nimara. We are delighted to introduce our marina facilities and look forward to discussing how we can accommodate your needs for ${ berthText } . \ n \ nPlease feel free to reach out at any time. \ n \ nKind regards, \ nPort Nimara Team ` ,
} ,
follow_up : {
subject : ` Following up – ${ clientName } ` ,
body : ` Dear ${ clientName } , \ n \ nI wanted to follow up regarding your interest in ${ berthText } . Please let us know if you have any questions or if there is anything we can assist you with. \ n \ nWe look forward to hearing from you. \ n \ nKind regards, \ nPort Nimara Team ` ,
} ,
stage_update : {
subject : ` Update on your application – ${ clientName } ` ,
body : ` Dear ${ clientName } , \ n \ nWe are pleased to inform you that your application for ${ berthText } has progressed to the " ${ pipelineStage . replace ( /_/g , ' ' ) } " stage. \ n \ nWe will be in touch shortly with the next steps. \ n \ nKind regards, \ nPort Nimara Team ` ,
} ,
general : {
subject : ` Message from Port Nimara – ${ clientName } ` ,
body : ` Dear ${ clientName } , \ n \ nThank you for your continued interest in Port Nimara. We appreciate your patience and look forward to assisting you with ${ berthText } . \ n \ nKind regards, \ nPort Nimara Team ` ,
} ,
} ;
const template = templates [ context ] ? ? templates [ 'general' ] ! ;
return { . . . template , generatedAt : new Date ( ) . toISOString ( ) } ;
}
// ─── Worker ───────────────────────────────────────────────────────────────────
export const aiWorker = new Worker (
'ai' ,
async ( job : Job ) = > {
logger . info ( { jobId : job.id , jobName : job.name } , 'Processing AI job' ) ;
switch ( job . name ) {
case 'generate-email-draft' : {
const payload = job . data as GenerateEmailDraftPayload ;
const result = await generateEmailDraft ( payload ) ;
return result ;
}
default :
logger . warn ( { jobName : job.name } , 'Unknown AI job' ) ;
return undefined ;
}
} ,
{
connection : { url : process.env.REDIS_URL ! } as ConnectionOptions ,
concurrency : QUEUE_CONFIGS.ai.concurrency ,
} ,
) ;
aiWorker . on ( 'failed' , ( job , err ) = > {
logger . error ( { jobId : job?.id , jobName : job?.name , err } , 'AI job failed' ) ;
} ) ;