letsbe-hub/app/services/redactor.py

143 lines
3.3 KiB
Python

"""
Strict ALLOW-LIST redaction for telemetry data.
PRIVACY GUARANTEE: If a field is not explicitly allowed, it is removed.
This module ensures NO sensitive data ever reaches the Hub database.
"""
from typing import Any
# ONLY these fields can be stored in metadata
ALLOWED_METADATA_FIELDS = frozenset({
"tool_name",
"duration_ms",
"status",
"error_code",
"component",
"version",
})
# Patterns that indicate sensitive data (defense in depth)
SENSITIVE_PATTERNS = frozenset({
"password",
"secret",
"token",
"key",
"credential",
"auth",
"cookie",
"session",
"bearer",
"content",
"body",
"payload",
"data",
"file",
"env",
"environment",
"config",
"setting",
"screenshot",
"image",
"base64",
"binary",
"private",
"cert",
"certificate",
})
def redact_metadata(metadata: dict[str, Any] | None) -> dict[str, Any]:
"""
Filter metadata to ONLY allowed fields.
Uses allow-list approach: if not explicitly allowed, it's removed.
This provides defense against accidentally storing sensitive data.
Args:
metadata: Raw metadata from telemetry
Returns:
Filtered metadata with only safe fields
"""
if metadata is None:
return {}
redacted: dict[str, Any] = {}
for key, value in metadata.items():
# Must be in allow list
if key not in ALLOWED_METADATA_FIELDS:
continue
# Defense in depth: reject if key contains sensitive pattern
key_lower = key.lower()
if any(pattern in key_lower for pattern in SENSITIVE_PATTERNS):
continue
# Only primitive types (no nested objects that could hide data)
if isinstance(value, (str, int, float, bool)):
# String length limit to prevent large data blobs
if isinstance(value, str) and len(value) > 100:
continue
redacted[key] = value
return redacted
def validate_tool_name(tool_name: str) -> bool:
"""
Validate tool name format.
Tool names must:
- Start with a known prefix (sysadmin., browser., gateway.)
- Be reasonably short
- Not contain suspicious characters
Args:
tool_name: Tool name to validate
Returns:
True if valid, False otherwise
"""
# Must match known prefixes
valid_prefixes = ("sysadmin.", "browser.", "gateway.", "llm.")
if not tool_name.startswith(valid_prefixes):
return False
# Length limit
if len(tool_name) > 100:
return False
# No suspicious content
suspicious_chars = {";", "'", '"', "\\", "\n", "\r", "\t", "\0"}
if any(c in tool_name for c in suspicious_chars):
return False
return True
def sanitize_error_code(error_code: str | None) -> str | None:
"""
Sanitize an error code to ensure it doesn't contain sensitive data.
Args:
error_code: Raw error code
Returns:
Sanitized error code or None if invalid
"""
if error_code is None:
return None
# Length limit
if len(error_code) > 50:
return None
# Must be alphanumeric with underscores/dashes
allowed = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-")
if not all(c in allowed for c in error_code):
return None
return error_code