LetsBeBiz-Redesign/letsbe-orchestrator/tests/test_events.py

193 lines
6.0 KiB
Python
Raw Permalink Normal View History

"""Tests for the events route module."""
import uuid
import pytest
import pytest_asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.event import Event
from app.models.tenant import Tenant
@pytest.mark.asyncio
class TestEventModel:
"""Tests for Event model creation and retrieval."""
async def test_create_event(self, db: AsyncSession, test_tenant: Tenant):
"""Verify that an event can be created with required fields."""
event = Event(
tenant_id=test_tenant.id,
event_type="agent.registered",
payload={"agent_name": "test-agent", "version": "1.0.0"},
)
db.add(event)
await db.commit()
await db.refresh(event)
assert event.id is not None
assert event.tenant_id == test_tenant.id
assert event.event_type == "agent.registered"
assert event.payload["agent_name"] == "test-agent"
assert event.created_at is not None
async def test_create_event_with_task_id(self, db: AsyncSession, test_tenant: Tenant):
"""Verify that an event can be created with an optional task_id."""
task_id = uuid.uuid4()
event = Event(
tenant_id=test_tenant.id,
task_id=task_id,
event_type="task.completed",
payload={"result": "success"},
)
db.add(event)
await db.commit()
await db.refresh(event)
assert event.task_id == task_id
async def test_create_event_without_task_id(self, db: AsyncSession, test_tenant: Tenant):
"""Verify that task_id defaults to None."""
event = Event(
tenant_id=test_tenant.id,
event_type="system.startup",
payload={},
)
db.add(event)
await db.commit()
await db.refresh(event)
assert event.task_id is None
async def test_create_event_with_empty_payload(self, db: AsyncSession, test_tenant: Tenant):
"""Verify that events can have empty payloads."""
event = Event(
tenant_id=test_tenant.id,
event_type="heartbeat",
payload={},
)
db.add(event)
await db.commit()
await db.refresh(event)
assert event.payload == {}
async def test_create_event_with_complex_payload(self, db: AsyncSession, test_tenant: Tenant):
"""Verify that events can have complex nested payloads."""
payload = {
"metrics": {
"cpu_usage": 45.2,
"memory_mb": 1024,
"disk_io": {"read": 100, "write": 200},
},
"tags": ["production", "high-priority"],
"count": 42,
}
event = Event(
tenant_id=test_tenant.id,
event_type="metrics.collected",
payload=payload,
)
db.add(event)
await db.commit()
await db.refresh(event)
assert event.payload["metrics"]["cpu_usage"] == 45.2
assert event.payload["tags"] == ["production", "high-priority"]
async def test_retrieve_event_by_id(self, db: AsyncSession, test_tenant: Tenant):
"""Verify that an event can be retrieved by its ID."""
event = Event(
tenant_id=test_tenant.id,
event_type="test.retrieve",
payload={"key": "value"},
)
db.add(event)
await db.commit()
await db.refresh(event)
result = await db.execute(select(Event).where(Event.id == event.id))
retrieved = result.scalar_one_or_none()
assert retrieved is not None
assert retrieved.event_type == "test.retrieve"
assert retrieved.payload["key"] == "value"
async def test_filter_events_by_type(self, db: AsyncSession, test_tenant: Tenant):
"""Verify that events can be filtered by event_type."""
# Create events of different types
for event_type in ["agent.registered", "task.completed", "agent.registered"]:
event = Event(
tenant_id=test_tenant.id,
event_type=event_type,
payload={},
)
db.add(event)
await db.commit()
result = await db.execute(
select(Event).where(Event.event_type == "agent.registered")
)
events = list(result.scalars().all())
assert len(events) == 2
async def test_filter_events_by_tenant(self, db: AsyncSession, test_tenant: Tenant):
"""Verify that events can be filtered by tenant_id."""
# Create event for test tenant
event = Event(
tenant_id=test_tenant.id,
event_type="test.tenant_filter",
payload={},
)
db.add(event)
await db.commit()
result = await db.execute(
select(Event).where(Event.tenant_id == test_tenant.id)
)
events = list(result.scalars().all())
assert len(events) >= 1
assert all(e.tenant_id == test_tenant.id for e in events)
async def test_events_ordered_by_created_at(self, db: AsyncSession, test_tenant: Tenant):
"""Verify that events can be ordered by created_at."""
for i in range(3):
event = Event(
tenant_id=test_tenant.id,
event_type=f"test.order_{i}",
payload={"index": i},
)
db.add(event)
await db.commit()
result = await db.execute(
select(Event)
.where(Event.tenant_id == test_tenant.id)
.order_by(Event.created_at.desc())
)
events = list(result.scalars().all())
assert len(events) >= 3
async def test_event_repr(self, db: AsyncSession, test_tenant: Tenant):
"""Verify the __repr__ method of Event."""
event = Event(
tenant_id=test_tenant.id,
event_type="test.repr",
payload={},
)
db.add(event)
await db.commit()
await db.refresh(event)
repr_str = repr(event)
assert "Event" in repr_str
assert "test.repr" in repr_str