LetsBeBiz-Redesign/letsbe-sysadmin-agent/app/utils/credential_reader.py

157 lines
4.6 KiB
Python

"""
Credential reader utility for reading credentials from the credentials.env file.
Used by the agent to report credentials back to the Hub during heartbeat.
"""
import os
import stat
from pathlib import Path
from typing import Optional
from app.utils.logger import get_logger
logger = get_logger(__name__)
# Default credentials file location
CREDENTIALS_FILE = Path("/opt/letsbe/env/credentials.env")
def check_credentials_permissions(path: str) -> None:
"""Warn if credentials file has overly permissive permissions."""
try:
if not os.path.exists(path):
return
file_stat = os.stat(path)
mode = file_stat.st_mode
# Check if group or others have any permissions
if mode & (stat.S_IRWXG | stat.S_IRWXO):
logger.warning(
f"Credentials file {path} has overly permissive permissions "
f"(mode={oct(mode)}). Recommended: chmod 600"
)
except OSError:
pass
def read_credentials_file(file_path: Optional[Path] = None) -> dict[str, str]:
"""
Read credentials.env file and return as a dictionary.
Args:
file_path: Optional path to credentials file. Defaults to /opt/letsbe/env/credentials.env
Returns:
Dictionary of key-value pairs from the credentials file
"""
credentials: dict[str, str] = {}
creds_file = file_path or CREDENTIALS_FILE
if not creds_file.exists():
logger.debug(f"Credentials file not found: {creds_file}")
return credentials
check_credentials_permissions(str(creds_file))
try:
with open(creds_file, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith('#'):
continue
# Parse KEY=VALUE
if '=' in line:
key, value = line.split('=', 1)
credentials[key.strip()] = value.strip()
else:
logger.warning(f"Invalid line {line_num} in credentials file: {line}")
except Exception as e:
logger.error(f"Failed to read credentials file: {e}")
return credentials
def get_portainer_credentials() -> Optional[dict[str, str]]:
"""
Extract Portainer-specific credentials from the credentials file.
Returns:
Dictionary with 'username' and 'password' keys, or None if not configured
"""
creds = read_credentials_file()
username = creds.get('PORTAINER_ADMIN_USER')
password = creds.get('PORTAINER_ADMIN_PASSWORD')
if username and password:
return {
'username': username,
'password': password,
}
return None
def get_all_tool_credentials() -> dict[str, dict[str, str]]:
"""
Extract all tool credentials from the credentials file.
Groups credentials by tool name.
Returns:
Dictionary where keys are tool names and values are credential dictionaries
"""
creds = read_credentials_file()
tool_credentials: dict[str, dict[str, str]] = {}
# Portainer credentials
portainer = get_portainer_credentials()
if portainer:
tool_credentials['portainer'] = portainer
# Add other tool credentials as needed
# Example patterns that might exist in credentials.env:
# NEXTCLOUD_ADMIN_USER, NEXTCLOUD_ADMIN_PASSWORD
# KEYCLOAK_ADMIN_USER, KEYCLOAK_ADMIN_PASSWORD
# etc.
tool_mappings = [
('nextcloud', ['NEXTCLOUD_ADMIN_USER', 'NEXTCLOUD_ADMIN_PASSWORD']),
('keycloak', ['KEYCLOAK_ADMIN_USER', 'KEYCLOAK_ADMIN_PASSWORD']),
('minio', ['MINIO_ROOT_USER', 'MINIO_ROOT_PASSWORD']),
('poste', ['POSTE_ADMIN_EMAIL', 'POSTE_ADMIN_PASSWORD']),
]
for tool_name, (user_key, pass_key) in tool_mappings:
username = creds.get(user_key)
password = creds.get(pass_key)
if username and password:
tool_credentials[tool_name] = {
'username': username,
'password': password,
}
return tool_credentials
def get_credential_hash() -> str:
"""
Generate a hash of the credentials file content.
Used to detect changes without sending full credentials each time.
Returns:
SHA-256 hash of the credentials file content, or empty string if file doesn't exist
"""
import hashlib
if not CREDENTIALS_FILE.exists():
return ""
try:
content = CREDENTIALS_FILE.read_bytes()
return hashlib.sha256(content).hexdigest()
except Exception as e:
logger.error(f"Failed to hash credentials file: {e}")
return ""