diff --git a/app/executors/__init__.py b/app/executors/__init__.py index 51b266c..c64aa8a 100644 --- a/app/executors/__init__.py +++ b/app/executors/__init__.py @@ -10,6 +10,7 @@ from app.executors.env_inspect_executor import EnvInspectExecutor from app.executors.env_update_executor import EnvUpdateExecutor from app.executors.file_executor import FileExecutor from app.executors.file_inspect_executor import FileInspectExecutor +from app.executors.nextcloud_executor import NextcloudSetDomainExecutor from app.executors.playwright_executor import PlaywrightExecutor from app.executors.shell_executor import ShellExecutor @@ -24,6 +25,7 @@ EXECUTOR_REGISTRY: dict[str, Type[BaseExecutor]] = { "DOCKER_RELOAD": DockerExecutor, "COMPOSITE": CompositeExecutor, "PLAYWRIGHT": PlaywrightExecutor, + "NEXTCLOUD_SET_DOMAIN": NextcloudSetDomainExecutor, } @@ -61,6 +63,7 @@ __all__ = [ "DockerExecutor", "CompositeExecutor", "PlaywrightExecutor", + "NextcloudSetDomainExecutor", "EXECUTOR_REGISTRY", "get_executor", ] diff --git a/app/executors/nextcloud_executor.py b/app/executors/nextcloud_executor.py new file mode 100644 index 0000000..5b4a1c5 --- /dev/null +++ b/app/executors/nextcloud_executor.py @@ -0,0 +1,358 @@ +"""Nextcloud domain configuration executor.""" + +import asyncio +import subprocess +import time +from pathlib import Path +from typing import Any +from urllib.parse import urlparse + +from app.executors.base import BaseExecutor, ExecutionResult + + +class NextcloudSetDomainExecutor(BaseExecutor): + """Execute Nextcloud domain configuration via occ commands. + + This executor configures Nextcloud's external domain settings by running + occ config:system:set commands via docker compose exec. It keeps the + Orchestrator unaware of container names, occ paths, and docker-compose syntax. + + Security measures: + - URL parsing with validation + - No shell=True, command list only + - Timeout enforcement on each subprocess + + Payload: + { + "public_url": "https://cloud.example.com" + } + + Result (success): + { + "public_url": "https://cloud.example.com", + "host": "cloud.example.com", + "scheme": "https", + "commands_executed": 3, + "logs": { + "overwritehost": "", + "overwriteprotocol": "", + "overwrite.cli.url": "" + } + } + + Result (failure): + { + "public_url": "https://cloud.example.com", + "host": "cloud.example.com", + "scheme": "https", + "commands_executed": 2, + "failed_command": "overwriteprotocol", + "failed_args": ["config:system:set", "overwriteprotocol", "--value=https"], + "logs": {...} + } + """ + + # TODO: These constants may need adjustment based on actual Nextcloud stack setup + NEXTCLOUD_STACK_DIR = "/opt/letsbe/stacks/nextcloud" + NEXTCLOUD_SERVICE_NAME = "app" + NEXTCLOUD_OCC_PATH = "/var/www/html/occ" + NEXTCLOUD_USER = "www-data" + + # Compose file search order (matches DockerExecutor) + COMPOSE_FILE_NAMES = ["docker-compose.yml", "compose.yml"] + + # Default timeout for each occ command (seconds) + DEFAULT_COMMAND_TIMEOUT = 60 + + @property + def task_type(self) -> str: + return "NEXTCLOUD_SET_DOMAIN" + + async def execute(self, payload: dict[str, Any]) -> ExecutionResult: + """Execute Nextcloud domain configuration commands. + + Runs three occ config:system:set commands to configure: + - overwritehost: The domain/host portion of the URL + - overwriteprotocol: The scheme (http/https) + - overwrite.cli.url: The full public URL + + Args: + payload: Must contain "public_url", optionally "timeout" + + Returns: + ExecutionResult with configuration confirmation and logs + """ + self.validate_payload(payload, ["public_url"]) + + public_url = payload["public_url"] + timeout = payload.get("timeout", self.DEFAULT_COMMAND_TIMEOUT) + + # Parse URL into components + try: + scheme, host, normalized_url = self._parse_public_url(public_url) + except ValueError as e: + return ExecutionResult( + success=False, + data={"public_url": public_url}, + error=str(e), + ) + + # Find compose file in the Nextcloud stack directory + stack_dir = Path(self.NEXTCLOUD_STACK_DIR) + compose_file = self._find_compose_file(stack_dir) + + if compose_file is None: + self.logger.warning("nextcloud_compose_not_found", dir=self.NEXTCLOUD_STACK_DIR) + return ExecutionResult( + success=False, + data={"public_url": public_url, "host": host, "scheme": scheme}, + error=f"Nextcloud compose file not found in {self.NEXTCLOUD_STACK_DIR}. " + f"Looked for: {', '.join(self.COMPOSE_FILE_NAMES)}", + ) + + self.logger.info( + "nextcloud_setting_domain", + public_url=normalized_url, + host=host, + scheme=scheme, + compose_file=str(compose_file), + ) + + start_time = time.time() + logs: dict[str, str] = {} + commands_executed = 0 + + # Define the three occ commands to run + occ_commands = [ + ("overwritehost", ["config:system:set", "overwritehost", f"--value={host}"]), + ("overwriteprotocol", ["config:system:set", "overwriteprotocol", f"--value={scheme}"]), + ("overwrite.cli.url", ["config:system:set", "overwrite.cli.url", f"--value={normalized_url}"]), + ] + + try: + for cmd_name, occ_args in occ_commands: + exit_code, stdout, stderr = await self._run_occ_command( + compose_file, + occ_args, + timeout, + ) + logs[cmd_name] = self._combine_output(stdout, stderr) + commands_executed += 1 + + if exit_code != 0: + duration_ms = (time.time() - start_time) * 1000 + self.logger.warning( + "nextcloud_occ_command_failed", + command=cmd_name, + occ_args=occ_args, + exit_code=exit_code, + stderr=stderr[:500] if stderr else None, + ) + return ExecutionResult( + success=False, + data={ + "public_url": normalized_url, + "host": host, + "scheme": scheme, + "commands_executed": commands_executed, + "failed_command": cmd_name, + "failed_args": occ_args, + "logs": logs, + }, + error=f"occ {cmd_name} failed with exit code {exit_code}", + duration_ms=duration_ms, + ) + + duration_ms = (time.time() - start_time) * 1000 + self.logger.info( + "nextcloud_domain_set", + public_url=normalized_url, + host=host, + scheme=scheme, + commands_executed=commands_executed, + duration_ms=duration_ms, + ) + + return ExecutionResult( + success=True, + data={ + "public_url": normalized_url, + "host": host, + "scheme": scheme, + "commands_executed": commands_executed, + "logs": logs, + }, + duration_ms=duration_ms, + ) + + except asyncio.TimeoutError: + duration_ms = (time.time() - start_time) * 1000 + self.logger.error( + "nextcloud_timeout", + public_url=normalized_url, + timeout=timeout, + commands_executed=commands_executed, + ) + return ExecutionResult( + success=False, + data={ + "public_url": normalized_url, + "host": host, + "scheme": scheme, + "commands_executed": commands_executed, + "logs": logs, + }, + error=f"Nextcloud occ operation timed out after {timeout} seconds", + duration_ms=duration_ms, + ) + + except Exception as e: + duration_ms = (time.time() - start_time) * 1000 + self.logger.error( + "nextcloud_error", + public_url=normalized_url, + error=str(e), + commands_executed=commands_executed, + ) + return ExecutionResult( + success=False, + data={ + "public_url": normalized_url, + "host": host, + "scheme": scheme, + "commands_executed": commands_executed, + "logs": logs, + }, + error=str(e), + duration_ms=duration_ms, + ) + + def _parse_public_url(self, public_url: str) -> tuple[str, str, str]: + """Parse public URL into scheme, host, and normalized URL. + + Args: + public_url: Full URL like "https://cloud.example.com" or just "cloud.example.com" + + Returns: + Tuple of (scheme, host, normalized_url) + - scheme: "http" or "https" (defaults to "https" if not provided) + - host: Domain with optional port (e.g., "cloud.example.com:8443") + - normalized_url: Full URL with trailing slash stripped + + Raises: + ValueError: If URL is invalid or missing host + """ + if not public_url or not public_url.strip(): + raise ValueError("public_url cannot be empty") + + url = public_url.strip() + + # Parse the URL + parsed = urlparse(url) + + # Extract scheme, default to "https" if not provided + scheme = parsed.scheme if parsed.scheme else "https" + + # Extract host (netloc includes port if present) + host = parsed.netloc + + # Handle URLs without scheme (e.g., "cloud.example.com" or "cloud.example.com/path") + # urlparse treats "cloud.example.com" as a path, not netloc + if not host and not parsed.scheme: + # The URL was provided without a scheme, so we need to re-parse with scheme + url_with_scheme = f"https://{url}" + parsed = urlparse(url_with_scheme) + host = parsed.netloc + scheme = "https" + + if not host: + raise ValueError(f"Invalid URL - no host found: {public_url}") + + # Reconstruct normalized URL (with trailing slash stripped) + normalized_url = f"{scheme}://{host}" + if parsed.path and parsed.path != "/": + normalized_url += parsed.path.rstrip("/") + + return scheme, host, normalized_url + + def _find_compose_file(self, compose_dir: Path) -> Path | None: + """Find compose file in the directory. + + Searches in order: docker-compose.yml, compose.yml + + Args: + compose_dir: Directory to search in + + Returns: + Path to compose file, or None if not found + """ + for filename in self.COMPOSE_FILE_NAMES: + compose_file = compose_dir / filename + if compose_file.exists(): + return compose_file + return None + + def _combine_output(self, stdout: str, stderr: str) -> str: + """Combine stdout and stderr into a single string. + + Args: + stdout: Standard output + stderr: Standard error + + Returns: + Combined output string + """ + parts = [] + if stdout: + parts.append(stdout) + if stderr: + parts.append(stderr) + return "\n".join(parts) + + async def _run_occ_command( + self, + compose_file: Path, + occ_args: list[str], + timeout: int, + ) -> tuple[int, str, str]: + """Run a Nextcloud occ command via docker compose exec. + + Args: + compose_file: Path to the docker-compose file + occ_args: Arguments to pass to occ (e.g., ["config:system:set", "overwritehost", "--value=..."]) + timeout: Operation timeout in seconds + + Returns: + Tuple of (exit_code, stdout, stderr) + """ + def _run() -> tuple[int, str, str]: + # Build command: docker compose -f exec -T --user php + cmd = [ + "docker", + "compose", + "-f", + str(compose_file), + "exec", + "-T", # Disable pseudo-TTY allocation + "--user", + self.NEXTCLOUD_USER, + self.NEXTCLOUD_SERVICE_NAME, + "php", + self.NEXTCLOUD_OCC_PATH, + ] + occ_args + + # Run command from stack directory, no shell=True + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + cwd=self.NEXTCLOUD_STACK_DIR, + ) + + return result.returncode, result.stdout, result.stderr + + return await asyncio.wait_for( + asyncio.to_thread(_run), + timeout=timeout + 30, # Watchdog with buffer + ) diff --git a/tests/executors/test_nextcloud_executor.py b/tests/executors/test_nextcloud_executor.py new file mode 100644 index 0000000..db4ded3 --- /dev/null +++ b/tests/executors/test_nextcloud_executor.py @@ -0,0 +1,524 @@ +"""Unit tests for NextcloudSetDomainExecutor.""" + +import asyncio +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +# Patch the logger before importing the executor +with patch("app.utils.logger.get_logger", return_value=MagicMock()): + from app.executors.nextcloud_executor import NextcloudSetDomainExecutor + + +class TestNextcloudSetDomainExecutor: + """Tests for NextcloudSetDomainExecutor.""" + + @pytest.fixture + def executor(self): + """Create executor with mocked logger.""" + with patch("app.executors.base.get_logger", return_value=MagicMock()): + return NextcloudSetDomainExecutor() + + @pytest.fixture + def temp_nextcloud_stack(self, tmp_path): + """Create a temporary Nextcloud stack directory with compose file.""" + stack_dir = tmp_path / "opt" / "letsbe" / "stacks" / "nextcloud" + stack_dir.mkdir(parents=True) + compose_file = stack_dir / "docker-compose.yml" + compose_file.write_text("""version: '3.8' +services: + app: + image: nextcloud:latest +""") + return stack_dir + + @pytest.fixture + def executor_with_temp_stack(self, executor, temp_nextcloud_stack): + """Configure executor to use temporary stack directory.""" + executor.NEXTCLOUD_STACK_DIR = str(temp_nextcloud_stack) + return executor + + # ========================================================================= + # TASK TYPE TEST + # ========================================================================= + + def test_task_type(self, executor): + """Test that task_type property returns correct value.""" + assert executor.task_type == "NEXTCLOUD_SET_DOMAIN" + + # ========================================================================= + # URL PARSING TESTS + # ========================================================================= + + def test_parse_public_url_with_https(self, executor): + """Test URL parsing with explicit https scheme.""" + scheme, host, normalized_url = executor._parse_public_url("https://cloud.example.com") + assert scheme == "https" + assert host == "cloud.example.com" + assert normalized_url == "https://cloud.example.com" + + def test_parse_public_url_with_http(self, executor): + """Test URL parsing with http scheme.""" + scheme, host, normalized_url = executor._parse_public_url("http://cloud.example.com") + assert scheme == "http" + assert host == "cloud.example.com" + assert normalized_url == "http://cloud.example.com" + + def test_parse_public_url_with_port(self, executor): + """Test URL parsing with port number.""" + scheme, host, normalized_url = executor._parse_public_url("https://cloud.example.com:8443") + assert scheme == "https" + assert host == "cloud.example.com:8443" + assert normalized_url == "https://cloud.example.com:8443" + + def test_parse_public_url_without_scheme_defaults_to_https(self, executor): + """Test that URLs without scheme default to https.""" + scheme, host, normalized_url = executor._parse_public_url("cloud.example.com") + assert scheme == "https" + assert host == "cloud.example.com" + assert normalized_url == "https://cloud.example.com" + + def test_parse_public_url_trailing_slash_stripped(self, executor): + """Test that trailing slash is stripped from URL.""" + scheme, host, normalized_url = executor._parse_public_url("https://cloud.example.com/") + assert scheme == "https" + assert host == "cloud.example.com" + assert normalized_url == "https://cloud.example.com" + + def test_parse_public_url_with_path(self, executor): + """Test URL parsing with path (trailing slash stripped).""" + scheme, host, normalized_url = executor._parse_public_url("https://cloud.example.com/nextcloud/") + assert scheme == "https" + assert host == "cloud.example.com" + assert normalized_url == "https://cloud.example.com/nextcloud" + + def test_parse_public_url_empty_raises_error(self, executor): + """Test that empty URL raises ValueError.""" + with pytest.raises(ValueError, match="cannot be empty"): + executor._parse_public_url("") + + def test_parse_public_url_whitespace_only_raises_error(self, executor): + """Test that whitespace-only URL raises ValueError.""" + with pytest.raises(ValueError, match="cannot be empty"): + executor._parse_public_url(" ") + + def test_parse_public_url_invalid_no_host_raises_error(self, executor): + """Test that URL with no host raises ValueError.""" + with pytest.raises(ValueError, match="no host found"): + executor._parse_public_url("https://") + + # ========================================================================= + # SUCCESS CASES + # ========================================================================= + + @pytest.mark.asyncio + async def test_success_all_commands(self, executor_with_temp_stack): + """Test successful domain configuration with all three occ commands.""" + with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run: + mock_run.return_value = (0, "System config value overwritehost set to cloud.example.com", "") + + result = await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com" + }) + + assert result.success is True + assert result.data["public_url"] == "https://cloud.example.com" + assert result.data["host"] == "cloud.example.com" + assert result.data["scheme"] == "https" + assert result.data["commands_executed"] == 3 + assert "overwritehost" in result.data["logs"] + assert "overwriteprotocol" in result.data["logs"] + assert "overwrite.cli.url" in result.data["logs"] + assert result.error is None + assert result.duration_ms is not None + + # Verify all three commands were called + assert mock_run.call_count == 3 + + @pytest.mark.asyncio + async def test_success_with_http_scheme(self, executor_with_temp_stack): + """Test successful configuration with http scheme.""" + with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run: + mock_run.return_value = (0, "success", "") + + result = await executor_with_temp_stack.execute({ + "public_url": "http://cloud.example.com" + }) + + assert result.success is True + assert result.data["scheme"] == "http" + assert result.data["public_url"] == "http://cloud.example.com" + + @pytest.mark.asyncio + async def test_success_url_without_scheme(self, executor_with_temp_stack): + """Test successful configuration when URL lacks scheme (defaults to https).""" + with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run: + mock_run.return_value = (0, "success", "") + + result = await executor_with_temp_stack.execute({ + "public_url": "cloud.example.com" + }) + + assert result.success is True + assert result.data["scheme"] == "https" + assert result.data["host"] == "cloud.example.com" + assert result.data["public_url"] == "https://cloud.example.com" + + # ========================================================================= + # COMMAND ARGUMENTS TESTS + # ========================================================================= + + @pytest.mark.asyncio + async def test_command_arguments_correct(self, executor_with_temp_stack): + """Test that occ commands receive correct arguments.""" + calls = [] + + async def capture_calls(compose_file, occ_args, timeout): + calls.append(occ_args) + return (0, "success", "") + + with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=capture_calls): + await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com" + }) + + assert len(calls) == 3 + assert calls[0] == ["config:system:set", "overwritehost", "--value=cloud.example.com"] + assert calls[1] == ["config:system:set", "overwriteprotocol", "--value=https"] + assert calls[2] == ["config:system:set", "overwrite.cli.url", "--value=https://cloud.example.com"] + + @pytest.mark.asyncio + async def test_command_arguments_with_port(self, executor_with_temp_stack): + """Test that host with port is passed correctly to occ command.""" + calls = [] + + async def capture_calls(compose_file, occ_args, timeout): + calls.append(occ_args) + return (0, "success", "") + + with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=capture_calls): + await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com:8443" + }) + + assert calls[0] == ["config:system:set", "overwritehost", "--value=cloud.example.com:8443"] + + # ========================================================================= + # PAYLOAD VALIDATION TESTS + # ========================================================================= + + @pytest.mark.asyncio + async def test_missing_public_url_raises_error(self, executor_with_temp_stack): + """Test that missing public_url raises ValueError.""" + with pytest.raises(ValueError, match="Missing required fields"): + await executor_with_temp_stack.execute({}) + + @pytest.mark.asyncio + async def test_invalid_url_returns_failure(self, executor_with_temp_stack): + """Test that invalid URL returns failure result.""" + result = await executor_with_temp_stack.execute({ + "public_url": "" + }) + + assert result.success is False + assert "cannot be empty" in result.error + + # ========================================================================= + # COMPOSE FILE TESTS + # ========================================================================= + + @pytest.mark.asyncio + async def test_compose_file_not_found(self, executor, tmp_path): + """Test failure when compose file doesn't exist.""" + nonexistent_dir = tmp_path / "nonexistent" + nonexistent_dir.mkdir() + executor.NEXTCLOUD_STACK_DIR = str(nonexistent_dir) + + result = await executor.execute({ + "public_url": "https://cloud.example.com" + }) + + assert result.success is False + assert "compose file not found" in result.error.lower() + assert "docker-compose.yml" in result.error + + @pytest.mark.asyncio + async def test_compose_yml_fallback(self, executor, tmp_path): + """Test that compose.yml is used as fallback.""" + stack_dir = tmp_path / "nextcloud" + stack_dir.mkdir() + (stack_dir / "compose.yml").write_text("version: '3'\n") + executor.NEXTCLOUD_STACK_DIR = str(stack_dir) + + with patch.object(executor, "_run_occ_command") as mock_run: + mock_run.return_value = (0, "success", "") + + result = await executor.execute({ + "public_url": "https://cloud.example.com" + }) + + assert result.success is True + + # ========================================================================= + # COMMAND FAILURE TESTS + # ========================================================================= + + @pytest.mark.asyncio + async def test_first_command_fails(self, executor_with_temp_stack): + """Test that failure on first occ command returns partial results.""" + with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run: + mock_run.return_value = (1, "", "Error: Unable to write config") + + result = await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com" + }) + + assert result.success is False + assert result.data["commands_executed"] == 1 + assert result.data["failed_command"] == "overwritehost" + assert result.data["failed_args"] == ["config:system:set", "overwritehost", "--value=cloud.example.com"] + assert "overwritehost" in result.data["logs"] + assert "occ overwritehost failed with exit code 1" in result.error + + @pytest.mark.asyncio + async def test_second_command_fails(self, executor_with_temp_stack): + """Test that failure on second occ command returns partial results.""" + call_count = 0 + + async def mock_occ(compose_file, occ_args, timeout): + nonlocal call_count + call_count += 1 + if call_count == 1: + return (0, "success", "") # First succeeds + return (1, "", "Error setting overwriteprotocol") # Second fails + + with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=mock_occ): + result = await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com" + }) + + assert result.success is False + assert result.data["commands_executed"] == 2 + assert result.data["failed_command"] == "overwriteprotocol" + assert "overwritehost" in result.data["logs"] + assert "overwriteprotocol" in result.data["logs"] + assert "overwrite.cli.url" not in result.data["logs"] + + @pytest.mark.asyncio + async def test_third_command_fails(self, executor_with_temp_stack): + """Test that failure on third occ command returns partial results.""" + call_count = 0 + + async def mock_occ(compose_file, occ_args, timeout): + nonlocal call_count + call_count += 1 + if call_count <= 2: + return (0, "success", "") # First two succeed + return (1, "", "Error setting overwrite.cli.url") # Third fails + + with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=mock_occ): + result = await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com" + }) + + assert result.success is False + assert result.data["commands_executed"] == 3 + assert result.data["failed_command"] == "overwrite.cli.url" + assert "overwritehost" in result.data["logs"] + assert "overwriteprotocol" in result.data["logs"] + assert "overwrite.cli.url" in result.data["logs"] + + # ========================================================================= + # TIMEOUT HANDLING TESTS + # ========================================================================= + + @pytest.mark.asyncio + async def test_timeout_handling(self, executor_with_temp_stack): + """Test that asyncio.TimeoutError is caught and returns failure.""" + with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run: + mock_run.side_effect = asyncio.TimeoutError() + + result = await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com" + }) + + assert result.success is False + assert "timed out" in result.error.lower() + assert result.data["commands_executed"] == 0 + assert result.duration_ms is not None + + @pytest.mark.asyncio + async def test_timeout_after_partial_success(self, executor_with_temp_stack): + """Test timeout after some commands succeeded.""" + call_count = 0 + + async def mock_occ(compose_file, occ_args, timeout): + nonlocal call_count + call_count += 1 + if call_count == 1: + return (0, "success", "") + raise asyncio.TimeoutError() + + with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=mock_occ): + result = await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com" + }) + + assert result.success is False + assert "timed out" in result.error.lower() + assert result.data["commands_executed"] == 1 + assert "overwritehost" in result.data["logs"] + + # ========================================================================= + # EXCEPTION HANDLING TESTS + # ========================================================================= + + @pytest.mark.asyncio + async def test_unexpected_exception_handling(self, executor_with_temp_stack): + """Test that unexpected exceptions are caught and returned.""" + with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run: + mock_run.side_effect = RuntimeError("Unexpected docker error") + + result = await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com" + }) + + assert result.success is False + assert "Unexpected docker error" in result.error + assert result.duration_ms is not None + + # ========================================================================= + # RESULT STRUCTURE TESTS + # ========================================================================= + + @pytest.mark.asyncio + async def test_result_structure_on_success(self, executor_with_temp_stack): + """Test that successful result contains all expected keys.""" + with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run: + mock_run.return_value = (0, "success", "") + + result = await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com" + }) + + assert result.success is True + assert "public_url" in result.data + assert "host" in result.data + assert "scheme" in result.data + assert "commands_executed" in result.data + assert "logs" in result.data + assert result.error is None + assert result.duration_ms is not None + + @pytest.mark.asyncio + async def test_result_structure_on_failure(self, executor_with_temp_stack): + """Test that failure result contains all expected keys including failed_command.""" + with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run: + mock_run.return_value = (1, "", "error") + + result = await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com" + }) + + assert result.success is False + assert "public_url" in result.data + assert "host" in result.data + assert "scheme" in result.data + assert "commands_executed" in result.data + assert "failed_command" in result.data + assert "failed_args" in result.data + assert "logs" in result.data + assert result.error is not None + assert result.duration_ms is not None + + # ========================================================================= + # OUTPUT COMBINATION TESTS + # ========================================================================= + + def test_combine_output_both(self, executor): + """Test combining stdout and stderr.""" + result = executor._combine_output("stdout content", "stderr content") + assert "stdout content" in result + assert "stderr content" in result + + def test_combine_output_stdout_only(self, executor): + """Test combining with only stdout.""" + result = executor._combine_output("stdout content", "") + assert result == "stdout content" + + def test_combine_output_stderr_only(self, executor): + """Test combining with only stderr.""" + result = executor._combine_output("", "stderr content") + assert result == "stderr content" + + def test_combine_output_empty(self, executor): + """Test combining empty outputs.""" + result = executor._combine_output("", "") + assert result == "" + + # ========================================================================= + # FIND COMPOSE FILE TESTS + # ========================================================================= + + def test_find_compose_file_docker_compose_yml(self, executor, tmp_path): + """Test finding docker-compose.yml.""" + (tmp_path / "docker-compose.yml").write_text("version: '3'\n") + result = executor._find_compose_file(tmp_path) + assert result == tmp_path / "docker-compose.yml" + + def test_find_compose_file_compose_yml(self, executor, tmp_path): + """Test finding compose.yml as fallback.""" + (tmp_path / "compose.yml").write_text("version: '3'\n") + result = executor._find_compose_file(tmp_path) + assert result == tmp_path / "compose.yml" + + def test_find_compose_file_prefers_docker_compose_yml(self, executor, tmp_path): + """Test that docker-compose.yml is preferred over compose.yml.""" + (tmp_path / "docker-compose.yml").write_text("version: '3'\n") + (tmp_path / "compose.yml").write_text("version: '3'\n") + result = executor._find_compose_file(tmp_path) + assert result == tmp_path / "docker-compose.yml" + + def test_find_compose_file_not_found(self, executor, tmp_path): + """Test returning None when no compose file found.""" + result = executor._find_compose_file(tmp_path) + assert result is None + + # ========================================================================= + # CUSTOM TIMEOUT TESTS + # ========================================================================= + + @pytest.mark.asyncio + async def test_custom_timeout_passed(self, executor_with_temp_stack): + """Test that custom timeout from payload is passed to _run_occ_command.""" + received_timeouts = [] + + async def capture_timeout(compose_file, occ_args, timeout): + received_timeouts.append(timeout) + return (0, "success", "") + + with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=capture_timeout): + await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com", + "timeout": 120, + }) + + assert all(t == 120 for t in received_timeouts) + + @pytest.mark.asyncio + async def test_default_timeout_used(self, executor_with_temp_stack): + """Test that default timeout is used when not specified in payload.""" + received_timeouts = [] + + async def capture_timeout(compose_file, occ_args, timeout): + received_timeouts.append(timeout) + return (0, "success", "") + + with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=capture_timeout): + await executor_with_temp_stack.execute({ + "public_url": "https://cloud.example.com", + }) + + assert all(t == executor_with_temp_stack.DEFAULT_COMMAND_TIMEOUT for t in received_timeouts)