271 lines
9.4 KiB
Python
271 lines
9.4 KiB
Python
|
|
"""Hub Telemetry Service - sends aggregated metrics to Hub.
|
||
|
|
|
||
|
|
This background service periodically collects metrics from the local database
|
||
|
|
and sends them to the central Hub for license compliance and usage analytics.
|
||
|
|
|
||
|
|
Key design choices:
|
||
|
|
- Since-last-send windowing (avoids double-counting)
|
||
|
|
- SQL aggregates (never loads task objects into Python)
|
||
|
|
- Reusable httpx.AsyncClient (single connection pool)
|
||
|
|
- Jitter ±15% (prevents thundering herd)
|
||
|
|
- Exponential backoff on errors (1s → 2s → 4s → ... → 60s max)
|
||
|
|
"""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import logging
|
||
|
|
import random
|
||
|
|
from datetime import datetime, timedelta, timezone
|
||
|
|
from typing import Any, Optional
|
||
|
|
|
||
|
|
import httpx
|
||
|
|
from sqlalchemy import func, select
|
||
|
|
|
||
|
|
from app.config import get_settings
|
||
|
|
from app.db import async_session_maker
|
||
|
|
from app.models.agent import Agent, AgentStatus
|
||
|
|
from app.models.server import Server
|
||
|
|
from app.models.task import Task
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
settings = get_settings()
|
||
|
|
|
||
|
|
|
||
|
|
class HubTelemetryService:
|
||
|
|
"""Background service that sends telemetry to Hub."""
|
||
|
|
|
||
|
|
_task: Optional[asyncio.Task] = None
|
||
|
|
_shutdown_event: Optional[asyncio.Event] = None
|
||
|
|
_start_time: Optional[datetime] = None
|
||
|
|
_last_sent_at: Optional[datetime] = None
|
||
|
|
_client: Optional[httpx.AsyncClient] = None
|
||
|
|
_consecutive_failures: int = 0
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
async def start(cls) -> None:
|
||
|
|
"""Start the telemetry background task. Never blocks startup."""
|
||
|
|
if not settings.HUB_TELEMETRY_ENABLED:
|
||
|
|
logger.info("hub_telemetry_disabled")
|
||
|
|
return
|
||
|
|
|
||
|
|
if not settings.HUB_URL:
|
||
|
|
logger.warning("hub_telemetry_missing_hub_url")
|
||
|
|
return
|
||
|
|
|
||
|
|
if not settings.HUB_API_KEY:
|
||
|
|
logger.warning("hub_telemetry_missing_hub_api_key")
|
||
|
|
return
|
||
|
|
|
||
|
|
if not settings.INSTANCE_ID:
|
||
|
|
logger.warning("hub_telemetry_missing_instance_id")
|
||
|
|
return
|
||
|
|
|
||
|
|
now = datetime.now(timezone.utc)
|
||
|
|
cls._start_time = now
|
||
|
|
# Initialize window to (now - interval) so first send isn't empty
|
||
|
|
cls._last_sent_at = now - timedelta(
|
||
|
|
seconds=settings.HUB_TELEMETRY_INTERVAL_SECONDS
|
||
|
|
)
|
||
|
|
cls._shutdown_event = asyncio.Event()
|
||
|
|
cls._consecutive_failures = 0
|
||
|
|
cls._client = httpx.AsyncClient(timeout=30.0)
|
||
|
|
cls._task = asyncio.create_task(cls._telemetry_loop())
|
||
|
|
|
||
|
|
logger.info(
|
||
|
|
"hub_telemetry_started",
|
||
|
|
extra={
|
||
|
|
"interval_seconds": settings.HUB_TELEMETRY_INTERVAL_SECONDS,
|
||
|
|
"hub_url": settings.HUB_URL,
|
||
|
|
"instance_id": settings.INSTANCE_ID,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
async def stop(cls) -> None:
|
||
|
|
"""Stop the telemetry background task."""
|
||
|
|
if cls._shutdown_event:
|
||
|
|
cls._shutdown_event.set()
|
||
|
|
|
||
|
|
if cls._task:
|
||
|
|
try:
|
||
|
|
await asyncio.wait_for(cls._task, timeout=5.0)
|
||
|
|
except asyncio.TimeoutError:
|
||
|
|
cls._task.cancel()
|
||
|
|
try:
|
||
|
|
await cls._task
|
||
|
|
except asyncio.CancelledError:
|
||
|
|
pass
|
||
|
|
|
||
|
|
if cls._client:
|
||
|
|
await cls._client.aclose()
|
||
|
|
cls._client = None
|
||
|
|
|
||
|
|
logger.info("hub_telemetry_stopped")
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
async def _telemetry_loop(cls) -> None:
|
||
|
|
"""Main telemetry loop with jitter and backoff."""
|
||
|
|
base_interval = settings.HUB_TELEMETRY_INTERVAL_SECONDS
|
||
|
|
|
||
|
|
while not cls._shutdown_event.is_set():
|
||
|
|
try:
|
||
|
|
await cls._send_telemetry()
|
||
|
|
cls._consecutive_failures = 0 # Reset on success
|
||
|
|
except Exception as e:
|
||
|
|
cls._consecutive_failures += 1
|
||
|
|
logger.warning(
|
||
|
|
"hub_telemetry_send_failed",
|
||
|
|
extra={
|
||
|
|
"error": str(e),
|
||
|
|
"error_type": type(e).__name__,
|
||
|
|
"consecutive_failures": cls._consecutive_failures,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
|
||
|
|
# Calculate interval: base ± 15% jitter, with backoff on failures
|
||
|
|
jitter = random.uniform(-0.15, 0.15) * base_interval
|
||
|
|
backoff = (
|
||
|
|
min(2**cls._consecutive_failures, 60)
|
||
|
|
if cls._consecutive_failures
|
||
|
|
else 0
|
||
|
|
)
|
||
|
|
interval = base_interval + jitter + backoff
|
||
|
|
|
||
|
|
try:
|
||
|
|
await asyncio.wait_for(
|
||
|
|
cls._shutdown_event.wait(), timeout=interval
|
||
|
|
)
|
||
|
|
break # Shutdown requested
|
||
|
|
except asyncio.TimeoutError:
|
||
|
|
pass # Normal timeout, continue loop
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
async def _send_telemetry(cls) -> None:
|
||
|
|
"""Collect and send telemetry to Hub."""
|
||
|
|
window_start = cls._last_sent_at
|
||
|
|
window_end = datetime.now(timezone.utc)
|
||
|
|
|
||
|
|
payload = await cls._collect_metrics(window_start, window_end)
|
||
|
|
|
||
|
|
response = await cls._client.post(
|
||
|
|
f"{settings.HUB_URL}/api/v1/instances/{settings.INSTANCE_ID}/telemetry",
|
||
|
|
json=payload,
|
||
|
|
headers={"X-Hub-Api-Key": settings.HUB_API_KEY},
|
||
|
|
)
|
||
|
|
response.raise_for_status()
|
||
|
|
|
||
|
|
# Only update window on success
|
||
|
|
cls._last_sent_at = window_end
|
||
|
|
|
||
|
|
logger.debug(
|
||
|
|
"hub_telemetry_sent",
|
||
|
|
extra={
|
||
|
|
"window_seconds": (window_end - window_start).total_seconds(),
|
||
|
|
"status_code": response.status_code,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
async def _collect_metrics(
|
||
|
|
cls, window_start: datetime, window_end: datetime
|
||
|
|
) -> dict[str, Any]:
|
||
|
|
"""Collect metrics using SQL aggregates (never load objects)."""
|
||
|
|
async with async_session_maker() as db:
|
||
|
|
# Agent counts by status (all agents, not windowed)
|
||
|
|
agent_result = await db.execute(
|
||
|
|
select(Agent.status, func.count(Agent.id).label("count")).group_by(
|
||
|
|
Agent.status
|
||
|
|
)
|
||
|
|
)
|
||
|
|
agent_rows = agent_result.all()
|
||
|
|
|
||
|
|
# Task counts by status and type (windowed by updated_at)
|
||
|
|
# Duration approximated as (updated_at - created_at) for completed/failed tasks
|
||
|
|
task_result = await db.execute(
|
||
|
|
select(
|
||
|
|
Task.status,
|
||
|
|
Task.type,
|
||
|
|
func.count(Task.id).label("count"),
|
||
|
|
func.avg(
|
||
|
|
func.extract("epoch", Task.updated_at - Task.created_at) * 1000
|
||
|
|
).label("avg_duration_ms"),
|
||
|
|
)
|
||
|
|
.where(Task.updated_at.between(window_start, window_end))
|
||
|
|
.group_by(Task.status, Task.type)
|
||
|
|
)
|
||
|
|
task_rows = task_result.all()
|
||
|
|
|
||
|
|
# Server count (simple count, not windowed)
|
||
|
|
server_count = await db.scalar(select(func.count(Server.id)))
|
||
|
|
|
||
|
|
return {
|
||
|
|
"instance_id": str(settings.INSTANCE_ID),
|
||
|
|
"window_start": window_start.isoformat(),
|
||
|
|
"window_end": window_end.isoformat(),
|
||
|
|
"uptime_seconds": int((window_end - cls._start_time).total_seconds()),
|
||
|
|
"metrics": {
|
||
|
|
"agents": cls._format_agent_counts(agent_rows),
|
||
|
|
"tasks": cls._format_task_counts(task_rows),
|
||
|
|
"servers": {"total_count": server_count or 0},
|
||
|
|
},
|
||
|
|
}
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def _format_agent_counts(cls, rows: list) -> dict[str, int]:
|
||
|
|
"""Format agent count rows into response structure."""
|
||
|
|
counts = {
|
||
|
|
"online_count": 0,
|
||
|
|
"offline_count": 0,
|
||
|
|
"total_count": 0,
|
||
|
|
}
|
||
|
|
|
||
|
|
for row in rows:
|
||
|
|
status, count = row.status, row.count
|
||
|
|
counts["total_count"] += count
|
||
|
|
|
||
|
|
if status == AgentStatus.ONLINE:
|
||
|
|
counts["online_count"] = count
|
||
|
|
elif status == AgentStatus.OFFLINE:
|
||
|
|
counts["offline_count"] = count
|
||
|
|
# INVALID agents are counted in total but not separately
|
||
|
|
|
||
|
|
return counts
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def _format_task_counts(cls, rows: list) -> dict[str, Any]:
|
||
|
|
"""Format task count rows into response structure."""
|
||
|
|
by_status: dict[str, int] = {}
|
||
|
|
by_type: dict[str, dict[str, Any]] = {}
|
||
|
|
|
||
|
|
for row in rows:
|
||
|
|
status_str = row.status.value if hasattr(row.status, "value") else str(row.status)
|
||
|
|
type_str = row.type.value if hasattr(row.type, "value") else str(row.type)
|
||
|
|
count = row.count
|
||
|
|
avg_duration_ms = row.avg_duration_ms
|
||
|
|
|
||
|
|
# Aggregate by status
|
||
|
|
by_status[status_str] = by_status.get(status_str, 0) + count
|
||
|
|
|
||
|
|
# Aggregate by type
|
||
|
|
if type_str not in by_type:
|
||
|
|
by_type[type_str] = {"count": 0, "avg_duration_ms": 0}
|
||
|
|
|
||
|
|
# Weighted average for duration when merging
|
||
|
|
existing = by_type[type_str]
|
||
|
|
total_count = existing["count"] + count
|
||
|
|
if total_count > 0 and avg_duration_ms is not None:
|
||
|
|
existing_weighted = existing["avg_duration_ms"] * existing["count"]
|
||
|
|
new_weighted = avg_duration_ms * count
|
||
|
|
by_type[type_str]["avg_duration_ms"] = (
|
||
|
|
existing_weighted + new_weighted
|
||
|
|
) / total_count
|
||
|
|
by_type[type_str]["count"] = total_count
|
||
|
|
|
||
|
|
# Round durations for cleaner output
|
||
|
|
for type_data in by_type.values():
|
||
|
|
type_data["avg_duration_ms"] = round(type_data["avg_duration_ms"], 2)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"by_status": by_status,
|
||
|
|
"by_type": by_type,
|
||
|
|
}
|