MOPC-App/src/server/routers/pipeline.ts

1126 lines
35 KiB
TypeScript

import { z } from 'zod'
import { TRPCError } from '@trpc/server'
import { Prisma } from '@prisma/client'
import { router, protectedProcedure, adminProcedure, observerProcedure } from '../trpc'
import { logAudit } from '@/server/utils/audit'
export const pipelineRouter = router({
/**
* Create a new pipeline for a program
*/
create: adminProcedure
.input(
z.object({
programId: z.string(),
name: z.string().min(1).max(255),
slug: z.string().min(1).max(100).regex(/^[a-z0-9-]+$/),
settingsJson: z.record(z.unknown()).optional(),
})
)
.mutation(async ({ ctx, input }) => {
// Check for slug uniqueness
const existing = await ctx.prisma.pipeline.findUnique({
where: { slug: input.slug },
})
if (existing) {
throw new TRPCError({
code: 'CONFLICT',
message: `A pipeline with slug "${input.slug}" already exists`,
})
}
// Verify program exists
await ctx.prisma.program.findUniqueOrThrow({
where: { id: input.programId },
})
const { settingsJson, ...rest } = input
const pipeline = await ctx.prisma.$transaction(async (tx) => {
const created = await tx.pipeline.create({
data: {
...rest,
settingsJson: (settingsJson as Prisma.InputJsonValue) ?? undefined,
},
})
await logAudit({
prisma: tx,
userId: ctx.user.id,
action: 'CREATE',
entityType: 'Pipeline',
entityId: created.id,
detailsJson: { name: input.name, programId: input.programId },
ipAddress: ctx.ip,
userAgent: ctx.userAgent,
})
return created
})
return pipeline
}),
/**
* Update pipeline settings
*/
update: adminProcedure
.input(
z.object({
id: z.string(),
name: z.string().min(1).max(255).optional(),
slug: z.string().min(1).max(100).regex(/^[a-z0-9-]+$/).optional(),
status: z.enum(['DRAFT', 'ACTIVE', 'CLOSED', 'ARCHIVED']).optional(),
settingsJson: z.record(z.unknown()).optional(),
})
)
.mutation(async ({ ctx, input }) => {
const { id, settingsJson, ...data } = input
// Check slug uniqueness if changing
if (data.slug) {
const existing = await ctx.prisma.pipeline.findFirst({
where: { slug: data.slug, id: { not: id } },
})
if (existing) {
throw new TRPCError({
code: 'CONFLICT',
message: `A pipeline with slug "${data.slug}" already exists`,
})
}
}
const pipeline = await ctx.prisma.$transaction(async (tx) => {
const updated = await tx.pipeline.update({
where: { id },
data: {
...data,
settingsJson: (settingsJson as Prisma.InputJsonValue) ?? undefined,
},
})
await logAudit({
prisma: tx,
userId: ctx.user.id,
action: 'UPDATE',
entityType: 'Pipeline',
entityId: id,
detailsJson: { ...data, settingsJson } as Record<string, unknown>,
ipAddress: ctx.ip,
userAgent: ctx.userAgent,
})
return updated
})
return pipeline
}),
/**
* Archive a pipeline (soft delete)
*/
delete: adminProcedure
.input(z.object({ id: z.string() }))
.mutation(async ({ ctx, input }) => {
const pipeline = await ctx.prisma.pipeline.findUniqueOrThrow({
where: { id: input.id },
})
if (pipeline.status === 'ARCHIVED') {
throw new TRPCError({
code: 'PRECONDITION_FAILED',
message: 'Pipeline is already archived',
})
}
const updated = await ctx.prisma.$transaction(async (tx) => {
const result = await tx.pipeline.update({
where: { id: input.id },
data: { status: 'ARCHIVED' },
})
await logAudit({
prisma: tx,
userId: ctx.user.id,
action: 'PIPELINE_ARCHIVED',
entityType: 'Pipeline',
entityId: input.id,
detailsJson: {
previousStatus: pipeline.status,
name: pipeline.name,
},
ipAddress: ctx.ip,
userAgent: ctx.userAgent,
})
return result
})
return updated
}),
/**
* Get a single pipeline with tracks and stages
*/
get: protectedProcedure
.input(z.object({ id: z.string() }))
.query(async ({ ctx, input }) => {
return ctx.prisma.pipeline.findUniqueOrThrow({
where: { id: input.id },
include: {
program: { select: { id: true, name: true } },
tracks: {
orderBy: { sortOrder: 'asc' },
include: {
stages: {
orderBy: { sortOrder: 'asc' },
include: {
_count: {
select: { projectStageStates: true, cohorts: true },
},
},
},
_count: {
select: { projectStageStates: true },
},
},
},
routingRules: {
where: { isActive: true },
orderBy: { priority: 'desc' },
},
},
})
}),
/**
* Get pipeline summary with counts
*/
getSummary: protectedProcedure
.input(z.object({ id: z.string() }))
.query(async ({ ctx, input }) => {
const pipeline = await ctx.prisma.pipeline.findUniqueOrThrow({
where: { id: input.id },
include: {
program: { select: { id: true, name: true } },
tracks: {
include: {
_count: { select: { stages: true, projectStageStates: true } },
},
},
_count: { select: { tracks: true, routingRules: true } },
},
})
// Aggregate project state counts across all tracks
const stateCounts = await ctx.prisma.projectStageState.groupBy({
by: ['state'],
where: {
track: { pipelineId: input.id },
},
_count: true,
})
return {
...pipeline,
stateSummary: stateCounts.reduce(
(acc, curr) => {
acc[curr.state] = curr._count
return acc
},
{} as Record<string, number>
),
}
}),
/**
* List pipelines for a program
*/
list: protectedProcedure
.input(z.object({ programId: z.string() }))
.query(async ({ ctx, input }) => {
return ctx.prisma.pipeline.findMany({
where: { programId: input.programId },
orderBy: { createdAt: 'desc' },
include: {
_count: { select: { tracks: true, routingRules: true } },
},
})
}),
/**
* Publish (activate) a pipeline
*/
publish: adminProcedure
.input(z.object({ id: z.string() }))
.mutation(async ({ ctx, input }) => {
const pipeline = await ctx.prisma.pipeline.findUniqueOrThrow({
where: { id: input.id },
include: {
tracks: { include: { stages: true } },
},
})
if (pipeline.status === 'ACTIVE') {
throw new TRPCError({
code: 'PRECONDITION_FAILED',
message: 'Pipeline is already active',
})
}
// Validate: must have at least one track with at least one stage
if (pipeline.tracks.length === 0) {
throw new TRPCError({
code: 'PRECONDITION_FAILED',
message: 'Pipeline must have at least one track before publishing',
})
}
const emptyTracks = pipeline.tracks.filter((t) => t.stages.length === 0)
if (emptyTracks.length > 0) {
throw new TRPCError({
code: 'PRECONDITION_FAILED',
message: `Track(s) "${emptyTracks.map((t) => t.name).join(', ')}" have no stages. All tracks must have at least one stage.`,
})
}
const updated = await ctx.prisma.$transaction(async (tx) => {
const result = await tx.pipeline.update({
where: { id: input.id },
data: { status: 'ACTIVE' },
})
await logAudit({
prisma: tx,
userId: ctx.user.id,
action: 'PIPELINE_PUBLISHED',
entityType: 'Pipeline',
entityId: input.id,
detailsJson: {
previousStatus: pipeline.status,
trackCount: pipeline.tracks.length,
},
ipAddress: ctx.ip,
userAgent: ctx.userAgent,
})
return result
})
return updated
}),
/**
* Create a full pipeline structure atomically (Pipeline + Tracks + Stages + Transitions)
*/
createStructure: adminProcedure
.input(
z.object({
programId: z.string().min(1, 'Program ID is required'),
name: z.string().min(1).max(255),
slug: z.string().min(1).max(100).regex(/^[a-z0-9-]+$/),
settingsJson: z.record(z.unknown()).optional(),
tracks: z.array(
z.object({
name: z.string().min(1).max(255),
slug: z.string().min(1).max(100).regex(/^[a-z0-9-]+$/),
kind: z.enum(['MAIN', 'AWARD', 'SHOWCASE']),
sortOrder: z.number().int().min(0),
routingModeDefault: z.enum(['PARALLEL', 'EXCLUSIVE', 'POST_MAIN']).optional(),
decisionMode: z.enum(['JURY_VOTE', 'AWARD_MASTER_DECISION', 'ADMIN_DECISION']).optional(),
stages: z.array(
z.object({
name: z.string().min(1).max(255),
slug: z.string().min(1).max(100).regex(/^[a-z0-9-]+$/),
stageType: z.enum(['INTAKE', 'FILTER', 'EVALUATION', 'SELECTION', 'LIVE_FINAL', 'RESULTS']),
sortOrder: z.number().int().min(0),
configJson: z.record(z.unknown()).optional(),
})
),
awardConfig: z
.object({
name: z.string(),
description: z.string().optional(),
scoringMode: z.enum(['PICK_WINNER', 'RANKED', 'SCORED']).optional(),
})
.optional(),
})
),
autoTransitions: z.boolean().default(true),
})
)
.mutation(async ({ ctx, input }) => {
// Check slug uniqueness
const existing = await ctx.prisma.pipeline.findUnique({
where: { slug: input.slug },
})
if (existing) {
throw new TRPCError({
code: 'CONFLICT',
message: `A pipeline with slug "${input.slug}" already exists`,
})
}
// Verify program exists
await ctx.prisma.program.findUniqueOrThrow({
where: { id: input.programId },
})
const result = await ctx.prisma.$transaction(async (tx) => {
// 1. Create pipeline
const pipeline = await tx.pipeline.create({
data: {
programId: input.programId,
name: input.name,
slug: input.slug,
settingsJson: (input.settingsJson as Prisma.InputJsonValue) ?? undefined,
},
})
const createdTracks: Array<{
id: string
name: string
kind: string
stages: Array<{ id: string; name: string; sortOrder: number }>
}> = []
// 2. Create tracks and stages
for (const trackInput of input.tracks) {
const track = await tx.track.create({
data: {
pipelineId: pipeline.id,
name: trackInput.name,
slug: trackInput.slug,
kind: trackInput.kind,
sortOrder: trackInput.sortOrder,
routingMode: trackInput.routingModeDefault ?? null,
decisionMode: trackInput.decisionMode ?? null,
},
})
// 3. Create stages for this track
const createdStages: Array<{ id: string; name: string; sortOrder: number }> = []
for (const stageInput of trackInput.stages) {
const stage = await tx.stage.create({
data: {
trackId: track.id,
name: stageInput.name,
slug: stageInput.slug,
stageType: stageInput.stageType,
sortOrder: stageInput.sortOrder,
configJson: (stageInput.configJson as Prisma.InputJsonValue) ?? undefined,
},
})
createdStages.push({ id: stage.id, name: stage.name, sortOrder: stage.sortOrder })
}
// Create SpecialAward if AWARD kind
if (trackInput.kind === 'AWARD' && trackInput.awardConfig) {
await tx.specialAward.create({
data: {
programId: input.programId,
name: trackInput.awardConfig.name,
description: trackInput.awardConfig.description,
scoringMode: trackInput.awardConfig.scoringMode ?? 'PICK_WINNER',
trackId: track.id,
},
})
}
createdTracks.push({
id: track.id,
name: track.name,
kind: track.kind,
stages: createdStages,
})
}
// 4. Auto-create linear transitions within each track
if (input.autoTransitions) {
for (const track of createdTracks) {
const sorted = [...track.stages].sort((a, b) => a.sortOrder - b.sortOrder)
for (let i = 0; i < sorted.length - 1; i++) {
await tx.stageTransition.create({
data: {
fromStageId: sorted[i].id,
toStageId: sorted[i + 1].id,
isDefault: true,
},
})
}
}
}
await logAudit({
prisma: tx,
userId: ctx.user.id,
action: 'PIPELINE_STRUCTURE_CREATED',
entityType: 'Pipeline',
entityId: pipeline.id,
detailsJson: {
name: input.name,
programId: input.programId,
trackCount: createdTracks.length,
totalStages: createdTracks.reduce((sum, t) => sum + t.stages.length, 0),
},
ipAddress: ctx.ip,
userAgent: ctx.userAgent,
})
return {
pipeline,
tracks: createdTracks,
}
})
return result
}),
/**
* Get full pipeline draft structure for the edit wizard
*/
getDraft: adminProcedure
.input(z.object({ id: z.string() }))
.query(async ({ ctx, input }) => {
const pipeline = await ctx.prisma.pipeline.findUniqueOrThrow({
where: { id: input.id },
include: {
program: { select: { id: true, name: true } },
tracks: {
orderBy: { sortOrder: 'asc' },
include: {
stages: {
orderBy: { sortOrder: 'asc' },
include: {
transitionsFrom: {
include: { toStage: { select: { id: true, name: true, slug: true } } },
},
transitionsTo: {
include: { fromStage: { select: { id: true, name: true, slug: true } } },
},
_count: {
select: { projectStageStates: true },
},
},
},
specialAward: {
select: {
id: true,
name: true,
description: true,
scoringMode: true,
status: true,
},
},
_count: {
select: { projectStageStates: true },
},
},
},
routingRules: {
orderBy: { priority: 'desc' },
include: {
sourceTrack: { select: { id: true, name: true } },
destinationTrack: { select: { id: true, name: true } },
},
},
},
})
return pipeline
}),
/**
* Update pipeline structure (diff-based: create/update/delete tracks and stages)
*/
updateStructure: adminProcedure
.input(
z.object({
id: z.string(),
name: z.string().min(1).max(255).optional(),
slug: z.string().min(1).max(100).regex(/^[a-z0-9-]+$/).optional(),
settingsJson: z.record(z.unknown()).optional(),
tracks: z.array(
z.object({
id: z.string().optional(), // present = update, absent = create
name: z.string().min(1).max(255),
slug: z.string().min(1).max(100).regex(/^[a-z0-9-]+$/),
kind: z.enum(['MAIN', 'AWARD', 'SHOWCASE']),
sortOrder: z.number().int().min(0),
routingModeDefault: z.enum(['PARALLEL', 'EXCLUSIVE', 'POST_MAIN']).optional(),
decisionMode: z.enum(['JURY_VOTE', 'AWARD_MASTER_DECISION', 'ADMIN_DECISION']).optional(),
stages: z.array(
z.object({
id: z.string().optional(),
name: z.string().min(1).max(255),
slug: z.string().min(1).max(100).regex(/^[a-z0-9-]+$/),
stageType: z.enum(['INTAKE', 'FILTER', 'EVALUATION', 'SELECTION', 'LIVE_FINAL', 'RESULTS']),
sortOrder: z.number().int().min(0),
configJson: z.record(z.unknown()).optional(),
})
),
awardConfig: z
.object({
name: z.string(),
description: z.string().optional(),
scoringMode: z.enum(['PICK_WINNER', 'RANKED', 'SCORED']).optional(),
})
.optional(),
})
),
autoTransitions: z.boolean().default(true),
})
)
.mutation(async ({ ctx, input }) => {
// Check slug uniqueness if changing
if (input.slug) {
const existing = await ctx.prisma.pipeline.findFirst({
where: { slug: input.slug, id: { not: input.id } },
})
if (existing) {
throw new TRPCError({
code: 'CONFLICT',
message: `A pipeline with slug "${input.slug}" already exists`,
})
}
}
// Load existing structure
const existingPipeline = await ctx.prisma.pipeline.findUniqueOrThrow({
where: { id: input.id },
include: {
tracks: {
include: {
stages: {
include: { _count: { select: { projectStageStates: true } } },
},
},
},
},
})
const result = await ctx.prisma.$transaction(async (tx) => {
// Update pipeline fields
const pipeline = await tx.pipeline.update({
where: { id: input.id },
data: {
...(input.name ? { name: input.name } : {}),
...(input.slug ? { slug: input.slug } : {}),
...(input.settingsJson
? { settingsJson: input.settingsJson as Prisma.InputJsonValue }
: {}),
},
})
const inputTrackIds = new Set(
input.tracks.filter((t) => t.id).map((t) => t.id!)
)
// Delete removed tracks (safety: check no active ProjectStageStates)
for (const existingTrack of existingPipeline.tracks) {
if (!inputTrackIds.has(existingTrack.id)) {
const activeStates = existingTrack.stages.reduce(
(sum, s) => sum + s._count.projectStageStates,
0
)
if (activeStates > 0) {
throw new TRPCError({
code: 'PRECONDITION_FAILED',
message: `Cannot delete track "${existingTrack.name}" — it has ${activeStates} active project states`,
})
}
// Delete transitions, stages, then track
for (const stage of existingTrack.stages) {
await tx.stageTransition.deleteMany({
where: { OR: [{ fromStageId: stage.id }, { toStageId: stage.id }] },
})
}
await tx.stage.deleteMany({ where: { trackId: existingTrack.id } })
await tx.track.delete({ where: { id: existingTrack.id } })
}
}
const allStageIds: Array<{ id: string; sortOrder: number; trackId: string }> = []
// Create or update tracks
for (const trackInput of input.tracks) {
let trackId: string
if (trackInput.id) {
// Update existing track
await tx.track.update({
where: { id: trackInput.id },
data: {
name: trackInput.name,
slug: trackInput.slug,
kind: trackInput.kind,
sortOrder: trackInput.sortOrder,
routingMode: trackInput.routingModeDefault ?? null,
decisionMode: trackInput.decisionMode ?? null,
},
})
trackId = trackInput.id
} else {
// Create new track
const newTrack = await tx.track.create({
data: {
pipelineId: input.id,
name: trackInput.name,
slug: trackInput.slug,
kind: trackInput.kind,
sortOrder: trackInput.sortOrder,
routingMode: trackInput.routingModeDefault ?? null,
decisionMode: trackInput.decisionMode ?? null,
},
})
trackId = newTrack.id
// Create award if needed
if (trackInput.kind === 'AWARD' && trackInput.awardConfig) {
await tx.specialAward.create({
data: {
programId: existingPipeline.programId,
name: trackInput.awardConfig.name,
description: trackInput.awardConfig.description,
scoringMode: trackInput.awardConfig.scoringMode ?? 'PICK_WINNER',
trackId,
},
})
}
}
// Handle stages within this track
const existingTrack = existingPipeline.tracks.find((t) => t.id === trackId)
const inputStageIds = new Set(
trackInput.stages.filter((s) => s.id).map((s) => s.id!)
)
// Delete removed stages
if (existingTrack) {
for (const existingStage of existingTrack.stages) {
if (!inputStageIds.has(existingStage.id)) {
if (existingStage._count.projectStageStates > 0) {
throw new TRPCError({
code: 'PRECONDITION_FAILED',
message: `Cannot delete stage "${existingStage.name}" — it has ${existingStage._count.projectStageStates} active project states`,
})
}
await tx.stageTransition.deleteMany({
where: { OR: [{ fromStageId: existingStage.id }, { toStageId: existingStage.id }] },
})
await tx.stage.delete({ where: { id: existingStage.id } })
}
}
}
// Create or update stages
for (const stageInput of trackInput.stages) {
if (stageInput.id) {
await tx.stage.update({
where: { id: stageInput.id },
data: {
name: stageInput.name,
slug: stageInput.slug,
stageType: stageInput.stageType,
sortOrder: stageInput.sortOrder,
configJson: (stageInput.configJson as Prisma.InputJsonValue) ?? undefined,
},
})
allStageIds.push({ id: stageInput.id, sortOrder: stageInput.sortOrder, trackId })
} else {
const newStage = await tx.stage.create({
data: {
trackId,
name: stageInput.name,
slug: stageInput.slug,
stageType: stageInput.stageType,
sortOrder: stageInput.sortOrder,
configJson: (stageInput.configJson as Prisma.InputJsonValue) ?? undefined,
},
})
allStageIds.push({ id: newStage.id, sortOrder: stageInput.sortOrder, trackId })
}
}
}
// Recreate transitions if autoTransitions
if (input.autoTransitions) {
// Delete all existing transitions for this pipeline's tracks
const trackIds = allStageIds.map((s) => s.trackId)
const uniqueTrackIds = [...new Set(trackIds)]
for (const tId of uniqueTrackIds) {
const trackStages = await tx.stage.findMany({
where: { trackId: tId },
select: { id: true },
})
const stageIds = trackStages.map((s) => s.id)
if (stageIds.length > 0) {
await tx.stageTransition.deleteMany({
where: {
OR: [
{ fromStageId: { in: stageIds } },
{ toStageId: { in: stageIds } },
],
},
})
}
}
// Create new linear transitions per track
for (const tId of uniqueTrackIds) {
const trackStages = allStageIds
.filter((s) => s.trackId === tId)
.sort((a, b) => a.sortOrder - b.sortOrder)
for (let i = 0; i < trackStages.length - 1; i++) {
await tx.stageTransition.create({
data: {
fromStageId: trackStages[i].id,
toStageId: trackStages[i + 1].id,
isDefault: true,
},
})
}
}
}
await logAudit({
prisma: tx,
userId: ctx.user.id,
action: 'PIPELINE_STRUCTURE_UPDATED',
entityType: 'Pipeline',
entityId: input.id,
detailsJson: {
name: input.name,
trackCount: input.tracks.length,
},
ipAddress: ctx.ip,
userAgent: ctx.userAgent,
})
return pipeline
})
return result
}),
/**
* Simulate pipeline execution (dry run)
* Shows what would happen if projects were routed through the pipeline
*/
simulate: adminProcedure
.input(
z.object({
id: z.string(),
projectIds: z.array(z.string()).min(1).max(500),
})
)
.mutation(async ({ ctx, input }) => {
const pipeline = await ctx.prisma.pipeline.findUniqueOrThrow({
where: { id: input.id },
include: {
tracks: {
include: { stages: { orderBy: { sortOrder: 'asc' } } },
},
routingRules: {
where: { isActive: true },
orderBy: { priority: 'desc' },
},
},
})
// Load projects with their current state
const projects = await ctx.prisma.project.findMany({
where: { id: { in: input.projectIds } },
select: {
id: true,
title: true,
tags: true,
status: true,
metadataJson: true,
},
})
if (projects.length === 0) {
throw new TRPCError({
code: 'NOT_FOUND',
message: 'No projects found with the provided IDs',
})
}
// Simulate: for each project, determine which track/stage it would land in
const mainTrack = pipeline.tracks.find((t) => t.kind === 'MAIN')
const simulations = projects.map((project) => {
// Default: route to first stage of MAIN track
let targetTrack = mainTrack
let targetStage = mainTrack?.stages[0] ?? null
// Check routing rules (highest priority first)
for (const rule of pipeline.routingRules) {
const predicate = rule.predicateJson as Record<string, unknown>
if (predicate && evaluateSimplePredicate(predicate, project)) {
const destTrack = pipeline.tracks.find(
(t) => t.id === rule.destinationTrackId
)
if (destTrack) {
targetTrack = destTrack
targetStage = rule.destinationStageId
? destTrack.stages.find((s) => s.id === rule.destinationStageId) ?? destTrack.stages[0]
: destTrack.stages[0]
break
}
}
}
return {
projectId: project.id,
projectTitle: project.title,
currentStatus: project.status,
targetTrackId: targetTrack?.id ?? null,
targetTrackName: targetTrack?.name ?? 'Unrouted',
targetStageId: targetStage?.id ?? null,
targetStageName: targetStage?.name ?? 'None',
}
})
await logAudit({
prisma: ctx.prisma,
userId: ctx.user.id,
action: 'PIPELINE_SIMULATE',
entityType: 'Pipeline',
entityId: input.id,
detailsJson: { projectCount: projects.length },
ipAddress: ctx.ip,
userAgent: ctx.userAgent,
})
return {
pipelineId: input.id,
pipelineName: pipeline.name,
projectCount: projects.length,
simulations,
}
}),
// =========================================================================
// Phase 4: Participant-facing procedures
// =========================================================================
/**
* Get pipeline view for an applicant showing their project's journey
*/
getApplicantView: protectedProcedure
.input(
z.object({
programId: z.string(),
projectId: z.string(),
})
)
.query(async ({ ctx, input }) => {
// Get the pipeline for this program
const pipeline = await ctx.prisma.pipeline.findFirst({
where: { programId: input.programId, status: 'ACTIVE' },
include: {
tracks: {
where: { kind: 'MAIN' },
orderBy: { sortOrder: 'asc' },
include: {
stages: {
orderBy: { sortOrder: 'asc' },
select: {
id: true,
name: true,
stageType: true,
status: true,
sortOrder: true,
windowOpenAt: true,
windowCloseAt: true,
},
},
},
},
},
})
if (!pipeline) {
return null
}
// Get project stage states
const projectStates = await ctx.prisma.projectStageState.findMany({
where: {
projectId: input.projectId,
track: { pipelineId: pipeline.id },
},
select: {
stageId: true,
state: true,
enteredAt: true,
exitedAt: true,
},
})
const stateMap = new Map(projectStates.map((s) => [s.stageId, s]))
const mainTrack = pipeline.tracks[0]
if (!mainTrack) return null
// Determine current stage
const currentState = projectStates.find((s) => !s.exitedAt)
return {
pipelineId: pipeline.id,
pipelineName: pipeline.name,
trackId: mainTrack.id,
trackName: mainTrack.name,
currentStageId: currentState?.stageId ?? null,
stages: mainTrack.stages.map((stage) => {
const state = stateMap.get(stage.id)
return {
id: stage.id,
name: stage.name,
stageType: stage.stageType,
status: stage.status,
projectState: state?.state ?? null,
enteredAt: state?.enteredAt ?? null,
exitedAt: state?.exitedAt ?? null,
isCurrent: currentState?.stageId === stage.id,
}
}),
}
}),
/**
* Get stage-level analytics for a pipeline (observer/admin)
*/
getStageAnalytics: observerProcedure
.input(
z.object({
pipelineId: z.string(),
stageId: z.string().optional(),
})
)
.query(async ({ ctx, input }) => {
// Get pipeline with tracks and stages
const pipeline = await ctx.prisma.pipeline.findUniqueOrThrow({
where: { id: input.pipelineId },
include: {
tracks: {
orderBy: { sortOrder: 'asc' },
include: {
stages: {
orderBy: { sortOrder: 'asc' },
where: input.stageId ? { id: input.stageId } : {},
select: { id: true, name: true, stageType: true },
},
},
},
},
})
const allStages = pipeline.tracks.flatMap((t) => t.stages)
// Get per-stage project state counts
const stageAnalytics = await Promise.all(
allStages.map(async (stage) => {
const [stateCounts, assignmentCount, completedEvaluations, totalEvaluations] =
await Promise.all([
ctx.prisma.projectStageState.groupBy({
by: ['state'],
where: { stageId: stage.id },
_count: true,
}),
ctx.prisma.assignment.count({
where: { stageId: stage.id },
}),
ctx.prisma.evaluation.count({
where: {
assignment: { stageId: stage.id },
status: 'SUBMITTED',
},
}),
ctx.prisma.evaluation.count({
where: { assignment: { stageId: stage.id } },
}),
])
return {
stageId: stage.id,
stageName: stage.name,
stageType: stage.stageType,
projectCounts: stateCounts.reduce(
(acc, curr) => {
acc[curr.state] = curr._count
return acc
},
{} as Record<string, number>
),
totalProjects: stateCounts.reduce((sum, c) => sum + c._count, 0),
assignmentCoverage: assignmentCount,
evaluationCompletion: {
completed: completedEvaluations,
total: totalEvaluations,
percentage:
totalEvaluations > 0
? Math.round((completedEvaluations / totalEvaluations) * 100)
: 0,
},
}
})
)
return {
pipelineId: input.pipelineId,
pipelineName: pipeline.name,
stages: stageAnalytics,
}
}),
})
/**
* Simple predicate evaluator for simulation.
* Supports basic field matching on project data.
*/
function evaluateSimplePredicate(
predicate: Record<string, unknown>,
project: { tags: string[]; status: string; metadataJson: unknown }
): boolean {
const { field, operator, value } = predicate as {
field?: string
operator?: string
value?: unknown
}
if (!field || !operator) return false
let fieldValue: unknown
if (field === 'tags') {
fieldValue = project.tags
} else if (field === 'status') {
fieldValue = project.status
} else {
// Check metadataJson
const meta = (project.metadataJson as Record<string, unknown>) ?? {}
fieldValue = meta[field]
}
switch (operator) {
case 'equals':
return fieldValue === value
case 'contains':
if (Array.isArray(fieldValue)) return fieldValue.includes(value)
if (typeof fieldValue === 'string' && typeof value === 'string')
return fieldValue.includes(value)
return false
case 'in':
if (Array.isArray(value)) return value.includes(fieldValue)
return false
case 'hasAny':
if (Array.isArray(fieldValue) && Array.isArray(value))
return fieldValue.some((v) => value.includes(v))
return false
default:
return false
}
}