"""File inspection executor for reading portions of text files.""" import time from typing import Any from app.config import get_settings from app.executors.base import BaseExecutor, ExecutionResult from app.utils.validation import ValidationError, validate_file_path class FileInspectExecutor(BaseExecutor): """Read portions of files for inspection. Security measures: - Path validation against allowed file root (/opt/letsbe) - Directory traversal prevention - File must exist (no blind path probing) - Read-only operation (no file modification) - Byte limit enforced (max 1MB) Payload: { "path": "/opt/letsbe/env/chatwoot.env", "max_bytes": 4096 # optional, default 4096, max 1MB } Result (success): { "path": "/opt/letsbe/env/chatwoot.env", "bytes_read": 123, "truncated": false, "content": "..." } """ # Default and maximum byte limits DEFAULT_MAX_BYTES = 4096 ABSOLUTE_MAX_BYTES = 1_048_576 # 1 MB @property def task_type(self) -> str: return "FILE_INSPECT" async def execute(self, payload: dict[str, Any]) -> ExecutionResult: """Read file content up to max_bytes. Args: payload: Must contain "path", optionally "max_bytes" Returns: ExecutionResult with file content and metadata """ # Path is always required if "path" not in payload: raise ValueError("Missing required field: path") settings = get_settings() raw_path = payload["path"] max_bytes = payload.get("max_bytes", self.DEFAULT_MAX_BYTES) # Validate max_bytes is a valid integer try: max_bytes_int = int(max_bytes) except (TypeError, ValueError): return ExecutionResult( success=False, data={}, error=f"Invalid max_bytes value: {max_bytes!r}", ) # Validate max_bytes is within allowed range if max_bytes_int <= 0 or max_bytes_int > self.ABSOLUTE_MAX_BYTES: return ExecutionResult( success=False, data={}, error=f"max_bytes must be between 1 and {self.ABSOLUTE_MAX_BYTES}", ) # Validate path is under allowed file root try: validated_path = validate_file_path( raw_path, settings.allowed_file_root, must_exist=True, ) except ValidationError as e: self.logger.warning("file_path_validation_failed", path=raw_path, error=str(e)) return ExecutionResult( success=False, data={}, error=f"Path validation failed: {e}", ) self.logger.info( "file_inspecting", path=str(validated_path), max_bytes=max_bytes_int, ) start_time = time.time() try: # Read up to max_bytes + 1 to detect truncation with validated_path.open("rb") as f: content_bytes = f.read(max_bytes_int + 1) truncated = len(content_bytes) > max_bytes_int if truncated: content_bytes = content_bytes[:max_bytes_int] # Decode as UTF-8 with errors replaced content_text = content_bytes.decode("utf-8", errors="replace") duration_ms = (time.time() - start_time) * 1000 self.logger.info( "file_inspected", path=str(validated_path), bytes_read=len(content_bytes), truncated=truncated, duration_ms=duration_ms, ) return ExecutionResult( success=True, data={ "path": str(validated_path), "bytes_read": len(content_bytes), "truncated": truncated, "content": content_text, }, duration_ms=duration_ms, ) except OSError as e: duration_ms = (time.time() - start_time) * 1000 self.logger.error("file_inspect_read_error", path=str(validated_path), error=str(e)) return ExecutionResult( success=False, data={}, error=f"Failed to read file: {e}", duration_ms=duration_ms, ) except Exception as e: duration_ms = (time.time() - start_time) * 1000 self.logger.error("file_inspect_error", path=str(validated_path), error=str(e)) return ExecutionResult( success=False, data={}, error=str(e), duration_ms=duration_ms, )