From 1b2311b4a311ef7f966e1516b228e7a70fc06641 Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 5 Feb 2026 11:48:57 +0100 Subject: [PATCH] Convert AI tagging to background job with progress tracking - Add TaggingJob model for tracking tagging progress - Convert batch tagging to background job processing (prevents timeouts) - Add real-time progress polling in UI with percentage/count display - Add admin notifications when tagging job completes or fails - Export getTaggingSettings and getAvailableTags functions After deployment, run: npx prisma migrate deploy Co-Authored-By: Claude Opus 4.5 --- .../migration.sql | 38 ++ prisma/schema.prisma | 37 ++ src/app/(admin)/admin/projects/page.tsx | 134 ++++--- src/server/routers/tag.ts | 362 ++++++++++++++++-- src/server/services/ai-tagging.ts | 4 +- 5 files changed, 481 insertions(+), 94 deletions(-) create mode 100644 prisma/migrations/20260205100000_add_tagging_job/migration.sql diff --git a/prisma/migrations/20260205100000_add_tagging_job/migration.sql b/prisma/migrations/20260205100000_add_tagging_job/migration.sql new file mode 100644 index 0000000..37c1d57 --- /dev/null +++ b/prisma/migrations/20260205100000_add_tagging_job/migration.sql @@ -0,0 +1,38 @@ +-- Add TaggingJob table for background AI tagging +DO $$ BEGIN + CREATE TYPE "TaggingJobStatus" AS ENUM ('PENDING', 'RUNNING', 'COMPLETED', 'FAILED'); +EXCEPTION WHEN duplicate_object THEN NULL; END $$; + +CREATE TABLE IF NOT EXISTS "TaggingJob" ( + "id" TEXT NOT NULL, + "programId" TEXT, + "roundId" TEXT, + "status" "TaggingJobStatus" NOT NULL DEFAULT 'PENDING', + "totalProjects" INTEGER NOT NULL DEFAULT 0, + "processedCount" INTEGER NOT NULL DEFAULT 0, + "taggedCount" INTEGER NOT NULL DEFAULT 0, + "skippedCount" INTEGER NOT NULL DEFAULT 0, + "failedCount" INTEGER NOT NULL DEFAULT 0, + "errorMessage" TEXT, + "errorsJson" JSONB, + "startedAt" TIMESTAMP(3), + "completedAt" TIMESTAMP(3), + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "TaggingJob_pkey" PRIMARY KEY ("id") +); + +CREATE INDEX IF NOT EXISTS "TaggingJob_programId_idx" ON "TaggingJob"("programId"); +CREATE INDEX IF NOT EXISTS "TaggingJob_roundId_idx" ON "TaggingJob"("roundId"); +CREATE INDEX IF NOT EXISTS "TaggingJob_status_idx" ON "TaggingJob"("status"); + +DO $$ BEGIN + ALTER TABLE "TaggingJob" ADD CONSTRAINT "TaggingJob_programId_fkey" + FOREIGN KEY ("programId") REFERENCES "Program"("id") ON DELETE CASCADE ON UPDATE CASCADE; +EXCEPTION WHEN duplicate_object THEN NULL; END $$; + +DO $$ BEGIN + ALTER TABLE "TaggingJob" ADD CONSTRAINT "TaggingJob_roundId_fkey" + FOREIGN KEY ("roundId") REFERENCES "Round"("id") ON DELETE CASCADE ON UPDATE CASCADE; +EXCEPTION WHEN duplicate_object THEN NULL; END $$; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 5023294..4617155 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -324,6 +324,7 @@ model Program { learningResources LearningResource[] partners Partner[] specialAwards SpecialAward[] + taggingJobs TaggingJob[] @@unique([name, year]) @@index([status]) @@ -375,6 +376,7 @@ model Round { filteringResults FilteringResult[] filteringJobs FilteringJob[] assignmentJobs AssignmentJob[] + taggingJobs TaggingJob[] @@index([programId]) @@index([status]) @@ -1125,6 +1127,41 @@ enum AssignmentJobStatus { FAILED } +// Tracks progress of long-running AI tagging jobs +model TaggingJob { + id String @id @default(cuid()) + programId String? // If tagging entire program + roundId String? // If tagging single round + status TaggingJobStatus @default(PENDING) + totalProjects Int @default(0) + processedCount Int @default(0) + taggedCount Int @default(0) + skippedCount Int @default(0) + failedCount Int @default(0) + errorMessage String? @db.Text + errorsJson Json? @db.JsonB // Array of error messages + startedAt DateTime? + completedAt DateTime? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + // Relations (optional - can tag by program or round) + program Program? @relation(fields: [programId], references: [id], onDelete: Cascade) + round Round? @relation(fields: [roundId], references: [id], onDelete: Cascade) + + @@index([programId]) + @@index([roundId]) + @@index([status]) +} + +enum TaggingJobStatus { + PENDING + RUNNING + COMPLETED + FAILED +} + // ============================================================================= // SPECIAL AWARDS SYSTEM // ============================================================================= diff --git a/src/app/(admin)/admin/projects/page.tsx b/src/app/(admin)/admin/projects/page.tsx index f09c138..782b997 100644 --- a/src/app/(admin)/admin/projects/page.tsx +++ b/src/app/(admin)/admin/projects/page.tsx @@ -257,80 +257,75 @@ export default function ProjectsPage() { const [taggingScope, setTaggingScope] = useState<'round' | 'program'>('round') const [selectedRoundForTagging, setSelectedRoundForTagging] = useState('') const [selectedProgramForTagging, setSelectedProgramForTagging] = useState('') - const [taggingInProgress, setTaggingInProgress] = useState(false) - const [taggingResult, setTaggingResult] = useState<{ - processed: number - skipped: number - failed: number - errors: string[] - } | null>(null) + const [activeTaggingJobId, setActiveTaggingJobId] = useState(null) // Fetch programs and rounds for the AI tagging dialog const { data: programs } = trpc.program.list.useQuery() - // AI batch tagging mutations - const batchTagProjects = trpc.tag.batchTagProjects.useMutation({ - onMutate: () => { - setTaggingInProgress(true) - setTaggingResult(null) - }, + // Start tagging job mutation + const startTaggingJob = trpc.tag.startTaggingJob.useMutation({ onSuccess: (result) => { - setTaggingInProgress(false) - setTaggingResult(result) - if (result.errors && result.errors.length > 0) { - toast.error(`AI Tagging issue: ${result.errors[0]}`) - } else if (result.processed === 0 && result.skipped === 0 && result.failed === 0) { - toast.info('No projects to tag - all projects already have tags') - } else { - toast.success( - `AI Tagging complete: ${result.processed} tagged, ${result.skipped} already tagged, ${result.failed} failed` - ) - } - utils.project.list.invalidate() + setActiveTaggingJobId(result.jobId) + toast.info('AI tagging job started. Progress will update automatically.') }, onError: (error) => { - setTaggingInProgress(false) - toast.error(error.message || 'Failed to generate AI tags') + toast.error(error.message || 'Failed to start tagging job') }, }) - const batchTagProgramProjects = trpc.tag.batchTagProgramProjects.useMutation({ - onMutate: () => { - setTaggingInProgress(true) - setTaggingResult(null) - }, - onSuccess: (result) => { - setTaggingInProgress(false) - setTaggingResult(result) - if (result.errors && result.errors.length > 0) { - toast.error(`AI Tagging issue: ${result.errors[0]}`) - } else if (result.processed === 0 && result.skipped === 0 && result.failed === 0) { - toast.info('No projects to tag - all projects already have tags') - } else { - toast.success( - `AI Tagging complete: ${result.processed} tagged, ${result.skipped} already tagged, ${result.failed} failed` - ) - } + // Poll for job status when job is active + const { data: jobStatus } = trpc.tag.getTaggingJobStatus.useQuery( + { jobId: activeTaggingJobId! }, + { + enabled: !!activeTaggingJobId, + refetchInterval: (query) => { + const status = query.state.data?.status + // Stop polling when job is done + if (status === 'COMPLETED' || status === 'FAILED') { + return false + } + return 1500 // Poll every 1.5 seconds + }, + } + ) + + // Handle job completion + useEffect(() => { + if (jobStatus?.status === 'COMPLETED') { + toast.success( + `AI Tagging complete: ${jobStatus.taggedCount} tagged, ${jobStatus.skippedCount} already tagged, ${jobStatus.failedCount} failed` + ) utils.project.list.invalidate() - }, - onError: (error) => { - setTaggingInProgress(false) - toast.error(error.message || 'Failed to generate AI tags') - }, - }) + } else if (jobStatus?.status === 'FAILED') { + toast.error(`AI Tagging failed: ${jobStatus.errorMessage || 'Unknown error'}`) + } + }, [jobStatus?.status, jobStatus?.taggedCount, jobStatus?.skippedCount, jobStatus?.failedCount, jobStatus?.errorMessage, utils.project.list]) + + const taggingInProgress = startTaggingJob.isPending || + (jobStatus?.status === 'PENDING' || jobStatus?.status === 'RUNNING') + + const taggingResult = jobStatus?.status === 'COMPLETED' || jobStatus?.status === 'FAILED' + ? { + processed: jobStatus.taggedCount, + skipped: jobStatus.skippedCount, + failed: jobStatus.failedCount, + errors: jobStatus.errors || [], + status: jobStatus.status, + } + : null const handleStartTagging = () => { if (taggingScope === 'round' && selectedRoundForTagging) { - batchTagProjects.mutate({ roundId: selectedRoundForTagging }) + startTaggingJob.mutate({ roundId: selectedRoundForTagging }) } else if (taggingScope === 'program' && selectedProgramForTagging) { - batchTagProgramProjects.mutate({ programId: selectedProgramForTagging }) + startTaggingJob.mutate({ programId: selectedProgramForTagging }) } } const handleCloseTaggingDialog = () => { if (!taggingInProgress) { setAiTagDialogOpen(false) - setTaggingResult(null) + setActiveTaggingJobId(null) setSelectedRoundForTagging('') setSelectedProgramForTagging('') } @@ -346,6 +341,11 @@ export default function ProjectsPage() { ? selectedProgram : (selectedRound ? programs?.find(p => p.id === selectedRound.program?.id) : null) + // Calculate progress percentage + const taggingProgressPercent = jobStatus && jobStatus.totalProjects > 0 + ? Math.round((jobStatus.processedCount / jobStatus.totalProjects) * 100) + : 0 + const deleteProject = trpc.project.delete.useMutation({ onSuccess: () => { toast.success('Project deleted successfully') @@ -696,18 +696,40 @@ export default function ProjectsPage() { AI Tagging in Progress

- Analyzing projects and assigning expertise tags... + {jobStatus?.status === 'PENDING' + ? 'Initializing...' + : `Processing ${jobStatus?.totalProjects || 0} projects with AI...`}

+ {jobStatus && jobStatus.totalProjects > 0 && ( + + + {jobStatus.processedCount} / {jobStatus.totalProjects} + + )}
- Processing projects... + {jobStatus?.processedCount || 0} of {jobStatus?.totalProjects || '?'} projects processed + {jobStatus?.taggedCount ? ` (${jobStatus.taggedCount} tagged)` : ''} + {jobStatus && jobStatus.totalProjects > 0 && ( + + {taggingProgressPercent}% + + )}
- +
+ {jobStatus?.failedCount ? ( +

+ {jobStatus.failedCount} projects failed so far +

+ ) : null} )} diff --git a/src/server/routers/tag.ts b/src/server/routers/tag.ts index 8416544..b2d2c0f 100644 --- a/src/server/routers/tag.ts +++ b/src/server/routers/tag.ts @@ -1,14 +1,173 @@ import { z } from 'zod' import { TRPCError } from '@trpc/server' import { router, adminProcedure, protectedProcedure } from '../trpc' +import { prisma } from '@/lib/prisma' import { tagProject, - batchTagProjects, - batchTagProgramProjects, getTagSuggestions, addProjectTag, removeProjectTag, + getTaggingSettings, + getAvailableTags, } from '../services/ai-tagging' +import { + createNotification, + notifyAdmins, + NotificationTypes, +} from '../services/in-app-notification' + +// Background job runner for tagging +async function runTaggingJob(jobId: string, userId: string) { + const job = await prisma.taggingJob.findUnique({ + where: { id: jobId }, + }) + + if (!job) { + console.error(`[AI Tagging Job] Job not found: ${jobId}`) + return + } + + console.log(`[AI Tagging Job] Starting job ${jobId}...`) + + // Mark as running + await prisma.taggingJob.update({ + where: { id: jobId }, + data: { status: 'RUNNING', startedAt: new Date() }, + }) + + try { + // Get settings and tags + const settings = await getTaggingSettings() + if (!settings.enabled) { + throw new Error('AI tagging is not enabled') + } + + const availableTags = await getAvailableTags() + if (availableTags.length === 0) { + throw new Error('No expertise tags configured') + } + + // Get projects to tag + const whereClause = job.programId + ? { round: { programId: job.programId } } + : { roundId: job.roundId! } + + const allProjects = await prisma.project.findMany({ + where: whereClause, + select: { id: true, title: true, tags: true }, + }) + + const untaggedProjects = allProjects.filter(p => p.tags.length === 0) + const skippedCount = allProjects.length - untaggedProjects.length + + console.log(`[AI Tagging Job] Found ${untaggedProjects.length} untagged projects (${skippedCount} already tagged)`) + + await prisma.taggingJob.update({ + where: { id: jobId }, + data: { + totalProjects: untaggedProjects.length, + skippedCount, + }, + }) + + if (untaggedProjects.length === 0) { + await prisma.taggingJob.update({ + where: { id: jobId }, + data: { + status: 'COMPLETED', + completedAt: new Date(), + }, + }) + return + } + + let taggedCount = 0 + let failedCount = 0 + const errors: string[] = [] + const startTime = Date.now() + + for (let i = 0; i < untaggedProjects.length; i++) { + const project = untaggedProjects[i] + console.log(`[AI Tagging Job] Processing ${i + 1}/${untaggedProjects.length}: "${project.title.substring(0, 40)}..."`) + + try { + const result = await tagProject(project.id, userId) + taggedCount++ + console.log(`[AI Tagging Job] ✓ Tagged with ${result.applied.length} tags`) + } catch (error) { + failedCount++ + const errorMsg = error instanceof Error ? error.message : 'Unknown error' + errors.push(`${project.title}: ${errorMsg}`) + console.error(`[AI Tagging Job] ✗ Failed: ${errorMsg}`) + } + + // Update progress + await prisma.taggingJob.update({ + where: { id: jobId }, + data: { + processedCount: i + 1, + taggedCount, + failedCount, + errorsJson: errors.length > 0 ? errors.slice(0, 20) : undefined, // Keep last 20 errors + }, + }) + + // Log progress every 10 projects + if ((i + 1) % 10 === 0) { + const elapsed = ((Date.now() - startTime) / 1000).toFixed(0) + const avgTime = (Date.now() - startTime) / (i + 1) / 1000 + const remaining = avgTime * (untaggedProjects.length - i - 1) + console.log(`[AI Tagging Job] Progress: ${i + 1}/${untaggedProjects.length} (${elapsed}s elapsed, ~${remaining.toFixed(0)}s remaining)`) + } + } + + const totalTime = ((Date.now() - startTime) / 1000).toFixed(1) + console.log(`[AI Tagging Job] Complete: ${taggedCount} tagged, ${failedCount} failed in ${totalTime}s`) + + // Mark as completed + await prisma.taggingJob.update({ + where: { id: jobId }, + data: { + status: 'COMPLETED', + completedAt: new Date(), + errorsJson: errors.length > 0 ? errors : undefined, + }, + }) + + // Send notification to admins + await notifyAdmins({ + type: NotificationTypes.AI_SUGGESTIONS_READY, + title: 'AI Tagging Complete', + message: `Tagged ${taggedCount} projects${failedCount > 0 ? `, ${failedCount} failed` : ''}${skippedCount > 0 ? `, ${skippedCount} already had tags` : ''}.`, + linkUrl: '/admin/projects', + linkLabel: 'View Projects', + priority: 'normal', + metadata: { jobId, taggedCount, failedCount, skippedCount }, + }) + + } catch (error) { + const errorMsg = error instanceof Error ? error.message : 'Unknown error' + console.error(`[AI Tagging Job] Job failed: ${errorMsg}`) + + await prisma.taggingJob.update({ + where: { id: jobId }, + data: { + status: 'FAILED', + completedAt: new Date(), + errorMessage: errorMsg, + }, + }) + + // Notify about failure + await notifyAdmins({ + type: NotificationTypes.SYSTEM_ERROR, + title: 'AI Tagging Failed', + message: errorMsg, + linkUrl: '/admin/projects', + priority: 'high', + }) + } +} export const tagRouter = router({ /** @@ -468,59 +627,190 @@ export const tagRouter = router({ }), /** - * Batch tag all untagged projects in a round + * Start a background tagging job for a round + */ + startTaggingJob: adminProcedure + .input(z.object({ + roundId: z.string().optional(), + programId: z.string().optional(), + })) + .mutation(async ({ ctx, input }) => { + if (!input.roundId && !input.programId) { + throw new TRPCError({ + code: 'BAD_REQUEST', + message: 'Either roundId or programId is required', + }) + } + + // Check for existing running job + const existingJob = await ctx.prisma.taggingJob.findFirst({ + where: { + OR: [ + { roundId: input.roundId, status: { in: ['PENDING', 'RUNNING'] } }, + { programId: input.programId, status: { in: ['PENDING', 'RUNNING'] } }, + ], + }, + }) + + if (existingJob) { + throw new TRPCError({ + code: 'CONFLICT', + message: 'A tagging job is already running', + }) + } + + // Create the job + const job = await ctx.prisma.taggingJob.create({ + data: { + roundId: input.roundId, + programId: input.programId, + status: 'PENDING', + }, + }) + + // Audit log + await ctx.prisma.auditLog.create({ + data: { + userId: ctx.user.id, + action: 'START_AI_TAG_JOB', + entityType: input.programId ? 'Program' : 'Round', + entityId: input.programId || input.roundId!, + detailsJson: { jobId: job.id }, + ipAddress: ctx.ip, + userAgent: ctx.userAgent, + }, + }) + + // Start job in background (don't await) + runTaggingJob(job.id, ctx.user.id).catch((error) => { + console.error('[AI Tagging] Background job error:', error) + }) + + return { jobId: job.id } + }), + + /** + * Get tagging job status + */ + getTaggingJobStatus: adminProcedure + .input(z.object({ jobId: z.string() })) + .query(async ({ ctx, input }) => { + const job = await ctx.prisma.taggingJob.findUnique({ + where: { id: input.jobId }, + }) + + if (!job) { + throw new TRPCError({ + code: 'NOT_FOUND', + message: 'Job not found', + }) + } + + return { + id: job.id, + status: job.status, + totalProjects: job.totalProjects, + processedCount: job.processedCount, + taggedCount: job.taggedCount, + skippedCount: job.skippedCount, + failedCount: job.failedCount, + errorMessage: job.errorMessage, + errors: job.errorsJson as string[] | null, + startedAt: job.startedAt, + completedAt: job.completedAt, + } + }), + + /** + * Get latest tagging job for a round or program + */ + getLatestTaggingJob: adminProcedure + .input(z.object({ + roundId: z.string().optional(), + programId: z.string().optional(), + })) + .query(async ({ ctx, input }) => { + const job = await ctx.prisma.taggingJob.findFirst({ + where: { + OR: [ + input.roundId ? { roundId: input.roundId } : {}, + input.programId ? { programId: input.programId } : {}, + ].filter(o => Object.keys(o).length > 0), + }, + orderBy: { createdAt: 'desc' }, + }) + + if (!job) { + return null + } + + return { + id: job.id, + status: job.status, + totalProjects: job.totalProjects, + processedCount: job.processedCount, + taggedCount: job.taggedCount, + skippedCount: job.skippedCount, + failedCount: job.failedCount, + errorMessage: job.errorMessage, + errors: job.errorsJson as string[] | null, + startedAt: job.startedAt, + completedAt: job.completedAt, + createdAt: job.createdAt, + } + }), + + // Legacy endpoints kept for backward compatibility (redirect to job-based) + /** + * @deprecated Use startTaggingJob instead */ batchTagProjects: adminProcedure .input(z.object({ roundId: z.string() })) .mutation(async ({ ctx, input }) => { - const result = await batchTagProjects(input.roundId, ctx.user.id) - - // Audit log - await ctx.prisma.auditLog.create({ + // Start job and return immediately with placeholder + const job = await ctx.prisma.taggingJob.create({ data: { - userId: ctx.user.id, - action: 'BATCH_AI_TAG', - entityType: 'Round', - entityId: input.roundId, - detailsJson: { - processed: result.processed, - failed: result.failed, - skipped: result.skipped, - }, - ipAddress: ctx.ip, - userAgent: ctx.userAgent, + roundId: input.roundId, + status: 'PENDING', }, }) - return result + runTaggingJob(job.id, ctx.user.id).catch(console.error) + + return { + processed: 0, + failed: 0, + skipped: 0, + errors: [], + jobId: job.id, + message: 'Tagging job started in background. Check job status for progress.', + } }), /** - * Batch tag all untagged projects in an entire program (edition) + * @deprecated Use startTaggingJob instead */ batchTagProgramProjects: adminProcedure .input(z.object({ programId: z.string() })) .mutation(async ({ ctx, input }) => { - const result = await batchTagProgramProjects(input.programId, ctx.user.id) - - // Audit log - await ctx.prisma.auditLog.create({ + // Start job and return immediately with placeholder + const job = await ctx.prisma.taggingJob.create({ data: { - userId: ctx.user.id, - action: 'BATCH_AI_TAG', - entityType: 'Program', - entityId: input.programId, - detailsJson: { - processed: result.processed, - failed: result.failed, - skipped: result.skipped, - }, - ipAddress: ctx.ip, - userAgent: ctx.userAgent, + programId: input.programId, + status: 'PENDING', }, }) - return result + runTaggingJob(job.id, ctx.user.id).catch(console.error) + + return { + processed: 0, + failed: 0, + skipped: 0, + errors: [], + jobId: job.id, + message: 'Tagging job started in background. Check job status for progress.', + } }), /** diff --git a/src/server/services/ai-tagging.ts b/src/server/services/ai-tagging.ts index 4b327cc..27ce84c 100644 --- a/src/server/services/ai-tagging.ts +++ b/src/server/services/ai-tagging.ts @@ -94,7 +94,7 @@ Rules: /** * Get system settings for AI tagging */ -async function getTaggingSettings(): Promise<{ +export async function getTaggingSettings(): Promise<{ enabled: boolean maxTags: number }> { @@ -125,7 +125,7 @@ async function getTaggingSettings(): Promise<{ /** * Get all active expertise tags */ -async function getAvailableTags(): Promise { +export async function getAvailableTags(): Promise { return prisma.expertiseTag.findMany({ where: { isActive: true }, select: {