Add NEXTCLOUD_SET_DOMAIN executor for domain configuration

Implements a Nextcloud-specific executor that accepts a high-level
public_url payload and runs the appropriate occ config:system:set
commands via docker compose exec. The Orchestrator remains unaware
of container names, occ paths, and docker-compose syntax.

Features:
- Task type: NEXTCLOUD_SET_DOMAIN
- Payload: { "public_url": "https://cloud.example.com" }
- Parses URL into scheme and host, defaults to https if not provided
- Strips trailing slashes from URLs
- Runs three occ commands: overwritehost, overwriteprotocol, overwrite.cli.url
- Returns partial results with failed_args for debugging on failure
- Configurable constants for stack dir, service name, occ path, and user

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Matt 2025-12-04 14:01:41 +01:00
parent e8eae5a8e0
commit 47b3422829
3 changed files with 885 additions and 0 deletions

View File

@ -10,6 +10,7 @@ from app.executors.env_inspect_executor import EnvInspectExecutor
from app.executors.env_update_executor import EnvUpdateExecutor from app.executors.env_update_executor import EnvUpdateExecutor
from app.executors.file_executor import FileExecutor from app.executors.file_executor import FileExecutor
from app.executors.file_inspect_executor import FileInspectExecutor from app.executors.file_inspect_executor import FileInspectExecutor
from app.executors.nextcloud_executor import NextcloudSetDomainExecutor
from app.executors.playwright_executor import PlaywrightExecutor from app.executors.playwright_executor import PlaywrightExecutor
from app.executors.shell_executor import ShellExecutor from app.executors.shell_executor import ShellExecutor
@ -24,6 +25,7 @@ EXECUTOR_REGISTRY: dict[str, Type[BaseExecutor]] = {
"DOCKER_RELOAD": DockerExecutor, "DOCKER_RELOAD": DockerExecutor,
"COMPOSITE": CompositeExecutor, "COMPOSITE": CompositeExecutor,
"PLAYWRIGHT": PlaywrightExecutor, "PLAYWRIGHT": PlaywrightExecutor,
"NEXTCLOUD_SET_DOMAIN": NextcloudSetDomainExecutor,
} }
@ -61,6 +63,7 @@ __all__ = [
"DockerExecutor", "DockerExecutor",
"CompositeExecutor", "CompositeExecutor",
"PlaywrightExecutor", "PlaywrightExecutor",
"NextcloudSetDomainExecutor",
"EXECUTOR_REGISTRY", "EXECUTOR_REGISTRY",
"get_executor", "get_executor",
] ]

View File

