letsbe-sysadmin/app/executors/__init__.py

70 lines
2.0 KiB
Python

"""Task executors registry."""
from typing import Type
from app.executors.base import BaseExecutor, ExecutionResult
from app.executors.composite_executor import CompositeExecutor
from app.executors.docker_executor import DockerExecutor
from app.executors.echo_executor import EchoExecutor
from app.executors.env_inspect_executor import EnvInspectExecutor
from app.executors.env_update_executor import EnvUpdateExecutor
from app.executors.file_executor import FileExecutor
from app.executors.file_inspect_executor import FileInspectExecutor
from app.executors.nextcloud_executor import NextcloudSetDomainExecutor
from app.executors.playwright_executor import PlaywrightExecutor
from app.executors.shell_executor import ShellExecutor
# Registry mapping task types to executor classes
EXECUTOR_REGISTRY: dict[str, Type[BaseExecutor]] = {
"ECHO": EchoExecutor,
"SHELL": ShellExecutor,
"FILE_WRITE": FileExecutor,
"ENV_UPDATE": EnvUpdateExecutor,
"ENV_INSPECT": EnvInspectExecutor,
"FILE_INSPECT": FileInspectExecutor,
"DOCKER_RELOAD": DockerExecutor,
"COMPOSITE": CompositeExecutor,
"PLAYWRIGHT": PlaywrightExecutor,
"NEXTCLOUD_SET_DOMAIN": NextcloudSetDomainExecutor,
}
def get_executor(task_type: str) -> BaseExecutor:
"""Get an executor instance for a task type.
Args:
task_type: The type of task to execute
Returns:
Executor instance
Raises:
ValueError: If task type is not registered
"""
if task_type not in EXECUTOR_REGISTRY:
raise ValueError(
f"Unknown task type: {task_type}. "
f"Available: {list(EXECUTOR_REGISTRY.keys())}"
)
executor_class = EXECUTOR_REGISTRY[task_type]
return executor_class()
__all__ = [
"BaseExecutor",
"ExecutionResult",
"EchoExecutor",
"ShellExecutor",
"FileExecutor",
"FileInspectExecutor",
"EnvUpdateExecutor",
"EnvInspectExecutor",
"DockerExecutor",
"CompositeExecutor",
"PlaywrightExecutor",
"NextcloudSetDomainExecutor",
"EXECUTOR_REGISTRY",
"get_executor",
]