LetsBeBiz-Redesign/letsbe-orchestrator/app/services/hub_telemetry.py

271 lines
9.4 KiB
Python
Raw Permalink Normal View History

"""Hub Telemetry Service - sends aggregated metrics to Hub.
This background service periodically collects metrics from the local database
and sends them to the central Hub for license compliance and usage analytics.
Key design choices:
- Since-last-send windowing (avoids double-counting)
- SQL aggregates (never loads task objects into Python)
- Reusable httpx.AsyncClient (single connection pool)
- Jitter ±15% (prevents thundering herd)
- Exponential backoff on errors (1s 2s 4s ... 60s max)
"""
import asyncio
import logging
import random
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
import httpx
from sqlalchemy import func, select
from app.config import get_settings
from app.db import async_session_maker
from app.models.agent import Agent, AgentStatus
from app.models.server import Server
from app.models.task import Task
logger = logging.getLogger(__name__)
settings = get_settings()
class HubTelemetryService:
"""Background service that sends telemetry to Hub."""
_task: Optional[asyncio.Task] = None
_shutdown_event: Optional[asyncio.Event] = None
_start_time: Optional[datetime] = None
_last_sent_at: Optional[datetime] = None
_client: Optional[httpx.AsyncClient] = None
_consecutive_failures: int = 0
@classmethod
async def start(cls) -> None:
"""Start the telemetry background task. Never blocks startup."""
if not settings.HUB_TELEMETRY_ENABLED:
logger.info("hub_telemetry_disabled")
return
if not settings.HUB_URL:
logger.warning("hub_telemetry_missing_hub_url")
return
if not settings.HUB_API_KEY:
logger.warning("hub_telemetry_missing_hub_api_key")
return
if not settings.INSTANCE_ID:
logger.warning("hub_telemetry_missing_instance_id")
return
now = datetime.now(timezone.utc)
cls._start_time = now
# Initialize window to (now - interval) so first send isn't empty
cls._last_sent_at = now - timedelta(
seconds=settings.HUB_TELEMETRY_INTERVAL_SECONDS
)
cls._shutdown_event = asyncio.Event()
cls._consecutive_failures = 0
cls._client = httpx.AsyncClient(timeout=30.0)
cls._task = asyncio.create_task(cls._telemetry_loop())
logger.info(
"hub_telemetry_started",
extra={
"interval_seconds": settings.HUB_TELEMETRY_INTERVAL_SECONDS,
"hub_url": settings.HUB_URL,
"instance_id": settings.INSTANCE_ID,
},
)
@classmethod
async def stop(cls) -> None:
"""Stop the telemetry background task."""
if cls._shutdown_event:
cls._shutdown_event.set()
if cls._task:
try:
await asyncio.wait_for(cls._task, timeout=5.0)
except asyncio.TimeoutError:
cls._task.cancel()
try:
await cls._task
except asyncio.CancelledError:
pass
if cls._client:
await cls._client.aclose()
cls._client = None
logger.info("hub_telemetry_stopped")
@classmethod
async def _telemetry_loop(cls) -> None:
"""Main telemetry loop with jitter and backoff."""
base_interval = settings.HUB_TELEMETRY_INTERVAL_SECONDS
while not cls._shutdown_event.is_set():
try:
await cls._send_telemetry()
cls._consecutive_failures = 0 # Reset on success
except Exception as e:
cls._consecutive_failures += 1
logger.warning(
"hub_telemetry_send_failed",
extra={
"error": str(e),
"error_type": type(e).__name__,
"consecutive_failures": cls._consecutive_failures,
},
)
# Calculate interval: base ± 15% jitter, with backoff on failures
jitter = random.uniform(-0.15, 0.15) * base_interval
backoff = (
min(2**cls._consecutive_failures, 60)
if cls._consecutive_failures
else 0
)
interval = base_interval + jitter + backoff
try:
await asyncio.wait_for(
cls._shutdown_event.wait(), timeout=interval
)
break # Shutdown requested
except asyncio.TimeoutError:
pass # Normal timeout, continue loop
@classmethod
async def _send_telemetry(cls) -> None:
"""Collect and send telemetry to Hub."""
window_start = cls._last_sent_at
window_end = datetime.now(timezone.utc)
payload = await cls._collect_metrics(window_start, window_end)
response = await cls._client.post(
f"{settings.HUB_URL}/api/v1/instances/{settings.INSTANCE_ID}/telemetry",
json=payload,
headers={"X-Hub-Api-Key": settings.HUB_API_KEY},
)
response.raise_for_status()
# Only update window on success
cls._last_sent_at = window_end
logger.debug(
"hub_telemetry_sent",
extra={
"window_seconds": (window_end - window_start).total_seconds(),
"status_code": response.status_code,
},
)
@classmethod
async def _collect_metrics(
cls, window_start: datetime, window_end: datetime
) -> dict[str, Any]:
"""Collect metrics using SQL aggregates (never load objects)."""
async with async_session_maker() as db:
# Agent counts by status (all agents, not windowed)
agent_result = await db.execute(
select(Agent.status, func.count(Agent.id).label("count")).group_by(
Agent.status
)
)
agent_rows = agent_result.all()
# Task counts by status and type (windowed by updated_at)
# Duration approximated as (updated_at - created_at) for completed/failed tasks
task_result = await db.execute(
select(
Task.status,
Task.type,
func.count(Task.id).label("count"),
func.avg(
func.extract("epoch", Task.updated_at - Task.created_at) * 1000
).label("avg_duration_ms"),
)
.where(Task.updated_at.between(window_start, window_end))
.group_by(Task.status, Task.type)
)
task_rows = task_result.all()
# Server count (simple count, not windowed)
server_count = await db.scalar(select(func.count(Server.id)))
return {
"instance_id": str(settings.INSTANCE_ID),
"window_start": window_start.isoformat(),
"window_end": window_end.isoformat(),
"uptime_seconds": int((window_end - cls._start_time).total_seconds()),
"metrics": {
"agents": cls._format_agent_counts(agent_rows),
"tasks": cls._format_task_counts(task_rows),
"servers": {"total_count": server_count or 0},
},
}
@classmethod
def _format_agent_counts(cls, rows: list) -> dict[str, int]:
"""Format agent count rows into response structure."""
counts = {
"online_count": 0,
"offline_count": 0,
"total_count": 0,
}
for row in rows:
status, count = row.status, row.count
counts["total_count"] += count
if status == AgentStatus.ONLINE:
counts["online_count"] = count
elif status == AgentStatus.OFFLINE:
counts["offline_count"] = count
# INVALID agents are counted in total but not separately
return counts
@classmethod
def _format_task_counts(cls, rows: list) -> dict[str, Any]:
"""Format task count rows into response structure."""
by_status: dict[str, int] = {}
by_type: dict[str, dict[str, Any]] = {}
for row in rows:
status_str = row.status.value if hasattr(row.status, "value") else str(row.status)
type_str = row.type.value if hasattr(row.type, "value") else str(row.type)
count = row.count
avg_duration_ms = row.avg_duration_ms
# Aggregate by status
by_status[status_str] = by_status.get(status_str, 0) + count
# Aggregate by type
if type_str not in by_type:
by_type[type_str] = {"count": 0, "avg_duration_ms": 0}
# Weighted average for duration when merging
existing = by_type[type_str]
total_count = existing["count"] + count
if total_count > 0 and avg_duration_ms is not None:
existing_weighted = existing["avg_duration_ms"] * existing["count"]
new_weighted = avg_duration_ms * count
by_type[type_str]["avg_duration_ms"] = (
existing_weighted + new_weighted
) / total_count
by_type[type_str]["count"] = total_count
# Round durations for cleaner output
for type_data in by_type.values():
type_data["avg_duration_ms"] = round(type_data["avg_duration_ms"], 2)
return {
"by_status": by_status,
"by_type": by_type,
}