"""Agent authentication dependencies.""" import hashlib import secrets import uuid from typing import Annotated from fastapi import Depends, Header, HTTPException, status from sqlalchemy import select from app.db import AsyncSessionDep from app.models.agent import Agent async def get_current_agent( db: AsyncSessionDep, x_agent_id: str = Header(..., alias="X-Agent-Id"), x_agent_secret: str = Header(..., alias="X-Agent-Secret"), ) -> Agent: """ Validate agent credentials using X-Agent-Id/X-Agent-Secret headers. Args: db: Database session x_agent_id: Agent UUID from X-Agent-Id header x_agent_secret: Agent secret from X-Agent-Secret header Returns: Agent if credentials are valid Raises: HTTPException: 401 if credentials are invalid """ # Parse agent ID try: agent_id = uuid.UUID(x_agent_id) except ValueError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Agent ID format", ) # Look up agent result = await db.execute(select(Agent).where(Agent.id == agent_id)) agent = result.scalar_one_or_none() if agent is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid agent credentials", ) # Verify secret using timing-safe comparison provided_hash = hashlib.sha256(x_agent_secret.encode()).hexdigest() if not secrets.compare_digest(agent.secret_hash, provided_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid agent credentials", ) return agent # Type alias for dependency injection CurrentAgentDep = Annotated[Agent, Depends(get_current_agent)]