"""Agent management endpoints.""" import hashlib import logging import secrets import uuid from fastapi import APIRouter, Depends, Header, HTTPException, Query, Request, Response, status from pydantic import ValidationError from slowapi import Limiter from slowapi.util import get_remote_address from sqlalchemy import select from app.db import AsyncSessionDep from app.dependencies.auth import CurrentAgentDep from app.dependencies.local_agent_auth import verify_local_agent_key from app.models.agent import Agent, AgentStatus from app.models.base import utc_now from app.models.registration_token import RegistrationToken from app.models.tenant import Tenant from app.schemas.agent import ( AgentHeartbeatResponse, AgentRegisterRequest, AgentRegisterRequestLegacy, AgentRegisterResponse, AgentRegisterResponseLegacy, AgentResponse, LocalAgentRegisterRequest, LocalAgentRegisterResponse, ) from app.services.local_bootstrap import LocalBootstrapService logger = logging.getLogger(__name__) limiter = Limiter(key_func=get_remote_address) router = APIRouter(prefix="/agents", tags=["Agents"]) # --- Helper functions (embryonic service layer) --- async def get_agent_by_id(db: AsyncSessionDep, agent_id: uuid.UUID) -> Agent | None: """Retrieve an agent by ID.""" result = await db.execute(select(Agent).where(Agent.id == agent_id)) return result.scalar_one_or_none() async def get_tenant_by_id(db: AsyncSessionDep, tenant_id: uuid.UUID) -> Tenant | None: """Retrieve a tenant by ID.""" result = await db.execute(select(Tenant).where(Tenant.id == tenant_id)) return result.scalar_one_or_none() async def get_registration_token_by_hash( db: AsyncSessionDep, token_hash: str ) -> RegistrationToken | None: """Retrieve a registration token by its hash.""" result = await db.execute( select(RegistrationToken).where(RegistrationToken.token_hash == token_hash) ) return result.scalar_one_or_none() async def get_agent_by_tenant( db: AsyncSessionDep, tenant_id: uuid.UUID ) -> Agent | None: """Retrieve the first agent for a tenant (used for local mode single-agent).""" result = await db.execute( select(Agent).where(Agent.tenant_id == tenant_id).limit(1) ) return result.scalar_one_or_none() async def validate_agent_token( db: AsyncSessionDep, agent_id: uuid.UUID, authorization: str | None, ) -> Agent: """ Validate agent exists and token matches (legacy method). Args: db: Database session agent_id: Agent UUID authorization: Authorization header value Returns: Agent if valid Raises: HTTPException: 401 if invalid """ if authorization is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing Authorization header", ) # Parse Bearer token parts = authorization.split(" ", 1) if len(parts) != 2 or parts[0].lower() != "bearer": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Authorization header format. Expected: Bearer ", ) token = parts[1] # Find and validate agent agent = await get_agent_by_id(db, agent_id) if agent is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid agent credentials", ) # Use secrets.compare_digest for timing-attack-safe comparison if not secrets.compare_digest(agent.token, token): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid agent credentials", ) return agent # --- Route handlers (thin controllers) --- @router.get( "", response_model=list[AgentResponse], summary="List all agents", description="Retrieve all registered agents, optionally filtered by tenant.", ) async def list_agents( db: AsyncSessionDep, tenant_id: uuid.UUID | None = None, ) -> list[Agent]: """List all agents, optionally filtered by tenant.""" query = select(Agent) if tenant_id: query = query.where(Agent.tenant_id == tenant_id) query = query.order_by(Agent.created_at.desc()) result = await db.execute(query) return list(result.scalars().all()) @router.get( "/{agent_id}", response_model=AgentResponse, summary="Get agent by ID", description="Retrieve a specific agent by its UUID.", ) async def get_agent( agent_id: uuid.UUID, db: AsyncSessionDep, ) -> Agent: """Get a specific agent by ID.""" agent = await get_agent_by_id(db, agent_id) if agent is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Agent {agent_id} not found", ) return agent @router.post( "/register", response_model=AgentRegisterResponse | AgentRegisterResponseLegacy, status_code=status.HTTP_201_CREATED, summary="Register a new agent", description=""" Register a new SysAdmin agent with the orchestrator. **New Secure Flow (Recommended):** - Provide `registration_token` obtained from `/api/v1/tenants/{id}/registration-tokens` - The token determines which tenant the agent belongs to - Returns `agent_id`, `agent_secret`, and `tenant_id` - Store `agent_secret` securely - it's only shown once **Legacy Flow (Deprecated):** - Provide optional `tenant_id` directly - Returns `agent_id` and `token` - This flow will be removed in a future version """, ) @limiter.limit("5/minute") async def register_agent( request: Request, body: dict, db: AsyncSessionDep, ) -> AgentRegisterResponse | AgentRegisterResponseLegacy: """ Register a new SysAdmin agent. Supports both new (registration_token) and legacy (tenant_id) flows. """ # Determine which registration flow to use if "registration_token" in body: # New secure registration flow try: parsed = AgentRegisterRequest.model_validate(body) except ValidationError as e: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=e.errors(), ) return await _register_agent_secure(parsed, db) else: # Legacy registration flow (deprecated) logger.warning( "legacy_registration_used", extra={"message": "Agent using deprecated registration without token"}, ) try: parsed = AgentRegisterRequestLegacy.model_validate(body) except ValidationError as e: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=e.errors(), ) return await _register_agent_legacy(parsed, db) async def _register_agent_secure( request: AgentRegisterRequest, db: AsyncSessionDep, ) -> AgentRegisterResponse: """Register agent using the new secure token-based flow.""" # Hash the provided registration token token_hash = hashlib.sha256(request.registration_token.encode()).hexdigest() # Look up the registration token reg_token = await get_registration_token_by_hash(db, token_hash) if reg_token is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid registration token", ) # Validate token state if not reg_token.is_valid(): if reg_token.revoked: detail = "Registration token has been revoked" elif reg_token.expires_at and reg_token.expires_at < utc_now(): detail = "Registration token has expired" else: detail = "Registration token has been exhausted" raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=detail, ) # Increment use count reg_token.use_count += 1 # Generate agent credentials agent_id = uuid.uuid4() agent_secret = secrets.token_hex(32) secret_hash = hashlib.sha256(agent_secret.encode()).hexdigest() # Create agent with tenant from token agent = Agent( id=agent_id, name=request.hostname, version=request.version, status=AgentStatus.ONLINE.value, last_heartbeat=utc_now(), token="", # Legacy field - empty for new agents secret_hash=secret_hash, tenant_id=reg_token.tenant_id, registration_token_id=reg_token.id, ) db.add(agent) await db.commit() logger.info( "agent_registered", extra={ "agent_id": str(agent_id), "tenant_id": str(reg_token.tenant_id), "hostname": request.hostname, "registration_token_id": str(reg_token.id), }, ) return AgentRegisterResponse( agent_id=agent_id, agent_secret=agent_secret, tenant_id=reg_token.tenant_id, ) async def _register_agent_legacy( request: AgentRegisterRequestLegacy, db: AsyncSessionDep, ) -> AgentRegisterResponseLegacy: """Register agent using the legacy flow (deprecated).""" # Validate tenant exists if provided if request.tenant_id is not None: tenant = await get_tenant_by_id(db, request.tenant_id) if tenant is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Tenant {request.tenant_id} not found", ) agent_id = uuid.uuid4() token = secrets.token_hex(32) # For legacy agents, also compute the secret_hash from the token # This allows them to work with the new auth scheme secret_hash = hashlib.sha256(token.encode()).hexdigest() agent = Agent( id=agent_id, name=request.hostname, version=request.version, status=AgentStatus.ONLINE.value, last_heartbeat=utc_now(), token=token, # Legacy field - used for backward compatibility secret_hash=secret_hash, # Also set for new auth scheme tenant_id=request.tenant_id, ) db.add(agent) await db.commit() logger.info( "agent_registered_legacy", extra={ "agent_id": str(agent_id), "tenant_id": str(request.tenant_id) if request.tenant_id else None, "hostname": request.hostname, }, ) return AgentRegisterResponseLegacy(agent_id=agent_id, token=token) @router.post( "/register-local", response_model=LocalAgentRegisterResponse, summary="Register agent in LOCAL_MODE", description=""" Register the local SysAdmin agent in LOCAL_MODE. **Important:** This endpoint only exists when `LOCAL_MODE=true`. **Authentication:** - Requires `X-Local-Agent-Key` header (NOT `X-Admin-Api-Key`) - LOCAL_AGENT_KEY has minimal scope - can only register the local agent **Idempotent Behavior:** - First call: Creates agent, returns `agent_secret` (201 Created) - Subsequent calls: Returns existing `agent_id`, NO secret (200 OK) - With `rotate=true`: Deletes existing agent, returns new credentials (201 Created) **HTTP Status Codes:** - 201: New agent created (or rotated) - 200: Existing agent returned (no secret) - 404: Endpoint hidden (LOCAL_MODE is false) - 401: Invalid or missing LOCAL_AGENT_KEY - 503: Local tenant not bootstrapped yet **Security:** - LOCAL_AGENT_KEY is separate from ADMIN_API_KEY (principle of least privilege) - Agent secret is only shown once (on first registration or rotation) - Rotation is logged as a security event """, responses={ 201: {"description": "Agent created or rotated"}, 200: {"description": "Existing agent returned (no secret)"}, 401: {"description": "Invalid LOCAL_AGENT_KEY"}, 404: {"description": "Endpoint hidden (LOCAL_MODE=false)"}, 503: {"description": "Local tenant not bootstrapped"}, }, ) @limiter.limit("5/minute") async def register_agent_local( request: Request, body: LocalAgentRegisterRequest, response: Response, db: AsyncSessionDep, rotate: bool = Query( default=False, description="Force credential rotation (deletes existing agent, creates new one)", ), _auth: None = Depends(verify_local_agent_key), ) -> LocalAgentRegisterResponse: """ Register an agent in LOCAL_MODE using LOCAL_AGENT_KEY. This endpoint: - Only works when LOCAL_MODE=true - Requires valid LOCAL_AGENT_KEY (NOT ADMIN_API_KEY) - Creates agent for the auto-bootstrapped local tenant - Idempotent: if agent exists, returns existing agent_id (no new secret) - With rotate=true: deletes existing, creates new with fresh credentials Security: LOCAL_AGENT_KEY has minimal scope - can only register the local agent, nothing else. """ # Get local tenant ID from bootstrap service local_tenant_id = LocalBootstrapService.get_local_tenant_id() if local_tenant_id is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Local tenant not bootstrapped. Orchestrator is starting up.", headers={"Retry-After": "5"}, ) # Check if agent already exists for this tenant existing_agent = await get_agent_by_tenant(db, local_tenant_id) # Handle rotation request if rotate and existing_agent: logger.warning( "local_agent_credentials_rotated", extra={ "agent_id": str(existing_agent.id), "tenant_id": str(local_tenant_id), "hostname": existing_agent.name, "new_hostname": body.hostname, }, ) await db.delete(existing_agent) await db.commit() existing_agent = None # Proceed to create new agent # Idempotent: return existing agent without secret if existing_agent: logger.info( "local_agent_already_registered", extra={ "agent_id": str(existing_agent.id), "tenant_id": str(local_tenant_id), "hostname": existing_agent.name, }, ) response.status_code = status.HTTP_200_OK return LocalAgentRegisterResponse( agent_id=existing_agent.id, tenant_id=local_tenant_id, agent_secret=None, already_registered=True, ) # Create new agent agent_id = uuid.uuid4() agent_secret = secrets.token_hex(32) secret_hash = hashlib.sha256(agent_secret.encode()).hexdigest() agent = Agent( id=agent_id, name=body.hostname, version=body.version, status=AgentStatus.ONLINE.value, last_heartbeat=utc_now(), token="", # Legacy field - empty for new agents secret_hash=secret_hash, tenant_id=local_tenant_id, registration_token_id=None, # No registration token in local mode ) db.add(agent) await db.commit() logger.info( "local_agent_registered", extra={ "agent_id": str(agent_id), "tenant_id": str(local_tenant_id), "hostname": body.hostname, "rotated": rotate, }, ) response.status_code = status.HTTP_201_CREATED return LocalAgentRegisterResponse( agent_id=agent_id, tenant_id=local_tenant_id, agent_secret=agent_secret, already_registered=False, ) @router.post( "/{agent_id}/heartbeat", response_model=AgentHeartbeatResponse, summary="Send agent heartbeat", description=""" Send a heartbeat from an agent. Updates the agent's last_heartbeat timestamp and sets status to online. **Authentication:** - New: X-Agent-Id and X-Agent-Secret headers - Legacy: Authorization: Bearer header """, ) async def agent_heartbeat( agent_id: uuid.UUID, db: AsyncSessionDep, current_agent: CurrentAgentDep, ) -> AgentHeartbeatResponse: """ Send heartbeat from agent. Updates last_heartbeat timestamp and sets status to online. """ # Verify the path agent_id matches the authenticated agent if agent_id != current_agent.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Agent ID mismatch", ) # Update heartbeat current_agent.last_heartbeat = utc_now() current_agent.status = AgentStatus.ONLINE.value await db.commit() return AgentHeartbeatResponse(status="ok")