"""Add telemetry_samples table for aggregated orchestrator metrics. Revision ID: 002 Revises: 001 Create Date: 2024-12-17 This table stores aggregated telemetry from orchestrator instances. Uses a unique constraint on (instance_id, window_start) for de-duplication. """ from typing import Sequence, Union from alembic import op import sqlalchemy as sa from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. revision: str = "002" down_revision: Union[str, None] = "001" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: # Create telemetry_samples table op.create_table( "telemetry_samples", sa.Column("id", sa.UUID(), nullable=False), sa.Column("instance_id", sa.UUID(), nullable=False), # Time window sa.Column("window_start", sa.DateTime(timezone=True), nullable=False), sa.Column("window_end", sa.DateTime(timezone=True), nullable=False), # Orchestrator uptime sa.Column("uptime_seconds", sa.Integer(), nullable=False), # Aggregated metrics stored as JSONB sa.Column("metrics", postgresql.JSONB(astext_type=sa.Text()), nullable=False), # Foreign key and primary key sa.ForeignKeyConstraint( ["instance_id"], ["instances.id"], ondelete="CASCADE", ), sa.PrimaryKeyConstraint("id"), # Unique constraint for de-duplication # Prevents double-counting if orchestrator retries submissions sa.UniqueConstraint( "instance_id", "window_start", name="uq_telemetry_instance_window", ), ) # Index on instance_id for efficient queries op.create_index( op.f("ix_telemetry_samples_instance_id"), "telemetry_samples", ["instance_id"], unique=False, ) def downgrade() -> None: op.drop_index(op.f("ix_telemetry_samples_instance_id"), table_name="telemetry_samples") op.drop_table("telemetry_samples")