feat: add secure registration with credential persistence

- Add REGISTRATION_TOKEN config for new secure registration flow
- Add agent_secret and credentials_path config options
- Update HTTP client to use X-Agent-Id/X-Agent-Secret headers
- Add credential persistence to ~/.letsbe-agent/credentials.json
- Load persisted credentials on startup to survive restarts
- Verify credentials via heartbeat before skipping registration
- Maintain backward compatibility with legacy Bearer token auth

The agent now:
1. First tries to load persisted credentials
2. Validates them via heartbeat
3. Falls back to registration if invalid/missing
4. Persists new credentials after successful registration

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Matt 2025-12-07 11:11:59 +01:00
parent cea54183cc
commit 51b3050b5c
3 changed files with 272 additions and 37 deletions

View File

@ -49,30 +49,65 @@ class Agent:
async def register(self, max_retries: int = 5) -> bool:
"""Register agent with the orchestrator.
First attempts to load persisted credentials from a previous session.
If credentials exist and are valid, skips registration.
Otherwise, registers using REGISTRATION_TOKEN (new) or TENANT_ID (legacy).
Args:
max_retries: Maximum registration attempts
Returns:
True if registration succeeded
True if registration succeeded or credentials were loaded
"""
if self._registered:
logger.info("agent_already_registered", agent_id=self.client.agent_id)
return True
# Try to load persisted credentials first
if self.client.load_credentials():
self._registered = True
logger.info(
"credentials_restored",
agent_id=self.client.agent_id,
tenant_id=self.client.tenant_id,
)
# Verify credentials still work by sending heartbeat
if await self.client.heartbeat():
logger.info("credentials_verified")
# Retry any pending results from previous session
await self.client.retry_pending_results()
return True
else:
# Credentials may be invalid, clear and re-register
logger.warning("credentials_invalid_reregistering")
self.client.clear_credentials()
self._registered = False
# Check if we have registration token or can do legacy registration
if not self.settings.registration_token and not self.settings.tenant_id:
# For backward compatibility, allow registration without token
# (orchestrator will create shared agent)
logger.warning(
"registration_no_token",
message="No REGISTRATION_TOKEN provided. Using legacy registration.",
)
metadata = self._get_metadata()
for attempt in range(max_retries):
try:
# register() returns (agent_id, token)
agent_id, token = await self.client.register(metadata)
# register() returns (agent_id, secret_or_token, tenant_id)
agent_id, secret, tenant_id = await self.client.register(metadata)
self._registered = True
logger.info(
"agent_registered",
agent_id=agent_id,
tenant_id=tenant_id,
hostname=self.settings.hostname,
version=self.settings.agent_version,
token_received=bool(token),
auth_type="secure" if self.client.agent_secret else "legacy",
)
# Send registration event

View File

