letsbe-sysadmin/app/executors/playwright_executor.py

330 lines
12 KiB
Python

"""Playwright browser automation executor.
Executes deterministic, scenario-based browser automation tasks.
Each scenario is a reusable workflow registered in the scenario registry.
"""
import time
import uuid
from pathlib import Path
from typing import Any
from playwright.async_api import async_playwright, Route, Request
from app.config import get_settings
from app.executors.base import BaseExecutor, ExecutionResult
from app.playwright_scenarios import get_scenario, get_scenario_names, ScenarioOptions
from app.utils.validation import is_domain_allowed, validate_allowed_domains, ValidationError
class PlaywrightExecutor(BaseExecutor):
"""Browser automation executor using Playwright scenarios.
Executes pre-defined browser automation scenarios with strict security controls.
Each execution creates an isolated browser context with domain restrictions.
Payload:
{
"scenario": "nextcloud_initial_setup", # Required: registered scenario name
"inputs": { # Required: scenario-specific inputs
"base_url": "https://cloud.example.com",
"admin_username": "admin",
"admin_password": "secret123"
},
"options": { # Optional configuration
"timeout_ms": 60000, # Action timeout (default: 60000)
"screenshot_on_failure": true, # Screenshot on fail (default: true)
"screenshot_on_success": false, # Screenshot on success (default: false)
"save_trace": false, # Save trace file (default: false)
"allowed_domains": ["cloud.example.com"] # REQUIRED: domain allowlist
}
}
Security:
- allowed_domains is REQUIRED - blocks all requests to non-listed domains
- Browser runs in headless mode only (not configurable)
- Each execution gets an isolated browser context
- Artifacts are stored in per-task directories
"""
@property
def task_type(self) -> str:
return "PLAYWRIGHT"
async def execute(self, payload: dict[str, Any]) -> ExecutionResult:
"""Execute a Playwright scenario.
Args:
payload: Task payload with scenario, inputs, and options
Returns:
ExecutionResult with scenario output and artifact paths
"""
start_time = time.time()
settings = get_settings()
try:
# Validate required fields
self.validate_payload(payload, ["scenario", "inputs"])
scenario_name = payload["scenario"]
inputs = payload["inputs"]
options_dict = payload.get("options", {})
# Validate allowed_domains is present
allowed_domains = options_dict.get("allowed_domains")
if not allowed_domains:
return ExecutionResult(
success=False,
data={"scenario": scenario_name},
error="Security error: 'allowed_domains' is required in options",
duration_ms=(time.time() - start_time) * 1000,
)
# Validate domain patterns
try:
allowed_domains = validate_allowed_domains(allowed_domains)
except ValidationError as e:
return ExecutionResult(
success=False,
data={"scenario": scenario_name},
error=f"Invalid allowed_domains: {e}",
duration_ms=(time.time() - start_time) * 1000,
)
# Get scenario from registry
scenario = get_scenario(scenario_name)
if scenario is None:
available = get_scenario_names()
return ExecutionResult(
success=False,
data={
"scenario": scenario_name,
"available_scenarios": available,
},
error=f"Unknown scenario: '{scenario_name}'. Available: {available}",
duration_ms=(time.time() - start_time) * 1000,
)
# Validate scenario inputs
missing_inputs = scenario.validate_inputs(inputs)
if missing_inputs:
return ExecutionResult(
success=False,
data={
"scenario": scenario_name,
"missing_inputs": missing_inputs,
"required_inputs": scenario.required_inputs,
},
error=f"Missing required inputs: {missing_inputs}",
duration_ms=(time.time() - start_time) * 1000,
)
# Create artifacts directory for this execution
task_id = str(uuid.uuid4())[:8]
artifacts_dir = Path(settings.playwright_artifacts_dir) / f"task-{task_id}"
artifacts_dir.mkdir(parents=True, exist_ok=True)
# Build scenario options
scenario_options = ScenarioOptions(
timeout_ms=options_dict.get("timeout_ms", settings.playwright_default_timeout_ms),
screenshot_on_failure=options_dict.get("screenshot_on_failure", True),
screenshot_on_success=options_dict.get("screenshot_on_success", False),
save_trace=options_dict.get("save_trace", False),
allowed_domains=allowed_domains,
artifacts_dir=artifacts_dir,
)
self.logger.info(
"playwright_scenario_starting",
scenario=scenario_name,
task_id=task_id,
allowed_domains=allowed_domains,
)
# Execute scenario with browser
result = await self._run_scenario(
scenario=scenario,
inputs=inputs,
options=scenario_options,
task_id=task_id,
)
duration_ms = (time.time() - start_time) * 1000
self.logger.info(
"playwright_scenario_completed",
scenario=scenario_name,
success=result.success,
duration_ms=duration_ms,
)
return ExecutionResult(
success=result.success,
data={
"scenario": scenario_name,
"result": result.data,
"screenshots": result.screenshots,
"artifacts_dir": str(artifacts_dir),
"trace_path": result.trace_path,
},
error=result.error,
duration_ms=duration_ms,
)
except ValueError as e:
# Validation errors
return ExecutionResult(
success=False,
data={},
error=str(e),
duration_ms=(time.time() - start_time) * 1000,
)
except Exception as e:
self.logger.error(
"playwright_executor_error",
error=str(e),
error_type=type(e).__name__,
)
return ExecutionResult(
success=False,
data={},
error=f"Playwright executor error: {e}",
duration_ms=(time.time() - start_time) * 1000,
)
async def _run_scenario(
self,
scenario,
inputs: dict[str, Any],
options: ScenarioOptions,
task_id: str,
):
"""Run a scenario with browser and domain restrictions.
Args:
scenario: The scenario instance to execute
inputs: Scenario inputs
options: Scenario options
task_id: Task identifier for logging
Returns:
ScenarioResult from the scenario execution
"""
from app.playwright_scenarios import ScenarioResult
settings = get_settings()
blocked_requests: list[str] = []
async def route_handler(route: Route, request: Request) -> None:
"""Block requests to non-allowed domains."""
url = request.url
if is_domain_allowed(url, options.allowed_domains):
await route.continue_()
else:
blocked_requests.append(url)
self.logger.warning(
"playwright_blocked_request",
url=url,
task_id=task_id,
)
await route.abort("blockedbyclient")
async with async_playwright() as p:
# Launch browser in headless mode (always)
browser = await p.chromium.launch(
headless=True,
args=[
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
],
)
try:
# Create isolated context
context = await browser.new_context(
viewport={"width": 1280, "height": 720},
user_agent="LetsBe-SysAdmin-Agent/1.0 Playwright",
)
# Set default timeouts
context.set_default_timeout(options.timeout_ms)
context.set_default_navigation_timeout(
settings.playwright_navigation_timeout_ms
)
# Start tracing if enabled
if options.save_trace and options.artifacts_dir:
await context.tracing.start(
screenshots=True,
snapshots=True,
)
# Apply domain restrictions via route interception
await context.route("**/*", route_handler)
# Create page
page = await context.new_page()
try:
# Run scenario setup hook
await scenario.setup(page, options)
# Execute the scenario
result = await scenario.execute(page, inputs, options)
# Take success screenshot if enabled
if options.screenshot_on_success and options.artifacts_dir:
screenshot_path = options.artifacts_dir / "success.png"
await page.screenshot(path=str(screenshot_path))
result.screenshots.append(str(screenshot_path))
except Exception as e:
# Capture failure screenshot
screenshots = []
if options.screenshot_on_failure and options.artifacts_dir:
try:
screenshot_path = options.artifacts_dir / "failure.png"
await page.screenshot(path=str(screenshot_path))
screenshots.append(str(screenshot_path))
except Exception as screenshot_error:
self.logger.warning(
"playwright_screenshot_failed",
error=str(screenshot_error),
)
result = ScenarioResult(
success=False,
data={"blocked_requests": blocked_requests},
screenshots=screenshots,
error=str(e),
)
finally:
# Run scenario teardown hook
try:
await scenario.teardown(page, options)
except Exception as teardown_error:
self.logger.warning(
"playwright_teardown_error",
error=str(teardown_error),
)
# Stop tracing and save
if options.save_trace and options.artifacts_dir:
trace_path = options.artifacts_dir / "trace.zip"
await context.tracing.stop(path=str(trace_path))
result.trace_path = str(trace_path)
# Add blocked requests info
if blocked_requests:
result.data["blocked_requests"] = blocked_requests
return result
finally:
await browser.close()