letsbe-orchestrator/app/dependencies/auth.py

167 lines
5.2 KiB
Python

"""Agent authentication dependencies."""
import hashlib
import logging
import secrets
import uuid
from typing import Annotated
from fastapi import Depends, Header, HTTPException, status
from sqlalchemy import select
from app.db import AsyncSessionDep
from app.models.agent import Agent
logger = logging.getLogger(__name__)
async def get_current_agent(
db: AsyncSessionDep,
x_agent_id: str = Header(..., alias="X-Agent-Id"),
x_agent_secret: str = Header(..., alias="X-Agent-Secret"),
) -> Agent:
"""
Validate agent credentials using the new X-Agent-Id/X-Agent-Secret scheme.
This is the preferred authentication method for agents.
Args:
db: Database session
x_agent_id: Agent UUID from X-Agent-Id header
x_agent_secret: Agent secret from X-Agent-Secret header
Returns:
Agent if credentials are valid
Raises:
HTTPException: 401 if credentials are invalid
"""
# Parse agent ID
try:
agent_id = uuid.UUID(x_agent_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid Agent ID format",
)
# Look up agent
result = await db.execute(select(Agent).where(Agent.id == agent_id))
agent = result.scalar_one_or_none()
if agent is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid agent credentials",
)
# Verify secret using timing-safe comparison
provided_hash = hashlib.sha256(x_agent_secret.encode()).hexdigest()
if not secrets.compare_digest(agent.secret_hash, provided_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid agent credentials",
)
return agent
async def _validate_agent_token_legacy(
db: AsyncSessionDep,
agent_id: uuid.UUID,
token: str,
) -> Agent:
"""
Validate agent using legacy plaintext token (for backward compatibility).
This method is DEPRECATED and will be removed after migration period.
"""
result = await db.execute(select(Agent).where(Agent.id == agent_id))
agent = result.scalar_one_or_none()
if agent is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid agent credentials",
)
# Use timing-safe comparison for legacy token
if not agent.token or not secrets.compare_digest(agent.token, token):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid agent credentials",
)
return agent
async def get_current_agent_compat(
db: AsyncSessionDep,
x_agent_id: str | None = Header(None, alias="X-Agent-Id"),
x_agent_secret: str | None = Header(None, alias="X-Agent-Secret"),
authorization: str | None = Header(None),
agent_id: uuid.UUID | None = None, # Query param for legacy /tasks/next
) -> Agent:
"""
Backward-compatible agent authentication.
Supports both:
1. New scheme: X-Agent-Id + X-Agent-Secret headers (preferred)
2. Legacy scheme: Authorization: Bearer <token> header + agent_id param
The legacy scheme will log a deprecation warning.
Args:
db: Database session
x_agent_id: Agent UUID from X-Agent-Id header (new scheme)
x_agent_secret: Agent secret from X-Agent-Secret header (new scheme)
authorization: Authorization header (legacy scheme)
agent_id: Agent UUID from query param (legacy scheme for /tasks/next)
Returns:
Agent if credentials are valid
Raises:
HTTPException: 401 if credentials are invalid or missing
"""
# Prefer new authentication scheme
if x_agent_id and x_agent_secret:
return await get_current_agent(db, x_agent_id, x_agent_secret)
# Fall back to legacy Bearer token authentication
if authorization:
logger.warning(
"deprecated_auth_scheme: Bearer token auth is deprecated. Use X-Agent-Id and X-Agent-Secret headers. agent_id=%s",
str(agent_id) if agent_id else None,
)
# Parse Bearer token
parts = authorization.split(" ", 1)
if len(parts) != 2 or parts[0].lower() != "bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid Authorization header format. Expected: Bearer <token>",
)
token = parts[1]
# For legacy auth, we need the agent_id from somewhere
# It could come from the path param or query param
if agent_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Agent ID required for Bearer token authentication",
)
return await _validate_agent_token_legacy(db, agent_id, token)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing authentication credentials. Use X-Agent-Id and X-Agent-Secret headers.",
)
# Type aliases for dependency injection
CurrentAgentDep = Annotated[Agent, Depends(get_current_agent)]
CurrentAgentCompatDep = Annotated[Agent, Depends(get_current_agent_compat)]