@ -58,9 +58,11 @@ class OrchestratorClient:
Features:
- Exponential backoff with jitter on failures
- Circuit breaker to prevent hammering during outages
- X-Agent-Version header on all requests
- X-Agent-Id and X-Agent-Secret headers for new auth
- Backward compatible with legacy Bearer token auth
- Event logging to orchestrator
- Local result persistence for retry
- Credential persistence to survive restarts
"""
# API version prefix for all endpoints
@ -70,9 +72,17 @@ class OrchestratorClient:
self.settings = settings or get_settings()
self._client: Optional[httpx.AsyncClient] = None
self._agent_id: Optional[str] = None
self._token: Optional[str] = None # Token received from registration or env
self._agent_secret: Optional[str] = None # New auth scheme
self._tenant_id: Optional[str] = None # Set after registration
self._token: Optional[str] = None # Legacy token (deprecated)
# Initialize token from settings if provided
# Initialize from settings if provided
if self.settings.agent_id:
self._agent_id = self.settings.agent_id
if self.settings.agent_secret:
self._agent_secret = self.settings.agent_secret
if self.settings.tenant_id:
self._tenant_id = self.settings.tenant_id
if self.settings.agent_token:
self._token = self.settings.agent_token
@ -80,8 +90,9 @@ class OrchestratorClient:
self._consecutive_failures = 0
self._circuit_open_until: Optional[float] = None
# Pending results path
# Persistence paths
self._pending_path = Path(self.settings.pending_results_path).expanduser()
self._credentials_path = Path(self.settings.credentials_path).expanduser()
@property
def agent_id(self) -> Optional[str]:
@ -92,30 +103,73 @@ class OrchestratorClient:
def agent_id(self, value: str) -> None:
"""Set the agent ID after registration."""
self._agent_id = value
self._invalidate_client()
@property
def agent_secret(self) -> Optional[str]:
"""Get the current agent secret (new auth scheme)."""
return self._agent_secret
@agent_secret.setter
def agent_secret(self, value: str) -> None:
"""Set the agent secret after registration."""
self._agent_secret = value
self._invalidate_client()
@property
def tenant_id(self) -> Optional[str]:
"""Get the tenant ID."""
return self._tenant_id
@tenant_id.setter
def tenant_id(self, value: str) -> None:
"""Set the tenant ID."""
self._tenant_id = value
@property
def token(self) -> Optional[str]:
"""Get the current authentication token."""
"""Get the legacy authentication token (deprecated)."""
return self._token
@token.setter
def token(self, value: str) -> None:
"""Set the authentication token (from registration or env)."""
"""Set the legacy authentication token (deprecated)."""
self._token = value
# Force client recreation to pick up new headers
self._invalidate_client()
@property
def is_registered(self) -> bool:
"""Check if agent has credentials (registered or loaded)."""
return self._agent_id is not None and (
self._agent_secret is not None or self._token is not None
)
def _invalidate_client(self) -> None:
"""Force client recreation to pick up new headers."""
if self._client and not self._client.is_closed:
asyncio.create_task(self._client.aclose())
self._client = None
def _get_headers(self) -> dict[str, str]:
"""Get headers for API requests including version and auth."""
"""Get headers for API requests including version and auth.
Uses new X-Agent-Id/X-Agent-Secret scheme if available,
falls back to legacy Bearer token for backward compatibility.
"""
headers = {
"Content-Type": "application/json",
"X-Agent-Version": self.settings.agent_version,
"X-Agent-Hostname": self.settings.hostname,
}
if self._token:
# Prefer new auth scheme
if self._agent_id and self._agent_secret:
headers["X-Agent-Id"] = self._agent_id
headers["X-Agent-Secret"] = self._agent_secret
# Fall back to legacy Bearer token
elif self._token:
headers["Authorization"] = f"Bearer {self._token}"
return headers
async def _get_client(self) -> httpx.AsyncClient:
@ -243,14 +297,18 @@ class OrchestratorClient:
raise last_error or Exception("Unknown error during request")
async def register(self, metadata: Optional[dict] = None) -> tuple[str, str]:
async def register(self, metadata: Optional[dict] = None) -> tuple[str, str, Optional[str]]:
"""Register agent with the orchestrator.
Supports two registration flows:
1. New (secure): Uses REGISTRATION_TOKEN from settings
2. Legacy (deprecated): Uses TENANT_ID directly
Args:
metadata: Optional metadata about the agent
Returns:
Tuple of (agent_id, token) assigned by orchestrator
Tuple of (agent_id, secret_or_token, tenant_id)
"""
payload = {
"hostname": self.settings.hostname,
@ -258,15 +316,24 @@ class OrchestratorClient:
"metadata": metadata or {},
}
# Include tenant_id if configured
if self.settings.tenant_id:
payload["tenant_id"] = self.settings.tenant_id
logger.info(
"registering_agent",
hostname=self.settings.hostname,
tenant_id=self.settings.tenant_id,
)
# Determine registration flow
if self.settings.registration_token:
# New secure registration flow
payload["registration_token"] = self.settings.registration_token
logger.info(
"registering_agent_secure",
hostname=self.settings.hostname,
)
else:
# Legacy registration flow (deprecated)
if self.settings.tenant_id:
payload["tenant_id"] = self.settings.tenant_id
logger.warning(
"registering_agent_legacy",
hostname=self.settings.hostname,
tenant_id=self.settings.tenant_id,
message="Using deprecated registration flow. Consider using REGISTRATION_TOKEN.",
)
response = await self._request_with_retry(
"POST",
@ -276,14 +343,37 @@ class OrchestratorClient:
response.raise_for_status()
data = response.json()
self._agent_id = data["agent_id"]
# Use property setter to force client recreation with new token
new_token = data.get("token")
if new_token:
self.token = new_token # Property setter forces client recreation
logger.info("agent_registered", agent_id=self._agent_id)
return self._agent_id, self._token
# Handle response based on registration flow
if "agent_secret" in data:
# New secure registration response
self._agent_id = data["agent_id"]
self._agent_secret = data["agent_secret"]
self._tenant_id = data.get("tenant_id")
# Persist credentials for restart recovery
await self._save_credentials()
logger.info(
"agent_registered_secure",
agent_id=self._agent_id,
tenant_id=self._tenant_id,
)
return self._agent_id, self._agent_secret, self._tenant_id
else:
# Legacy registration response
self._agent_id = data["agent_id"]
self._token = data.get("token")
self._tenant_id = self.settings.tenant_id
# Also persist legacy credentials
await self._save_credentials()
logger.info(
"agent_registered_legacy",
agent_id=self._agent_id,
)
return self._agent_id, self._token, self._tenant_id
async def heartbeat(self) -> bool:
"""Send heartbeat to orchestrator.
@ -312,15 +402,15 @@ class OrchestratorClient:
Returns:
Task if available, None otherwise
"""
if not self._agent_id:
if not self.is_registered:
logger.warning("fetch_task_skipped", reason="not_registered")
return None
try:
# Note: agent_id is now in headers (X-Agent-Id), not query params
response = await self._request_with_retry(
"GET",
f"{self.API_PREFIX}/tasks/next",
params={"agent_id": self._agent_id},
max_retries=1,
)
@ -525,6 +615,96 @@ class OrchestratorClient:
return successful
async def _save_credentials(self) -> None:
"""Persist agent credentials to disk for restart recovery.
Credentials are stored with secure file permissions (0600).
"""
try:
# Ensure directory exists
self._credentials_path.parent.mkdir(parents=True, exist_ok=True)
credentials = {
"agent_id": self._agent_id,
"tenant_id": self._tenant_id,
}
# Include appropriate credential based on auth type
if self._agent_secret:
credentials["agent_secret"] = self._agent_secret
elif self._token:
credentials["token"] = self._token
# Write with secure permissions
self._credentials_path.write_text(json.dumps(credentials, indent=2))
# Set secure permissions (owner read/write only)
# Note: On Windows, this has limited effect
try:
self._credentials_path.chmod(0o600)
except OSError:
pass # Ignore on Windows
logger.info(
"credentials_saved",
path=str(self._credentials_path),
agent_id=self._agent_id,
)
except Exception as e:
logger.error("credentials_save_failed", error=str(e))
def load_credentials(self) -> bool:
"""Load persisted credentials from disk.
Returns:
True if credentials were loaded successfully
"""
if not self._credentials_path.exists():
return False
try:
data = json.loads(self._credentials_path.read_text())
self._agent_id = data.get("agent_id")
self._tenant_id = data.get("tenant_id")
# Load appropriate credential
if "agent_secret" in data:
self._agent_secret = data["agent_secret"]
elif "token" in data:
self._token = data["token"]
if self._agent_id:
logger.info(
"credentials_loaded",
agent_id=self._agent_id,
tenant_id=self._tenant_id,
auth_type="secure" if self._agent_secret else "legacy",
)
return True
return False
except Exception as e:
logger.error("credentials_load_failed", error=str(e))
return False
def clear_credentials(self) -> None:
"""Clear persisted credentials (useful for re-registration)."""
self._agent_id = None
self._agent_secret = None
self._token = None
self._tenant_id = None
if self._credentials_path.exists():
try:
self._credentials_path.unlink()
logger.info("credentials_cleared")
except Exception as e:
logger.error("credentials_clear_failed", error=str(e))
self._invalidate_client()
async def close(self) -> None:
"""Close the HTTP client."""
if self._client and not self._client.is_closed:

View File

@ -27,10 +27,22 @@ class Settings(BaseSettings):
hostname: str = Field(default_factory=socket.gethostname, description="Agent hostname")
agent_id: Optional[str] = Field(default=None, description="Assigned by orchestrator after registration")
# Tenant assignment
# New secure registration (recommended)
registration_token: Optional[str] = Field(
default=None,
description="Registration token from orchestrator. Required for first-time registration."
)
# Agent credentials (set after registration, persisted to disk)
agent_secret: Optional[str] = Field(
default=None,
description="Agent secret for authentication. Set after registration."
)
# Tenant assignment (derived from registration token, or can be set directly for legacy)
tenant_id: Optional[str] = Field(
default=None,
description="Tenant UUID this agent belongs to. Required in production."
description="Tenant UUID this agent belongs to. Set after registration."
)
# Orchestrator connection
@ -41,8 +53,12 @@ class Settings(BaseSettings):
default="http://host.docker.internal:8000",
description="Orchestrator API base URL"
)
# Token may be None initially; will be set after registration or provided via env
agent_token: Optional[str] = Field(default=None, description="Authentication token for API calls")
# Legacy auth (deprecated - use registration_token + agent_secret instead)
agent_token: Optional[str] = Field(
default=None,
description="[DEPRECATED] Legacy authentication token. Use agent_secret instead."
)
# Timing intervals (seconds)
heartbeat_interval: int = Field(default=30, ge=5, le=300, description="Heartbeat interval")
@ -82,6 +98,10 @@ class Settings(BaseSettings):
default="~/.letsbe-agent/pending_results.json",
description="Path for buffering unsent task results"
)
credentials_path: str = Field(
default="~/.letsbe-agent/credentials.json",
description="Path for persisting agent credentials after registration"
)
@lru_cache