496 lines
19 KiB
Python
496 lines
19 KiB
Python
"""Unit tests for CompositeExecutor."""
|
|
|
|
import pytest
|
|
from unittest.mock import MagicMock, patch, AsyncMock
|
|
|
|
from app.executors.base import ExecutionResult
|
|
|
|
|
|
# Patch the logger before importing the executor
|
|
with patch("app.utils.logger.get_logger", return_value=MagicMock()):
|
|
from app.executors.composite_executor import CompositeExecutor
|
|
|
|
|
|
class TestCompositeExecutor:
|
|
"""Tests for CompositeExecutor."""
|
|
|
|
@pytest.fixture
|
|
def executor(self):
|
|
"""Create executor with mocked logger."""
|
|
with patch("app.executors.base.get_logger", return_value=MagicMock()):
|
|
return CompositeExecutor()
|
|
|
|
def _create_mock_executor(self, success: bool, data: dict, error: str | None = None):
|
|
"""Create a mock executor that returns specified result."""
|
|
mock_executor = MagicMock()
|
|
mock_executor.execute = AsyncMock(return_value=ExecutionResult(
|
|
success=success,
|
|
data=data,
|
|
error=error,
|
|
))
|
|
return mock_executor
|
|
|
|
# =========================================================================
|
|
# HAPPY PATH TESTS
|
|
# =========================================================================
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_two_steps_both_succeed(self, executor):
|
|
"""Test successful execution of two steps."""
|
|
mock_env_executor = self._create_mock_executor(
|
|
success=True,
|
|
data={"updated_keys": ["API_KEY"], "removed_keys": [], "path": "/opt/letsbe/env/app.env"},
|
|
)
|
|
mock_docker_executor = self._create_mock_executor(
|
|
success=True,
|
|
data={"compose_dir": "/opt/letsbe/stacks/myapp", "pull_ran": True, "logs": {}},
|
|
)
|
|
|
|
def mock_get_executor(task_type: str):
|
|
if task_type == "ENV_UPDATE":
|
|
return mock_env_executor
|
|
elif task_type == "DOCKER_RELOAD":
|
|
return mock_docker_executor
|
|
raise ValueError(f"Unknown task type: {task_type}")
|
|
|
|
with patch("app.executors.get_executor", side_effect=mock_get_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "ENV_UPDATE", "payload": {"path": "/opt/letsbe/env/app.env", "updates": {"API_KEY": "secret"}}},
|
|
{"type": "DOCKER_RELOAD", "payload": {"compose_dir": "/opt/letsbe/stacks/myapp", "pull": True}},
|
|
]
|
|
})
|
|
|
|
assert result.success is True
|
|
assert result.error is None
|
|
assert len(result.data["steps"]) == 2
|
|
|
|
# Verify first step
|
|
assert result.data["steps"][0]["index"] == 0
|
|
assert result.data["steps"][0]["type"] == "ENV_UPDATE"
|
|
assert result.data["steps"][0]["status"] == "completed"
|
|
assert result.data["steps"][0]["result"]["updated_keys"] == ["API_KEY"]
|
|
|
|
# Verify second step
|
|
assert result.data["steps"][1]["index"] == 1
|
|
assert result.data["steps"][1]["type"] == "DOCKER_RELOAD"
|
|
assert result.data["steps"][1]["status"] == "completed"
|
|
assert result.data["steps"][1]["result"]["compose_dir"] == "/opt/letsbe/stacks/myapp"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_single_step_succeeds(self, executor):
|
|
"""Test successful execution of single step."""
|
|
mock_executor = self._create_mock_executor(
|
|
success=True,
|
|
data={"written": True, "path": "/opt/letsbe/env/test.env", "size": 100},
|
|
)
|
|
|
|
with patch("app.executors.get_executor", return_value=mock_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "FILE_WRITE", "payload": {"path": "/opt/letsbe/env/test.env", "content": "KEY=value"}},
|
|
]
|
|
})
|
|
|
|
assert result.success is True
|
|
assert len(result.data["steps"]) == 1
|
|
assert result.data["steps"][0]["status"] == "completed"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_three_steps_all_succeed(self, executor):
|
|
"""Test successful execution of three steps."""
|
|
mock_executor = self._create_mock_executor(success=True, data={"success": True})
|
|
|
|
with patch("app.executors.get_executor", return_value=mock_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "FILE_WRITE", "payload": {}},
|
|
{"type": "ENV_UPDATE", "payload": {}},
|
|
{"type": "DOCKER_RELOAD", "payload": {}},
|
|
]
|
|
})
|
|
|
|
assert result.success is True
|
|
assert len(result.data["steps"]) == 3
|
|
assert all(s["status"] == "completed" for s in result.data["steps"])
|
|
|
|
# =========================================================================
|
|
# FAILURE HANDLING TESTS
|
|
# =========================================================================
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_first_step_fails_stops_execution(self, executor):
|
|
"""Test that first step failure stops execution."""
|
|
mock_executor = self._create_mock_executor(
|
|
success=False,
|
|
data={"partial": "data"},
|
|
error="Validation failed: invalid key format",
|
|
)
|
|
|
|
with patch("app.executors.get_executor", return_value=mock_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "ENV_UPDATE", "payload": {}},
|
|
{"type": "DOCKER_RELOAD", "payload": {}}, # Should NOT be called
|
|
]
|
|
})
|
|
|
|
assert result.success is False
|
|
assert "Step 0 (ENV_UPDATE) failed" in result.error
|
|
assert "invalid key format" in result.error
|
|
assert len(result.data["steps"]) == 1 # Only first step
|
|
assert result.data["steps"][0]["status"] == "failed"
|
|
assert result.data["steps"][0]["error"] == "Validation failed: invalid key format"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_second_step_fails_preserves_first_result(self, executor):
|
|
"""Test that second step failure preserves first step result."""
|
|
mock_env_executor = self._create_mock_executor(
|
|
success=True,
|
|
data={"updated_keys": ["KEY1"]},
|
|
)
|
|
mock_docker_executor = self._create_mock_executor(
|
|
success=False,
|
|
data={},
|
|
error="No compose file found",
|
|
)
|
|
|
|
call_count = [0]
|
|
|
|
def mock_get_executor(task_type: str):
|
|
call_count[0] += 1
|
|
if task_type == "ENV_UPDATE":
|
|
return mock_env_executor
|
|
elif task_type == "DOCKER_RELOAD":
|
|
return mock_docker_executor
|
|
raise ValueError(f"Unknown task type: {task_type}")
|
|
|
|
with patch("app.executors.get_executor", side_effect=mock_get_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "ENV_UPDATE", "payload": {}},
|
|
{"type": "DOCKER_RELOAD", "payload": {}},
|
|
]
|
|
})
|
|
|
|
assert result.success is False
|
|
assert "Step 1 (DOCKER_RELOAD) failed" in result.error
|
|
assert len(result.data["steps"]) == 2
|
|
|
|
# First step completed
|
|
assert result.data["steps"][0]["index"] == 0
|
|
assert result.data["steps"][0]["status"] == "completed"
|
|
assert result.data["steps"][0]["result"]["updated_keys"] == ["KEY1"]
|
|
|
|
# Second step failed
|
|
assert result.data["steps"][1]["index"] == 1
|
|
assert result.data["steps"][1]["status"] == "failed"
|
|
assert result.data["steps"][1]["error"] == "No compose file found"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_executor_raises_exception(self, executor):
|
|
"""Test handling of executor that raises exception."""
|
|
mock_executor = MagicMock()
|
|
mock_executor.execute = AsyncMock(side_effect=RuntimeError("Unexpected database error"))
|
|
|
|
with patch("app.executors.get_executor", return_value=mock_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "ENV_UPDATE", "payload": {}},
|
|
]
|
|
})
|
|
|
|
assert result.success is False
|
|
assert "Step 0 (ENV_UPDATE) failed" in result.error
|
|
assert "Unexpected database error" in result.error
|
|
assert len(result.data["steps"]) == 1
|
|
assert result.data["steps"][0]["status"] == "failed"
|
|
assert "Unexpected database error" in result.data["steps"][0]["error"]
|
|
|
|
# =========================================================================
|
|
# VALIDATION TESTS
|
|
# =========================================================================
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_steps_validation_error(self, executor):
|
|
"""Test that empty steps list fails validation."""
|
|
result = await executor.execute({"steps": []})
|
|
|
|
assert result.success is False
|
|
assert "cannot be empty" in result.error
|
|
assert result.data["steps"] == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_missing_steps_field(self, executor):
|
|
"""Test that missing steps field raises ValueError."""
|
|
with pytest.raises(ValueError, match="Missing required fields: steps"):
|
|
await executor.execute({})
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_steps_not_a_list(self, executor):
|
|
"""Test that non-list steps fails validation."""
|
|
result = await executor.execute({"steps": "not a list"})
|
|
|
|
assert result.success is False
|
|
assert "must be a list" in result.error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_step_missing_type_field(self, executor):
|
|
"""Test that step without type field fails."""
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"payload": {"key": "value"}}
|
|
]
|
|
})
|
|
|
|
assert result.success is False
|
|
assert "Step 0 missing 'type' field" in result.error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_step_not_a_dict(self, executor):
|
|
"""Test that non-dict step fails validation."""
|
|
result = await executor.execute({
|
|
"steps": ["not a dict"]
|
|
})
|
|
|
|
assert result.success is False
|
|
assert "Step 0 is not a valid step definition" in result.error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unknown_step_type_fails(self, executor):
|
|
"""Test that unknown step type fails with clear error."""
|
|
with patch("app.executors.get_executor") as mock_get:
|
|
mock_get.side_effect = ValueError("Unknown task type: INVALID_TYPE. Available: ['ECHO', 'SHELL']")
|
|
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "INVALID_TYPE", "payload": {}}
|
|
]
|
|
})
|
|
|
|
assert result.success is False
|
|
assert "Unknown task type" in result.error
|
|
assert "INVALID_TYPE" in result.error
|
|
|
|
# =========================================================================
|
|
# RESULT STRUCTURE TESTS
|
|
# =========================================================================
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_result_has_correct_structure(self, executor):
|
|
"""Test that result has all required fields."""
|
|
mock_executor = self._create_mock_executor(
|
|
success=True,
|
|
data={"key": "value"},
|
|
)
|
|
|
|
with patch("app.executors.get_executor", return_value=mock_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "ECHO", "payload": {"message": "test"}},
|
|
]
|
|
})
|
|
|
|
assert result.success is True
|
|
assert "steps" in result.data
|
|
assert isinstance(result.data["steps"], list)
|
|
|
|
step = result.data["steps"][0]
|
|
assert "index" in step
|
|
assert "type" in step
|
|
assert "status" in step
|
|
assert "result" in step
|
|
assert step["index"] == 0
|
|
assert step["type"] == "ECHO"
|
|
assert step["status"] == "completed"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_error_field_present_on_failure(self, executor):
|
|
"""Test that error field is present in step result on failure."""
|
|
mock_executor = self._create_mock_executor(
|
|
success=False,
|
|
data={},
|
|
error="Something went wrong",
|
|
)
|
|
|
|
with patch("app.executors.get_executor", return_value=mock_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "SHELL", "payload": {}},
|
|
]
|
|
})
|
|
|
|
assert result.success is False
|
|
assert "error" in result.data["steps"][0]
|
|
assert result.data["steps"][0]["error"] == "Something went wrong"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_error_field_absent_on_success(self, executor):
|
|
"""Test that error field is not present in step result on success."""
|
|
mock_executor = self._create_mock_executor(
|
|
success=True,
|
|
data={"result": "ok"},
|
|
error=None,
|
|
)
|
|
|
|
with patch("app.executors.get_executor", return_value=mock_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "ECHO", "payload": {}},
|
|
]
|
|
})
|
|
|
|
assert result.success is True
|
|
assert "error" not in result.data["steps"][0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_propagates_underlying_executor_results(self, executor):
|
|
"""Test that underlying executor data is propagated correctly."""
|
|
specific_data = {
|
|
"updated_keys": ["DB_HOST", "DB_PORT"],
|
|
"removed_keys": ["OLD_KEY"],
|
|
"path": "/opt/letsbe/env/database.env",
|
|
"custom_field": "custom_value",
|
|
}
|
|
mock_executor = self._create_mock_executor(
|
|
success=True,
|
|
data=specific_data,
|
|
)
|
|
|
|
with patch("app.executors.get_executor", return_value=mock_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "ENV_UPDATE", "payload": {}},
|
|
]
|
|
})
|
|
|
|
assert result.success is True
|
|
assert result.data["steps"][0]["result"] == specific_data
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_duration_ms_populated(self, executor):
|
|
"""Test that duration_ms is populated."""
|
|
mock_executor = self._create_mock_executor(success=True, data={})
|
|
|
|
with patch("app.executors.get_executor", return_value=mock_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "ECHO", "payload": {}},
|
|
]
|
|
})
|
|
|
|
assert result.duration_ms is not None
|
|
assert result.duration_ms >= 0
|
|
|
|
# =========================================================================
|
|
# PAYLOAD HANDLING TESTS
|
|
# =========================================================================
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_step_payload_defaults_to_empty_dict(self, executor):
|
|
"""Test that missing payload in step defaults to empty dict."""
|
|
mock_executor = MagicMock()
|
|
mock_executor.execute = AsyncMock(return_value=ExecutionResult(success=True, data={}))
|
|
|
|
with patch("app.executors.get_executor", return_value=mock_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "ECHO"} # No payload field
|
|
]
|
|
})
|
|
|
|
assert result.success is True
|
|
# Verify execute was called with empty dict
|
|
mock_executor.execute.assert_called_once_with({})
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_step_payload_passed_correctly(self, executor):
|
|
"""Test that step payload is passed to executor correctly."""
|
|
mock_executor = MagicMock()
|
|
mock_executor.execute = AsyncMock(return_value=ExecutionResult(success=True, data={}))
|
|
|
|
expected_payload = {"path": "/opt/letsbe/env/app.env", "updates": {"KEY": "value"}}
|
|
|
|
with patch("app.executors.get_executor", return_value=mock_executor):
|
|
await executor.execute({
|
|
"steps": [
|
|
{"type": "ENV_UPDATE", "payload": expected_payload}
|
|
]
|
|
})
|
|
|
|
mock_executor.execute.assert_called_once_with(expected_payload)
|
|
|
|
# =========================================================================
|
|
# TASK TYPE TEST
|
|
# =========================================================================
|
|
|
|
def test_task_type(self, executor):
|
|
"""Test task_type property."""
|
|
assert executor.task_type == "COMPOSITE"
|
|
|
|
# =========================================================================
|
|
# EXECUTION ORDER TESTS
|
|
# =========================================================================
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_steps_executed_in_order(self, executor):
|
|
"""Test that steps are executed in sequential order."""
|
|
execution_order = []
|
|
|
|
def create_tracking_executor(name: str):
|
|
mock = MagicMock()
|
|
async def track_execute(payload):
|
|
execution_order.append(name)
|
|
return ExecutionResult(success=True, data={"name": name})
|
|
mock.execute = track_execute
|
|
return mock
|
|
|
|
def mock_get_executor(task_type: str):
|
|
return create_tracking_executor(task_type)
|
|
|
|
with patch("app.executors.get_executor", side_effect=mock_get_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "STEP_A", "payload": {}},
|
|
{"type": "STEP_B", "payload": {}},
|
|
{"type": "STEP_C", "payload": {}},
|
|
]
|
|
})
|
|
|
|
assert result.success is True
|
|
assert execution_order == ["STEP_A", "STEP_B", "STEP_C"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failure_stops_subsequent_steps(self, executor):
|
|
"""Test that failure at step N prevents steps N+1 and beyond from running."""
|
|
execution_order = []
|
|
|
|
def create_tracking_executor(name: str, should_fail: bool = False):
|
|
mock = MagicMock()
|
|
async def track_execute(payload):
|
|
execution_order.append(name)
|
|
return ExecutionResult(
|
|
success=not should_fail,
|
|
data={},
|
|
error="Failed" if should_fail else None,
|
|
)
|
|
mock.execute = track_execute
|
|
return mock
|
|
|
|
def mock_get_executor(task_type: str):
|
|
if task_type == "STEP_B":
|
|
return create_tracking_executor(task_type, should_fail=True)
|
|
return create_tracking_executor(task_type)
|
|
|
|
with patch("app.executors.get_executor", side_effect=mock_get_executor):
|
|
result = await executor.execute({
|
|
"steps": [
|
|
{"type": "STEP_A", "payload": {}},
|
|
{"type": "STEP_B", "payload": {}}, # This fails
|
|
{"type": "STEP_C", "payload": {}}, # Should NOT run
|
|
]
|
|
})
|
|
|
|
assert result.success is False
|
|
assert execution_order == ["STEP_A", "STEP_B"] # STEP_C not executed
|