letsbe-sysadmin/app/executors/env_inspect_executor.py

162 lines
5.0 KiB
Python

"""ENV file inspection executor for reading current values."""
import time
from typing import Any
from app.config import get_settings
from app.executors.base import BaseExecutor, ExecutionResult
from app.utils.validation import ValidationError, validate_file_path
class EnvInspectExecutor(BaseExecutor):
"""Read ENV files to inspect current values.
Security measures:
- Path validation against allowed env root (/opt/letsbe/env)
- Directory traversal prevention
- File must exist (no blind path probing)
- Read-only operation (no file modification)
Payload:
{
"path": "/opt/letsbe/env/chatwoot.env",
"keys": ["FRONTEND_URL", "BACKEND_URL"] # optional, null returns all
}
Result (success):
{
"path": "/opt/letsbe/env/chatwoot.env",
"keys": {
"FRONTEND_URL": "https://...",
"BACKEND_URL": "https://..."
}
}
"""
@property
def task_type(self) -> str:
return "ENV_INSPECT"
async def execute(self, payload: dict[str, Any]) -> ExecutionResult:
"""Read ENV file and return current key-value pairs.
Args:
payload: Must contain "path", optionally "keys" to filter
Returns:
ExecutionResult with dict of key-value pairs
"""
# Path is always required
if "path" not in payload:
raise ValueError("Missing required field: path")
settings = get_settings()
file_path = payload["path"]
requested_keys = payload.get("keys")
# Validate keys is a list if provided
if requested_keys is not None and not isinstance(requested_keys, list):
return ExecutionResult(
success=False,
data={},
error="'keys' must be a list of key names or null",
)
# Validate path is under allowed env root
try:
validated_path = validate_file_path(
file_path,
settings.allowed_env_root,
must_exist=True, # File MUST exist for inspect
)
except ValidationError as e:
self.logger.warning("env_path_validation_failed", path=file_path, error=str(e))
return ExecutionResult(
success=False,
data={},
error=f"Path validation failed: {e}",
)
self.logger.info(
"env_inspecting",
path=str(validated_path),
filter_keys=requested_keys,
)
start_time = time.time()
try:
# Read and parse the ENV file
content = validated_path.read_text(encoding="utf-8")
all_keys = self._parse_env_file(content)
# Filter keys if requested
if requested_keys is None:
result_keys = all_keys
else:
# Return only requested keys that exist (ignore unknown)
result_keys = {k: v for k, v in all_keys.items() if k in requested_keys}
duration_ms = (time.time() - start_time) * 1000
self.logger.info(
"env_inspected",
path=str(validated_path),
keys_returned=len(result_keys),
duration_ms=duration_ms,
)
return ExecutionResult(
success=True,
data={
"path": str(validated_path),
"keys": result_keys,
},
duration_ms=duration_ms,
)
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
self.logger.error("env_inspect_error", path=str(validated_path), error=str(e))
return ExecutionResult(
success=False,
data={},
error=str(e),
duration_ms=duration_ms,
)
def _parse_env_file(self, content: str) -> dict[str, str]:
"""Parse ENV file content into key-value dict.
Handles:
- KEY=value format
- Lines starting with # (comments)
- Empty lines
- Whitespace trimming
- Quoted values (single and double quotes)
Args:
content: Raw ENV file content
Returns:
Dict of key-value pairs
"""
env_dict = {}
for line in content.splitlines():
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith("#"):
continue
# Split on first = only
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
# Remove surrounding quotes if present
if (value.startswith('"') and value.endswith('"')) or \
(value.startswith("'") and value.endswith("'")):
value = value[1:-1]
env_dict[key] = value
return env_dict