From b8e3cc368540f991f6cb504d60f4e0ec6f8a0462 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 9 Dec 2025 13:23:21 +0100 Subject: [PATCH] fix: Added circuit breaker --- .claude/settings.local.json | 10 ++++++- app/clients/orchestrator_client.py | 15 ++++++++++ app/main.py | 45 +++++++++++++++++++++++++++--- 3 files changed, 65 insertions(+), 5 deletions(-) diff --git a/.claude/settings.local.json b/.claude/settings.local.json index e1e0cd3..b94a857 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -27,7 +27,15 @@ "Bash(git push:*)", "Bash(git remote:*)", "Bash(git config:*)", - "Bash(git fetch:*)" + "Bash(git fetch:*)", + "mcp__serena__initial_instructions", + "mcp__serena__check_onboarding_performed", + "mcp__serena__list_dir", + "mcp__serena__think_about_collected_information", + "Bash(.venvScriptspython.exe -m pytest tests/executors/test_nextcloud_executor.py -v)", + "Bash(set PYTHONPATH=%cd%)", + "Bash(python:*)", + "Bash(pip install:*)" ], "deny": [], "ask": [] diff --git a/app/clients/orchestrator_client.py b/app/clients/orchestrator_client.py index fab4c23..d2caaf7 100644 --- a/app/clients/orchestrator_client.py +++ b/app/clients/orchestrator_client.py @@ -707,6 +707,21 @@ class OrchestratorClient: self._invalidate_client() + def reset_circuit_breaker(self) -> None: + """Manually reset the circuit breaker. + + Useful when retrying registration after a long wait period, + to give the orchestrator a fresh chance to respond. + """ + if self._circuit_open_until is not None or self._consecutive_failures > 0: + logger.info( + "circuit_breaker_manual_reset", + was_open=self._circuit_open_until is not None, + previous_failures=self._consecutive_failures, + ) + self._circuit_open_until = None + self._consecutive_failures = 0 + async def close(self) -> None: """Close the HTTP client.""" if self._client and not self._client.is_closed: diff --git a/app/main.py b/app/main.py index 4ee4349..8b8448a 100644 --- a/app/main.py +++ b/app/main.py @@ -103,10 +103,47 @@ async def main() -> int: pass try: - # Register with orchestrator - if not await agent.register(): - logger.error("registration_failed_exit") - return 1 + # Register with orchestrator - retry indefinitely until success or shutdown + # This ensures the agent survives orchestrator restarts/updates + registration_attempt = 0 + while not shutdown_event.is_set(): + registration_attempt += 1 + + # Reset circuit breaker before each attempt to give orchestrator a fresh chance + # This is important after waiting - orchestrator may have come back up + client.reset_circuit_breaker() + + logger.info( + "registration_attempt", + attempt=registration_attempt, + message="Attempting to register with orchestrator...", + ) + + if await agent.register(): + break # Registration successful + + # Wait before retrying, with exponential backoff up to 60 seconds + wait_time = min(30 * (1.5 ** min(registration_attempt - 1, 4)), 60) + logger.warning( + "registration_retry_wait", + attempt=registration_attempt, + wait_seconds=wait_time, + message="Orchestrator unavailable, will retry...", + ) + + # Wait but allow shutdown to interrupt + try: + await asyncio.wait_for(shutdown_event.wait(), timeout=wait_time) + # If we get here, shutdown was requested + logger.info("shutdown_during_registration") + return 0 + except asyncio.TimeoutError: + # Normal timeout, continue to retry + pass + + if shutdown_event.is_set(): + logger.info("shutdown_before_registration_complete") + return 0 # Start background tasks heartbeat_task = asyncio.create_task(