"""Tests for telemetry redactor.""" import pytest from app.services.redactor import redact_metadata, sanitize_error_code, validate_tool_name class TestRedactMetadata: """Tests for redact_metadata function.""" def test_allows_safe_fields(self): """Test that allowed fields pass through.""" metadata = { "tool_name": "sysadmin.env_update", "duration_ms": 150, "status": "success", "error_code": "E001", } result = redact_metadata(metadata) assert result == metadata def test_removes_unknown_fields(self): """Test that unknown fields are removed.""" metadata = { "tool_name": "sysadmin.env_update", "password": "secret123", "file_content": "sensitive data", "custom_field": "value", } result = redact_metadata(metadata) assert "password" not in result assert "file_content" not in result assert "custom_field" not in result assert result["tool_name"] == "sysadmin.env_update" def test_removes_nested_objects(self): """Test that nested objects are removed.""" metadata = { "tool_name": "sysadmin.env_update", "nested": {"password": "secret"}, } result = redact_metadata(metadata) assert "nested" not in result def test_handles_none(self): """Test handling of None input.""" assert redact_metadata(None) == {} def test_handles_empty(self): """Test handling of empty dict.""" assert redact_metadata({}) == {} def test_truncates_long_strings(self): """Test that very long strings are removed.""" metadata = { "tool_name": "a" * 200, # Too long "status": "success", } result = redact_metadata(metadata) assert "tool_name" not in result assert result["status"] == "success" def test_defense_in_depth_patterns(self): """Test that sensitive patterns in field names are caught.""" # Even if somehow in allowed list, sensitive patterns should be caught metadata = { "status": "success", "password_hash": "abc123", # Contains 'password' } result = redact_metadata(metadata) assert "password_hash" not in result class TestValidateToolName: """Tests for validate_tool_name function.""" def test_valid_sysadmin_tool(self): """Test valid sysadmin tool name.""" assert validate_tool_name("sysadmin.env_update") is True assert validate_tool_name("sysadmin.file_write") is True def test_valid_browser_tool(self): """Test valid browser tool name.""" assert validate_tool_name("browser.navigate") is True assert validate_tool_name("browser.click") is True def test_valid_gateway_tool(self): """Test valid gateway tool name.""" assert validate_tool_name("gateway.proxy") is True def test_invalid_prefix(self): """Test that unknown prefixes are rejected.""" assert validate_tool_name("unknown.tool") is False assert validate_tool_name("custom.action") is False def test_too_long(self): """Test that very long names are rejected.""" assert validate_tool_name("sysadmin." + "a" * 100) is False def test_suspicious_chars(self): """Test that suspicious characters are rejected.""" assert validate_tool_name("sysadmin.tool;drop table") is False assert validate_tool_name("sysadmin.tool'or'1'='1") is False assert validate_tool_name("sysadmin.tool\ninjection") is False class TestSanitizeErrorCode: """Tests for sanitize_error_code function.""" def test_valid_codes(self): """Test valid error codes.""" assert sanitize_error_code("E001") == "E001" assert sanitize_error_code("connection_timeout") == "connection_timeout" assert sanitize_error_code("AUTH-FAILED") == "AUTH-FAILED" def test_none_input(self): """Test None input.""" assert sanitize_error_code(None) is None def test_too_long(self): """Test that long codes are rejected.""" assert sanitize_error_code("a" * 60) is None def test_invalid_chars(self): """Test that invalid characters are rejected.""" assert sanitize_error_code("error code") is None # space assert sanitize_error_code("error;drop") is None # semicolon assert sanitize_error_code("error\ntable") is None # newline