157 lines
4.6 KiB
Python
157 lines
4.6 KiB
Python
"""
|
|
Credential reader utility for reading credentials from the credentials.env file.
|
|
Used by the agent to report credentials back to the Hub during heartbeat.
|
|
"""
|
|
|
|
import os
|
|
import stat
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from app.utils.logger import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# Default credentials file location
|
|
CREDENTIALS_FILE = Path("/opt/letsbe/env/credentials.env")
|
|
|
|
|
|
def check_credentials_permissions(path: str) -> None:
|
|
"""Warn if credentials file has overly permissive permissions."""
|
|
try:
|
|
if not os.path.exists(path):
|
|
return
|
|
file_stat = os.stat(path)
|
|
mode = file_stat.st_mode
|
|
# Check if group or others have any permissions
|
|
if mode & (stat.S_IRWXG | stat.S_IRWXO):
|
|
logger.warning(
|
|
f"Credentials file {path} has overly permissive permissions "
|
|
f"(mode={oct(mode)}). Recommended: chmod 600"
|
|
)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def read_credentials_file(file_path: Optional[Path] = None) -> dict[str, str]:
|
|
"""
|
|
Read credentials.env file and return as a dictionary.
|
|
|
|
Args:
|
|
file_path: Optional path to credentials file. Defaults to /opt/letsbe/env/credentials.env
|
|
|
|
Returns:
|
|
Dictionary of key-value pairs from the credentials file
|
|
"""
|
|
credentials: dict[str, str] = {}
|
|
creds_file = file_path or CREDENTIALS_FILE
|
|
|
|
if not creds_file.exists():
|
|
logger.debug(f"Credentials file not found: {creds_file}")
|
|
return credentials
|
|
|
|
check_credentials_permissions(str(creds_file))
|
|
|
|
try:
|
|
with open(creds_file, 'r') as f:
|
|
for line_num, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
# Skip empty lines and comments
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
# Parse KEY=VALUE
|
|
if '=' in line:
|
|
key, value = line.split('=', 1)
|
|
credentials[key.strip()] = value.strip()
|
|
else:
|
|
logger.warning(f"Invalid line {line_num} in credentials file: {line}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to read credentials file: {e}")
|
|
|
|
return credentials
|
|
|
|
|
|
def get_portainer_credentials() -> Optional[dict[str, str]]:
|
|
"""
|
|
Extract Portainer-specific credentials from the credentials file.
|
|
|
|
Returns:
|
|
Dictionary with 'username' and 'password' keys, or None if not configured
|
|
"""
|
|
creds = read_credentials_file()
|
|
|
|
username = creds.get('PORTAINER_ADMIN_USER')
|
|
password = creds.get('PORTAINER_ADMIN_PASSWORD')
|
|
|
|
if username and password:
|
|
return {
|
|
'username': username,
|
|
'password': password,
|
|
}
|
|
|
|
return None
|
|
|
|
|
|
def get_all_tool_credentials() -> dict[str, dict[str, str]]:
|
|
"""
|
|
Extract all tool credentials from the credentials file.
|
|
Groups credentials by tool name.
|
|
|
|
Returns:
|
|
Dictionary where keys are tool names and values are credential dictionaries
|
|
"""
|
|
creds = read_credentials_file()
|
|
tool_credentials: dict[str, dict[str, str]] = {}
|
|
|
|
# Portainer credentials
|
|
portainer = get_portainer_credentials()
|
|
if portainer:
|
|
tool_credentials['portainer'] = portainer
|
|
|
|
# Add other tool credentials as needed
|
|
# Example patterns that might exist in credentials.env:
|
|
# NEXTCLOUD_ADMIN_USER, NEXTCLOUD_ADMIN_PASSWORD
|
|
# KEYCLOAK_ADMIN_USER, KEYCLOAK_ADMIN_PASSWORD
|
|
# etc.
|
|
|
|
tool_mappings = [
|
|
('nextcloud', ['NEXTCLOUD_ADMIN_USER', 'NEXTCLOUD_ADMIN_PASSWORD']),
|
|
('keycloak', ['KEYCLOAK_ADMIN_USER', 'KEYCLOAK_ADMIN_PASSWORD']),
|
|
('minio', ['MINIO_ROOT_USER', 'MINIO_ROOT_PASSWORD']),
|
|
('poste', ['POSTE_ADMIN_EMAIL', 'POSTE_ADMIN_PASSWORD']),
|
|
]
|
|
|
|
for tool_name, (user_key, pass_key) in tool_mappings:
|
|
username = creds.get(user_key)
|
|
password = creds.get(pass_key)
|
|
if username and password:
|
|
tool_credentials[tool_name] = {
|
|
'username': username,
|
|
'password': password,
|
|
}
|
|
|
|
return tool_credentials
|
|
|
|
|
|
def get_credential_hash() -> str:
|
|
"""
|
|
Generate a hash of the credentials file content.
|
|
Used to detect changes without sending full credentials each time.
|
|
|
|
Returns:
|
|
SHA-256 hash of the credentials file content, or empty string if file doesn't exist
|
|
"""
|
|
import hashlib
|
|
|
|
if not CREDENTIALS_FILE.exists():
|
|
return ""
|
|
|
|
try:
|
|
content = CREDENTIALS_FILE.read_bytes()
|
|
return hashlib.sha256(content).hexdigest()
|
|
except Exception as e:
|
|
logger.error(f"Failed to hash credentials file: {e}")
|
|
return ""
|