From 2d27775a2c59877bf1c765dc1a56f3db65f746b7 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 9 Dec 2025 15:25:44 +0100 Subject: [PATCH] fix: Persist credentials across container and orchestrator restarts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, the agent would clear credentials on ANY heartbeat failure, causing infinite re-registration loops when: - Agent container was updated while orchestrator was running - Orchestrator was restarted while agent was running Changes: - Add HeartbeatStatus enum and HeartbeatResult dataclass - Modify heartbeat() to return status info instead of just bool - Only clear credentials on 401/403 (AUTH_FAILED) - Keep credentials on transient errors (NETWORK_ERROR, SERVER_ERROR) - Handle AUTH_FAILED in heartbeat_loop() for mid-session invalidation Scenarios now handled: - Agent restart: keeps creds, retries until orchestrator responds - Orchestrator restart: keeps creds, retries with backoff - Admin deletes agent: clears creds, breaks out for re-registration 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- app/agent.py | 81 +++++++++++++++++++++--------- app/clients/orchestrator_client.py | 59 +++++++++++++++++++--- 2 files changed, 108 insertions(+), 32 deletions(-) diff --git a/app/agent.py b/app/agent.py index d1e1081..f2dd77b 100644 --- a/app/agent.py +++ b/app/agent.py @@ -5,7 +5,13 @@ import platform import random from typing import Optional -from app.clients.orchestrator_client import CircuitBreakerOpen, EventLevel, OrchestratorClient +from app.clients.orchestrator_client import ( + CircuitBreakerOpen, + EventLevel, + HeartbeatResult, + HeartbeatStatus, + OrchestratorClient, +) from app.config import Settings, get_settings from app.utils.logger import get_logger @@ -73,16 +79,37 @@ class Agent: ) # Verify credentials still work by sending heartbeat - if await self.client.heartbeat(): + result = await self.client.heartbeat() + + if result.status == HeartbeatStatus.SUCCESS: logger.info("credentials_verified") # Retry any pending results from previous session await self.client.retry_pending_results() return True - else: - # Credentials may be invalid, clear and re-register - logger.warning("credentials_invalid_reregistering") + + elif result.status == HeartbeatStatus.AUTH_FAILED: + # Only clear credentials on explicit auth failure (401/403) + logger.warning("credentials_invalid_clearing", reason=result.message) self.client.clear_credentials() self._registered = False + # Fall through to registration + + elif result.status == HeartbeatStatus.NOT_REGISTERED: + # Should not happen if load_credentials succeeded, but handle it + logger.warning("credentials_not_registered_state") + self._registered = False + # Fall through to registration + + elif result.status in (HeartbeatStatus.SERVER_ERROR, HeartbeatStatus.NETWORK_ERROR): + # Transient error - keep credentials, retry later + # Do NOT retry_pending_results here - orchestrator is unhealthy + # Main heartbeat loop will handle retries with backoff + logger.warning( + "credentials_verification_transient_error", + status=result.status.value, + message=result.message, + ) + return True # Check if we have registration token or can do legacy registration if not self.settings.registration_token and not self.settings.tenant_id: @@ -169,31 +196,35 @@ class Agent: backoff_multiplier = 1.0 while not self._shutdown_event.is_set(): - try: - success = await self.client.heartbeat() + result = await self.client.heartbeat() - if success: - consecutive_failures = 0 - backoff_multiplier = 1.0 - logger.debug("heartbeat_sent", agent_id=self.client.agent_id) - else: - consecutive_failures += 1 - backoff_multiplier = min(backoff_multiplier * 1.5, 4.0) - logger.warning( - "heartbeat_failed", - consecutive_failures=consecutive_failures, - ) + if result.status == HeartbeatStatus.SUCCESS: + consecutive_failures = 0 + backoff_multiplier = 1.0 + logger.debug("heartbeat_sent", agent_id=self.client.agent_id) - except CircuitBreakerOpen: - logger.warning("heartbeat_circuit_breaker_open") - backoff_multiplier = 4.0 # Max backoff during circuit break + elif result.status == HeartbeatStatus.AUTH_FAILED: + # Credentials truly invalid (e.g., agent deleted in orchestrator) + logger.warning( + "heartbeat_auth_failed_clearing_credentials", + message=result.message, + ) + self.client.clear_credentials() + self._registered = False # Outer loop will re-run register() + consecutive_failures = 0 + backoff_multiplier = 1.0 + # Break out of heartbeat loop to trigger re-registration + break - except Exception as e: + else: + # NETWORK_ERROR / SERVER_ERROR / NOT_REGISTERED + # Transient issues - keep credentials, just backoff consecutive_failures += 1 backoff_multiplier = min(backoff_multiplier * 1.5, 4.0) - logger.error( - "heartbeat_error", - error=str(e), + logger.warning( + "heartbeat_failed_transient", + status=result.status.value, + message=result.message, consecutive_failures=consecutive_failures, ) diff --git a/app/clients/orchestrator_client.py b/app/clients/orchestrator_client.py index d2caaf7..2ef94f5 100644 --- a/app/clients/orchestrator_client.py +++ b/app/clients/orchestrator_client.py @@ -52,6 +52,24 @@ class CircuitBreakerOpen(Exception): pass +class HeartbeatStatus(str, Enum): + """Status of a heartbeat attempt.""" + + SUCCESS = "success" + AUTH_FAILED = "auth_failed" # 401/403 - credentials invalid + SERVER_ERROR = "server_error" # 5xx - transient, retry + NETWORK_ERROR = "network_error" # Connection failed, timeout + NOT_REGISTERED = "not_registered" # No agent_id/secret set + + +@dataclass +class HeartbeatResult: + """Result of a heartbeat attempt with status and optional message.""" + + status: HeartbeatStatus + message: str = "" + + class OrchestratorClient: """Async client for Orchestrator REST API. @@ -377,15 +395,20 @@ class OrchestratorClient: ) return self._agent_id, self._token, self._tenant_id - async def heartbeat(self) -> bool: + async def heartbeat(self) -> HeartbeatResult: """Send heartbeat to orchestrator. Returns: - True if heartbeat was acknowledged + HeartbeatResult with status indicating success or failure type. + - SUCCESS: Heartbeat acknowledged (200) + - AUTH_FAILED: Credentials invalid (401/403) + - SERVER_ERROR: Server issue (5xx), transient + - NETWORK_ERROR: Connection failed, transient + - NOT_REGISTERED: No agent_id set """ if not self._agent_id: logger.warning("heartbeat_skipped", reason="not_registered") - return False + return HeartbeatResult(HeartbeatStatus.NOT_REGISTERED, "No agent_id set") try: response = await self._request_with_retry( @@ -393,10 +416,32 @@ class OrchestratorClient: f"{self.API_PREFIX}/agents/{self._agent_id}/heartbeat", max_retries=1, # Don't retry too aggressively for heartbeats ) - return response.status_code == 200 - except (httpx.HTTPError, CircuitBreakerOpen) as e: - logger.warning("heartbeat_failed", error=str(e)) - return False + + if response.status_code == 200: + return HeartbeatResult(HeartbeatStatus.SUCCESS) + elif response.status_code in (401, 403): + msg = f"HTTP {response.status_code}: {response.text[:200]}" + logger.warning("heartbeat_auth_failed", status_code=response.status_code) + return HeartbeatResult(HeartbeatStatus.AUTH_FAILED, msg) + elif response.status_code >= 500: + msg = f"HTTP {response.status_code}: {response.text[:200]}" + logger.warning("heartbeat_server_error", status_code=response.status_code) + return HeartbeatResult(HeartbeatStatus.SERVER_ERROR, msg) + else: + # 4xx other than 401/403 - treat as auth failure + msg = f"HTTP {response.status_code}: {response.text[:200]}" + logger.warning("heartbeat_client_error", status_code=response.status_code) + return HeartbeatResult(HeartbeatStatus.AUTH_FAILED, msg) + + except (httpx.ConnectError, httpx.TimeoutException) as e: + logger.warning("heartbeat_network_error", error=str(e)) + return HeartbeatResult(HeartbeatStatus.NETWORK_ERROR, str(e)) + except httpx.HTTPError as e: + logger.warning("heartbeat_http_error", error=str(e)) + return HeartbeatResult(HeartbeatStatus.NETWORK_ERROR, str(e)) + except CircuitBreakerOpen: + logger.warning("heartbeat_circuit_breaker_open") + return HeartbeatResult(HeartbeatStatus.NETWORK_ERROR, "Circuit breaker open") async def fetch_next_task(self) -> Optional[Task]: """Fetch the next available task for this agent.