letsbe-sysadmin/app/main.py

201 lines
5.7 KiB
Python
Raw Permalink Normal View History

"""Main entry point for the LetsBe SysAdmin Agent."""
import asyncio
import signal
import sys
from pathlib import Path
from typing import Optional
from app import __version__
from app.agent import Agent
from app.clients.orchestrator_client import OrchestratorClient
from app.config import get_settings
from app.task_manager import TaskManager
from app.utils.logger import configure_logging, get_logger
def print_banner() -> None:
"""Print startup banner."""
settings = get_settings()
banner = f"""
+==============================================================+
| LetsBe SysAdmin Agent v{__version__:<24}|
+==============================================================+
| Hostname: {settings.hostname:<45}|
| Orchestrator: {settings.orchestrator_url:<45}|
| Log Level: {settings.log_level:<45}|
+==============================================================+
"""
print(banner)
def validate_mounted_directories() -> None:
"""Check that required host directories are mounted.
Logs warnings if directories are missing but does not prevent startup.
"""
logger = get_logger("main")
required_dirs = [
"/opt/letsbe/env",
"/opt/letsbe/stacks",
"/opt/letsbe/nginx",
]
missing = []
for dir_path in required_dirs:
if not Path(dir_path).is_dir():
missing.append(dir_path)
if missing:
logger.warning(
"mounted_directories_missing",
missing=missing,
message="Some host directories are not mounted. Tasks requiring these paths will fail.",
)
else:
logger.info("mounted_directories_ok", directories=required_dirs)
async def main() -> int:
"""Main async entry point.
Returns:
Exit code (0 for success, non-zero for failure)
"""
settings = get_settings()
# Configure logging
configure_logging(settings.log_level, settings.log_json)
logger = get_logger("main")
print_banner()
validate_mounted_directories()
logger.info(
"agent_starting",
version=__version__,
hostname=settings.hostname,
orchestrator_url=settings.orchestrator_url,
)
# Create components
client = OrchestratorClient(settings)
agent = Agent(client, settings)
task_manager = TaskManager(client, settings)
# Shutdown handler
shutdown_event = asyncio.Event()
def handle_signal(sig: int) -> None:
"""Handle shutdown signals."""
sig_name = signal.Signals(sig).name
logger.info("signal_received", signal=sig_name)
shutdown_event.set()
# Register signal handlers (Unix)
if sys.platform != "win32":
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: handle_signal(s))
else:
# Windows: Use default CTRL+C handling
pass
try:
2025-12-09 13:23:21 +01:00
# Register with orchestrator - retry indefinitely until success or shutdown
# This ensures the agent survives orchestrator restarts/updates
registration_attempt = 0
while not shutdown_event.is_set():
registration_attempt += 1
# Reset circuit breaker before each attempt to give orchestrator a fresh chance
# This is important after waiting - orchestrator may have come back up
client.reset_circuit_breaker()
logger.info(
"registration_attempt",
attempt=registration_attempt,
message="Attempting to register with orchestrator...",
)
if await agent.register():
break # Registration successful
# Wait before retrying, with exponential backoff up to 60 seconds
wait_time = min(30 * (1.5 ** min(registration_attempt - 1, 4)), 60)
logger.warning(
"registration_retry_wait",
attempt=registration_attempt,
wait_seconds=wait_time,
message="Orchestrator unavailable, will retry...",
)
# Wait but allow shutdown to interrupt
try:
await asyncio.wait_for(shutdown_event.wait(), timeout=wait_time)
# If we get here, shutdown was requested
logger.info("shutdown_during_registration")
return 0
except asyncio.TimeoutError:
# Normal timeout, continue to retry
pass
if shutdown_event.is_set():
logger.info("shutdown_before_registration_complete")
return 0
# Start background tasks
heartbeat_task = asyncio.create_task(
agent.heartbeat_loop(),
name="heartbeat",
)
poll_task = asyncio.create_task(
task_manager.poll_loop(),
name="poll",
)
logger.info("agent_running")
# Wait for shutdown signal
await shutdown_event.wait()
logger.info("shutdown_initiated")
# Graceful shutdown
await task_manager.shutdown()
await agent.shutdown()
# Cancel background tasks
heartbeat_task.cancel()
poll_task.cancel()
# Wait for tasks to finish
await asyncio.gather(
heartbeat_task,
poll_task,
return_exceptions=True,
)
logger.info("agent_stopped")
return 0
except Exception as e:
logger.error("agent_fatal_error", error=str(e))
await client.close()
return 1
def run() -> None:
"""Entry point for CLI."""
try:
exit_code = asyncio.run(main())
sys.exit(exit_code)
except KeyboardInterrupt:
print("\nAgent interrupted by user")
sys.exit(130)
if __name__ == "__main__":
run()