@ -0,0 +1,358 @@
"""Nextcloud domain configuration executor."""
import asyncio
import subprocess
import time
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from app.executors.base import BaseExecutor, ExecutionResult
class NextcloudSetDomainExecutor(BaseExecutor):
"""Execute Nextcloud domain configuration via occ commands.
This executor configures Nextcloud's external domain settings by running
occ config:system:set commands via docker compose exec. It keeps the
Orchestrator unaware of container names, occ paths, and docker-compose syntax.
Security measures:
- URL parsing with validation
- No shell=True, command list only
- Timeout enforcement on each subprocess
Payload:
{
"public_url": "https://cloud.example.com"
}
Result (success):
{
"public_url": "https://cloud.example.com",
"host": "cloud.example.com",
"scheme": "https",
"commands_executed": 3,
"logs": {
"overwritehost": "<stdout+stderr>",
"overwriteprotocol": "<stdout+stderr>",
"overwrite.cli.url": "<stdout+stderr>"
}
}
Result (failure):
{
"public_url": "https://cloud.example.com",
"host": "cloud.example.com",
"scheme": "https",
"commands_executed": 2,
"failed_command": "overwriteprotocol",
"failed_args": ["config:system:set", "overwriteprotocol", "--value=https"],
"logs": {...}
}
"""
# TODO: These constants may need adjustment based on actual Nextcloud stack setup
NEXTCLOUD_STACK_DIR = "/opt/letsbe/stacks/nextcloud"
NEXTCLOUD_SERVICE_NAME = "app"
NEXTCLOUD_OCC_PATH = "/var/www/html/occ"
NEXTCLOUD_USER = "www-data"
# Compose file search order (matches DockerExecutor)
COMPOSE_FILE_NAMES = ["docker-compose.yml", "compose.yml"]
# Default timeout for each occ command (seconds)
DEFAULT_COMMAND_TIMEOUT = 60
@property
def task_type(self) -> str:
return "NEXTCLOUD_SET_DOMAIN"
async def execute(self, payload: dict[str, Any]) -> ExecutionResult:
"""Execute Nextcloud domain configuration commands.
Runs three occ config:system:set commands to configure:
- overwritehost: The domain/host portion of the URL
- overwriteprotocol: The scheme (http/https)
- overwrite.cli.url: The full public URL
Args:
payload: Must contain "public_url", optionally "timeout"
Returns:
ExecutionResult with configuration confirmation and logs
"""
self.validate_payload(payload, ["public_url"])
public_url = payload["public_url"]
timeout = payload.get("timeout", self.DEFAULT_COMMAND_TIMEOUT)
# Parse URL into components
try:
scheme, host, normalized_url = self._parse_public_url(public_url)
except ValueError as e:
return ExecutionResult(
success=False,
data={"public_url": public_url},
error=str(e),
)
# Find compose file in the Nextcloud stack directory
stack_dir = Path(self.NEXTCLOUD_STACK_DIR)
compose_file = self._find_compose_file(stack_dir)
if compose_file is None:
self.logger.warning("nextcloud_compose_not_found", dir=self.NEXTCLOUD_STACK_DIR)
return ExecutionResult(
success=False,
data={"public_url": public_url, "host": host, "scheme": scheme},
error=f"Nextcloud compose file not found in {self.NEXTCLOUD_STACK_DIR}. "
f"Looked for: {', '.join(self.COMPOSE_FILE_NAMES)}",
)
self.logger.info(
"nextcloud_setting_domain",
public_url=normalized_url,
host=host,
scheme=scheme,
compose_file=str(compose_file),
)
start_time = time.time()
logs: dict[str, str] = {}
commands_executed = 0
# Define the three occ commands to run
occ_commands = [
("overwritehost", ["config:system:set", "overwritehost", f"--value={host}"]),
("overwriteprotocol", ["config:system:set", "overwriteprotocol", f"--value={scheme}"]),
("overwrite.cli.url", ["config:system:set", "overwrite.cli.url", f"--value={normalized_url}"]),
]
try:
for cmd_name, occ_args in occ_commands:
exit_code, stdout, stderr = await self._run_occ_command(
compose_file,
occ_args,
timeout,
)
logs[cmd_name] = self._combine_output(stdout, stderr)
commands_executed += 1
if exit_code != 0:
duration_ms = (time.time() - start_time) * 1000
self.logger.warning(
"nextcloud_occ_command_failed",
command=cmd_name,
occ_args=occ_args,
exit_code=exit_code,
stderr=stderr[:500] if stderr else None,
)
return ExecutionResult(
success=False,
data={
"public_url": normalized_url,
"host": host,
"scheme": scheme,
"commands_executed": commands_executed,
"failed_command": cmd_name,
"failed_args": occ_args,
"logs": logs,
},
error=f"occ {cmd_name} failed with exit code {exit_code}",
duration_ms=duration_ms,
)
duration_ms = (time.time() - start_time) * 1000
self.logger.info(
"nextcloud_domain_set",
public_url=normalized_url,
host=host,
scheme=scheme,
commands_executed=commands_executed,
duration_ms=duration_ms,
)
return ExecutionResult(
success=True,
data={
"public_url": normalized_url,
"host": host,
"scheme": scheme,
"commands_executed": commands_executed,
"logs": logs,
},
duration_ms=duration_ms,
)
except asyncio.TimeoutError:
duration_ms = (time.time() - start_time) * 1000
self.logger.error(
"nextcloud_timeout",
public_url=normalized_url,
timeout=timeout,
commands_executed=commands_executed,
)
return ExecutionResult(
success=False,
data={
"public_url": normalized_url,
"host": host,
"scheme": scheme,
"commands_executed": commands_executed,
"logs": logs,
},
error=f"Nextcloud occ operation timed out after {timeout} seconds",
duration_ms=duration_ms,
)
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
self.logger.error(
"nextcloud_error",
public_url=normalized_url,
error=str(e),
commands_executed=commands_executed,
)
return ExecutionResult(
success=False,
data={
"public_url": normalized_url,
"host": host,
"scheme": scheme,
"commands_executed": commands_executed,
"logs": logs,
},
error=str(e),
duration_ms=duration_ms,
)
def _parse_public_url(self, public_url: str) -> tuple[str, str, str]:
"""Parse public URL into scheme, host, and normalized URL.
Args:
public_url: Full URL like "https://cloud.example.com" or just "cloud.example.com"
Returns:
Tuple of (scheme, host, normalized_url)
- scheme: "http" or "https" (defaults to "https" if not provided)
- host: Domain with optional port (e.g., "cloud.example.com:8443")
- normalized_url: Full URL with trailing slash stripped
Raises:
ValueError: If URL is invalid or missing host
"""
if not public_url or not public_url.strip():
raise ValueError("public_url cannot be empty")
url = public_url.strip()
# Parse the URL
parsed = urlparse(url)
# Extract scheme, default to "https" if not provided
scheme = parsed.scheme if parsed.scheme else "https"
# Extract host (netloc includes port if present)
host = parsed.netloc
# Handle URLs without scheme (e.g., "cloud.example.com" or "cloud.example.com/path")
# urlparse treats "cloud.example.com" as a path, not netloc
if not host and not parsed.scheme:
# The URL was provided without a scheme, so we need to re-parse with scheme
url_with_scheme = f"https://{url}"
parsed = urlparse(url_with_scheme)
host = parsed.netloc
scheme = "https"
if not host:
raise ValueError(f"Invalid URL - no host found: {public_url}")
# Reconstruct normalized URL (with trailing slash stripped)
normalized_url = f"{scheme}://{host}"
if parsed.path and parsed.path != "/":
normalized_url += parsed.path.rstrip("/")
return scheme, host, normalized_url
def _find_compose_file(self, compose_dir: Path) -> Path | None:
"""Find compose file in the directory.
Searches in order: docker-compose.yml, compose.yml
Args:
compose_dir: Directory to search in
Returns:
Path to compose file, or None if not found
"""
for filename in self.COMPOSE_FILE_NAMES:
compose_file = compose_dir / filename
if compose_file.exists():
return compose_file
return None
def _combine_output(self, stdout: str, stderr: str) -> str:
"""Combine stdout and stderr into a single string.
Args:
stdout: Standard output
stderr: Standard error
Returns:
Combined output string
"""
parts = []
if stdout:
parts.append(stdout)
if stderr:
parts.append(stderr)
return "\n".join(parts)
async def _run_occ_command(
self,
compose_file: Path,
occ_args: list[str],
timeout: int,
) -> tuple[int, str, str]:
"""Run a Nextcloud occ command via docker compose exec.
Args:
compose_file: Path to the docker-compose file
occ_args: Arguments to pass to occ (e.g., ["config:system:set", "overwritehost", "--value=..."])
timeout: Operation timeout in seconds
Returns:
Tuple of (exit_code, stdout, stderr)
"""
def _run() -> tuple[int, str, str]:
# Build command: docker compose -f <file> exec -T --user <user> <service> php <occ_path> <args>
cmd = [
"docker",
"compose",
"-f",
str(compose_file),
"exec",
"-T", # Disable pseudo-TTY allocation
"--user",
self.NEXTCLOUD_USER,
self.NEXTCLOUD_SERVICE_NAME,
"php",
self.NEXTCLOUD_OCC_PATH,
] + occ_args
# Run command from stack directory, no shell=True
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
cwd=self.NEXTCLOUD_STACK_DIR,
)
return result.returncode, result.stdout, result.stderr
return await asyncio.wait_for(
asyncio.to_thread(_run),
timeout=timeout + 30, # Watchdog with buffer
)

