"""Task management endpoints.""" import uuid from fastapi import APIRouter, Header, HTTPException, Query, status from sqlalchemy import select from app.db import AsyncSessionDep from app.models.agent import Agent from app.models.task import Task, TaskStatus from app.routes.agents import validate_agent_token from app.schemas.task import TaskCreate, TaskResponse, TaskUpdate router = APIRouter(prefix="/tasks", tags=["Tasks"]) # --- Helper functions (embryonic service layer) --- async def create_task(db: AsyncSessionDep, task_in: TaskCreate) -> Task: """Create a new task in the database.""" task = Task( tenant_id=task_in.tenant_id, type=task_in.type, payload=task_in.payload, status=TaskStatus.PENDING.value, ) db.add(task) await db.commit() await db.refresh(task) return task async def get_tasks( db: AsyncSessionDep, tenant_id: uuid.UUID | None = None, task_status: TaskStatus | None = None, ) -> list[Task]: """Retrieve tasks with optional filtering.""" query = select(Task).order_by(Task.created_at.desc()) if tenant_id is not None: query = query.where(Task.tenant_id == tenant_id) if task_status is not None: query = query.where(Task.status == task_status.value) result = await db.execute(query) return list(result.scalars().all()) async def get_task_by_id(db: AsyncSessionDep, task_id: uuid.UUID) -> Task | None: """Retrieve a task by ID.""" result = await db.execute(select(Task).where(Task.id == task_id)) return result.scalar_one_or_none() async def update_task( db: AsyncSessionDep, task: Task, task_update: TaskUpdate, ) -> Task: """Update a task's status and/or result.""" if task_update.status is not None: task.status = task_update.status.value if task_update.result is not None: task.result = task_update.result await db.commit() await db.refresh(task) return task # --- Route handlers (thin controllers) --- @router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED) async def create_task_endpoint( task_in: TaskCreate, db: AsyncSessionDep, ) -> Task: """ Create a new task for agent execution. ## Parameters - **tenant_id**: UUID of the tenant this task belongs to - **type**: Task type string (see supported types below) - **payload**: JSON payload with task-specific parameters ## Supported Task Types | Type | Description | Payload | |------|-------------|---------| | FILE_WRITE | Write content to a file | `{"path": str, "content": str}` | | ENV_UPDATE | Update .env key/value pairs | `{"path": str, "updates": {str: str}}` | | DOCKER_RELOAD | Reload Docker Compose stack | `{"compose_dir": str}` | | COMPOSITE | Execute sequence of sub-tasks | `{"sequence": [{task, payload}, ...]}` | ## Agent Behavior 1. Agent polls `GET /tasks/next` to claim pending tasks 2. Agent executes the task based on type and payload 3. Agent updates task status via `PATCH /tasks/{id}` ## Example Payloads **FILE_WRITE:** ```json {"path": "/opt/app/config.json", "content": "{\"key\": \"value\"}"} ``` **ENV_UPDATE:** ```json {"path": "/opt/app/.env", "updates": {"DB_HOST": "localhost", "DB_PORT": "5432"}} ``` **DOCKER_RELOAD:** ```json {"compose_dir": "/opt/stacks/keycloak"} ``` **COMPOSITE:** ```json { "sequence": [ {"task": "FILE_WRITE", "payload": {"path": "/opt/app/config.json", "content": "{}"}}, {"task": "DOCKER_RELOAD", "payload": {"compose_dir": "/opt/stacks/app"}} ] } ``` """ return await create_task(db, task_in) @router.get("/", response_model=list[TaskResponse]) async def list_tasks_endpoint( db: AsyncSessionDep, tenant_id: uuid.UUID | None = Query(None, description="Filter by tenant ID"), status: TaskStatus | None = Query(None, description="Filter by task status"), ) -> list[Task]: """ List all tasks with optional filtering. ## Query Parameters - **tenant_id**: Optional filter by tenant UUID - **status**: Optional filter by task status (pending, running, completed, failed) ## Task Types Tasks may have the following types: - **FILE_WRITE**: Write content to a file - **ENV_UPDATE**: Update .env key/value pairs - **DOCKER_RELOAD**: Reload Docker Compose stack - **COMPOSITE**: Execute sequence of sub-tasks - Legacy types: provision_server, configure_keycloak, etc. ## Response Returns tasks ordered by created_at descending (newest first). Each task includes: id, tenant_id, agent_id, type, payload, status, result, timestamps. """ return await get_tasks(db, tenant_id=tenant_id, task_status=status) # --- Agent task acquisition --- # NOTE: /next must be defined BEFORE /{task_id} to avoid path matching issues async def get_next_pending_task(db: AsyncSessionDep, agent: Agent) -> Task | None: """Get the oldest pending task for the agent's tenant. If the agent has a tenant_id, only returns tasks for that tenant. If the agent has no tenant_id (shared agent), returns any pending task. """ query = select(Task).where(Task.status == TaskStatus.PENDING.value) # Filter by agent's tenant if agent is tenant-specific if agent.tenant_id is not None: query = query.where(Task.tenant_id == agent.tenant_id) query = query.order_by(Task.created_at.asc()).limit(1) result = await db.execute(query) return result.scalar_one_or_none() @router.get("/next", response_model=TaskResponse | None) async def get_next_task_endpoint( db: AsyncSessionDep, agent_id: uuid.UUID = Query(..., description="Agent UUID requesting the task"), authorization: str | None = Header(None), ) -> Task | None: """ Get the next pending task for an agent. Requires Bearer token authentication matching the agent. Atomically claims the oldest pending task by: - Setting status to 'running' - Assigning agent_id to the requesting agent Tasks are filtered by the agent's tenant_id: - If agent has a tenant_id, only returns tasks for that tenant - If agent has no tenant_id (shared agent), can claim any task Returns null (200) if no pending tasks are available. """ # Validate agent credentials and get agent object agent = await validate_agent_token(db, agent_id, authorization) # Get next pending task for this agent's tenant task = await get_next_pending_task(db, agent) if task is None: return None # Claim the task task.status = TaskStatus.RUNNING.value task.agent_id = agent_id await db.commit() await db.refresh(task) return task @router.get("/{task_id}", response_model=TaskResponse) async def get_task_endpoint( task_id: uuid.UUID, db: AsyncSessionDep, ) -> Task: """ Get a task by ID. Returns the task with the specified UUID. """ task = await get_task_by_id(db, task_id) if task is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task {task_id} not found", ) return task @router.patch("/{task_id}", response_model=TaskResponse) async def update_task_endpoint( task_id: uuid.UUID, task_update: TaskUpdate, db: AsyncSessionDep, ) -> Task: """ Update a task's status and/or result. Only status and result fields can be updated. - **status**: New task status - **result**: JSON result payload """ task = await get_task_by_id(db, task_id) if task is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task {task_id} not found", ) return await update_task(db, task, task_update)