154 lines
4.8 KiB
Python
154 lines
4.8 KiB
Python
"""File inspection executor for reading portions of text files."""
|
|
|
|
import time
|
|
from typing import Any
|
|
|
|
from app.config import get_settings
|
|
from app.executors.base import BaseExecutor, ExecutionResult
|
|
from app.utils.validation import ValidationError, validate_file_path
|
|
|
|
|
|
class FileInspectExecutor(BaseExecutor):
|
|
"""Read portions of files for inspection.
|
|
|
|
Security measures:
|
|
- Path validation against allowed file root (/opt/letsbe)
|
|
- Directory traversal prevention
|
|
- File must exist (no blind path probing)
|
|
- Read-only operation (no file modification)
|
|
- Byte limit enforced (max 1MB)
|
|
|
|
Payload:
|
|
{
|
|
"path": "/opt/letsbe/env/chatwoot.env",
|
|
"max_bytes": 4096 # optional, default 4096, max 1MB
|
|
}
|
|
|
|
Result (success):
|
|
{
|
|
"path": "/opt/letsbe/env/chatwoot.env",
|
|
"bytes_read": 123,
|
|
"truncated": false,
|
|
"content": "..."
|
|
}
|
|
"""
|
|
|
|
# Default and maximum byte limits
|
|
DEFAULT_MAX_BYTES = 4096
|
|
ABSOLUTE_MAX_BYTES = 1_048_576 # 1 MB
|
|
|
|
@property
|
|
def task_type(self) -> str:
|
|
return "FILE_INSPECT"
|
|
|
|
async def execute(self, payload: dict[str, Any]) -> ExecutionResult:
|
|
"""Read file content up to max_bytes.
|
|
|
|
Args:
|
|
payload: Must contain "path", optionally "max_bytes"
|
|
|
|
Returns:
|
|
ExecutionResult with file content and metadata
|
|
"""
|
|
# Path is always required
|
|
if "path" not in payload:
|
|
raise ValueError("Missing required field: path")
|
|
|
|
settings = get_settings()
|
|
|
|
raw_path = payload["path"]
|
|
max_bytes = payload.get("max_bytes", self.DEFAULT_MAX_BYTES)
|
|
|
|
# Validate max_bytes is a valid integer
|
|
try:
|
|
max_bytes_int = int(max_bytes)
|
|
except (TypeError, ValueError):
|
|
return ExecutionResult(
|
|
success=False,
|
|
data={},
|
|
error=f"Invalid max_bytes value: {max_bytes!r}",
|
|
)
|
|
|
|
# Validate max_bytes is within allowed range
|
|
if max_bytes_int <= 0 or max_bytes_int > self.ABSOLUTE_MAX_BYTES:
|
|
return ExecutionResult(
|
|
success=False,
|
|
data={},
|
|
error=f"max_bytes must be between 1 and {self.ABSOLUTE_MAX_BYTES}",
|
|
)
|
|
|
|
# Validate path is under allowed file root
|
|
try:
|
|
validated_path = validate_file_path(
|
|
raw_path,
|
|
settings.allowed_file_root,
|
|
must_exist=True,
|
|
)
|
|
except ValidationError as e:
|
|
self.logger.warning("file_path_validation_failed", path=raw_path, error=str(e))
|
|
return ExecutionResult(
|
|
success=False,
|
|
data={},
|
|
error=f"Path validation failed: {e}",
|
|
)
|
|
|
|
self.logger.info(
|
|
"file_inspecting",
|
|
path=str(validated_path),
|
|
max_bytes=max_bytes_int,
|
|
)
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Read up to max_bytes + 1 to detect truncation
|
|
with validated_path.open("rb") as f:
|
|
content_bytes = f.read(max_bytes_int + 1)
|
|
|
|
truncated = len(content_bytes) > max_bytes_int
|
|
if truncated:
|
|
content_bytes = content_bytes[:max_bytes_int]
|
|
|
|
# Decode as UTF-8 with errors replaced
|
|
content_text = content_bytes.decode("utf-8", errors="replace")
|
|
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
|
|
self.logger.info(
|
|
"file_inspected",
|
|
path=str(validated_path),
|
|
bytes_read=len(content_bytes),
|
|
truncated=truncated,
|
|
duration_ms=duration_ms,
|
|
)
|
|
|
|
return ExecutionResult(
|
|
success=True,
|
|
data={
|
|
"path": str(validated_path),
|
|
"bytes_read": len(content_bytes),
|
|
"truncated": truncated,
|
|
"content": content_text,
|
|
},
|
|
duration_ms=duration_ms,
|
|
)
|
|
|
|
except OSError as e:
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
self.logger.error("file_inspect_read_error", path=str(validated_path), error=str(e))
|
|
return ExecutionResult(
|
|
success=False,
|
|
data={},
|
|
error=f"Failed to read file: {e}",
|
|
duration_ms=duration_ms,
|
|
)
|
|
except Exception as e:
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
self.logger.error("file_inspect_error", path=str(validated_path), error=str(e))
|
|
return ExecutionResult(
|
|
success=False,
|
|
data={},
|
|
error=str(e),
|
|
duration_ms=duration_ms,
|
|
)
|