From 65a22e6f191a0492131c894a5747ae91c0306d5d Mon Sep 17 00:00:00 2001 From: Matt Date: Mon, 16 Feb 2026 14:02:38 +0100 Subject: [PATCH] Optimize all AI functions for efficiency and speed - AI Tagging: batch 10 projects per API call with 3 concurrent batches (~10x faster) - New `tagProjectsBatch()` with `getAISuggestionsBatch()` for multi-project prompts - Single DB query for all projects, single anonymization pass - Compact JSON in prompts (no pretty-print) saves tokens - AI Shortlist: run STARTUP and BUSINESS_CONCEPT categories in parallel (2x faster) - AI Filtering: increase default parallel batches from 1 to 3 (3x faster) Co-Authored-By: Claude Opus 4.6 --- src/server/routers/tag.ts | 68 +++--- src/server/services/ai-filtering.ts | 2 +- src/server/services/ai-shortlist.ts | 10 +- src/server/services/ai-tagging.ts | 346 +++++++++++++++++++++++++++- 4 files changed, 377 insertions(+), 49 deletions(-) diff --git a/src/server/routers/tag.ts b/src/server/routers/tag.ts index b9e9a7c..b782ed0 100644 --- a/src/server/routers/tag.ts +++ b/src/server/routers/tag.ts @@ -5,6 +5,7 @@ import { prisma } from '@/lib/prisma' import { logAudit } from '../utils/audit' import { tagProject, + tagProjectsBatch, getTagSuggestions, addProjectTag, removeProjectTag, @@ -17,7 +18,7 @@ import { NotificationTypes, } from '../services/in-app-notification' -// Background job runner for tagging +// Background job runner for tagging — uses batched API calls for efficiency async function runTaggingJob(jobId: string, userId: string) { const job = await prisma.taggingJob.findUnique({ where: { id: jobId }, @@ -28,7 +29,7 @@ async function runTaggingJob(jobId: string, userId: string) { return } - console.log(`[AI Tagging Job] Starting job ${jobId}...`) + console.log(`[AI Tagging Job] Starting job ${jobId} (batched mode)...`) // Mark as running await prisma.taggingJob.update({ @@ -56,7 +57,7 @@ async function runTaggingJob(jobId: string, userId: string) { const allProjects = await prisma.project.findMany({ where: whereClause, - select: { id: true, title: true, tags: true }, + select: { id: true, title: true, tags: true, projectTags: { select: { tagId: true } } }, }) const untaggedProjects = allProjects.filter(p => p.tags.length === 0) @@ -83,48 +84,33 @@ async function runTaggingJob(jobId: string, userId: string) { return } - let taggedCount = 0 - let failedCount = 0 - const errors: string[] = [] const startTime = Date.now() - for (let i = 0; i < untaggedProjects.length; i++) { - const project = untaggedProjects[i] - console.log(`[AI Tagging Job] Processing ${i + 1}/${untaggedProjects.length}: "${project.title.substring(0, 40)}..."`) + // Use batched tagging — processes 10 projects per API call, 3 concurrent calls + const { results, totalTokens } = await tagProjectsBatch( + untaggedProjects, + userId, + async (processed, total) => { + // Update job progress on each batch completion + const taggedSoFar = results?.length ?? processed + await prisma.taggingJob.update({ + where: { id: jobId }, + data: { + processedCount: processed, + taggedCount: taggedSoFar, + }, + }) - try { - const result = await tagProject(project.id, userId) - taggedCount++ - console.log(`[AI Tagging Job] ✓ Tagged with ${result.applied.length} tags`) - } catch (error) { - failedCount++ - const errorMsg = error instanceof Error ? error.message : 'Unknown error' - errors.push(`${project.title}: ${errorMsg}`) - console.error(`[AI Tagging Job] ✗ Failed: ${errorMsg}`) - } - - // Update progress - await prisma.taggingJob.update({ - where: { id: jobId }, - data: { - processedCount: i + 1, - taggedCount, - failedCount, - errorsJson: errors.length > 0 ? errors.slice(0, 20) : undefined, // Keep last 20 errors - }, - }) - - // Log progress every 10 projects - if ((i + 1) % 10 === 0) { const elapsed = ((Date.now() - startTime) / 1000).toFixed(0) - const avgTime = (Date.now() - startTime) / (i + 1) / 1000 - const remaining = avgTime * (untaggedProjects.length - i - 1) - console.log(`[AI Tagging Job] Progress: ${i + 1}/${untaggedProjects.length} (${elapsed}s elapsed, ~${remaining.toFixed(0)}s remaining)`) + console.log(`[AI Tagging Job] Progress: ${processed}/${total} (${elapsed}s elapsed)`) } - } + ) + + const taggedCount = results.filter(r => r.applied.length > 0).length + const failedCount = untaggedProjects.length - results.length const totalTime = ((Date.now() - startTime) / 1000).toFixed(1) - console.log(`[AI Tagging Job] Complete: ${taggedCount} tagged, ${failedCount} failed in ${totalTime}s`) + console.log(`[AI Tagging Job] Complete: ${taggedCount} tagged, ${failedCount} failed in ${totalTime}s (${totalTokens} tokens)`) // Mark as completed await prisma.taggingJob.update({ @@ -132,7 +118,9 @@ async function runTaggingJob(jobId: string, userId: string) { data: { status: 'COMPLETED', completedAt: new Date(), - errorsJson: errors.length > 0 ? errors : undefined, + processedCount: results.length, + taggedCount, + failedCount, }, }) @@ -144,7 +132,7 @@ async function runTaggingJob(jobId: string, userId: string) { linkUrl: '/admin/projects', linkLabel: 'View Projects', priority: 'normal', - metadata: { jobId, taggedCount, failedCount, skippedCount }, + metadata: { jobId, taggedCount, failedCount, skippedCount, totalTokens }, }) } catch (error) { diff --git a/src/server/services/ai-filtering.ts b/src/server/services/ai-filtering.ts index eb37a3f..aaa6bb0 100644 --- a/src/server/services/ai-filtering.ts +++ b/src/server/services/ai-filtering.ts @@ -142,7 +142,7 @@ interface FilteringRuleInput { const DEFAULT_BATCH_SIZE = 20 const MAX_BATCH_SIZE = 50 const MIN_BATCH_SIZE = 1 -const DEFAULT_PARALLEL_BATCHES = 1 +const DEFAULT_PARALLEL_BATCHES = 3 const MAX_PARALLEL_BATCHES = 10 // Structured system prompt for AI screening diff --git a/src/server/services/ai-shortlist.ts b/src/server/services/ai-shortlist.ts index 5813547..10f1c96 100644 --- a/src/server/services/ai-shortlist.ts +++ b/src/server/services/ai-shortlist.ts @@ -344,8 +344,8 @@ export async function generateShortlist( let totalTokens = 0 const allErrors: string[] = [] - // Run each category independently - for (const cat of categories) { + // Run categories in parallel for efficiency + const categoryPromises = categories.map(async (cat) => { const catTopN = cat === 'STARTUP' ? (startupTopN ?? topN) : (conceptTopN ?? topN) @@ -357,6 +357,12 @@ export async function generateShortlist( prisma, ) + return { cat, result } + }) + + const categoryResults = await Promise.all(categoryPromises) + + for (const { cat, result } of categoryResults) { if (cat === 'STARTUP') { allRecommendations.STARTUP = result.recommendations } else { diff --git a/src/server/services/ai-tagging.ts b/src/server/services/ai-tagging.ts index 691729e..1115eee 100644 --- a/src/server/services/ai-tagging.ts +++ b/src/server/services/ai-tagging.ts @@ -5,7 +5,7 @@ * * Features: * - Single project tagging (on-submit or manual) - * - Batch tagging for rounds + * - Batch tagging with concurrent processing (10 projects/batch, 3 concurrent) * - Confidence scores for each tag * - Additive only - never removes existing tags * @@ -53,8 +53,10 @@ interface AvailableTag { const CONFIDENCE_THRESHOLD = 0.5 const DEFAULT_MAX_TAGS = 5 +const BATCH_SIZE = 10 // Projects per API call +const BATCH_CONCURRENCY = 3 // Concurrent API calls -// System prompt optimized for tag suggestion +// System prompt optimized for single-project tag suggestion const TAG_SUGGESTION_SYSTEM_PROMPT = `You are an expert at categorizing ocean conservation and sustainability projects. Analyze the project and suggest the most relevant expertise tags from the provided list. @@ -78,6 +80,36 @@ Rules: - Maximum 7 suggestions per project - Be conservative - only suggest tags that truly apply` +// System prompt optimized for batch tagging (multiple projects in one call) +const BATCH_TAG_SYSTEM_PROMPT = `You are an expert at categorizing ocean conservation and sustainability projects. + +Analyze EACH project and suggest the most relevant expertise tags from the provided list. +Consider each project's focus areas, technology, methodology, and domain. + +Return JSON with this format: +{ + "projects": [ + { + "project_id": "PROJECT_001", + "suggestions": [ + { + "tag_name": "exact tag name from list", + "confidence": 0.0-1.0, + "reasoning": "brief explanation" + } + ] + } + ] +} + +Rules: +- Only suggest tags from the provided list (exact names) +- Order by relevance (most relevant first) +- Confidence should reflect how well the tag matches +- Maximum 7 suggestions per project +- Be conservative - only suggest tags that truly apply +- Return results for ALL projects provided` + // ─── Helper Functions ──────────────────────────────────────────────────────── /** @@ -132,7 +164,8 @@ export async function getAvailableTags(): Promise { // ─── AI Tagging Core ───────────────────────────────────────────────────────── /** - * Call OpenAI to get tag suggestions for a project + * Call OpenAI to get tag suggestions for a single project + * Used for on-demand single-project tagging */ async function getAISuggestions( anonymizedProject: AnonymizedProjectForAI, @@ -147,7 +180,7 @@ async function getAISuggestions( const model = await getConfiguredModel() - // Build tag list for prompt + // Build compact tag list for prompt const tagList = availableTags.map((t) => ({ name: t.name, category: t.category, @@ -155,10 +188,10 @@ async function getAISuggestions( })) const userPrompt = `PROJECT: -${JSON.stringify(anonymizedProject, null, 2)} +${JSON.stringify(anonymizedProject)} AVAILABLE TAGS: -${JSON.stringify(tagList, null, 2)} +${JSON.stringify(tagList)} Suggest relevant tags for this project.` @@ -246,6 +279,160 @@ Suggest relevant tags for this project.` } } +/** + * Call OpenAI to get tag suggestions for a batch of projects in one API call. + * Returns a map of project_id -> TagSuggestion[]. + */ +async function getAISuggestionsBatch( + anonymizedProjects: AnonymizedProjectForAI[], + availableTags: AvailableTag[], + userId?: string +): Promise<{ suggestionsMap: Map; tokensUsed: number }> { + const openai = await getOpenAI() + if (!openai) { + console.warn('[AI Tagging] OpenAI not configured') + return { suggestionsMap: new Map(), tokensUsed: 0 } + } + + const model = await getConfiguredModel() + const suggestionsMap = new Map() + + // Build compact tag list (sent once for entire batch) + const tagList = availableTags.map((t) => ({ + name: t.name, + category: t.category, + description: t.description, + })) + + const userPrompt = `PROJECTS (${anonymizedProjects.length}): +${JSON.stringify(anonymizedProjects)} + +AVAILABLE TAGS: +${JSON.stringify(tagList)} + +Suggest relevant tags for each project.` + + const MAX_PARSE_RETRIES = 2 + let parseAttempts = 0 + + try { + const params = buildCompletionParams(model, { + messages: [ + { role: 'system', content: BATCH_TAG_SYSTEM_PROMPT }, + { role: 'user', content: userPrompt }, + ], + jsonMode: true, + temperature: 0.1, + maxTokens: Math.min(4000, anonymizedProjects.length * 500), + }) + + let response = await openai.chat.completions.create(params) + let usage = extractTokenUsage(response) + let totalTokens = usage.totalTokens + + // Parse with retry logic + let parsed: { + projects: Array<{ + project_id: string + suggestions: Array<{ + tag_name: string + confidence: number + reasoning: string + }> + }> + } + + while (true) { + try { + const content = response.choices[0]?.message?.content + if (!content) throw new Error('Empty response from AI') + + const raw = JSON.parse(content) + parsed = raw.projects ? raw : { projects: Array.isArray(raw) ? raw : [] } + break + } catch (parseError) { + if (parseError instanceof SyntaxError && parseAttempts < MAX_PARSE_RETRIES) { + parseAttempts++ + console.warn(`[AI Tagging Batch] JSON parse failed, retrying (${parseAttempts}/${MAX_PARSE_RETRIES})`) + const retryParams = buildCompletionParams(model, { + messages: [ + { role: 'system', content: BATCH_TAG_SYSTEM_PROMPT }, + { role: 'user', content: userPrompt + '\n\nIMPORTANT: Please ensure valid JSON output.' }, + ], + jsonMode: true, + temperature: 0.1, + maxTokens: Math.min(4000, anonymizedProjects.length * 500), + }) + response = await openai.chat.completions.create(retryParams) + const retryUsage = extractTokenUsage(response) + totalTokens += retryUsage.totalTokens + continue + } + throw parseError + } + } + + // Log usage for the entire batch + await logAIUsage({ + userId, + action: 'PROJECT_TAGGING', + entityType: 'Project', + model, + promptTokens: usage.promptTokens, + completionTokens: usage.completionTokens, + totalTokens, + batchSize: anonymizedProjects.length, + itemsProcessed: parsed.projects?.length || 0, + status: 'SUCCESS', + }) + + // Map results back to TagSuggestion format + for (const projectResult of parsed.projects || []) { + const suggestions: TagSuggestion[] = [] + for (const s of projectResult.suggestions || []) { + const tag = availableTags.find( + (t) => t.name.toLowerCase() === s.tag_name.toLowerCase() + ) + if (tag) { + suggestions.push({ + tagId: tag.id, + tagName: tag.name, + confidence: Math.max(0, Math.min(1, s.confidence)), + reasoning: s.reasoning || '', + }) + } + } + suggestionsMap.set(projectResult.project_id, suggestions) + } + + return { suggestionsMap, tokensUsed: totalTokens } + } catch (error) { + if (error instanceof SyntaxError) { + const parseError = createParseError(error.message) + logAIError('Tagging', 'getAISuggestionsBatch', parseError) + } + + const classified = classifyAIError(error) + logAIError('Tagging', 'getAISuggestionsBatch', classified) + + await logAIUsage({ + userId, + action: 'PROJECT_TAGGING', + entityType: 'Project', + model: 'unknown', + promptTokens: 0, + completionTokens: 0, + totalTokens: 0, + batchSize: anonymizedProjects.length, + itemsProcessed: 0, + status: 'ERROR', + errorMessage: error instanceof Error ? error.message : 'Unknown error', + }) + + throw error + } +} + // ─── Public API ────────────────────────────────────────────────────────────── /** @@ -355,6 +542,153 @@ export async function tagProject( } } +/** + * Tag a batch of projects using batched API calls with concurrency. + * Much more efficient than tagging one-by-one for bulk operations. + * + * @param projects Array of { id, projectTags } to tag + * @param userId The user initiating the tagging + * @param onProgress Callback for progress updates + * @returns Array of TaggingResult + */ +export async function tagProjectsBatch( + projects: Array<{ + id: string + title: string + projectTags: Array<{ tagId: string }> + }>, + userId: string, + onProgress?: (processed: number, total: number) => Promise +): Promise<{ results: TaggingResult[]; totalTokens: number }> { + const settings = await getTaggingSettings() + if (!settings.enabled) { + return { results: [], totalTokens: 0 } + } + + const availableTags = await getAvailableTags() + if (availableTags.length === 0) { + return { results: [], totalTokens: 0 } + } + + // Fetch full project data for all projects at once (single DB query) + const fullProjects = await prisma.project.findMany({ + where: { id: { in: projects.map((p) => p.id) } }, + include: { + projectTags: true, + files: { select: { fileType: true } }, + _count: { select: { teamMembers: true, files: true } }, + }, + }) + + const projectMap = new Map(fullProjects.map((p) => [p.id, p])) + + // Anonymize all projects at once + const projectsWithRelations = fullProjects.map(toProjectWithRelations) + const { anonymized, mappings } = anonymizeProjectsForAI(projectsWithRelations, 'FILTERING') + + if (!validateAnonymizedProjects(anonymized)) { + throw new Error('GDPR compliance check failed: PII detected in anonymized data') + } + + // Build mapping from anonymous ID to real project + const anonToRealMap = new Map() + for (const mapping of mappings) { + anonToRealMap.set(mapping.anonymousId, mapping.realId) + } + + // Split into batches + const batches: AnonymizedProjectForAI[][] = [] + for (let i = 0; i < anonymized.length; i += BATCH_SIZE) { + batches.push(anonymized.slice(i, i + BATCH_SIZE)) + } + + const allResults: TaggingResult[] = [] + let totalTokens = 0 + let processedCount = 0 + + // Process batches with concurrency + for (let i = 0; i < batches.length; i += BATCH_CONCURRENCY) { + const concurrentBatches = batches.slice(i, i + BATCH_CONCURRENCY) + + const batchPromises = concurrentBatches.map(async (batch) => { + try { + const { suggestionsMap, tokensUsed } = await getAISuggestionsBatch( + batch, + availableTags, + userId + ) + return { suggestionsMap, tokensUsed, error: null } + } catch (error) { + console.error('[AI Tagging Batch] Batch failed:', error) + return { suggestionsMap: new Map(), tokensUsed: 0, error } + } + }) + + const batchResults = await Promise.all(batchPromises) + + // Process results from all concurrent batches + for (const { suggestionsMap, tokensUsed } of batchResults) { + totalTokens += tokensUsed + + for (const [anonId, suggestions] of suggestionsMap) { + const realId = anonToRealMap.get(anonId) + if (!realId) continue + + const project = projectMap.get(realId) + if (!project) continue + + // Filter by confidence + const validSuggestions = suggestions.filter( + (s) => s.confidence >= settings.confidenceThreshold + ) + + // Get existing tags + const existingTagIds = new Set(project.projectTags.map((pt) => pt.tagId)) + const currentTagCount = project.projectTags.length + const remainingSlots = Math.max(0, settings.maxTags - currentTagCount) + + const newSuggestions = validSuggestions + .filter((s) => !existingTagIds.has(s.tagId)) + .slice(0, remainingSlots) + + // Apply tags + const applied: TagSuggestion[] = [] + for (const suggestion of newSuggestions) { + try { + await prisma.projectTag.create({ + data: { + projectId: realId, + tagId: suggestion.tagId, + confidence: suggestion.confidence, + source: 'AI', + }, + }) + applied.push(suggestion) + } catch { + // Skip duplicates + } + } + + allResults.push({ + projectId: realId, + suggestions, + applied, + tokensUsed: 0, // Token tracking is per-batch, not per-project + }) + + processedCount++ + } + } + + // Report progress after each concurrent chunk + if (onProgress) { + await onProgress(processedCount, projects.length) + } + } + + return { results: allResults, totalTokens } +} + /** * Get tag suggestions for a project without applying them * Useful for preview/review before applying