"""Task management endpoints.""" import uuid from fastapi import APIRouter, Header, HTTPException, Query, status from sqlalchemy import select from app.db import AsyncSessionDep from app.models.task import Task, TaskStatus from app.routes.agents import validate_agent_token from app.schemas.task import TaskCreate, TaskResponse, TaskUpdate router = APIRouter(prefix="/tasks", tags=["Tasks"]) # --- Helper functions (embryonic service layer) --- async def create_task(db: AsyncSessionDep, task_in: TaskCreate) -> Task: """Create a new task in the database.""" task = Task( tenant_id=task_in.tenant_id, type=task_in.type, payload=task_in.payload, status=TaskStatus.PENDING.value, ) db.add(task) await db.commit() await db.refresh(task) return task async def get_tasks( db: AsyncSessionDep, tenant_id: uuid.UUID | None = None, task_status: TaskStatus | None = None, ) -> list[Task]: """Retrieve tasks with optional filtering.""" query = select(Task).order_by(Task.created_at.desc()) if tenant_id is not None: query = query.where(Task.tenant_id == tenant_id) if task_status is not None: query = query.where(Task.status == task_status.value) result = await db.execute(query) return list(result.scalars().all()) async def get_task_by_id(db: AsyncSessionDep, task_id: uuid.UUID) -> Task | None: """Retrieve a task by ID.""" result = await db.execute(select(Task).where(Task.id == task_id)) return result.scalar_one_or_none() async def update_task( db: AsyncSessionDep, task: Task, task_update: TaskUpdate, ) -> Task: """Update a task's status and/or result.""" if task_update.status is not None: task.status = task_update.status.value if task_update.result is not None: task.result = task_update.result await db.commit() await db.refresh(task) return task # --- Route handlers (thin controllers) --- @router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED) async def create_task_endpoint( task_in: TaskCreate, db: AsyncSessionDep, ) -> Task: """ Create a new task for agent execution. ## Parameters - **tenant_id**: UUID of the tenant this task belongs to - **type**: Task type string (see supported types below) - **payload**: JSON payload with task-specific parameters ## Supported Task Types | Type | Description | Payload | |------|-------------|---------| | FILE_WRITE | Write content to a file | `{"path": str, "content": str}` | | ENV_UPDATE | Update .env key/value pairs | `{"path": str, "updates": {str: str}}` | | DOCKER_RELOAD | Reload Docker Compose stack | `{"compose_dir": str}` | | COMPOSITE | Execute sequence of sub-tasks | `{"sequence": [{task, payload}, ...]}` | ## Agent Behavior 1. Agent polls `GET /tasks/next` to claim pending tasks 2. Agent executes the task based on type and payload 3. Agent updates task status via `PATCH /tasks/{id}` ## Example Payloads **FILE_WRITE:** ```json {"path": "/opt/app/config.json", "content": "{\"key\": \"value\"}"} ``` **ENV_UPDATE:** ```json {"path": "/opt/app/.env", "updates": {"DB_HOST": "localhost", "DB_PORT": "5432"}} ``` **DOCKER_RELOAD:** ```json {"compose_dir": "/opt/stacks/keycloak"} ``` **COMPOSITE:** ```json { "sequence": [ {"task": "FILE_WRITE", "payload": {"path": "/opt/app/config.json", "content": "{}"}}, {"task": "DOCKER_RELOAD", "payload": {"compose_dir": "/opt/stacks/app"}} ] } ``` """ return await create_task(db, task_in) @router.get("/", response_model=list[TaskResponse]) async def list_tasks_endpoint( db: AsyncSessionDep, tenant_id: uuid.UUID | None = Query(None, description="Filter by tenant ID"), status: TaskStatus | None = Query(None, description="Filter by task status"), ) -> list[Task]: """ List all tasks with optional filtering. ## Query Parameters - **tenant_id**: Optional filter by tenant UUID - **status**: Optional filter by task status (pending, running, completed, failed) ## Task Types Tasks may have the following types: - **FILE_WRITE**: Write content to a file - **ENV_UPDATE**: Update .env key/value pairs - **DOCKER_RELOAD**: Reload Docker Compose stack - **COMPOSITE**: Execute sequence of sub-tasks - Legacy types: provision_server, configure_keycloak, etc. ## Response Returns tasks ordered by created_at descending (newest first). Each task includes: id, tenant_id, agent_id, type, payload, status, result, timestamps. """ return await get_tasks(db, tenant_id=tenant_id, task_status=status) # --- Agent task acquisition --- # NOTE: /next must be defined BEFORE /{task_id} to avoid path matching issues async def get_next_pending_task(db: AsyncSessionDep) -> Task | None: """Get the oldest pending task.""" query = ( select(Task) .where(Task.status == TaskStatus.PENDING.value) .order_by(Task.created_at.asc()) .limit(1) ) result = await db.execute(query) return result.scalar_one_or_none() @router.get("/next", response_model=TaskResponse | None) async def get_next_task_endpoint( db: AsyncSessionDep, agent_id: uuid.UUID = Query(..., description="Agent UUID requesting the task"), authorization: str | None = Header(None), ) -> Task | None: """ Get the next pending task for an agent. Requires Bearer token authentication matching the agent. Atomically claims the oldest pending task by: - Setting status to 'running' - Assigning agent_id to the requesting agent Returns null (200) if no pending tasks are available. """ # Validate agent credentials await validate_agent_token(db, agent_id, authorization) # Get next pending task task = await get_next_pending_task(db) if task is None: return None # Claim the task task.status = TaskStatus.RUNNING.value task.agent_id = agent_id await db.commit() await db.refresh(task) return task @router.get("/{task_id}", response_model=TaskResponse) async def get_task_endpoint( task_id: uuid.UUID, db: AsyncSessionDep, ) -> Task: """ Get a task by ID. Returns the task with the specified UUID. """ task = await get_task_by_id(db, task_id) if task is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task {task_id} not found", ) return task @router.patch("/{task_id}", response_model=TaskResponse) async def update_task_endpoint( task_id: uuid.UUID, task_update: TaskUpdate, db: AsyncSessionDep, ) -> Task: """ Update a task's status and/or result. Only status and result fields can be updated. - **status**: New task status - **result**: JSON result payload """ task = await get_task_by_id(db, task_id) if task is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task {task_id} not found", ) return await update_task(db, task, task_update)