View File

@ -0,0 +1,524 @@
"""Unit tests for NextcloudSetDomainExecutor."""
import asyncio
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Patch the logger before importing the executor
with patch("app.utils.logger.get_logger", return_value=MagicMock()):
from app.executors.nextcloud_executor import NextcloudSetDomainExecutor
class TestNextcloudSetDomainExecutor:
"""Tests for NextcloudSetDomainExecutor."""
@pytest.fixture
def executor(self):
"""Create executor with mocked logger."""
with patch("app.executors.base.get_logger", return_value=MagicMock()):
return NextcloudSetDomainExecutor()
@pytest.fixture
def temp_nextcloud_stack(self, tmp_path):
"""Create a temporary Nextcloud stack directory with compose file."""
stack_dir = tmp_path / "opt" / "letsbe" / "stacks" / "nextcloud"
stack_dir.mkdir(parents=True)
compose_file = stack_dir / "docker-compose.yml"
compose_file.write_text("""version: '3.8'
services:
app:
image: nextcloud:latest
""")
return stack_dir
@pytest.fixture
def executor_with_temp_stack(self, executor, temp_nextcloud_stack):
"""Configure executor to use temporary stack directory."""
executor.NEXTCLOUD_STACK_DIR = str(temp_nextcloud_stack)
return executor
# =========================================================================
# TASK TYPE TEST
# =========================================================================
def test_task_type(self, executor):
"""Test that task_type property returns correct value."""
assert executor.task_type == "NEXTCLOUD_SET_DOMAIN"
# =========================================================================
# URL PARSING TESTS
# =========================================================================
def test_parse_public_url_with_https(self, executor):
"""Test URL parsing with explicit https scheme."""
scheme, host, normalized_url = executor._parse_public_url("https://cloud.example.com")
assert scheme == "https"
assert host == "cloud.example.com"
assert normalized_url == "https://cloud.example.com"
def test_parse_public_url_with_http(self, executor):
"""Test URL parsing with http scheme."""
scheme, host, normalized_url = executor._parse_public_url("http://cloud.example.com")
assert scheme == "http"
assert host == "cloud.example.com"
assert normalized_url == "http://cloud.example.com"
def test_parse_public_url_with_port(self, executor):
"""Test URL parsing with port number."""
scheme, host, normalized_url = executor._parse_public_url("https://cloud.example.com:8443")
assert scheme == "https"
assert host == "cloud.example.com:8443"
assert normalized_url == "https://cloud.example.com:8443"
def test_parse_public_url_without_scheme_defaults_to_https(self, executor):
"""Test that URLs without scheme default to https."""
scheme, host, normalized_url = executor._parse_public_url("cloud.example.com")
assert scheme == "https"
assert host == "cloud.example.com"
assert normalized_url == "https://cloud.example.com"
def test_parse_public_url_trailing_slash_stripped(self, executor):
"""Test that trailing slash is stripped from URL."""
scheme, host, normalized_url = executor._parse_public_url("https://cloud.example.com/")
assert scheme == "https"
assert host == "cloud.example.com"
assert normalized_url == "https://cloud.example.com"
def test_parse_public_url_with_path(self, executor):
"""Test URL parsing with path (trailing slash stripped)."""
scheme, host, normalized_url = executor._parse_public_url("https://cloud.example.com/nextcloud/")
assert scheme == "https"
assert host == "cloud.example.com"
assert normalized_url == "https://cloud.example.com/nextcloud"
def test_parse_public_url_empty_raises_error(self, executor):
"""Test that empty URL raises ValueError."""
with pytest.raises(ValueError, match="cannot be empty"):
executor._parse_public_url("")
def test_parse_public_url_whitespace_only_raises_error(self, executor):
"""Test that whitespace-only URL raises ValueError."""
with pytest.raises(ValueError, match="cannot be empty"):
executor._parse_public_url(" ")
def test_parse_public_url_invalid_no_host_raises_error(self, executor):
"""Test that URL with no host raises ValueError."""
with pytest.raises(ValueError, match="no host found"):
executor._parse_public_url("https://")
# =========================================================================
# SUCCESS CASES
# =========================================================================
@pytest.mark.asyncio
async def test_success_all_commands(self, executor_with_temp_stack):
"""Test successful domain configuration with all three occ commands."""
with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run:
mock_run.return_value = (0, "System config value overwritehost set to cloud.example.com", "")
result = await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com"
})
assert result.success is True
assert result.data["public_url"] == "https://cloud.example.com"
assert result.data["host"] == "cloud.example.com"
assert result.data["scheme"] == "https"
assert result.data["commands_executed"] == 3
assert "overwritehost" in result.data["logs"]
assert "overwriteprotocol" in result.data["logs"]
assert "overwrite.cli.url" in result.data["logs"]
assert result.error is None
assert result.duration_ms is not None
# Verify all three commands were called
assert mock_run.call_count == 3
@pytest.mark.asyncio
async def test_success_with_http_scheme(self, executor_with_temp_stack):
"""Test successful configuration with http scheme."""
with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run:
mock_run.return_value = (0, "success", "")
result = await executor_with_temp_stack.execute({
"public_url": "http://cloud.example.com"
})
assert result.success is True
assert result.data["scheme"] == "http"
assert result.data["public_url"] == "http://cloud.example.com"
@pytest.mark.asyncio
async def test_success_url_without_scheme(self, executor_with_temp_stack):
"""Test successful configuration when URL lacks scheme (defaults to https)."""
with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run:
mock_run.return_value = (0, "success", "")
result = await executor_with_temp_stack.execute({
"public_url": "cloud.example.com"
})
assert result.success is True
assert result.data["scheme"] == "https"
assert result.data["host"] == "cloud.example.com"
assert result.data["public_url"] == "https://cloud.example.com"
# =========================================================================
# COMMAND ARGUMENTS TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_command_arguments_correct(self, executor_with_temp_stack):
"""Test that occ commands receive correct arguments."""
calls = []
async def capture_calls(compose_file, occ_args, timeout):
calls.append(occ_args)
return (0, "success", "")
with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=capture_calls):
await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com"
})
assert len(calls) == 3
assert calls[0] == ["config:system:set", "overwritehost", "--value=cloud.example.com"]
assert calls[1] == ["config:system:set", "overwriteprotocol", "--value=https"]
assert calls[2] == ["config:system:set", "overwrite.cli.url", "--value=https://cloud.example.com"]
@pytest.mark.asyncio
async def test_command_arguments_with_port(self, executor_with_temp_stack):
"""Test that host with port is passed correctly to occ command."""
calls = []
async def capture_calls(compose_file, occ_args, timeout):
calls.append(occ_args)
return (0, "success", "")
with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=capture_calls):
await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com:8443"
})
assert calls[0] == ["config:system:set", "overwritehost", "--value=cloud.example.com:8443"]
# =========================================================================
# PAYLOAD VALIDATION TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_missing_public_url_raises_error(self, executor_with_temp_stack):
"""Test that missing public_url raises ValueError."""
with pytest.raises(ValueError, match="Missing required fields"):
await executor_with_temp_stack.execute({})
@pytest.mark.asyncio
async def test_invalid_url_returns_failure(self, executor_with_temp_stack):
"""Test that invalid URL returns failure result."""
result = await executor_with_temp_stack.execute({
"public_url": ""
})
assert result.success is False
assert "cannot be empty" in result.error
# =========================================================================
# COMPOSE FILE TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_compose_file_not_found(self, executor, tmp_path):
"""Test failure when compose file doesn't exist."""
nonexistent_dir = tmp_path / "nonexistent"
nonexistent_dir.mkdir()
executor.NEXTCLOUD_STACK_DIR = str(nonexistent_dir)
result = await executor.execute({
"public_url": "https://cloud.example.com"
})
assert result.success is False
assert "compose file not found" in result.error.lower()
assert "docker-compose.yml" in result.error
@pytest.mark.asyncio
async def test_compose_yml_fallback(self, executor, tmp_path):
"""Test that compose.yml is used as fallback."""
stack_dir = tmp_path / "nextcloud"
stack_dir.mkdir()
(stack_dir / "compose.yml").write_text("version: '3'\n")
executor.NEXTCLOUD_STACK_DIR = str(stack_dir)
with patch.object(executor, "_run_occ_command") as mock_run:
mock_run.return_value = (0, "success", "")
result = await executor.execute({
"public_url": "https://cloud.example.com"
})
assert result.success is True
# =========================================================================
# COMMAND FAILURE TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_first_command_fails(self, executor_with_temp_stack):
"""Test that failure on first occ command returns partial results."""
with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run:
mock_run.return_value = (1, "", "Error: Unable to write config")
result = await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com"
})
assert result.success is False
assert result.data["commands_executed"] == 1
assert result.data["failed_command"] == "overwritehost"
assert result.data["failed_args"] == ["config:system:set", "overwritehost", "--value=cloud.example.com"]
assert "overwritehost" in result.data["logs"]
assert "occ overwritehost failed with exit code 1" in result.error
@pytest.mark.asyncio
async def test_second_command_fails(self, executor_with_temp_stack):
"""Test that failure on second occ command returns partial results."""
call_count = 0
async def mock_occ(compose_file, occ_args, timeout):
nonlocal call_count
call_count += 1
if call_count == 1:
return (0, "success", "") # First succeeds
return (1, "", "Error setting overwriteprotocol") # Second fails
with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=mock_occ):
result = await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com"
})
assert result.success is False
assert result.data["commands_executed"] == 2
assert result.data["failed_command"] == "overwriteprotocol"
assert "overwritehost" in result.data["logs"]
assert "overwriteprotocol" in result.data["logs"]
assert "overwrite.cli.url" not in result.data["logs"]
@pytest.mark.asyncio
async def test_third_command_fails(self, executor_with_temp_stack):
"""Test that failure on third occ command returns partial results."""
call_count = 0
async def mock_occ(compose_file, occ_args, timeout):
nonlocal call_count
call_count += 1
if call_count <= 2:
return (0, "success", "") # First two succeed
return (1, "", "Error setting overwrite.cli.url") # Third fails
with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=mock_occ):
result = await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com"
})
assert result.success is False
assert result.data["commands_executed"] == 3
assert result.data["failed_command"] == "overwrite.cli.url"
assert "overwritehost" in result.data["logs"]
assert "overwriteprotocol" in result.data["logs"]
assert "overwrite.cli.url" in result.data["logs"]
# =========================================================================
# TIMEOUT HANDLING TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_timeout_handling(self, executor_with_temp_stack):
"""Test that asyncio.TimeoutError is caught and returns failure."""
with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run:
mock_run.side_effect = asyncio.TimeoutError()
result = await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com"
})
assert result.success is False
assert "timed out" in result.error.lower()
assert result.data["commands_executed"] == 0
assert result.duration_ms is not None
@pytest.mark.asyncio
async def test_timeout_after_partial_success(self, executor_with_temp_stack):
"""Test timeout after some commands succeeded."""
call_count = 0
async def mock_occ(compose_file, occ_args, timeout):
nonlocal call_count
call_count += 1
if call_count == 1:
return (0, "success", "")
raise asyncio.TimeoutError()
with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=mock_occ):
result = await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com"
})
assert result.success is False
assert "timed out" in result.error.lower()
assert result.data["commands_executed"] == 1
assert "overwritehost" in result.data["logs"]
# =========================================================================
# EXCEPTION HANDLING TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_unexpected_exception_handling(self, executor_with_temp_stack):
"""Test that unexpected exceptions are caught and returned."""
with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run:
mock_run.side_effect = RuntimeError("Unexpected docker error")
result = await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com"
})
assert result.success is False
assert "Unexpected docker error" in result.error
assert result.duration_ms is not None
# =========================================================================
# RESULT STRUCTURE TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_result_structure_on_success(self, executor_with_temp_stack):
"""Test that successful result contains all expected keys."""
with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run:
mock_run.return_value = (0, "success", "")
result = await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com"
})
assert result.success is True
assert "public_url" in result.data
assert "host" in result.data
assert "scheme" in result.data
assert "commands_executed" in result.data
assert "logs" in result.data
assert result.error is None
assert result.duration_ms is not None
@pytest.mark.asyncio
async def test_result_structure_on_failure(self, executor_with_temp_stack):
"""Test that failure result contains all expected keys including failed_command."""
with patch.object(executor_with_temp_stack, "_run_occ_command") as mock_run:
mock_run.return_value = (1, "", "error")
result = await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com"
})
assert result.success is False
assert "public_url" in result.data
assert "host" in result.data
assert "scheme" in result.data
assert "commands_executed" in result.data
assert "failed_command" in result.data
assert "failed_args" in result.data
assert "logs" in result.data
assert result.error is not None
assert result.duration_ms is not None
# =========================================================================
# OUTPUT COMBINATION TESTS
# =========================================================================
def test_combine_output_both(self, executor):
"""Test combining stdout and stderr."""
result = executor._combine_output("stdout content", "stderr content")
assert "stdout content" in result
assert "stderr content" in result
def test_combine_output_stdout_only(self, executor):
"""Test combining with only stdout."""
result = executor._combine_output("stdout content", "")
assert result == "stdout content"
def test_combine_output_stderr_only(self, executor):
"""Test combining with only stderr."""
result = executor._combine_output("", "stderr content")
assert result == "stderr content"
def test_combine_output_empty(self, executor):
"""Test combining empty outputs."""
result = executor._combine_output("", "")
assert result == ""
# =========================================================================
# FIND COMPOSE FILE TESTS
# =========================================================================
def test_find_compose_file_docker_compose_yml(self, executor, tmp_path):
"""Test finding docker-compose.yml."""
(tmp_path / "docker-compose.yml").write_text("version: '3'\n")
result = executor._find_compose_file(tmp_path)
assert result == tmp_path / "docker-compose.yml"
def test_find_compose_file_compose_yml(self, executor, tmp_path):
"""Test finding compose.yml as fallback."""
(tmp_path / "compose.yml").write_text("version: '3'\n")
result = executor._find_compose_file(tmp_path)
assert result == tmp_path / "compose.yml"
def test_find_compose_file_prefers_docker_compose_yml(self, executor, tmp_path):
"""Test that docker-compose.yml is preferred over compose.yml."""
(tmp_path / "docker-compose.yml").write_text("version: '3'\n")
(tmp_path / "compose.yml").write_text("version: '3'\n")
result = executor._find_compose_file(tmp_path)
assert result == tmp_path / "docker-compose.yml"
def test_find_compose_file_not_found(self, executor, tmp_path):
"""Test returning None when no compose file found."""
result = executor._find_compose_file(tmp_path)
assert result is None
# =========================================================================
# CUSTOM TIMEOUT TESTS
# =========================================================================
@pytest.mark.asyncio
async def test_custom_timeout_passed(self, executor_with_temp_stack):
"""Test that custom timeout from payload is passed to _run_occ_command."""
received_timeouts = []
async def capture_timeout(compose_file, occ_args, timeout):
received_timeouts.append(timeout)
return (0, "success", "")
with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=capture_timeout):
await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com",
"timeout": 120,
})
assert all(t == 120 for t in received_timeouts)
@pytest.mark.asyncio
async def test_default_timeout_used(self, executor_with_temp_stack):
"""Test that default timeout is used when not specified in payload."""
received_timeouts = []
async def capture_timeout(compose_file, occ_args, timeout):
received_timeouts.append(timeout)
return (0, "success", "")
with patch.object(executor_with_temp_stack, "_run_occ_command", side_effect=capture_timeout):
await executor_with_temp_stack.execute({
"public_url": "https://cloud.example.com",
})
assert all(t == executor_with_temp_stack.DEFAULT_COMMAND_TIMEOUT for t in received_timeouts)