208 lines
6.7 KiB
Python
208 lines
6.7 KiB
Python
"""Composite executor for sequential task execution."""
|
|
|
|
import time
|
|
from typing import Any
|
|
|
|
from app.executors.base import BaseExecutor, ExecutionResult
|
|
|
|
|
|
class CompositeExecutor(BaseExecutor):
|
|
"""Execute a sequence of tasks in order.
|
|
|
|
Executes each task in the sequence using the appropriate executor.
|
|
Stops on first failure and returns partial results.
|
|
|
|
Security measures:
|
|
- Each sub-task uses the same validated executors
|
|
- Sequential execution only (no parallelism)
|
|
- Stops immediately on first failure
|
|
|
|
Payload:
|
|
{
|
|
"steps": [
|
|
{"type": "ENV_UPDATE", "payload": {...}},
|
|
{"type": "DOCKER_RELOAD", "payload": {...}}
|
|
]
|
|
}
|
|
|
|
Result (success):
|
|
{
|
|
"steps": [
|
|
{"index": 0, "type": "ENV_UPDATE", "status": "completed", "result": {...}},
|
|
{"index": 1, "type": "DOCKER_RELOAD", "status": "completed", "result": {...}}
|
|
]
|
|
}
|
|
|
|
Result (failure at step 1):
|
|
ExecutionResult.success = False
|
|
ExecutionResult.error = "Step 1 (DOCKER_RELOAD) failed: <error message>"
|
|
ExecutionResult.data = {
|
|
"steps": [
|
|
{"index": 0, "type": "ENV_UPDATE", "status": "completed", "result": {...}},
|
|
{"index": 1, "type": "DOCKER_RELOAD", "status": "failed", "error": "..."}
|
|
]
|
|
}
|
|
"""
|
|
|
|
@property
|
|
def task_type(self) -> str:
|
|
return "COMPOSITE"
|
|
|
|
async def execute(self, payload: dict[str, Any]) -> ExecutionResult:
|
|
"""Execute a sequence of tasks.
|
|
|
|
Args:
|
|
payload: Must contain "steps" list of step definitions
|
|
|
|
Returns:
|
|
ExecutionResult with execution summary
|
|
"""
|
|
self.validate_payload(payload, ["steps"])
|
|
|
|
steps = payload["steps"]
|
|
|
|
# Validate steps is a non-empty list
|
|
if not isinstance(steps, list):
|
|
return ExecutionResult(
|
|
success=False,
|
|
data={"steps": []},
|
|
error="'steps' must be a list of step definitions",
|
|
)
|
|
|
|
if not steps:
|
|
return ExecutionResult(
|
|
success=False,
|
|
data={"steps": []},
|
|
error="'steps' cannot be empty",
|
|
)
|
|
|
|
# Import registry here to avoid circular imports
|
|
from app.executors import get_executor
|
|
|
|
self.logger.info(
|
|
"composite_starting",
|
|
total_steps=len(steps),
|
|
step_types=[step.get("type", "UNKNOWN") if isinstance(step, dict) else "INVALID" for step in steps],
|
|
)
|
|
|
|
start_time = time.time()
|
|
results: list[dict[str, Any]] = []
|
|
|
|
for i, step in enumerate(steps):
|
|
# Validate step structure
|
|
if not isinstance(step, dict):
|
|
self.logger.error("composite_invalid_step", step_index=i)
|
|
return ExecutionResult(
|
|
success=False,
|
|
data={"steps": results},
|
|
error=f"Step {i} is not a valid step definition (must be dict)",
|
|
)
|
|
|
|
step_type = step.get("type")
|
|
step_payload = step.get("payload", {})
|
|
|
|
if not step_type:
|
|
self.logger.error("composite_missing_type", step_index=i)
|
|
return ExecutionResult(
|
|
success=False,
|
|
data={"steps": results},
|
|
error=f"Step {i} missing 'type' field",
|
|
)
|
|
|
|
self.logger.info(
|
|
"composite_step_starting",
|
|
step_index=i,
|
|
step_type=step_type,
|
|
)
|
|
|
|
# Get executor for this step type
|
|
try:
|
|
executor = get_executor(step_type)
|
|
except ValueError as e:
|
|
self.logger.error(
|
|
"composite_unknown_type",
|
|
step_index=i,
|
|
step_type=step_type,
|
|
error=str(e),
|
|
)
|
|
return ExecutionResult(
|
|
success=False,
|
|
data={"steps": results},
|
|
error=f"Step {i} ({step_type}) failed: {e}",
|
|
)
|
|
|
|
# Execute the step
|
|
try:
|
|
result = await executor.execute(step_payload)
|
|
|
|
step_result: dict[str, Any] = {
|
|
"index": i,
|
|
"type": step_type,
|
|
"status": "completed" if result.success else "failed",
|
|
"result": result.data,
|
|
}
|
|
if result.error:
|
|
step_result["error"] = result.error
|
|
|
|
results.append(step_result)
|
|
|
|
self.logger.info(
|
|
"composite_step_completed",
|
|
step_index=i,
|
|
step_type=step_type,
|
|
success=result.success,
|
|
)
|
|
|
|
# Stop on first failure
|
|
if not result.success:
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
self.logger.warning(
|
|
"composite_step_failed",
|
|
step_index=i,
|
|
step_type=step_type,
|
|
error=result.error,
|
|
)
|
|
return ExecutionResult(
|
|
success=False,
|
|
data={"steps": results},
|
|
error=f"Step {i} ({step_type}) failed: {result.error}",
|
|
duration_ms=duration_ms,
|
|
)
|
|
|
|
except Exception as e:
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
self.logger.error(
|
|
"composite_step_exception",
|
|
step_index=i,
|
|
step_type=step_type,
|
|
error=str(e),
|
|
)
|
|
# Add failed step to results
|
|
results.append({
|
|
"index": i,
|
|
"type": step_type,
|
|
"status": "failed",
|
|
"error": str(e),
|
|
})
|
|
return ExecutionResult(
|
|
success=False,
|
|
data={"steps": results},
|
|
error=f"Step {i} ({step_type}) failed: {e}",
|
|
duration_ms=duration_ms,
|
|
)
|
|
|
|
# All steps completed successfully
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
|
|
self.logger.info(
|
|
"composite_completed",
|
|
steps_completed=len(results),
|
|
duration_ms=duration_ms,
|
|
)
|
|
|
|
return ExecutionResult(
|
|
success=True,
|
|
data={"steps": results},
|
|
duration_ms=duration_ms,
|
|
)
|