letsbe-hub/alembic/versions/002_add_telemetry_samples.py

64 lines
2.0 KiB
Python

"""Add telemetry_samples table for aggregated orchestrator metrics.
Revision ID: 002
Revises: 001
Create Date: 2024-12-17
This table stores aggregated telemetry from orchestrator instances.
Uses a unique constraint on (instance_id, window_start) for de-duplication.
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = "002"
down_revision: Union[str, None] = "001"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create telemetry_samples table
op.create_table(
"telemetry_samples",
sa.Column("id", sa.UUID(), nullable=False),
sa.Column("instance_id", sa.UUID(), nullable=False),
# Time window
sa.Column("window_start", sa.DateTime(timezone=True), nullable=False),
sa.Column("window_end", sa.DateTime(timezone=True), nullable=False),
# Orchestrator uptime
sa.Column("uptime_seconds", sa.Integer(), nullable=False),
# Aggregated metrics stored as JSONB
sa.Column("metrics", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
# Foreign key and primary key
sa.ForeignKeyConstraint(
["instance_id"],
["instances.id"],
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id"),
# Unique constraint for de-duplication
# Prevents double-counting if orchestrator retries submissions
sa.UniqueConstraint(
"instance_id",
"window_start",
name="uq_telemetry_instance_window",
),
)
# Index on instance_id for efficient queries
op.create_index(
op.f("ix_telemetry_samples_instance_id"),
"telemetry_samples",
["instance_id"],
unique=False,
)
def downgrade() -> None:
op.drop_index(op.f("ix_telemetry_samples_instance_id"), table_name="telemetry_samples")
op.drop_table("telemetry_samples")