"""Agent lifecycle management: registration and heartbeat.""" import asyncio import platform import random from typing import Optional from app.clients.orchestrator_client import ( CircuitBreakerOpen, EventLevel, HeartbeatResult, HeartbeatStatus, OrchestratorClient, ) from app.config import Settings, get_settings from app.utils.logger import get_logger logger = get_logger("agent") class Agent: """Agent lifecycle manager. Handles: - Registration with orchestrator - Periodic heartbeat - Graceful shutdown """ def __init__( self, client: Optional[OrchestratorClient] = None, settings: Optional[Settings] = None, ): self.settings = settings or get_settings() self.client = client or OrchestratorClient(self.settings) self._shutdown_event = asyncio.Event() self._registered = False @property def is_registered(self) -> bool: """Check if agent is registered with orchestrator.""" return self._registered and self.client.agent_id is not None def _get_metadata(self) -> dict: """Gather agent metadata for registration.""" return { "platform": platform.system(), "platform_version": platform.version(), "python_version": platform.python_version(), "hostname": self.settings.hostname, "version": self.settings.agent_version, } async def register(self, max_retries: int = 5) -> bool: """Register agent with the orchestrator. First attempts to load persisted credentials from a previous session. If credentials exist and are valid, skips registration. Otherwise, registers using REGISTRATION_TOKEN (new) or TENANT_ID (legacy). Args: max_retries: Maximum registration attempts Returns: True if registration succeeded or credentials were loaded """ if self._registered: logger.info("agent_already_registered", agent_id=self.client.agent_id) return True # Try to load persisted credentials first if self.client.load_credentials(): self._registered = True logger.info( "credentials_restored", agent_id=self.client.agent_id, tenant_id=self.client.tenant_id, ) # Verify credentials still work by sending heartbeat result = await self.client.heartbeat() if result.status == HeartbeatStatus.SUCCESS: logger.info("credentials_verified") # Retry any pending results from previous session await self.client.retry_pending_results() return True elif result.status == HeartbeatStatus.AUTH_FAILED: # Only clear credentials on explicit auth failure (401/403) logger.warning("credentials_invalid_clearing", reason=result.message) self.client.clear_credentials() self._registered = False # Fall through to registration elif result.status == HeartbeatStatus.NOT_REGISTERED: # Should not happen if load_credentials succeeded, but handle it logger.warning("credentials_not_registered_state") self._registered = False # Fall through to registration elif result.status in (HeartbeatStatus.SERVER_ERROR, HeartbeatStatus.NETWORK_ERROR): # Transient error - keep credentials, retry later # Do NOT retry_pending_results here - orchestrator is unhealthy # Main heartbeat loop will handle retries with backoff logger.warning( "credentials_verification_transient_error", status=result.status.value, message=result.message, ) return True # Check if we have registration token or can do legacy registration if not self.settings.registration_token and not self.settings.tenant_id: # For backward compatibility, allow registration without token # (orchestrator will create shared agent) logger.warning( "registration_no_token", message="No REGISTRATION_TOKEN provided. Using legacy registration.", ) metadata = self._get_metadata() for attempt in range(max_retries): try: # register() returns (agent_id, secret_or_token, tenant_id) agent_id, secret, tenant_id = await self.client.register(metadata) self._registered = True logger.info( "agent_registered", agent_id=agent_id, tenant_id=tenant_id, hostname=self.settings.hostname, version=self.settings.agent_version, auth_type="secure" if self.client.agent_secret else "legacy", ) # Send registration event await self.client.send_event( EventLevel.INFO, f"Agent registered: {self.settings.hostname}", metadata=metadata, ) # Retry any pending results from previous session await self.client.retry_pending_results() return True except CircuitBreakerOpen: logger.warning( "registration_circuit_breaker_open", attempt=attempt + 1, ) # Wait for cooldown await asyncio.sleep(self.settings.circuit_breaker_cooldown) except Exception as e: delay = self.settings.backoff_base * (2 ** attempt) delay = min(delay, self.settings.backoff_max) # Add jitter delay += random.uniform(0, delay * 0.25) logger.error( "registration_failed", attempt=attempt + 1, max_retries=max_retries, error=str(e), retry_in=delay, ) if attempt < max_retries - 1: await asyncio.sleep(delay) logger.error("registration_exhausted", max_retries=max_retries) return False async def heartbeat_loop(self) -> None: """Run the heartbeat loop until shutdown. Sends periodic heartbeats to the orchestrator. Uses exponential backoff on failures. """ if not self.is_registered: logger.warning("heartbeat_loop_not_registered") return logger.info( "heartbeat_loop_started", interval=self.settings.heartbeat_interval, ) consecutive_failures = 0 backoff_multiplier = 1.0 while not self._shutdown_event.is_set(): result = await self.client.heartbeat() if result.status == HeartbeatStatus.SUCCESS: consecutive_failures = 0 backoff_multiplier = 1.0 logger.debug("heartbeat_sent", agent_id=self.client.agent_id) elif result.status == HeartbeatStatus.AUTH_FAILED: # Credentials truly invalid (e.g., agent deleted in orchestrator) logger.warning( "heartbeat_auth_failed_clearing_credentials", message=result.message, ) self.client.clear_credentials() self._registered = False # Outer loop will re-run register() consecutive_failures = 0 backoff_multiplier = 1.0 # Break out of heartbeat loop to trigger re-registration break else: # NETWORK_ERROR / SERVER_ERROR / NOT_REGISTERED # Transient issues - keep credentials, just backoff consecutive_failures += 1 backoff_multiplier = min(backoff_multiplier * 1.5, 4.0) logger.warning( "heartbeat_failed_transient", status=result.status.value, message=result.message, consecutive_failures=consecutive_failures, ) # Calculate next interval with backoff interval = self.settings.heartbeat_interval * backoff_multiplier # Add jitter (0-10% of interval) interval += random.uniform(0, interval * 0.1) # Wait for next heartbeat or shutdown try: await asyncio.wait_for( self._shutdown_event.wait(), timeout=interval, ) break # Shutdown requested except asyncio.TimeoutError: pass # Normal timeout, continue loop logger.info("heartbeat_loop_stopped") async def shutdown(self) -> None: """Initiate graceful shutdown.""" logger.info("agent_shutdown_initiated") # Signal shutdown self._shutdown_event.set() # Send shutdown event if we can if self.is_registered: try: await self.client.send_event( EventLevel.INFO, f"Agent shutting down: {self.settings.hostname}", ) except Exception: pass # Best effort # Close client await self.client.close() logger.info("agent_shutdown_complete")