letsbe-sysadmin/tests/executors/test_composite_executor.py

496 lines
19 KiB
Python

"""Unit tests for CompositeExecutor."""
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from app.executors.base import ExecutionResult
# Patch the logger before importing the executor
with patch("app.utils.logger.get_logger", return_value=MagicMock()):
from app.executors.composite_executor import CompositeExecutor
class TestCompositeExecutor:
"""Tests for CompositeExecutor."""
@pytest.fixture
def executor(self):
"""Create executor with mocked logger."""
with patch("app.executors.base.get_logger", return_value=MagicMock()):
return CompositeExecutor()
def _create_mock_executor(self, success: bool, data: dict, error: str | None = None):
"""Create a mock executor that returns specified result."""
mock_executor = MagicMock()
mock_executor.execute = AsyncMock(return_value=ExecutionResult(
success=success,
data=data,
error=error,
))
return mock_executor
# =========================================================================
# HAPPY PATH TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_two_steps_both_succeed(self, executor):
"""Test successful execution of two steps."""
mock_env_executor = self._create_mock_executor(
success=True,
data={"updated_keys": ["API_KEY"], "removed_keys": [], "path": "/opt/letsbe/env/app.env"},
)
mock_docker_executor = self._create_mock_executor(
success=True,
data={"compose_dir": "/opt/letsbe/stacks/myapp", "pull_ran": True, "logs": {}},
)
def mock_get_executor(task_type: str):
if task_type == "ENV_UPDATE":
return mock_env_executor
elif task_type == "DOCKER_RELOAD":
return mock_docker_executor
raise ValueError(f"Unknown task type: {task_type}")
with patch("app.executors.get_executor", side_effect=mock_get_executor):
result = await executor.execute({
"steps": [
{"type": "ENV_UPDATE", "payload": {"path": "/opt/letsbe/env/app.env", "updates": {"API_KEY": "secret"}}},
{"type": "DOCKER_RELOAD", "payload": {"compose_dir": "/opt/letsbe/stacks/myapp", "pull": True}},
]
})
assert result.success is True
assert result.error is None
assert len(result.data["steps"]) == 2
# Verify first step
assert result.data["steps"][0]["index"] == 0
assert result.data["steps"][0]["type"] == "ENV_UPDATE"
assert result.data["steps"][0]["status"] == "completed"
assert result.data["steps"][0]["result"]["updated_keys"] == ["API_KEY"]
# Verify second step
assert result.data["steps"][1]["index"] == 1
assert result.data["steps"][1]["type"] == "DOCKER_RELOAD"
assert result.data["steps"][1]["status"] == "completed"
assert result.data["steps"][1]["result"]["compose_dir"] == "/opt/letsbe/stacks/myapp"
@pytest.mark.asyncio
async def test_single_step_succeeds(self, executor):
"""Test successful execution of single step."""
mock_executor = self._create_mock_executor(
success=True,
data={"written": True, "path": "/opt/letsbe/env/test.env", "size": 100},
)
with patch("app.executors.get_executor", return_value=mock_executor):
result = await executor.execute({
"steps": [
{"type": "FILE_WRITE", "payload": {"path": "/opt/letsbe/env/test.env", "content": "KEY=value"}},
]
})
assert result.success is True
assert len(result.data["steps"]) == 1
assert result.data["steps"][0]["status"] == "completed"
@pytest.mark.asyncio
async def test_three_steps_all_succeed(self, executor):
"""Test successful execution of three steps."""
mock_executor = self._create_mock_executor(success=True, data={"success": True})
with patch("app.executors.get_executor", return_value=mock_executor):
result = await executor.execute({
"steps": [
{"type": "FILE_WRITE", "payload": {}},
{"type": "ENV_UPDATE", "payload": {}},
{"type": "DOCKER_RELOAD", "payload": {}},
]
})
assert result.success is True
assert len(result.data["steps"]) == 3
assert all(s["status"] == "completed" for s in result.data["steps"])
# =========================================================================
# FAILURE HANDLING TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_first_step_fails_stops_execution(self, executor):
"""Test that first step failure stops execution."""
mock_executor = self._create_mock_executor(
success=False,
data={"partial": "data"},
error="Validation failed: invalid key format",
)
with patch("app.executors.get_executor", return_value=mock_executor):
result = await executor.execute({
"steps": [
{"type": "ENV_UPDATE", "payload": {}},
{"type": "DOCKER_RELOAD", "payload": {}}, # Should NOT be called
]
})
assert result.success is False
assert "Step 0 (ENV_UPDATE) failed" in result.error
assert "invalid key format" in result.error
assert len(result.data["steps"]) == 1 # Only first step
assert result.data["steps"][0]["status"] == "failed"
assert result.data["steps"][0]["error"] == "Validation failed: invalid key format"
@pytest.mark.asyncio
async def test_second_step_fails_preserves_first_result(self, executor):
"""Test that second step failure preserves first step result."""
mock_env_executor = self._create_mock_executor(
success=True,
data={"updated_keys": ["KEY1"]},
)
mock_docker_executor = self._create_mock_executor(
success=False,
data={},
error="No compose file found",
)
call_count = [0]
def mock_get_executor(task_type: str):
call_count[0] += 1
if task_type == "ENV_UPDATE":
return mock_env_executor
elif task_type == "DOCKER_RELOAD":
return mock_docker_executor
raise ValueError(f"Unknown task type: {task_type}")
with patch("app.executors.get_executor", side_effect=mock_get_executor):
result = await executor.execute({
"steps": [
{"type": "ENV_UPDATE", "payload": {}},
{"type": "DOCKER_RELOAD", "payload": {}},
]
})
assert result.success is False
assert "Step 1 (DOCKER_RELOAD) failed" in result.error
assert len(result.data["steps"]) == 2
# First step completed
assert result.data["steps"][0]["index"] == 0
assert result.data["steps"][0]["status"] == "completed"
assert result.data["steps"][0]["result"]["updated_keys"] == ["KEY1"]
# Second step failed
assert result.data["steps"][1]["index"] == 1
assert result.data["steps"][1]["status"] == "failed"
assert result.data["steps"][1]["error"] == "No compose file found"
@pytest.mark.asyncio
async def test_executor_raises_exception(self, executor):
"""Test handling of executor that raises exception."""
mock_executor = MagicMock()
mock_executor.execute = AsyncMock(side_effect=RuntimeError("Unexpected database error"))
with patch("app.executors.get_executor", return_value=mock_executor):
result = await executor.execute({
"steps": [
{"type": "ENV_UPDATE", "payload": {}},
]
})
assert result.success is False
assert "Step 0 (ENV_UPDATE) failed" in result.error
assert "Unexpected database error" in result.error
assert len(result.data["steps"]) == 1
assert result.data["steps"][0]["status"] == "failed"
assert "Unexpected database error" in result.data["steps"][0]["error"]
# =========================================================================
# VALIDATION TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_empty_steps_validation_error(self, executor):
"""Test that empty steps list fails validation."""
result = await executor.execute({"steps": []})
assert result.success is False
assert "cannot be empty" in result.error
assert result.data["steps"] == []
@pytest.mark.asyncio
async def test_missing_steps_field(self, executor):
"""Test that missing steps field raises ValueError."""
with pytest.raises(ValueError, match="Missing required fields: steps"):
await executor.execute({})
@pytest.mark.asyncio
async def test_steps_not_a_list(self, executor):
"""Test that non-list steps fails validation."""
result = await executor.execute({"steps": "not a list"})
assert result.success is False
assert "must be a list" in result.error
@pytest.mark.asyncio
async def test_step_missing_type_field(self, executor):
"""Test that step without type field fails."""
result = await executor.execute({
"steps": [
{"payload": {"key": "value"}}
]
})
assert result.success is False
assert "Step 0 missing 'type' field" in result.error
@pytest.mark.asyncio
async def test_step_not_a_dict(self, executor):
"""Test that non-dict step fails validation."""
result = await executor.execute({
"steps": ["not a dict"]
})
assert result.success is False
assert "Step 0 is not a valid step definition" in result.error
@pytest.mark.asyncio
async def test_unknown_step_type_fails(self, executor):
"""Test that unknown step type fails with clear error."""
with patch("app.executors.get_executor") as mock_get:
mock_get.side_effect = ValueError("Unknown task type: INVALID_TYPE. Available: ['ECHO', 'SHELL']")
result = await executor.execute({
"steps": [
{"type": "INVALID_TYPE", "payload": {}}
]
})
assert result.success is False
assert "Unknown task type" in result.error
assert "INVALID_TYPE" in result.error
# =========================================================================
# RESULT STRUCTURE TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_result_has_correct_structure(self, executor):
"""Test that result has all required fields."""
mock_executor = self._create_mock_executor(
success=True,
data={"key": "value"},
)
with patch("app.executors.get_executor", return_value=mock_executor):
result = await executor.execute({
"steps": [
{"type": "ECHO", "payload": {"message": "test"}},
]
})
assert result.success is True
assert "steps" in result.data
assert isinstance(result.data["steps"], list)
step = result.data["steps"][0]
assert "index" in step
assert "type" in step
assert "status" in step
assert "result" in step
assert step["index"] == 0
assert step["type"] == "ECHO"
assert step["status"] == "completed"
@pytest.mark.asyncio
async def test_error_field_present_on_failure(self, executor):
"""Test that error field is present in step result on failure."""
mock_executor = self._create_mock_executor(
success=False,
data={},
error="Something went wrong",
)
with patch("app.executors.get_executor", return_value=mock_executor):
result = await executor.execute({
"steps": [
{"type": "SHELL", "payload": {}},
]
})
assert result.success is False
assert "error" in result.data["steps"][0]
assert result.data["steps"][0]["error"] == "Something went wrong"
@pytest.mark.asyncio
async def test_error_field_absent_on_success(self, executor):
"""Test that error field is not present in step result on success."""
mock_executor = self._create_mock_executor(
success=True,
data={"result": "ok"},
error=None,
)
with patch("app.executors.get_executor", return_value=mock_executor):
result = await executor.execute({
"steps": [
{"type": "ECHO", "payload": {}},
]
})
assert result.success is True
assert "error" not in result.data["steps"][0]
@pytest.mark.asyncio
async def test_propagates_underlying_executor_results(self, executor):
"""Test that underlying executor data is propagated correctly."""
specific_data = {
"updated_keys": ["DB_HOST", "DB_PORT"],
"removed_keys": ["OLD_KEY"],
"path": "/opt/letsbe/env/database.env",
"custom_field": "custom_value",
}
mock_executor = self._create_mock_executor(
success=True,
data=specific_data,
)
with patch("app.executors.get_executor", return_value=mock_executor):
result = await executor.execute({
"steps": [
{"type": "ENV_UPDATE", "payload": {}},
]
})
assert result.success is True
assert result.data["steps"][0]["result"] == specific_data
@pytest.mark.asyncio
async def test_duration_ms_populated(self, executor):
"""Test that duration_ms is populated."""
mock_executor = self._create_mock_executor(success=True, data={})
with patch("app.executors.get_executor", return_value=mock_executor):
result = await executor.execute({
"steps": [
{"type": "ECHO", "payload": {}},
]
})
assert result.duration_ms is not None
assert result.duration_ms >= 0
# =========================================================================
# PAYLOAD HANDLING TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_step_payload_defaults_to_empty_dict(self, executor):
"""Test that missing payload in step defaults to empty dict."""
mock_executor = MagicMock()
mock_executor.execute = AsyncMock(return_value=ExecutionResult(success=True, data={}))
with patch("app.executors.get_executor", return_value=mock_executor):
result = await executor.execute({
"steps": [
{"type": "ECHO"} # No payload field
]
})
assert result.success is True
# Verify execute was called with empty dict
mock_executor.execute.assert_called_once_with({})
@pytest.mark.asyncio
async def test_step_payload_passed_correctly(self, executor):
"""Test that step payload is passed to executor correctly."""
mock_executor = MagicMock()
mock_executor.execute = AsyncMock(return_value=ExecutionResult(success=True, data={}))
expected_payload = {"path": "/opt/letsbe/env/app.env", "updates": {"KEY": "value"}}
with patch("app.executors.get_executor", return_value=mock_executor):
await executor.execute({
"steps": [
{"type": "ENV_UPDATE", "payload": expected_payload}
]
})
mock_executor.execute.assert_called_once_with(expected_payload)
# =========================================================================
# TASK TYPE TEST
# =========================================================================
def test_task_type(self, executor):
"""Test task_type property."""
assert executor.task_type == "COMPOSITE"
# =========================================================================
# EXECUTION ORDER TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_steps_executed_in_order(self, executor):
"""Test that steps are executed in sequential order."""
execution_order = []
def create_tracking_executor(name: str):
mock = MagicMock()
async def track_execute(payload):
execution_order.append(name)
return ExecutionResult(success=True, data={"name": name})
mock.execute = track_execute
return mock
def mock_get_executor(task_type: str):
return create_tracking_executor(task_type)
with patch("app.executors.get_executor", side_effect=mock_get_executor):
result = await executor.execute({
"steps": [
{"type": "STEP_A", "payload": {}},
{"type": "STEP_B", "payload": {}},
{"type": "STEP_C", "payload": {}},
]
})
assert result.success is True
assert execution_order == ["STEP_A", "STEP_B", "STEP_C"]
@pytest.mark.asyncio
async def test_failure_stops_subsequent_steps(self, executor):
"""Test that failure at step N prevents steps N+1 and beyond from running."""
execution_order = []
def create_tracking_executor(name: str, should_fail: bool = False):
mock = MagicMock()
async def track_execute(payload):
execution_order.append(name)
return ExecutionResult(
success=not should_fail,
data={},
error="Failed" if should_fail else None,
)
mock.execute = track_execute
return mock
def mock_get_executor(task_type: str):
if task_type == "STEP_B":
return create_tracking_executor(task_type, should_fail=True)
return create_tracking_executor(task_type)
with patch("app.executors.get_executor", side_effect=mock_get_executor):
result = await executor.execute({
"steps": [
{"type": "STEP_A", "payload": {}},
{"type": "STEP_B", "payload": {}}, # This fails
{"type": "STEP_C", "payload": {}}, # Should NOT run
]
})
assert result.success is False
assert execution_order == ["STEP_A", "STEP_B"] # STEP_C not executed