""" Credential reader utility for reading credentials from the credentials.env file. Used by the agent to report credentials back to the Hub during heartbeat. """ import os import stat from pathlib import Path from typing import Optional from app.utils.logger import get_logger logger = get_logger(__name__) # Default credentials file location CREDENTIALS_FILE = Path("/opt/letsbe/env/credentials.env") def check_credentials_permissions(path: str) -> None: """Warn if credentials file has overly permissive permissions.""" try: if not os.path.exists(path): return file_stat = os.stat(path) mode = file_stat.st_mode # Check if group or others have any permissions if mode & (stat.S_IRWXG | stat.S_IRWXO): logger.warning( f"Credentials file {path} has overly permissive permissions " f"(mode={oct(mode)}). Recommended: chmod 600" ) except OSError: pass def read_credentials_file(file_path: Optional[Path] = None) -> dict[str, str]: """ Read credentials.env file and return as a dictionary. Args: file_path: Optional path to credentials file. Defaults to /opt/letsbe/env/credentials.env Returns: Dictionary of key-value pairs from the credentials file """ credentials: dict[str, str] = {} creds_file = file_path or CREDENTIALS_FILE if not creds_file.exists(): logger.debug(f"Credentials file not found: {creds_file}") return credentials check_credentials_permissions(str(creds_file)) try: with open(creds_file, 'r') as f: for line_num, line in enumerate(f, 1): line = line.strip() # Skip empty lines and comments if not line or line.startswith('#'): continue # Parse KEY=VALUE if '=' in line: key, value = line.split('=', 1) credentials[key.strip()] = value.strip() else: logger.warning(f"Invalid line {line_num} in credentials file: {line}") except Exception as e: logger.error(f"Failed to read credentials file: {e}") return credentials def get_portainer_credentials() -> Optional[dict[str, str]]: """ Extract Portainer-specific credentials from the credentials file. Returns: Dictionary with 'username' and 'password' keys, or None if not configured """ creds = read_credentials_file() username = creds.get('PORTAINER_ADMIN_USER') password = creds.get('PORTAINER_ADMIN_PASSWORD') if username and password: return { 'username': username, 'password': password, } return None def get_all_tool_credentials() -> dict[str, dict[str, str]]: """ Extract all tool credentials from the credentials file. Groups credentials by tool name. Returns: Dictionary where keys are tool names and values are credential dictionaries """ creds = read_credentials_file() tool_credentials: dict[str, dict[str, str]] = {} # Portainer credentials portainer = get_portainer_credentials() if portainer: tool_credentials['portainer'] = portainer # Add other tool credentials as needed # Example patterns that might exist in credentials.env: # NEXTCLOUD_ADMIN_USER, NEXTCLOUD_ADMIN_PASSWORD # KEYCLOAK_ADMIN_USER, KEYCLOAK_ADMIN_PASSWORD # etc. tool_mappings = [ ('nextcloud', ['NEXTCLOUD_ADMIN_USER', 'NEXTCLOUD_ADMIN_PASSWORD']), ('keycloak', ['KEYCLOAK_ADMIN_USER', 'KEYCLOAK_ADMIN_PASSWORD']), ('minio', ['MINIO_ROOT_USER', 'MINIO_ROOT_PASSWORD']), ('poste', ['POSTE_ADMIN_EMAIL', 'POSTE_ADMIN_PASSWORD']), ] for tool_name, (user_key, pass_key) in tool_mappings: username = creds.get(user_key) password = creds.get(pass_key) if username and password: tool_credentials[tool_name] = { 'username': username, 'password': password, } return tool_credentials def get_credential_hash() -> str: """ Generate a hash of the credentials file content. Used to detect changes without sending full credentials each time. Returns: SHA-256 hash of the credentials file content, or empty string if file doesn't exist """ import hashlib if not CREDENTIALS_FILE.exists(): return "" try: content = CREDENTIALS_FILE.read_bytes() return hashlib.sha256(content).hexdigest() except Exception as e: logger.error(f"Failed to hash credentials file: {e}") return ""