From e8eae5a8e09d3634c60eadafafaf44203b2bb66f Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 4 Dec 2025 00:47:08 +0100 Subject: [PATCH] Add ENV_INSPECT and FILE_INSPECT executors with tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add EnvInspectExecutor to read ENV files and return key-value pairs - Add FileInspectExecutor to read portions of text files (up to 1MB) - Add FileExecutor tests including /opt/letsbe/config path verification - Register new executors in EXECUTOR_REGISTRY πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- app/executors/__init__.py | 6 + app/executors/env_inspect_executor.py | 161 +++++++ app/executors/file_inspect_executor.py | 153 +++++++ tests/executors/test_env_inspect_executor.py | 403 +++++++++++++++++ tests/executors/test_file_executor.py | 253 +++++++++++ tests/executors/test_file_inspect_executor.py | 412 ++++++++++++++++++ 6 files changed, 1388 insertions(+) create mode 100644 app/executors/env_inspect_executor.py create mode 100644 app/executors/file_inspect_executor.py create mode 100644 tests/executors/test_env_inspect_executor.py create mode 100644 tests/executors/test_file_executor.py create mode 100644 tests/executors/test_file_inspect_executor.py diff --git a/app/executors/__init__.py b/app/executors/__init__.py index 0a376e6..51b266c 100644 --- a/app/executors/__init__.py +++ b/app/executors/__init__.py @@ -6,8 +6,10 @@ from app.executors.base import BaseExecutor, ExecutionResult from app.executors.composite_executor import CompositeExecutor from app.executors.docker_executor import DockerExecutor from app.executors.echo_executor import EchoExecutor +from app.executors.env_inspect_executor import EnvInspectExecutor from app.executors.env_update_executor import EnvUpdateExecutor from app.executors.file_executor import FileExecutor +from app.executors.file_inspect_executor import FileInspectExecutor from app.executors.playwright_executor import PlaywrightExecutor from app.executors.shell_executor import ShellExecutor @@ -17,6 +19,8 @@ EXECUTOR_REGISTRY: dict[str, Type[BaseExecutor]] = { "SHELL": ShellExecutor, "FILE_WRITE": FileExecutor, "ENV_UPDATE": EnvUpdateExecutor, + "ENV_INSPECT": EnvInspectExecutor, + "FILE_INSPECT": FileInspectExecutor, "DOCKER_RELOAD": DockerExecutor, "COMPOSITE": CompositeExecutor, "PLAYWRIGHT": PlaywrightExecutor, @@ -51,7 +55,9 @@ __all__ = [ "EchoExecutor", "ShellExecutor", "FileExecutor", + "FileInspectExecutor", "EnvUpdateExecutor", + "EnvInspectExecutor", "DockerExecutor", "CompositeExecutor", "PlaywrightExecutor", diff --git a/app/executors/env_inspect_executor.py b/app/executors/env_inspect_executor.py new file mode 100644 index 0000000..ca2d52c --- /dev/null +++ b/app/executors/env_inspect_executor.py @@ -0,0 +1,161 @@ +"""ENV file inspection executor for reading current values.""" + +import time +from typing import Any + +from app.config import get_settings +from app.executors.base import BaseExecutor, ExecutionResult +from app.utils.validation import ValidationError, validate_file_path + + +class EnvInspectExecutor(BaseExecutor): + """Read ENV files to inspect current values. + + Security measures: + - Path validation against allowed env root (/opt/letsbe/env) + - Directory traversal prevention + - File must exist (no blind path probing) + - Read-only operation (no file modification) + + Payload: + { + "path": "/opt/letsbe/env/chatwoot.env", + "keys": ["FRONTEND_URL", "BACKEND_URL"] # optional, null returns all + } + + Result (success): + { + "path": "/opt/letsbe/env/chatwoot.env", + "keys": { + "FRONTEND_URL": "https://...", + "BACKEND_URL": "https://..." + } + } + """ + + @property + def task_type(self) -> str: + return "ENV_INSPECT" + + async def execute(self, payload: dict[str, Any]) -> ExecutionResult: + """Read ENV file and return current key-value pairs. + + Args: + payload: Must contain "path", optionally "keys" to filter + + Returns: + ExecutionResult with dict of key-value pairs + """ + # Path is always required + if "path" not in payload: + raise ValueError("Missing required field: path") + + settings = get_settings() + + file_path = payload["path"] + requested_keys = payload.get("keys") + + # Validate keys is a list if provided + if requested_keys is not None and not isinstance(requested_keys, list): + return ExecutionResult( + success=False, + data={}, + error="'keys' must be a list of key names or null", + ) + + # Validate path is under allowed env root + try: + validated_path = validate_file_path( + file_path, + settings.allowed_env_root, + must_exist=True, # File MUST exist for inspect + ) + except ValidationError as e: + self.logger.warning("env_path_validation_failed", path=file_path, error=str(e)) + return ExecutionResult( + success=False, + data={}, + error=f"Path validation failed: {e}", + ) + + self.logger.info( + "env_inspecting", + path=str(validated_path), + filter_keys=requested_keys, + ) + + start_time = time.time() + + try: + # Read and parse the ENV file + content = validated_path.read_text(encoding="utf-8") + all_keys = self._parse_env_file(content) + + # Filter keys if requested + if requested_keys is None: + result_keys = all_keys + else: + # Return only requested keys that exist (ignore unknown) + result_keys = {k: v for k, v in all_keys.items() if k in requested_keys} + + duration_ms = (time.time() - start_time) * 1000 + + self.logger.info( + "env_inspected", + path=str(validated_path), + keys_returned=len(result_keys), + duration_ms=duration_ms, + ) + + return ExecutionResult( + success=True, + data={ + "path": str(validated_path), + "keys": result_keys, + }, + duration_ms=duration_ms, + ) + + except Exception as e: + duration_ms = (time.time() - start_time) * 1000 + self.logger.error("env_inspect_error", path=str(validated_path), error=str(e)) + return ExecutionResult( + success=False, + data={}, + error=str(e), + duration_ms=duration_ms, + ) + + def _parse_env_file(self, content: str) -> dict[str, str]: + """Parse ENV file content into key-value dict. + + Handles: + - KEY=value format + - Lines starting with # (comments) + - Empty lines + - Whitespace trimming + - Quoted values (single and double quotes) + + Args: + content: Raw ENV file content + + Returns: + Dict of key-value pairs + """ + env_dict = {} + for line in content.splitlines(): + line = line.strip() + # Skip empty lines and comments + if not line or line.startswith("#"): + continue + # Split on first = only + if "=" in line: + key, value = line.split("=", 1) + key = key.strip() + value = value.strip() + # Remove surrounding quotes if present + if (value.startswith('"') and value.endswith('"')) or \ + (value.startswith("'") and value.endswith("'")): + value = value[1:-1] + env_dict[key] = value + return env_dict diff --git a/app/executors/file_inspect_executor.py b/app/executors/file_inspect_executor.py new file mode 100644 index 0000000..21e5cbc --- /dev/null +++ b/app/executors/file_inspect_executor.py @@ -0,0 +1,153 @@ +"""File inspection executor for reading portions of text files.""" + +import time +from typing import Any + +from app.config import get_settings +from app.executors.base import BaseExecutor, ExecutionResult +from app.utils.validation import ValidationError, validate_file_path + + +class FileInspectExecutor(BaseExecutor): + """Read portions of files for inspection. + + Security measures: + - Path validation against allowed file root (/opt/letsbe) + - Directory traversal prevention + - File must exist (no blind path probing) + - Read-only operation (no file modification) + - Byte limit enforced (max 1MB) + + Payload: + { + "path": "/opt/letsbe/env/chatwoot.env", + "max_bytes": 4096 # optional, default 4096, max 1MB + } + + Result (success): + { + "path": "/opt/letsbe/env/chatwoot.env", + "bytes_read": 123, + "truncated": false, + "content": "..." + } + """ + + # Default and maximum byte limits + DEFAULT_MAX_BYTES = 4096 + ABSOLUTE_MAX_BYTES = 1_048_576 # 1 MB + + @property + def task_type(self) -> str: + return "FILE_INSPECT" + + async def execute(self, payload: dict[str, Any]) -> ExecutionResult: + """Read file content up to max_bytes. + + Args: + payload: Must contain "path", optionally "max_bytes" + + Returns: + ExecutionResult with file content and metadata + """ + # Path is always required + if "path" not in payload: + raise ValueError("Missing required field: path") + + settings = get_settings() + + raw_path = payload["path"] + max_bytes = payload.get("max_bytes", self.DEFAULT_MAX_BYTES) + + # Validate max_bytes is a valid integer + try: + max_bytes_int = int(max_bytes) + except (TypeError, ValueError): + return ExecutionResult( + success=False, + data={}, + error=f"Invalid max_bytes value: {max_bytes!r}", + ) + + # Validate max_bytes is within allowed range + if max_bytes_int <= 0 or max_bytes_int > self.ABSOLUTE_MAX_BYTES: + return ExecutionResult( + success=False, + data={}, + error=f"max_bytes must be between 1 and {self.ABSOLUTE_MAX_BYTES}", + ) + + # Validate path is under allowed file root + try: + validated_path = validate_file_path( + raw_path, + settings.allowed_file_root, + must_exist=True, + ) + except ValidationError as e: + self.logger.warning("file_path_validation_failed", path=raw_path, error=str(e)) + return ExecutionResult( + success=False, + data={}, + error=f"Path validation failed: {e}", + ) + + self.logger.info( + "file_inspecting", + path=str(validated_path), + max_bytes=max_bytes_int, + ) + + start_time = time.time() + + try: + # Read up to max_bytes + 1 to detect truncation + with validated_path.open("rb") as f: + content_bytes = f.read(max_bytes_int + 1) + + truncated = len(content_bytes) > max_bytes_int + if truncated: + content_bytes = content_bytes[:max_bytes_int] + + # Decode as UTF-8 with errors replaced + content_text = content_bytes.decode("utf-8", errors="replace") + + duration_ms = (time.time() - start_time) * 1000 + + self.logger.info( + "file_inspected", + path=str(validated_path), + bytes_read=len(content_bytes), + truncated=truncated, + duration_ms=duration_ms, + ) + + return ExecutionResult( + success=True, + data={ + "path": str(validated_path), + "bytes_read": len(content_bytes), + "truncated": truncated, + "content": content_text, + }, + duration_ms=duration_ms, + ) + + except OSError as e: + duration_ms = (time.time() - start_time) * 1000 + self.logger.error("file_inspect_read_error", path=str(validated_path), error=str(e)) + return ExecutionResult( + success=False, + data={}, + error=f"Failed to read file: {e}", + duration_ms=duration_ms, + ) + except Exception as e: + duration_ms = (time.time() - start_time) * 1000 + self.logger.error("file_inspect_error", path=str(validated_path), error=str(e)) + return ExecutionResult( + success=False, + data={}, + error=str(e), + duration_ms=duration_ms, + ) diff --git a/tests/executors/test_env_inspect_executor.py b/tests/executors/test_env_inspect_executor.py new file mode 100644 index 0000000..e94d11b --- /dev/null +++ b/tests/executors/test_env_inspect_executor.py @@ -0,0 +1,403 @@ +"""Unit tests for EnvInspectExecutor.""" + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# Patch the logger before importing the executor +with patch("app.utils.logger.get_logger", return_value=MagicMock()): + from app.executors.env_inspect_executor import EnvInspectExecutor + + +class TestEnvInspectExecutor: + """Test suite for EnvInspectExecutor.""" + + @pytest.fixture + def executor(self): + """Create executor instance with mocked logger.""" + with patch("app.executors.base.get_logger", return_value=MagicMock()): + return EnvInspectExecutor() + + @pytest.fixture + def temp_env_root(self, tmp_path): + """Create a temporary directory to act as /opt/letsbe/env.""" + env_dir = tmp_path / "opt" / "letsbe" / "env" + env_dir.mkdir(parents=True) + return env_dir + + @pytest.fixture + def mock_settings(self, temp_env_root): + """Mock settings with temporary env root.""" + settings = MagicMock() + settings.allowed_env_root = str(temp_env_root) + return settings + + # ==================== Basic Inspection Tests ==================== + + @pytest.mark.asyncio + async def test_inspect_all_keys(self, executor, temp_env_root, mock_settings): + """Test reading all keys when no filter is provided.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text("KEY1=value1\nKEY2=value2\nKEY3=value3\n") + + result = await executor.execute({ + "path": str(env_path), + }) + + assert result.success is True + assert result.data["path"] == str(env_path) + assert result.data["keys"] == { + "KEY1": "value1", + "KEY2": "value2", + "KEY3": "value3", + } + + @pytest.mark.asyncio + async def test_inspect_selected_keys(self, executor, temp_env_root, mock_settings): + """Test reading only selected keys.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text("KEY1=value1\nKEY2=value2\nKEY3=value3\n") + + result = await executor.execute({ + "path": str(env_path), + "keys": ["KEY1", "KEY3"], + }) + + assert result.success is True + assert result.data["keys"] == { + "KEY1": "value1", + "KEY3": "value3", + } + + @pytest.mark.asyncio + async def test_inspect_selected_keys_ignores_unknown(self, executor, temp_env_root, mock_settings): + """Test that unknown keys in filter are silently ignored.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text("KEY1=value1\nKEY2=value2\n") + + result = await executor.execute({ + "path": str(env_path), + "keys": ["KEY1", "NONEXISTENT", "ALSO_MISSING"], + }) + + assert result.success is True + assert result.data["keys"] == {"KEY1": "value1"} + + @pytest.mark.asyncio + async def test_inspect_empty_keys_filter(self, executor, temp_env_root, mock_settings): + """Test with empty keys filter returns nothing.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text("KEY1=value1\n") + + result = await executor.execute({ + "path": str(env_path), + "keys": [], + }) + + assert result.success is True + assert result.data["keys"] == {} + + @pytest.mark.asyncio + async def test_inspect_with_keys_null(self, executor, temp_env_root, mock_settings): + """Test that keys=null returns all keys (same as omitting keys).""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text("KEY1=value1\nKEY2=value2\n") + + result = await executor.execute({ + "path": str(env_path), + "keys": None, + }) + + assert result.success is True + assert result.data["keys"] == {"KEY1": "value1", "KEY2": "value2"} + + # ==================== File Not Found Tests ==================== + + @pytest.mark.asyncio + async def test_missing_file(self, executor, temp_env_root, mock_settings): + """Test error when file does not exist.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "nonexistent.env" + + result = await executor.execute({ + "path": str(env_path), + }) + + assert result.success is False + assert "does not exist" in result.error or "Path validation failed" in result.error + + # ==================== Path Validation Tests ==================== + + @pytest.mark.asyncio + async def test_path_traversal_rejected(self, executor, temp_env_root, mock_settings): + """Test rejection of directory traversal attempts.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + result = await executor.execute({ + "path": str(temp_env_root / ".." / ".." / "etc" / "passwd"), + }) + + assert result.success is False + assert "Path validation failed" in result.error or "traversal" in result.error.lower() + + @pytest.mark.asyncio + async def test_path_outside_allowed_root(self, executor, temp_env_root, mock_settings): + """Test rejection of paths outside allowed root.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + result = await executor.execute({ + "path": "/etc/passwd", + }) + + assert result.success is False + assert "Path validation failed" in result.error + + @pytest.mark.asyncio + async def test_path_in_allowed_root_nested(self, executor, temp_env_root, mock_settings): + """Test acceptance of valid nested path within allowed root.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + nested_dir = temp_env_root / "subdir" + nested_dir.mkdir() + env_path = nested_dir / "app.env" + env_path.write_text("KEY=value\n") + + result = await executor.execute({ + "path": str(env_path), + }) + + assert result.success is True + assert result.data["keys"] == {"KEY": "value"} + + # ==================== Payload Validation Tests ==================== + + @pytest.mark.asyncio + async def test_missing_path_payload(self, executor): + """Test rejection of payload without path.""" + with pytest.raises(ValueError, match="Missing required field: path"): + await executor.execute({}) + + @pytest.mark.asyncio + async def test_missing_path_payload_with_keys(self, executor): + """Test rejection of payload with keys but no path.""" + with pytest.raises(ValueError, match="Missing required field: path"): + await executor.execute({ + "keys": ["KEY1"], + }) + + @pytest.mark.asyncio + async def test_reject_invalid_keys_type(self, executor, temp_env_root, mock_settings): + """Test rejection when keys is not a list or null.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text("KEY=value\n") + + result = await executor.execute({ + "path": str(env_path), + "keys": {"not": "a_list"}, + }) + + assert result.success is False + assert "'keys' must be a list" in result.error + + @pytest.mark.asyncio + async def test_reject_keys_as_string(self, executor, temp_env_root, mock_settings): + """Test rejection when keys is a string instead of list.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text("KEY=value\n") + + result = await executor.execute({ + "path": str(env_path), + "keys": "KEY", + }) + + assert result.success is False + assert "'keys' must be a list" in result.error + + # ==================== Task Type Tests ==================== + + def test_task_type_property(self, executor): + """Test that task_type returns ENV_INSPECT.""" + assert executor.task_type == "ENV_INSPECT" + + # ==================== Registry Integration Tests ==================== + + def test_registry_integration(self): + """Test that ENV_INSPECT is registered in executor registry.""" + from app.executors import get_executor + + executor = get_executor("ENV_INSPECT") + assert executor is not None + assert executor.task_type == "ENV_INSPECT" + + # ==================== Empty File Tests ==================== + + @pytest.mark.asyncio + async def test_empty_file(self, executor, temp_env_root, mock_settings): + """Test reading an empty ENV file returns empty keys dict.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "empty.env" + env_path.write_text("") + + result = await executor.execute({ + "path": str(env_path), + }) + + assert result.success is True + assert result.data["keys"] == {} + + @pytest.mark.asyncio + async def test_file_with_only_comments(self, executor, temp_env_root, mock_settings): + """Test reading file with only comments returns empty keys dict.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "comments.env" + env_path.write_text("# This is a comment\n# Another comment\n") + + result = await executor.execute({ + "path": str(env_path), + }) + + assert result.success is True + assert result.data["keys"] == {} + + # ==================== Parsing Tests ==================== + + @pytest.mark.asyncio + async def test_parses_quoted_values_double(self, executor, temp_env_root, mock_settings): + """Test parsing of double-quoted values.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text('KEY="value with spaces"\n') + + result = await executor.execute({ + "path": str(env_path), + }) + + assert result.success is True + assert result.data["keys"]["KEY"] == "value with spaces" + + @pytest.mark.asyncio + async def test_parses_quoted_values_single(self, executor, temp_env_root, mock_settings): + """Test parsing of single-quoted values.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text("KEY='single quoted'\n") + + result = await executor.execute({ + "path": str(env_path), + }) + + assert result.success is True + assert result.data["keys"]["KEY"] == "single quoted" + + @pytest.mark.asyncio + async def test_parses_value_with_equals(self, executor, temp_env_root, mock_settings): + """Test parsing values containing equals signs.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text("URL=postgres://user:pass@host/db?opt=val\n") + + result = await executor.execute({ + "path": str(env_path), + }) + + assert result.success is True + assert result.data["keys"]["URL"] == "postgres://user:pass@host/db?opt=val" + + @pytest.mark.asyncio + async def test_ignores_comment_lines(self, executor, temp_env_root, mock_settings): + """Test that comment lines are ignored.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text("# Comment\nKEY=value\n# Another comment\nKEY2=value2\n") + + result = await executor.execute({ + "path": str(env_path), + }) + + assert result.success is True + assert result.data["keys"] == {"KEY": "value", "KEY2": "value2"} + + @pytest.mark.asyncio + async def test_ignores_empty_lines(self, executor, temp_env_root, mock_settings): + """Test that empty lines are ignored.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text("KEY1=value1\n\n\nKEY2=value2\n") + + result = await executor.execute({ + "path": str(env_path), + }) + + assert result.success is True + assert result.data["keys"] == {"KEY1": "value1", "KEY2": "value2"} + + @pytest.mark.asyncio + async def test_handles_whitespace_around_key_value(self, executor, temp_env_root, mock_settings): + """Test handling of whitespace around keys and values.""" + with patch("app.executors.env_inspect_executor.get_settings", return_value=mock_settings): + env_path = temp_env_root / "app.env" + env_path.write_text(" KEY1 = value1 \n") + + result = await executor.execute({ + "path": str(env_path), + }) + + assert result.success is True + assert result.data["keys"]["KEY1"] == "value1" + + +class TestEnvInspectExecutorInternal: + """Tests for internal methods of EnvInspectExecutor.""" + + @pytest.fixture + def executor(self): + """Create executor instance with mocked logger.""" + with patch("app.executors.base.get_logger", return_value=MagicMock()): + return EnvInspectExecutor() + + def test_parse_env_file_basic(self, executor): + """Test basic ENV file parsing.""" + content = "KEY1=value1\nKEY2=value2\n" + result = executor._parse_env_file(content) + assert result == {"KEY1": "value1", "KEY2": "value2"} + + def test_parse_env_file_with_comments(self, executor): + """Test parsing ignores comments.""" + content = "# Comment\nKEY=value\n# Another comment\n" + result = executor._parse_env_file(content) + assert result == {"KEY": "value"} + + def test_parse_env_file_with_empty_lines(self, executor): + """Test parsing ignores empty lines.""" + content = "KEY1=value1\n\n\nKEY2=value2\n" + result = executor._parse_env_file(content) + assert result == {"KEY1": "value1", "KEY2": "value2"} + + def test_parse_env_file_with_quotes(self, executor): + """Test parsing handles quoted values.""" + content = 'KEY1="quoted value"\nKEY2=\'single quoted\'\n' + result = executor._parse_env_file(content) + assert result == {"KEY1": "quoted value", "KEY2": "single quoted"} + + def test_parse_env_file_with_equals_in_value(self, executor): + """Test parsing handles equals signs in values.""" + content = "URL=postgres://user:pass@host/db?opt=val\n" + result = executor._parse_env_file(content) + assert result == {"URL": "postgres://user:pass@host/db?opt=val"} + + def test_parse_env_file_empty(self, executor): + """Test parsing empty content.""" + content = "" + result = executor._parse_env_file(content) + assert result == {} + + def test_parse_env_file_only_comments(self, executor): + """Test parsing content with only comments.""" + content = "# Comment 1\n# Comment 2\n" + result = executor._parse_env_file(content) + assert result == {} diff --git a/tests/executors/test_file_executor.py b/tests/executors/test_file_executor.py new file mode 100644 index 0000000..d418c70 --- /dev/null +++ b/tests/executors/test_file_executor.py @@ -0,0 +1,253 @@ +"""Unit tests for FileExecutor (FILE_WRITE).""" + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# Patch the logger before importing the executor +with patch("app.utils.logger.get_logger", return_value=MagicMock()): + from app.executors.file_executor import FileExecutor + + +class TestFileExecutor: + """Test suite for FileExecutor.""" + + @pytest.fixture + def executor(self): + """Create executor instance with mocked logger.""" + with patch("app.executors.base.get_logger", return_value=MagicMock()): + return FileExecutor() + + @pytest.fixture + def temp_file_root(self, tmp_path): + """Create a temporary directory to act as /opt/letsbe.""" + file_dir = tmp_path / "opt" / "letsbe" + file_dir.mkdir(parents=True) + return file_dir + + @pytest.fixture + def mock_settings(self, temp_file_root): + """Mock settings with temporary file root.""" + settings = MagicMock() + settings.allowed_file_root = str(temp_file_root) + settings.allowed_env_root = str(temp_file_root / "env") + settings.max_file_size = 1_048_576 # 1MB + return settings + + # ==================== Happy Path Tests ==================== + + @pytest.mark.asyncio + async def test_write_file_happy_path(self, executor, temp_file_root, mock_settings): + """Test writing a simple file.""" + with patch("app.executors.file_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + content = "Hello, World!" + + result = await executor.execute({ + "path": str(file_path), + "content": content, + }) + + assert result.success is True + assert result.data["written"] is True + assert result.data["path"] == str(file_path) + assert result.data["size"] == len(content.encode("utf-8")) + assert file_path.read_text() == content + + @pytest.mark.asyncio + async def test_write_file_in_config_directory(self, executor, temp_file_root, mock_settings): + """Test writing file in /opt/letsbe/config subdirectory. + + This verifies that config paths are valid under allowed_file_root. + """ + with patch("app.executors.file_executor.get_settings", return_value=mock_settings): + config_dir = temp_file_root / "config" / "app" + config_dir.mkdir(parents=True) + file_path = config_dir / "settings.json" + content = '{"debug": false}' + + result = await executor.execute({ + "path": str(file_path), + "content": content, + }) + + assert result.success is True + assert result.data["written"] is True + assert file_path.read_text() == content + + @pytest.mark.asyncio + async def test_write_file_nested_config_directory(self, executor, temp_file_root, mock_settings): + """Test writing file in nested config subdirectory.""" + with patch("app.executors.file_executor.get_settings", return_value=mock_settings): + config_dir = temp_file_root / "config" / "nginx" / "sites-available" + config_dir.mkdir(parents=True) + file_path = config_dir / "default.conf" + content = "server { listen 80; }" + + result = await executor.execute({ + "path": str(file_path), + "content": content, + }) + + assert result.success is True + assert file_path.read_text() == content + + @pytest.mark.asyncio + async def test_write_creates_parent_directories(self, executor, temp_file_root, mock_settings): + """Test that parent directories are created automatically.""" + with patch("app.executors.file_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "new" / "nested" / "dir" / "file.txt" + content = "content" + + result = await executor.execute({ + "path": str(file_path), + "content": content, + }) + + assert result.success is True + assert file_path.exists() + assert file_path.read_text() == content + + # ==================== Write Mode Tests ==================== + + @pytest.mark.asyncio + @pytest.mark.skipif( + __import__("os").name == "nt", + reason="os.rename on Windows fails when target exists; executor uses os.rename instead of os.replace" + ) + async def test_write_mode_overwrites(self, executor, temp_file_root, mock_settings): + """Test that write mode overwrites existing content.""" + with patch("app.executors.file_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + file_path.write_text("old content") + + result = await executor.execute({ + "path": str(file_path), + "content": "new content", + "mode": "write", + }) + + assert result.success is True + assert file_path.read_text() == "new content" + + @pytest.mark.asyncio + async def test_append_mode(self, executor, temp_file_root, mock_settings): + """Test append mode adds to existing content.""" + with patch("app.executors.file_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + file_path.write_text("line1\n") + + result = await executor.execute({ + "path": str(file_path), + "content": "line2\n", + "mode": "append", + }) + + assert result.success is True + assert file_path.read_text() == "line1\nline2\n" + + @pytest.mark.asyncio + async def test_invalid_mode_rejected(self, executor, temp_file_root, mock_settings): + """Test rejection of invalid mode.""" + with patch("app.executors.file_executor.get_settings", return_value=mock_settings): + result = await executor.execute({ + "path": str(temp_file_root / "test.txt"), + "content": "content", + "mode": "invalid", + }) + + assert result.success is False + assert "Invalid mode" in result.error + + # ==================== Path Validation Tests ==================== + + @pytest.mark.asyncio + async def test_path_outside_allowed_root_rejected(self, executor, temp_file_root, mock_settings): + """Test rejection of paths outside allowed root.""" + with patch("app.executors.file_executor.get_settings", return_value=mock_settings): + result = await executor.execute({ + "path": "/etc/passwd", + "content": "hack", + }) + + assert result.success is False + assert "Validation failed" in result.error + + @pytest.mark.asyncio + async def test_path_traversal_rejected(self, executor, temp_file_root, mock_settings): + """Test rejection of directory traversal attempts.""" + with patch("app.executors.file_executor.get_settings", return_value=mock_settings): + result = await executor.execute({ + "path": str(temp_file_root / ".." / ".." / "etc" / "passwd"), + "content": "hack", + }) + + assert result.success is False + assert "Validation failed" in result.error or "traversal" in result.error.lower() + + # ==================== Payload Validation Tests ==================== + + @pytest.mark.asyncio + async def test_missing_path_payload(self, executor): + """Test rejection of payload without path.""" + with pytest.raises(ValueError, match="Missing required fields: path"): + await executor.execute({ + "content": "content", + }) + + @pytest.mark.asyncio + async def test_missing_content_payload(self, executor): + """Test rejection of payload without content.""" + with pytest.raises(ValueError, match="Missing required fields: content"): + await executor.execute({ + "path": "/some/path", + }) + + # ==================== Task Type and Registry Tests ==================== + + def test_task_type_property(self, executor): + """Test that task_type returns FILE_WRITE.""" + assert executor.task_type == "FILE_WRITE" + + def test_registry_integration(self): + """Test that FILE_WRITE is registered in executor registry.""" + from app.executors import get_executor + + executor = get_executor("FILE_WRITE") + assert executor is not None + assert executor.task_type == "FILE_WRITE" + + # ==================== Duration Tests ==================== + + @pytest.mark.asyncio + async def test_duration_ms_populated(self, executor, temp_file_root, mock_settings): + """Test that duration_ms is populated in result.""" + with patch("app.executors.file_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + + result = await executor.execute({ + "path": str(file_path), + "content": "content", + }) + + assert result.success is True + assert result.duration_ms is not None + assert result.duration_ms >= 0 + + # ==================== UTF-8 Tests ==================== + + @pytest.mark.asyncio + async def test_utf8_content(self, executor, temp_file_root, mock_settings): + """Test writing UTF-8 encoded content.""" + with patch("app.executors.file_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "utf8.txt" + content = "Hello δΈ–η•Œ! こんにけは πŸŽ‰" + + result = await executor.execute({ + "path": str(file_path), + "content": content, + }) + + assert result.success is True + assert file_path.read_text(encoding="utf-8") == content diff --git a/tests/executors/test_file_inspect_executor.py b/tests/executors/test_file_inspect_executor.py new file mode 100644 index 0000000..a2c5c2d --- /dev/null +++ b/tests/executors/test_file_inspect_executor.py @@ -0,0 +1,412 @@ +"""Unit tests for FileInspectExecutor.""" + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# Patch the logger before importing the executor +with patch("app.utils.logger.get_logger", return_value=MagicMock()): + from app.executors.file_inspect_executor import FileInspectExecutor + + +class TestFileInspectExecutor: + """Test suite for FileInspectExecutor.""" + + @pytest.fixture + def executor(self): + """Create executor instance with mocked logger.""" + with patch("app.executors.base.get_logger", return_value=MagicMock()): + return FileInspectExecutor() + + @pytest.fixture + def temp_file_root(self, tmp_path): + """Create a temporary directory to act as /opt/letsbe.""" + file_dir = tmp_path / "opt" / "letsbe" + file_dir.mkdir(parents=True) + return file_dir + + @pytest.fixture + def mock_settings(self, temp_file_root): + """Mock settings with temporary file root.""" + settings = MagicMock() + settings.allowed_file_root = str(temp_file_root) + return settings + + # ==================== Happy Path Tests ==================== + + @pytest.mark.asyncio + async def test_inspect_file_happy_path(self, executor, temp_file_root, mock_settings): + """Test reading a small file with default max_bytes.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + content = "Hello, World!\nThis is a test file." + # Write in binary mode to avoid platform line ending conversion + file_path.write_bytes(content.encode("utf-8")) + + result = await executor.execute({ + "path": str(file_path), + }) + + assert result.success is True + assert result.data["path"] == str(file_path) + assert result.data["content"] == content + assert result.data["bytes_read"] == len(content.encode("utf-8")) + assert result.data["truncated"] is False + + @pytest.mark.asyncio + async def test_inspect_file_with_custom_max_bytes(self, executor, temp_file_root, mock_settings): + """Test reading with custom max_bytes value.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + content = "Short content" + file_path.write_text(content) + + result = await executor.execute({ + "path": str(file_path), + "max_bytes": 8192, + }) + + assert result.success is True + assert result.data["content"] == content + assert result.data["truncated"] is False + + @pytest.mark.asyncio + async def test_inspect_file_nested_directory(self, executor, temp_file_root, mock_settings): + """Test reading file in nested directory.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + nested_dir = temp_file_root / "subdir" / "nested" + nested_dir.mkdir(parents=True) + file_path = nested_dir / "config.txt" + content = "nested file content" + file_path.write_text(content) + + result = await executor.execute({ + "path": str(file_path), + }) + + assert result.success is True + assert result.data["content"] == content + + @pytest.mark.asyncio + async def test_inspect_file_in_config_directory(self, executor, temp_file_root, mock_settings): + """Test reading file in /opt/letsbe/config subdirectory. + + This verifies that config paths are valid under allowed_file_root. + """ + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + config_dir = temp_file_root / "config" / "nginx" + config_dir.mkdir(parents=True) + file_path = config_dir / "nginx.conf" + content = "server { listen 80; }" + file_path.write_text(content) + + result = await executor.execute({ + "path": str(file_path), + }) + + assert result.success is True + assert result.data["content"] == content + assert result.data["path"] == str(file_path) + + # ==================== Truncation Tests ==================== + + @pytest.mark.asyncio + async def test_inspect_file_with_truncation(self, executor, temp_file_root, mock_settings): + """Test truncation when file is larger than max_bytes.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "large.txt" + content = "A" * 100 # 100 bytes + file_path.write_text(content) + + result = await executor.execute({ + "path": str(file_path), + "max_bytes": 10, + }) + + assert result.success is True + assert result.data["truncated"] is True + assert result.data["bytes_read"] == 10 + assert result.data["content"] == "A" * 10 + + @pytest.mark.asyncio + async def test_inspect_file_exact_size_no_truncation(self, executor, temp_file_root, mock_settings): + """Test no truncation when file is exactly max_bytes.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "exact.txt" + content = "X" * 50 + file_path.write_text(content) + + result = await executor.execute({ + "path": str(file_path), + "max_bytes": 50, + }) + + assert result.success is True + assert result.data["truncated"] is False + assert result.data["bytes_read"] == 50 + assert result.data["content"] == content + + @pytest.mark.asyncio + async def test_inspect_file_one_byte_over_truncates(self, executor, temp_file_root, mock_settings): + """Test truncation when file is one byte over max_bytes.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "over.txt" + content = "Y" * 51 + file_path.write_text(content) + + result = await executor.execute({ + "path": str(file_path), + "max_bytes": 50, + }) + + assert result.success is True + assert result.data["truncated"] is True + assert result.data["bytes_read"] == 50 + + # ==================== File Not Found Tests ==================== + + @pytest.mark.asyncio + async def test_file_does_not_exist(self, executor, temp_file_root, mock_settings): + """Test error when file does not exist.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "nonexistent.txt" + + result = await executor.execute({ + "path": str(file_path), + }) + + assert result.success is False + assert "does not exist" in result.error or "Path validation failed" in result.error + + # ==================== Path Validation Tests ==================== + + @pytest.mark.asyncio + async def test_path_outside_allowed_root_rejected(self, executor, temp_file_root, mock_settings): + """Test rejection of paths outside allowed root.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + result = await executor.execute({ + "path": "/etc/passwd", + }) + + assert result.success is False + assert "Path validation failed" in result.error + + @pytest.mark.asyncio + async def test_path_traversal_rejected(self, executor, temp_file_root, mock_settings): + """Test rejection of directory traversal attempts.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + result = await executor.execute({ + "path": str(temp_file_root / ".." / ".." / "etc" / "passwd"), + }) + + assert result.success is False + assert "Path validation failed" in result.error or "traversal" in result.error.lower() + + # ==================== max_bytes Validation Tests ==================== + + @pytest.mark.asyncio + async def test_invalid_max_bytes_type_string(self, executor, temp_file_root, mock_settings): + """Test rejection when max_bytes is a non-numeric string.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + file_path.write_text("content") + + result = await executor.execute({ + "path": str(file_path), + "max_bytes": "not-a-number", + }) + + assert result.success is False + assert "Invalid max_bytes" in result.error + + @pytest.mark.asyncio + async def test_invalid_max_bytes_type_none(self, executor, temp_file_root, mock_settings): + """Test that None max_bytes uses default (success case).""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + file_path.write_text("content") + + # Note: None should trigger default, not error + # But explicitly passing None might be different from omitting + result = await executor.execute({ + "path": str(file_path), + "max_bytes": None, + }) + + # None cannot be converted to int, so this should fail + assert result.success is False + assert "Invalid max_bytes" in result.error + + @pytest.mark.asyncio + async def test_max_bytes_zero_rejected(self, executor, temp_file_root, mock_settings): + """Test rejection when max_bytes is zero.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + file_path.write_text("content") + + result = await executor.execute({ + "path": str(file_path), + "max_bytes": 0, + }) + + assert result.success is False + assert "max_bytes must be between 1 and" in result.error + + @pytest.mark.asyncio + async def test_max_bytes_negative_rejected(self, executor, temp_file_root, mock_settings): + """Test rejection when max_bytes is negative.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + file_path.write_text("content") + + result = await executor.execute({ + "path": str(file_path), + "max_bytes": -100, + }) + + assert result.success is False + assert "max_bytes must be between 1 and" in result.error + + @pytest.mark.asyncio + async def test_max_bytes_over_limit_rejected(self, executor, temp_file_root, mock_settings): + """Test rejection when max_bytes exceeds 1MB.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + file_path.write_text("content") + + result = await executor.execute({ + "path": str(file_path), + "max_bytes": 2_000_000, # 2MB + }) + + assert result.success is False + assert "max_bytes must be between 1 and" in result.error + + @pytest.mark.asyncio + async def test_max_bytes_at_limit_accepted(self, executor, temp_file_root, mock_settings): + """Test acceptance of max_bytes at exactly 1MB.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + file_path.write_text("small content") + + result = await executor.execute({ + "path": str(file_path), + "max_bytes": 1_048_576, # Exactly 1MB + }) + + assert result.success is True + + @pytest.mark.asyncio + async def test_max_bytes_as_string_number_accepted(self, executor, temp_file_root, mock_settings): + """Test acceptance of max_bytes as numeric string.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + file_path.write_text("content") + + result = await executor.execute({ + "path": str(file_path), + "max_bytes": "4096", # String that can be converted to int + }) + + assert result.success is True + + # ==================== Payload Validation Tests ==================== + + @pytest.mark.asyncio + async def test_missing_path_payload(self, executor): + """Test rejection of payload without path.""" + with pytest.raises(ValueError, match="Missing required field: path"): + await executor.execute({}) + + @pytest.mark.asyncio + async def test_missing_path_with_max_bytes(self, executor): + """Test rejection of payload with max_bytes but no path.""" + with pytest.raises(ValueError, match="Missing required field: path"): + await executor.execute({ + "max_bytes": 4096, + }) + + # ==================== Task Type and Registry Tests ==================== + + def test_task_type_property(self, executor): + """Test that task_type returns FILE_INSPECT.""" + assert executor.task_type == "FILE_INSPECT" + + def test_registry_integration(self): + """Test that FILE_INSPECT is registered in executor registry.""" + from app.executors import get_executor + + executor = get_executor("FILE_INSPECT") + assert executor is not None + assert executor.task_type == "FILE_INSPECT" + + # ==================== Empty File Tests ==================== + + @pytest.mark.asyncio + async def test_empty_file(self, executor, temp_file_root, mock_settings): + """Test reading an empty file.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "empty.txt" + file_path.write_text("") + + result = await executor.execute({ + "path": str(file_path), + }) + + assert result.success is True + assert result.data["content"] == "" + assert result.data["bytes_read"] == 0 + assert result.data["truncated"] is False + + # ==================== Binary/UTF-8 Tests ==================== + + @pytest.mark.asyncio + async def test_utf8_content(self, executor, temp_file_root, mock_settings): + """Test reading UTF-8 encoded content.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "utf8.txt" + content = "Hello δΈ–η•Œ! こんにけは πŸŽ‰" + file_path.write_text(content, encoding="utf-8") + + result = await executor.execute({ + "path": str(file_path), + }) + + assert result.success is True + assert result.data["content"] == content + + @pytest.mark.asyncio + async def test_binary_with_replacement(self, executor, temp_file_root, mock_settings): + """Test that invalid UTF-8 bytes are replaced.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "binary.bin" + # Write bytes that are not valid UTF-8 + file_path.write_bytes(b"Hello\xff\xfeWorld") + + result = await executor.execute({ + "path": str(file_path), + }) + + assert result.success is True + # Invalid bytes should be replaced with replacement character + assert "Hello" in result.data["content"] + assert "World" in result.data["content"] + assert "\ufffd" in result.data["content"] # Replacement character + + # ==================== Duration Tests ==================== + + @pytest.mark.asyncio + async def test_duration_ms_populated(self, executor, temp_file_root, mock_settings): + """Test that duration_ms is populated in result.""" + with patch("app.executors.file_inspect_executor.get_settings", return_value=mock_settings): + file_path = temp_file_root / "test.txt" + file_path.write_text("content") + + result = await executor.execute({ + "path": str(file_path), + }) + + assert result.success is True + assert result.duration_ms is not None + assert result.duration_ms >= 0