From 51b3050b5cd62cdd0203458f03487ba1f86a88ba Mon Sep 17 00:00:00 2001 From: Matt Date: Sun, 7 Dec 2025 11:11:59 +0100 Subject: [PATCH] feat: add secure registration with credential persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add REGISTRATION_TOKEN config for new secure registration flow - Add agent_secret and credentials_path config options - Update HTTP client to use X-Agent-Id/X-Agent-Secret headers - Add credential persistence to ~/.letsbe-agent/credentials.json - Load persisted credentials on startup to survive restarts - Verify credentials via heartbeat before skipping registration - Maintain backward compatibility with legacy Bearer token auth The agent now: 1. First tries to load persisted credentials 2. Validates them via heartbeat 3. Falls back to registration if invalid/missing 4. Persists new credentials after successful registration 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- app/agent.py | 43 +++++- app/clients/orchestrator_client.py | 238 +++++++++++++++++++++++++---- app/config.py | 28 +++- 3 files changed, 272 insertions(+), 37 deletions(-) diff --git a/app/agent.py b/app/agent.py index 0b51dbd..d1e1081 100644 --- a/app/agent.py +++ b/app/agent.py @@ -49,30 +49,65 @@ class Agent: async def register(self, max_retries: int = 5) -> bool: """Register agent with the orchestrator. + First attempts to load persisted credentials from a previous session. + If credentials exist and are valid, skips registration. + Otherwise, registers using REGISTRATION_TOKEN (new) or TENANT_ID (legacy). + Args: max_retries: Maximum registration attempts Returns: - True if registration succeeded + True if registration succeeded or credentials were loaded """ if self._registered: logger.info("agent_already_registered", agent_id=self.client.agent_id) return True + # Try to load persisted credentials first + if self.client.load_credentials(): + self._registered = True + logger.info( + "credentials_restored", + agent_id=self.client.agent_id, + tenant_id=self.client.tenant_id, + ) + + # Verify credentials still work by sending heartbeat + if await self.client.heartbeat(): + logger.info("credentials_verified") + # Retry any pending results from previous session + await self.client.retry_pending_results() + return True + else: + # Credentials may be invalid, clear and re-register + logger.warning("credentials_invalid_reregistering") + self.client.clear_credentials() + self._registered = False + + # Check if we have registration token or can do legacy registration + if not self.settings.registration_token and not self.settings.tenant_id: + # For backward compatibility, allow registration without token + # (orchestrator will create shared agent) + logger.warning( + "registration_no_token", + message="No REGISTRATION_TOKEN provided. Using legacy registration.", + ) + metadata = self._get_metadata() for attempt in range(max_retries): try: - # register() returns (agent_id, token) - agent_id, token = await self.client.register(metadata) + # register() returns (agent_id, secret_or_token, tenant_id) + agent_id, secret, tenant_id = await self.client.register(metadata) self._registered = True logger.info( "agent_registered", agent_id=agent_id, + tenant_id=tenant_id, hostname=self.settings.hostname, version=self.settings.agent_version, - token_received=bool(token), + auth_type="secure" if self.client.agent_secret else "legacy", ) # Send registration event diff --git a/app/clients/orchestrator_client.py b/app/clients/orchestrator_client.py index ce34b1d..6699279 100644 --- a/app/clients/orchestrator_client.py +++ b/app/clients/orchestrator_client.py @@ -58,9 +58,11 @@ class OrchestratorClient: Features: - Exponential backoff with jitter on failures - Circuit breaker to prevent hammering during outages - - X-Agent-Version header on all requests + - X-Agent-Id and X-Agent-Secret headers for new auth + - Backward compatible with legacy Bearer token auth - Event logging to orchestrator - Local result persistence for retry + - Credential persistence to survive restarts """ # API version prefix for all endpoints @@ -70,9 +72,17 @@ class OrchestratorClient: self.settings = settings or get_settings() self._client: Optional[httpx.AsyncClient] = None self._agent_id: Optional[str] = None - self._token: Optional[str] = None # Token received from registration or env + self._agent_secret: Optional[str] = None # New auth scheme + self._tenant_id: Optional[str] = None # Set after registration + self._token: Optional[str] = None # Legacy token (deprecated) - # Initialize token from settings if provided + # Initialize from settings if provided + if self.settings.agent_id: + self._agent_id = self.settings.agent_id + if self.settings.agent_secret: + self._agent_secret = self.settings.agent_secret + if self.settings.tenant_id: + self._tenant_id = self.settings.tenant_id if self.settings.agent_token: self._token = self.settings.agent_token @@ -80,8 +90,9 @@ class OrchestratorClient: self._consecutive_failures = 0 self._circuit_open_until: Optional[float] = None - # Pending results path + # Persistence paths self._pending_path = Path(self.settings.pending_results_path).expanduser() + self._credentials_path = Path(self.settings.credentials_path).expanduser() @property def agent_id(self) -> Optional[str]: @@ -92,30 +103,73 @@ class OrchestratorClient: def agent_id(self, value: str) -> None: """Set the agent ID after registration.""" self._agent_id = value + self._invalidate_client() + + @property + def agent_secret(self) -> Optional[str]: + """Get the current agent secret (new auth scheme).""" + return self._agent_secret + + @agent_secret.setter + def agent_secret(self, value: str) -> None: + """Set the agent secret after registration.""" + self._agent_secret = value + self._invalidate_client() + + @property + def tenant_id(self) -> Optional[str]: + """Get the tenant ID.""" + return self._tenant_id + + @tenant_id.setter + def tenant_id(self, value: str) -> None: + """Set the tenant ID.""" + self._tenant_id = value @property def token(self) -> Optional[str]: - """Get the current authentication token.""" + """Get the legacy authentication token (deprecated).""" return self._token @token.setter def token(self, value: str) -> None: - """Set the authentication token (from registration or env).""" + """Set the legacy authentication token (deprecated).""" self._token = value - # Force client recreation to pick up new headers + self._invalidate_client() + + @property + def is_registered(self) -> bool: + """Check if agent has credentials (registered or loaded).""" + return self._agent_id is not None and ( + self._agent_secret is not None or self._token is not None + ) + + def _invalidate_client(self) -> None: + """Force client recreation to pick up new headers.""" if self._client and not self._client.is_closed: asyncio.create_task(self._client.aclose()) self._client = None def _get_headers(self) -> dict[str, str]: - """Get headers for API requests including version and auth.""" + """Get headers for API requests including version and auth. + + Uses new X-Agent-Id/X-Agent-Secret scheme if available, + falls back to legacy Bearer token for backward compatibility. + """ headers = { "Content-Type": "application/json", "X-Agent-Version": self.settings.agent_version, "X-Agent-Hostname": self.settings.hostname, } - if self._token: + + # Prefer new auth scheme + if self._agent_id and self._agent_secret: + headers["X-Agent-Id"] = self._agent_id + headers["X-Agent-Secret"] = self._agent_secret + # Fall back to legacy Bearer token + elif self._token: headers["Authorization"] = f"Bearer {self._token}" + return headers async def _get_client(self) -> httpx.AsyncClient: @@ -243,14 +297,18 @@ class OrchestratorClient: raise last_error or Exception("Unknown error during request") - async def register(self, metadata: Optional[dict] = None) -> tuple[str, str]: + async def register(self, metadata: Optional[dict] = None) -> tuple[str, str, Optional[str]]: """Register agent with the orchestrator. + Supports two registration flows: + 1. New (secure): Uses REGISTRATION_TOKEN from settings + 2. Legacy (deprecated): Uses TENANT_ID directly + Args: metadata: Optional metadata about the agent Returns: - Tuple of (agent_id, token) assigned by orchestrator + Tuple of (agent_id, secret_or_token, tenant_id) """ payload = { "hostname": self.settings.hostname, @@ -258,15 +316,24 @@ class OrchestratorClient: "metadata": metadata or {}, } - # Include tenant_id if configured - if self.settings.tenant_id: - payload["tenant_id"] = self.settings.tenant_id - - logger.info( - "registering_agent", - hostname=self.settings.hostname, - tenant_id=self.settings.tenant_id, - ) + # Determine registration flow + if self.settings.registration_token: + # New secure registration flow + payload["registration_token"] = self.settings.registration_token + logger.info( + "registering_agent_secure", + hostname=self.settings.hostname, + ) + else: + # Legacy registration flow (deprecated) + if self.settings.tenant_id: + payload["tenant_id"] = self.settings.tenant_id + logger.warning( + "registering_agent_legacy", + hostname=self.settings.hostname, + tenant_id=self.settings.tenant_id, + message="Using deprecated registration flow. Consider using REGISTRATION_TOKEN.", + ) response = await self._request_with_retry( "POST", @@ -276,14 +343,37 @@ class OrchestratorClient: response.raise_for_status() data = response.json() - self._agent_id = data["agent_id"] - # Use property setter to force client recreation with new token - new_token = data.get("token") - if new_token: - self.token = new_token # Property setter forces client recreation - logger.info("agent_registered", agent_id=self._agent_id) - return self._agent_id, self._token + # Handle response based on registration flow + if "agent_secret" in data: + # New secure registration response + self._agent_id = data["agent_id"] + self._agent_secret = data["agent_secret"] + self._tenant_id = data.get("tenant_id") + + # Persist credentials for restart recovery + await self._save_credentials() + + logger.info( + "agent_registered_secure", + agent_id=self._agent_id, + tenant_id=self._tenant_id, + ) + return self._agent_id, self._agent_secret, self._tenant_id + else: + # Legacy registration response + self._agent_id = data["agent_id"] + self._token = data.get("token") + self._tenant_id = self.settings.tenant_id + + # Also persist legacy credentials + await self._save_credentials() + + logger.info( + "agent_registered_legacy", + agent_id=self._agent_id, + ) + return self._agent_id, self._token, self._tenant_id async def heartbeat(self) -> bool: """Send heartbeat to orchestrator. @@ -312,15 +402,15 @@ class OrchestratorClient: Returns: Task if available, None otherwise """ - if not self._agent_id: + if not self.is_registered: logger.warning("fetch_task_skipped", reason="not_registered") return None try: + # Note: agent_id is now in headers (X-Agent-Id), not query params response = await self._request_with_retry( "GET", f"{self.API_PREFIX}/tasks/next", - params={"agent_id": self._agent_id}, max_retries=1, ) @@ -525,6 +615,96 @@ class OrchestratorClient: return successful + async def _save_credentials(self) -> None: + """Persist agent credentials to disk for restart recovery. + + Credentials are stored with secure file permissions (0600). + """ + try: + # Ensure directory exists + self._credentials_path.parent.mkdir(parents=True, exist_ok=True) + + credentials = { + "agent_id": self._agent_id, + "tenant_id": self._tenant_id, + } + + # Include appropriate credential based on auth type + if self._agent_secret: + credentials["agent_secret"] = self._agent_secret + elif self._token: + credentials["token"] = self._token + + # Write with secure permissions + self._credentials_path.write_text(json.dumps(credentials, indent=2)) + + # Set secure permissions (owner read/write only) + # Note: On Windows, this has limited effect + try: + self._credentials_path.chmod(0o600) + except OSError: + pass # Ignore on Windows + + logger.info( + "credentials_saved", + path=str(self._credentials_path), + agent_id=self._agent_id, + ) + + except Exception as e: + logger.error("credentials_save_failed", error=str(e)) + + def load_credentials(self) -> bool: + """Load persisted credentials from disk. + + Returns: + True if credentials were loaded successfully + """ + if not self._credentials_path.exists(): + return False + + try: + data = json.loads(self._credentials_path.read_text()) + + self._agent_id = data.get("agent_id") + self._tenant_id = data.get("tenant_id") + + # Load appropriate credential + if "agent_secret" in data: + self._agent_secret = data["agent_secret"] + elif "token" in data: + self._token = data["token"] + + if self._agent_id: + logger.info( + "credentials_loaded", + agent_id=self._agent_id, + tenant_id=self._tenant_id, + auth_type="secure" if self._agent_secret else "legacy", + ) + return True + return False + + except Exception as e: + logger.error("credentials_load_failed", error=str(e)) + return False + + def clear_credentials(self) -> None: + """Clear persisted credentials (useful for re-registration).""" + self._agent_id = None + self._agent_secret = None + self._token = None + self._tenant_id = None + + if self._credentials_path.exists(): + try: + self._credentials_path.unlink() + logger.info("credentials_cleared") + except Exception as e: + logger.error("credentials_clear_failed", error=str(e)) + + self._invalidate_client() + async def close(self) -> None: """Close the HTTP client.""" if self._client and not self._client.is_closed: diff --git a/app/config.py b/app/config.py index 9f144fa..425b91f 100644 --- a/app/config.py +++ b/app/config.py @@ -27,10 +27,22 @@ class Settings(BaseSettings): hostname: str = Field(default_factory=socket.gethostname, description="Agent hostname") agent_id: Optional[str] = Field(default=None, description="Assigned by orchestrator after registration") - # Tenant assignment + # New secure registration (recommended) + registration_token: Optional[str] = Field( + default=None, + description="Registration token from orchestrator. Required for first-time registration." + ) + + # Agent credentials (set after registration, persisted to disk) + agent_secret: Optional[str] = Field( + default=None, + description="Agent secret for authentication. Set after registration." + ) + + # Tenant assignment (derived from registration token, or can be set directly for legacy) tenant_id: Optional[str] = Field( default=None, - description="Tenant UUID this agent belongs to. Required in production." + description="Tenant UUID this agent belongs to. Set after registration." ) # Orchestrator connection @@ -41,8 +53,12 @@ class Settings(BaseSettings): default="http://host.docker.internal:8000", description="Orchestrator API base URL" ) - # Token may be None initially; will be set after registration or provided via env - agent_token: Optional[str] = Field(default=None, description="Authentication token for API calls") + + # Legacy auth (deprecated - use registration_token + agent_secret instead) + agent_token: Optional[str] = Field( + default=None, + description="[DEPRECATED] Legacy authentication token. Use agent_secret instead." + ) # Timing intervals (seconds) heartbeat_interval: int = Field(default=30, ge=5, le=300, description="Heartbeat interval") @@ -82,6 +98,10 @@ class Settings(BaseSettings): default="~/.letsbe-agent/pending_results.json", description="Path for buffering unsent task results" ) + credentials_path: str = Field( + default="~/.letsbe-agent/credentials.json", + description="Path for persisting agent credentials after registration" + ) @lru_cache