134 lines
4.5 KiB
Python
134 lines
4.5 KiB
Python
"""Tests for telemetry redactor."""
|
|
|
|
import pytest
|
|
|
|
from app.services.redactor import redact_metadata, sanitize_error_code, validate_tool_name
|
|
|
|
|
|
class TestRedactMetadata:
|
|
"""Tests for redact_metadata function."""
|
|
|
|
def test_allows_safe_fields(self):
|
|
"""Test that allowed fields pass through."""
|
|
metadata = {
|
|
"tool_name": "sysadmin.env_update",
|
|
"duration_ms": 150,
|
|
"status": "success",
|
|
"error_code": "E001",
|
|
}
|
|
result = redact_metadata(metadata)
|
|
|
|
assert result == metadata
|
|
|
|
def test_removes_unknown_fields(self):
|
|
"""Test that unknown fields are removed."""
|
|
metadata = {
|
|
"tool_name": "sysadmin.env_update",
|
|
"password": "secret123",
|
|
"file_content": "sensitive data",
|
|
"custom_field": "value",
|
|
}
|
|
result = redact_metadata(metadata)
|
|
|
|
assert "password" not in result
|
|
assert "file_content" not in result
|
|
assert "custom_field" not in result
|
|
assert result["tool_name"] == "sysadmin.env_update"
|
|
|
|
def test_removes_nested_objects(self):
|
|
"""Test that nested objects are removed."""
|
|
metadata = {
|
|
"tool_name": "sysadmin.env_update",
|
|
"nested": {"password": "secret"},
|
|
}
|
|
result = redact_metadata(metadata)
|
|
|
|
assert "nested" not in result
|
|
|
|
def test_handles_none(self):
|
|
"""Test handling of None input."""
|
|
assert redact_metadata(None) == {}
|
|
|
|
def test_handles_empty(self):
|
|
"""Test handling of empty dict."""
|
|
assert redact_metadata({}) == {}
|
|
|
|
def test_truncates_long_strings(self):
|
|
"""Test that very long strings are removed."""
|
|
metadata = {
|
|
"tool_name": "a" * 200, # Too long
|
|
"status": "success",
|
|
}
|
|
result = redact_metadata(metadata)
|
|
|
|
assert "tool_name" not in result
|
|
assert result["status"] == "success"
|
|
|
|
def test_defense_in_depth_patterns(self):
|
|
"""Test that sensitive patterns in field names are caught."""
|
|
# Even if somehow in allowed list, sensitive patterns should be caught
|
|
metadata = {
|
|
"status": "success",
|
|
"password_hash": "abc123", # Contains 'password'
|
|
}
|
|
result = redact_metadata(metadata)
|
|
|
|
assert "password_hash" not in result
|
|
|
|
|
|
class TestValidateToolName:
|
|
"""Tests for validate_tool_name function."""
|
|
|
|
def test_valid_sysadmin_tool(self):
|
|
"""Test valid sysadmin tool name."""
|
|
assert validate_tool_name("sysadmin.env_update") is True
|
|
assert validate_tool_name("sysadmin.file_write") is True
|
|
|
|
def test_valid_browser_tool(self):
|
|
"""Test valid browser tool name."""
|
|
assert validate_tool_name("browser.navigate") is True
|
|
assert validate_tool_name("browser.click") is True
|
|
|
|
def test_valid_gateway_tool(self):
|
|
"""Test valid gateway tool name."""
|
|
assert validate_tool_name("gateway.proxy") is True
|
|
|
|
def test_invalid_prefix(self):
|
|
"""Test that unknown prefixes are rejected."""
|
|
assert validate_tool_name("unknown.tool") is False
|
|
assert validate_tool_name("custom.action") is False
|
|
|
|
def test_too_long(self):
|
|
"""Test that very long names are rejected."""
|
|
assert validate_tool_name("sysadmin." + "a" * 100) is False
|
|
|
|
def test_suspicious_chars(self):
|
|
"""Test that suspicious characters are rejected."""
|
|
assert validate_tool_name("sysadmin.tool;drop table") is False
|
|
assert validate_tool_name("sysadmin.tool'or'1'='1") is False
|
|
assert validate_tool_name("sysadmin.tool\ninjection") is False
|
|
|
|
|
|
class TestSanitizeErrorCode:
|
|
"""Tests for sanitize_error_code function."""
|
|
|
|
def test_valid_codes(self):
|
|
"""Test valid error codes."""
|
|
assert sanitize_error_code("E001") == "E001"
|
|
assert sanitize_error_code("connection_timeout") == "connection_timeout"
|
|
assert sanitize_error_code("AUTH-FAILED") == "AUTH-FAILED"
|
|
|
|
def test_none_input(self):
|
|
"""Test None input."""
|
|
assert sanitize_error_code(None) is None
|
|
|
|
def test_too_long(self):
|
|
"""Test that long codes are rejected."""
|
|
assert sanitize_error_code("a" * 60) is None
|
|
|
|
def test_invalid_chars(self):
|
|
"""Test that invalid characters are rejected."""
|
|
assert sanitize_error_code("error code") is None # space
|
|
assert sanitize_error_code("error;drop") is None # semicolon
|
|
assert sanitize_error_code("error\ntable") is None # newline
|