From 21364221c864a41a72ab65f89898759bdb0aed6c Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 3 Dec 2025 11:02:31 +0100 Subject: [PATCH] Initial commit: LetsBe Cloud Orchestrator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Features: - FastAPI backend with SQLAlchemy 2.0 async ORM - Tenant management (CRUD operations) - Task management with types: FILE_WRITE, ENV_UPDATE, DOCKER_RELOAD, COMPOSITE - Agent registration, heartbeat, and task claiming (/tasks/next) - Chatwoot deployment playbook (COMPOSITE task with ENV_UPDATE + DOCKER_RELOAD) - Alembic migrations for Postgres - Docker Compose setup for local development 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .claude/settings.local.json | 23 ++ .env.example | 6 + .gitignore | 63 +++++ CLAUDE.md | 64 +++++ Dockerfile | 22 ++ alembic.ini | 116 ++++++++ alembic/env.py | 94 +++++++ alembic/script.py.mako | 26 ++ alembic/versions/.gitkeep | 0 .../versions/4ca4b9958baf_initial_schema.py | 110 ++++++++ alembic/versions/add_agent_fields.py | 48 ++++ app/__init__.py | 1 + app/config.py | 30 +++ app/db.py | 52 ++++ app/main.py | 98 +++++++ app/models/__init__.py | 19 ++ app/models/agent.py | 74 ++++++ app/models/base.py | 44 +++ app/models/event.py | 69 +++++ app/models/server.py | 59 +++++ app/models/task.py | 82 ++++++ app/models/tenant.py | 57 ++++ app/playbooks/__init__.py | 17 ++ app/playbooks/chatwoot.py | 102 +++++++ app/routes/__init__.py | 15 ++ app/routes/agents.py | 146 ++++++++++ app/routes/health.py | 21 ++ app/routes/playbooks.py | 99 +++++++ app/routes/tasks.py | 250 ++++++++++++++++++ app/routes/tenants.py | 85 ++++++ app/schemas/__init__.py | 45 ++++ app/schemas/agent.py | 43 +++ app/schemas/common.py | 24 ++ app/schemas/task.py | 70 +++++ app/schemas/tasks_extended.py | 73 +++++ app/schemas/tenant.py | 25 ++ docker-compose.yml | 37 +++ requirements.txt | 15 ++ tests/__init__.py | 1 + tests/conftest.py | 63 +++++ tests/test_playbooks_chatwoot.py | 141 ++++++++++ 41 files changed, 2429 insertions(+) create mode 100644 .claude/settings.local.json create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 CLAUDE.md create mode 100644 Dockerfile create mode 100644 alembic.ini create mode 100644 alembic/env.py create mode 100644 alembic/script.py.mako create mode 100644 alembic/versions/.gitkeep create mode 100644 alembic/versions/4ca4b9958baf_initial_schema.py create mode 100644 alembic/versions/add_agent_fields.py create mode 100644 app/__init__.py create mode 100644 app/config.py create mode 100644 app/db.py create mode 100644 app/main.py create mode 100644 app/models/__init__.py create mode 100644 app/models/agent.py create mode 100644 app/models/base.py create mode 100644 app/models/event.py create mode 100644 app/models/server.py create mode 100644 app/models/task.py create mode 100644 app/models/tenant.py create mode 100644 app/playbooks/__init__.py create mode 100644 app/playbooks/chatwoot.py create mode 100644 app/routes/__init__.py create mode 100644 app/routes/agents.py create mode 100644 app/routes/health.py create mode 100644 app/routes/playbooks.py create mode 100644 app/routes/tasks.py create mode 100644 app/routes/tenants.py create mode 100644 app/schemas/__init__.py create mode 100644 app/schemas/agent.py create mode 100644 app/schemas/common.py create mode 100644 app/schemas/task.py create mode 100644 app/schemas/tasks_extended.py create mode 100644 app/schemas/tenant.py create mode 100644 docker-compose.yml create mode 100644 requirements.txt create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_playbooks_chatwoot.py diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..848bf72 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,23 @@ +{ + "permissions": { + "allow": [ + "mcp__serena__activate_project", + "mcp__serena__list_dir", + "mcp__context7__resolve-library-id", + "mcp__context7__get-library-docs", + "mcp__zen__consensus", + "Bash(docker compose build:*)", + "Bash(docker compose:*)", + "Bash(curl:*)", + "Bash(dir \"Z:\\Repos\\LetsBeV2\\orchestrator\\letsbe-orchestrator\\alembic\\versions\" /B)", + "Bash(dir:*)", + "Bash(timeout:*)", + "Bash(python:*)", + "Bash(git init:*)", + "Bash(git remote add:*)", + "Bash(git add:*)" + ], + "deny": [], + "ask": [] + } +} diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..3e662ba --- /dev/null +++ b/.env.example @@ -0,0 +1,6 @@ +# Database (port 5433 to avoid conflict with existing Postgres) +DATABASE_URL=postgresql+asyncpg://orchestrator:orchestrator@localhost:5433/orchestrator + +# Application +DEBUG=true +APP_NAME=LetsBe Orchestrator diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..69f8416 --- /dev/null +++ b/.gitignore @@ -0,0 +1,63 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# Testing +.tox/ +.coverage +.coverage.* +htmlcov/ +.pytest_cache/ +.hypothesis/ + +# Mypy +.mypy_cache/ + +# Local development +*.log +*.sqlite +*.db + +# Docker +.docker/ + +# OS +.DS_Store +Thumbs.db + +# Serena (local MCP tool data) +.serena/ diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..5edb8b5 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,64 @@ +# CLAUDE.md — LetsBe Cloud Orchestrator + +## Overview + +You are the engineering assistant for the **LetsBe Cloud Orchestrator**, the core control-plane backend for the LetsBe Cloud platform. + +The platform automatically provisions per-tenant servers, deploys a suite of open-source tools (Poste, Keycloak, MinIO, Passbolt, Vikunja, etc.), and coordinates an **AI SysAdmin agent** that configures those tools autonomously. The Orchestrator provides APIs for: + +- Tenant lifecycle management +- Server provisioning state +- Task scheduling +- Agent registration & heartbeat +- Event logging +- DNS operations (via Entri integration) +- Secrets storage (via Vault) + +This repository contains ONLY the **Orchestrator service**, not the SysAdmin agent. + +Claude Code should generate code that is: + +- Clean, strongly typed +- Production-ready +- Following Python 3.11 best practices +- Using FastAPI + SQLAlchemy + Alembic +- Modular, scalable, logically structured + +--- + +## Project Goals + +1. Provide a REST API for managing: + - Tenants + - Servers + - Tasks + - Agents + - Events + +2. Integrate with: + - Postgres + - Vault (later) + - Entri DNS (later) + - SysAdmin agent (remote automation worker) + +3. Serve as the **central state and coordination system** for the entire LetsBe Cloud platform. + +--- + +## Tech Stack + +**Backend framework:** FastAPI +**Language:** Python 3.11 +**Database:** Postgres (via Docker Compose) +**ORM:** SQLAlchemy 2.0 +**Migrations:** Alembic +**Serialization:** Pydantic v2 +**Containerization:** Docker Compose +**Testing:** Pytest (later) +**Architecture style:** Modular monolith (service modules inside `app/`) + +Everything MUST run via Docker using: + +```bash +docker compose up --build +``` diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..29b65de --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first for better caching +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Expose port +EXPOSE 8000 + +# Default command +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..228b952 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,116 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python>=3.9 or backports.zoneinfo library. +# Any required deps can installed by adding `alembic[tz]` to the pip requirements +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to alembic/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +# version_path_separator = newline +version_path_separator = os # Use os.pathsep. Default configuration used for new projects. + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# Database URL - will be overridden by env.py from environment variable +# Host port is 5433 to avoid conflict with existing Postgres instances +sqlalchemy.url = postgresql+asyncpg://orchestrator:orchestrator@localhost:5433/orchestrator + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -q + +# lint with attempts to fix using "ruff" - use the exec runner, execute a binary +# hooks = ruff +# ruff.type = exec +# ruff.executable = %(here)s/.venv/bin/ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..98be2a3 --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,94 @@ +"""Alembic migration environment configuration for async SQLAlchemy.""" + +import asyncio +from logging.config import fileConfig + +from alembic import context +from sqlalchemy import pool +from sqlalchemy.engine import Connection +from sqlalchemy.ext.asyncio import async_engine_from_config + +from app.config import settings +from app.models import Base + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Override sqlalchemy.url with environment variable +config.set_main_option("sqlalchemy.url", settings.DATABASE_URL) + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +target_metadata = Base.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def do_run_migrations(connection: Connection) -> None: + """Run migrations with a connection.""" + context.configure( + connection=connection, + target_metadata=target_metadata, + compare_type=True, + compare_server_default=True, + ) + + with context.begin_transaction(): + context.run_migrations() + + +async def run_async_migrations() -> None: + """Run migrations in 'online' mode with async engine.""" + connectable = async_engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + + await connectable.dispose() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode.""" + asyncio.run(run_async_migrations()) + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/alembic/script.py.mako b/alembic/script.py.mako new file mode 100644 index 0000000..fbc4b07 --- /dev/null +++ b/alembic/script.py.mako @@ -0,0 +1,26 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/alembic/versions/.gitkeep b/alembic/versions/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/alembic/versions/4ca4b9958baf_initial_schema.py b/alembic/versions/4ca4b9958baf_initial_schema.py new file mode 100644 index 0000000..399306e --- /dev/null +++ b/alembic/versions/4ca4b9958baf_initial_schema.py @@ -0,0 +1,110 @@ +"""initial_schema + +Revision ID: 4ca4b9958baf +Revises: +Create Date: 2025-12-02 18:50:17.377481 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = '4ca4b9958baf' +down_revision: Union[str, None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('tenants', + sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('domain', sa.String(length=255), nullable=True), + sa.Column('id', sa.Uuid(), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('domain') + ) + op.create_index(op.f('ix_tenants_name'), 'tenants', ['name'], unique=True) + op.create_table('agents', + sa.Column('tenant_id', sa.Uuid(), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('last_heartbeat', sa.DateTime(timezone=True), nullable=True), + sa.Column('id', sa.Uuid(), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(['tenant_id'], ['tenants.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_agents_tenant_id'), 'agents', ['tenant_id'], unique=False) + op.create_table('servers', + sa.Column('tenant_id', sa.Uuid(), nullable=False), + sa.Column('hostname', sa.String(length=255), nullable=False), + sa.Column('ip_address', sa.String(length=45), nullable=True), + sa.Column('status', sa.String(length=50), nullable=False), + sa.Column('id', sa.Uuid(), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(['tenant_id'], ['tenants.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_servers_tenant_id'), 'servers', ['tenant_id'], unique=False) + op.create_table('tasks', + sa.Column('tenant_id', sa.Uuid(), nullable=False), + sa.Column('agent_id', sa.Uuid(), nullable=True), + sa.Column('type', sa.String(length=100), nullable=False), + sa.Column('payload', postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.Column('status', sa.String(length=50), nullable=False), + sa.Column('result', postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column('id', sa.Uuid(), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(['agent_id'], ['agents.id'], ondelete='SET NULL'), + sa.ForeignKeyConstraint(['tenant_id'], ['tenants.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_tasks_agent_id'), 'tasks', ['agent_id'], unique=False) + op.create_index(op.f('ix_tasks_status'), 'tasks', ['status'], unique=False) + op.create_index(op.f('ix_tasks_tenant_id'), 'tasks', ['tenant_id'], unique=False) + op.create_index(op.f('ix_tasks_type'), 'tasks', ['type'], unique=False) + op.create_table('events', + sa.Column('tenant_id', sa.Uuid(), nullable=False), + sa.Column('task_id', sa.Uuid(), nullable=True), + sa.Column('event_type', sa.String(length=100), nullable=False), + sa.Column('payload', postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('id', sa.Uuid(), nullable=False), + sa.ForeignKeyConstraint(['task_id'], ['tasks.id'], ondelete='SET NULL'), + sa.ForeignKeyConstraint(['tenant_id'], ['tenants.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_events_created_at'), 'events', ['created_at'], unique=False) + op.create_index(op.f('ix_events_event_type'), 'events', ['event_type'], unique=False) + op.create_index(op.f('ix_events_task_id'), 'events', ['task_id'], unique=False) + op.create_index(op.f('ix_events_tenant_id'), 'events', ['tenant_id'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_events_tenant_id'), table_name='events') + op.drop_index(op.f('ix_events_task_id'), table_name='events') + op.drop_index(op.f('ix_events_event_type'), table_name='events') + op.drop_index(op.f('ix_events_created_at'), table_name='events') + op.drop_table('events') + op.drop_index(op.f('ix_tasks_type'), table_name='tasks') + op.drop_index(op.f('ix_tasks_tenant_id'), table_name='tasks') + op.drop_index(op.f('ix_tasks_status'), table_name='tasks') + op.drop_index(op.f('ix_tasks_agent_id'), table_name='tasks') + op.drop_table('tasks') + op.drop_index(op.f('ix_servers_tenant_id'), table_name='servers') + op.drop_table('servers') + op.drop_index(op.f('ix_agents_tenant_id'), table_name='agents') + op.drop_table('agents') + op.drop_index(op.f('ix_tenants_name'), table_name='tenants') + op.drop_table('tenants') + # ### end Alembic commands ### diff --git a/alembic/versions/add_agent_fields.py b/alembic/versions/add_agent_fields.py new file mode 100644 index 0000000..a7167a3 --- /dev/null +++ b/alembic/versions/add_agent_fields.py @@ -0,0 +1,48 @@ +"""add_agent_fields_and_nullable_tenant + +Revision ID: add_agent_fields +Revises: 4ca4b9958baf +Create Date: 2025-12-02 19:30:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'add_agent_fields' +down_revision: Union[str, None] = '4ca4b9958baf' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Add new columns to agents table + op.add_column('agents', sa.Column('version', sa.String(length=50), nullable=False, server_default='')) + op.add_column('agents', sa.Column('status', sa.String(length=20), nullable=False, server_default='offline')) + op.add_column('agents', sa.Column('token', sa.Text(), nullable=False, server_default='')) + + # Create index on status for efficient queries + op.create_index(op.f('ix_agents_status'), 'agents', ['status'], unique=False) + + # Make tenant_id nullable (agents can register without a tenant) + op.alter_column('agents', 'tenant_id', + existing_type=sa.UUID(), + nullable=True) + + +def downgrade() -> None: + # Make tenant_id NOT NULL again (will fail if there are rows with NULL tenant_id) + op.alter_column('agents', 'tenant_id', + existing_type=sa.UUID(), + nullable=False) + + # Drop the status index + op.drop_index(op.f('ix_agents_status'), table_name='agents') + + # Drop new columns + op.drop_column('agents', 'token') + op.drop_column('agents', 'status') + op.drop_column('agents', 'version') diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..fb5d11b --- /dev/null +++ b/app/__init__.py @@ -0,0 +1 @@ +# LetsBe Cloud Orchestrator diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..2fb90a9 --- /dev/null +++ b/app/config.py @@ -0,0 +1,30 @@ +"""Application configuration using Pydantic Settings.""" + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + """Application settings loaded from environment variables.""" + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + case_sensitive=False, + ) + + # Database (port 5433 to avoid conflict with existing Postgres) + DATABASE_URL: str = "postgresql+asyncpg://orchestrator:orchestrator@localhost:5433/orchestrator" + + # Application + DEBUG: bool = False + APP_NAME: str = "LetsBe Orchestrator" + APP_VERSION: str = "0.1.0" + + # Connection pool settings + DB_POOL_SIZE: int = 5 + DB_MAX_OVERFLOW: int = 10 + DB_POOL_TIMEOUT: int = 30 + DB_POOL_RECYCLE: int = 1800 + + +settings = Settings() diff --git a/app/db.py b/app/db.py new file mode 100644 index 0000000..bd0a535 --- /dev/null +++ b/app/db.py @@ -0,0 +1,52 @@ +"""Database configuration and session management.""" + +from collections.abc import AsyncGenerator +from typing import Annotated + +from fastapi import Depends +from sqlalchemy.ext.asyncio import ( + AsyncSession, + async_sessionmaker, + create_async_engine, +) + +from app.config import settings + +# Create async engine with connection pooling +engine = create_async_engine( + settings.DATABASE_URL, + pool_size=settings.DB_POOL_SIZE, + max_overflow=settings.DB_MAX_OVERFLOW, + pool_timeout=settings.DB_POOL_TIMEOUT, + pool_recycle=settings.DB_POOL_RECYCLE, + echo=settings.DEBUG, +) + +# Create async session factory +async_session_maker = async_sessionmaker( + engine, + class_=AsyncSession, + expire_on_commit=False, + autocommit=False, + autoflush=False, +) + + +async def get_db() -> AsyncGenerator[AsyncSession, None]: + """ + Dependency that provides an async database session. + + Yields a session and ensures proper cleanup via finally block. + """ + async with async_session_maker() as session: + try: + yield session + except Exception: + await session.rollback() + raise + finally: + await session.close() + + +# Type alias for dependency injection +AsyncSessionDep = Annotated[AsyncSession, Depends(get_db)] diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..81a38a4 --- /dev/null +++ b/app/main.py @@ -0,0 +1,98 @@ +"""FastAPI application entry point.""" + +import uuid +from contextlib import asynccontextmanager +from typing import AsyncGenerator + +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse +from sqlalchemy.exc import IntegrityError +from starlette.middleware.base import BaseHTTPMiddleware + +from app.config import settings +from app.db import engine +from app.routes import ( + agents_router, + health_router, + playbooks_router, + tasks_router, + tenants_router, +) + + +# --- Middleware --- + + +class RequestIDMiddleware(BaseHTTPMiddleware): + """Middleware that adds a unique request ID to each request.""" + + async def dispatch(self, request: Request, call_next): + request_id = str(uuid.uuid4()) + request.state.request_id = request_id + response = await call_next(request) + response.headers["X-Request-ID"] = request_id + return response + + +# --- Lifespan --- + + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: + """Application lifespan handler for startup and shutdown.""" + # Startup + yield + # Shutdown + await engine.dispose() + + +# --- Application --- + + +app = FastAPI( + title=settings.APP_NAME, + version=settings.APP_VERSION, + description="Control-plane backend for the LetsBe Cloud platform", + lifespan=lifespan, +) + +# Add middleware +app.add_middleware(RequestIDMiddleware) + + +# --- Exception Handlers --- + + +@app.exception_handler(IntegrityError) +async def integrity_error_handler(request: Request, exc: IntegrityError) -> JSONResponse: + """Handle database integrity errors (unique constraint violations, etc.).""" + return JSONResponse( + status_code=409, + content={ + "detail": "Resource conflict: a record with these values already exists", + "request_id": getattr(request.state, "request_id", None), + }, + ) + + +# --- Routers --- + + +app.include_router(health_router) +app.include_router(tenants_router, prefix="/api/v1") +app.include_router(tasks_router, prefix="/api/v1") +app.include_router(agents_router, prefix="/api/v1") +app.include_router(playbooks_router, prefix="/api/v1") + + +# --- Root endpoint --- + + +@app.get("/") +async def root(): + """Root endpoint redirecting to docs.""" + return { + "message": f"Welcome to {settings.APP_NAME}", + "docs": "/docs", + "health": "/health", + } diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000..db08c2b --- /dev/null +++ b/app/models/__init__.py @@ -0,0 +1,19 @@ +"""SQLAlchemy models for the Orchestrator.""" + +from app.models.base import Base +from app.models.tenant import Tenant +from app.models.server import Server +from app.models.task import Task, TaskStatus +from app.models.agent import Agent, AgentStatus +from app.models.event import Event + +__all__ = [ + "Base", + "Tenant", + "Server", + "Task", + "TaskStatus", + "Agent", + "AgentStatus", + "Event", +] diff --git a/app/models/agent.py b/app/models/agent.py new file mode 100644 index 0000000..87d3bd7 --- /dev/null +++ b/app/models/agent.py @@ -0,0 +1,74 @@ +"""Agent model for SysAdmin automation workers.""" + +import uuid +from datetime import datetime +from enum import Enum +from typing import TYPE_CHECKING + +from sqlalchemy import DateTime, ForeignKey, String, Text +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.models.base import Base, TimestampMixin, UUIDMixin + +if TYPE_CHECKING: + from app.models.task import Task + from app.models.tenant import Tenant + + +class AgentStatus(str, Enum): + """Agent status values.""" + + ONLINE = "online" + OFFLINE = "offline" + + +class Agent(UUIDMixin, TimestampMixin, Base): + """ + Agent model representing a SysAdmin automation worker. + + Agents register with the orchestrator and receive tasks to execute. + """ + + __tablename__ = "agents" + + tenant_id: Mapped[uuid.UUID | None] = mapped_column( + ForeignKey("tenants.id", ondelete="CASCADE"), + nullable=True, + index=True, + ) + name: Mapped[str] = mapped_column( + String(255), + nullable=False, + ) + version: Mapped[str] = mapped_column( + String(50), + nullable=False, + default="", + ) + status: Mapped[str] = mapped_column( + String(20), + nullable=False, + default=AgentStatus.OFFLINE.value, + index=True, + ) + last_heartbeat: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), + nullable=True, + ) + token: Mapped[str] = mapped_column( + Text, + nullable=False, + default="", + ) + + # Relationships + tenant: Mapped["Tenant | None"] = relationship( + back_populates="agents", + ) + tasks: Mapped[list["Task"]] = relationship( + back_populates="agent", + lazy="selectin", + ) + + def __repr__(self) -> str: + return f"" diff --git a/app/models/base.py b/app/models/base.py new file mode 100644 index 0000000..ed6e47b --- /dev/null +++ b/app/models/base.py @@ -0,0 +1,44 @@ +"""Base model and mixins for SQLAlchemy ORM.""" + +import uuid +from datetime import datetime, timezone + +from sqlalchemy import DateTime +from sqlalchemy.ext.asyncio import AsyncAttrs +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + + +def utc_now() -> datetime: + """Return current UTC datetime.""" + return datetime.now(timezone.utc) + + +class Base(AsyncAttrs, DeclarativeBase): + """Base class for all SQLAlchemy models.""" + + pass + + +class UUIDMixin: + """Mixin that adds a UUID primary key.""" + + id: Mapped[uuid.UUID] = mapped_column( + primary_key=True, + default=uuid.uuid4, + ) + + +class TimestampMixin: + """Mixin that adds created_at and updated_at timestamps.""" + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=utc_now, + nullable=False, + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=utc_now, + onupdate=utc_now, + nullable=False, + ) diff --git a/app/models/event.py b/app/models/event.py new file mode 100644 index 0000000..862a272 --- /dev/null +++ b/app/models/event.py @@ -0,0 +1,69 @@ +"""Event model for audit logging.""" + +import uuid +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any + +from sqlalchemy import DateTime, ForeignKey, String +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.models.base import Base, UUIDMixin + +if TYPE_CHECKING: + from app.models.task import Task + from app.models.tenant import Tenant + + +def utc_now() -> datetime: + """Return current UTC datetime.""" + return datetime.now(timezone.utc) + + +class Event(UUIDMixin, Base): + """ + Event model for audit logging and activity tracking. + + Events are immutable records of system activity. + Only has created_at (no updated_at since events are immutable). + """ + + __tablename__ = "events" + + tenant_id: Mapped[uuid.UUID] = mapped_column( + ForeignKey("tenants.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + task_id: Mapped[uuid.UUID | None] = mapped_column( + ForeignKey("tasks.id", ondelete="SET NULL"), + nullable=True, + index=True, + ) + event_type: Mapped[str] = mapped_column( + String(100), + nullable=False, + index=True, + ) + payload: Mapped[dict[str, Any]] = mapped_column( + JSONB, + nullable=False, + default=dict, + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=utc_now, + nullable=False, + index=True, + ) + + # Relationships + tenant: Mapped["Tenant"] = relationship( + back_populates="events", + ) + task: Mapped["Task | None"] = relationship( + back_populates="events", + ) + + def __repr__(self) -> str: + return f"" diff --git a/app/models/server.py b/app/models/server.py new file mode 100644 index 0000000..a1239f2 --- /dev/null +++ b/app/models/server.py @@ -0,0 +1,59 @@ +"""Server model for provisioned infrastructure.""" + +import uuid +from enum import Enum +from typing import TYPE_CHECKING + +from sqlalchemy import ForeignKey, String +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.models.base import Base, TimestampMixin, UUIDMixin + +if TYPE_CHECKING: + from app.models.tenant import Tenant + + +class ServerStatus(str, Enum): + """Server provisioning status.""" + + PROVISIONING = "provisioning" + READY = "ready" + ERROR = "error" + TERMINATED = "terminated" + + +class Server(UUIDMixin, TimestampMixin, Base): + """ + Server model representing a provisioned VM or container. + + Tracks provisioning state and network configuration. + """ + + __tablename__ = "servers" + + tenant_id: Mapped[uuid.UUID] = mapped_column( + ForeignKey("tenants.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + hostname: Mapped[str] = mapped_column( + String(255), + nullable=False, + ) + ip_address: Mapped[str | None] = mapped_column( + String(45), # Supports IPv6 + nullable=True, + ) + status: Mapped[str] = mapped_column( + String(50), + default=ServerStatus.PROVISIONING.value, + nullable=False, + ) + + # Relationships + tenant: Mapped["Tenant"] = relationship( + back_populates="servers", + ) + + def __repr__(self) -> str: + return f"" diff --git a/app/models/task.py b/app/models/task.py new file mode 100644 index 0000000..80763a8 --- /dev/null +++ b/app/models/task.py @@ -0,0 +1,82 @@ +"""Task model for orchestration jobs.""" + +import uuid +from enum import Enum +from typing import TYPE_CHECKING, Any + +from sqlalchemy import ForeignKey, String +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.models.base import Base, TimestampMixin, UUIDMixin + +if TYPE_CHECKING: + from app.models.agent import Agent + from app.models.event import Event + from app.models.tenant import Tenant + + +class TaskStatus(str, Enum): + """Task execution status.""" + + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + + +class Task(UUIDMixin, TimestampMixin, Base): + """ + Task model representing an orchestration job. + + Tasks are assigned to agents and track execution state. + Payload and result use JSONB for flexible, queryable storage. + """ + + __tablename__ = "tasks" + + tenant_id: Mapped[uuid.UUID] = mapped_column( + ForeignKey("tenants.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + agent_id: Mapped[uuid.UUID | None] = mapped_column( + ForeignKey("agents.id", ondelete="SET NULL"), + nullable=True, + index=True, + ) + type: Mapped[str] = mapped_column( + String(100), + nullable=False, + index=True, + ) + payload: Mapped[dict[str, Any]] = mapped_column( + JSONB, + nullable=False, + default=dict, + ) + status: Mapped[str] = mapped_column( + String(50), + default=TaskStatus.PENDING.value, + nullable=False, + index=True, + ) + result: Mapped[dict[str, Any] | None] = mapped_column( + JSONB, + nullable=True, + ) + + # Relationships + tenant: Mapped["Tenant"] = relationship( + back_populates="tasks", + ) + agent: Mapped["Agent | None"] = relationship( + back_populates="tasks", + ) + events: Mapped[list["Event"]] = relationship( + back_populates="task", + lazy="selectin", + ) + + def __repr__(self) -> str: + return f"" diff --git a/app/models/tenant.py b/app/models/tenant.py new file mode 100644 index 0000000..22b57b5 --- /dev/null +++ b/app/models/tenant.py @@ -0,0 +1,57 @@ +"""Tenant model for multi-tenancy support.""" + +from typing import TYPE_CHECKING + +from sqlalchemy import String +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from app.models.base import Base, TimestampMixin, UUIDMixin + +if TYPE_CHECKING: + from app.models.agent import Agent + from app.models.event import Event + from app.models.server import Server + from app.models.task import Task + + +class Tenant(UUIDMixin, TimestampMixin, Base): + """ + Tenant model representing a customer organization. + + Each tenant has isolated servers, tasks, agents, and events. + """ + + __tablename__ = "tenants" + + name: Mapped[str] = mapped_column( + String(255), + unique=True, + nullable=False, + index=True, + ) + domain: Mapped[str | None] = mapped_column( + String(255), + unique=True, + nullable=True, + ) + + # Relationships + servers: Mapped[list["Server"]] = relationship( + back_populates="tenant", + lazy="selectin", + ) + tasks: Mapped[list["Task"]] = relationship( + back_populates="tenant", + lazy="selectin", + ) + agents: Mapped[list["Agent"]] = relationship( + back_populates="tenant", + lazy="selectin", + ) + events: Mapped[list["Event"]] = relationship( + back_populates="tenant", + lazy="selectin", + ) + + def __repr__(self) -> str: + return f"" diff --git a/app/playbooks/__init__.py b/app/playbooks/__init__.py new file mode 100644 index 0000000..3eaf103 --- /dev/null +++ b/app/playbooks/__init__.py @@ -0,0 +1,17 @@ +"""Playbooks module for infrastructure automation tasks. + +Playbooks define reusable sequences of steps (COMPOSITE tasks) for +deploying and configuring services on tenant servers. +""" + +from app.playbooks.chatwoot import ( + CompositeStep, + build_chatwoot_setup_steps, + create_chatwoot_setup_task, +) + +__all__ = [ + "CompositeStep", + "build_chatwoot_setup_steps", + "create_chatwoot_setup_task", +] diff --git a/app/playbooks/chatwoot.py b/app/playbooks/chatwoot.py new file mode 100644 index 0000000..7ca3c29 --- /dev/null +++ b/app/playbooks/chatwoot.py @@ -0,0 +1,102 @@ +"""Chatwoot deployment playbook. + +Defines the steps required to set up Chatwoot on a tenant server +that already has stacks and env templates under /opt/letsbe. +""" + +import uuid +from typing import Any + +from pydantic import BaseModel, Field +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.task import Task, TaskStatus + + +class CompositeStep(BaseModel): + """A single step in a composite playbook.""" + + type: str = Field(..., description="Task type (e.g., ENV_UPDATE, DOCKER_RELOAD)") + payload: dict[str, Any] = Field( + default_factory=dict, description="Payload for this step" + ) + + +# LetsBe standard paths +CHATWOOT_ENV_PATH = "/opt/letsbe/env/chatwoot.env" +CHATWOOT_STACK_DIR = "/opt/letsbe/stacks/chatwoot" + + +def build_chatwoot_setup_steps(*, domain: str) -> list[CompositeStep]: + """ + Build the sequence of steps required to set up Chatwoot. + + Assumes the env file already exists at /opt/letsbe/env/chatwoot.env + (created by provisioning/env_setup.sh). + + Args: + domain: The domain for Chatwoot (e.g., "support.example.com") + + Returns: + List of 2 CompositeStep objects: + 1. ENV_UPDATE - patches FRONTEND_URL and BACKEND_URL + 2. DOCKER_RELOAD - restarts the chatwoot stack with pull=True + """ + steps = [ + # Step 1: Update environment variables + CompositeStep( + type="ENV_UPDATE", + payload={ + "path": CHATWOOT_ENV_PATH, + "updates": { + "FRONTEND_URL": f"https://{domain}", + "BACKEND_URL": f"https://{domain}", + }, + }, + ), + # Step 2: Reload Docker stack + CompositeStep( + type="DOCKER_RELOAD", + payload={ + "compose_dir": CHATWOOT_STACK_DIR, + "pull": True, + }, + ), + ] + return steps + + +async def create_chatwoot_setup_task( + *, + db: AsyncSession, + tenant_id: uuid.UUID, + agent_id: uuid.UUID | None, + domain: str, +) -> Task: + """ + Create and persist a COMPOSITE task for Chatwoot setup. + + Args: + db: Async database session + tenant_id: UUID of the tenant + agent_id: Optional UUID of the agent to assign the task to + domain: The domain for Chatwoot + + Returns: + The created Task object with type="COMPOSITE" + """ + steps = build_chatwoot_setup_steps(domain=domain) + + task = Task( + tenant_id=tenant_id, + agent_id=agent_id, + type="COMPOSITE", + payload={"steps": [step.model_dump() for step in steps]}, + status=TaskStatus.PENDING.value, + ) + + db.add(task) + await db.commit() + await db.refresh(task) + + return task diff --git a/app/routes/__init__.py b/app/routes/__init__.py new file mode 100644 index 0000000..f8b5b22 --- /dev/null +++ b/app/routes/__init__.py @@ -0,0 +1,15 @@ +"""FastAPI route modules.""" + +from app.routes.health import router as health_router +from app.routes.tasks import router as tasks_router +from app.routes.tenants import router as tenants_router +from app.routes.agents import router as agents_router +from app.routes.playbooks import router as playbooks_router + +__all__ = [ + "health_router", + "tenants_router", + "tasks_router", + "agents_router", + "playbooks_router", +] diff --git a/app/routes/agents.py b/app/routes/agents.py new file mode 100644 index 0000000..af8c46e --- /dev/null +++ b/app/routes/agents.py @@ -0,0 +1,146 @@ +"""Agent management endpoints.""" + +import secrets +import uuid + +from fastapi import APIRouter, Header, HTTPException, status +from sqlalchemy import select + +from app.db import AsyncSessionDep +from app.models.agent import Agent, AgentStatus +from app.models.base import utc_now +from app.schemas.agent import ( + AgentHeartbeatResponse, + AgentRegisterRequest, + AgentRegisterResponse, +) + +router = APIRouter(prefix="/agents", tags=["Agents"]) + + +# --- Helper functions (embryonic service layer) --- + + +async def get_agent_by_id(db: AsyncSessionDep, agent_id: uuid.UUID) -> Agent | None: + """Retrieve an agent by ID.""" + result = await db.execute(select(Agent).where(Agent.id == agent_id)) + return result.scalar_one_or_none() + + +async def validate_agent_token( + db: AsyncSessionDep, + agent_id: uuid.UUID, + authorization: str | None, +) -> Agent: + """ + Validate agent exists and token matches. + + Args: + db: Database session + agent_id: Agent UUID + authorization: Authorization header value + + Returns: + Agent if valid + + Raises: + HTTPException: 401 if invalid + """ + if authorization is None: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Missing Authorization header", + ) + + # Parse Bearer token + parts = authorization.split(" ", 1) + if len(parts) != 2 or parts[0].lower() != "bearer": + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid Authorization header format. Expected: Bearer ", + ) + + token = parts[1] + + # Find and validate agent + agent = await get_agent_by_id(db, agent_id) + if agent is None: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid agent credentials", + ) + + # Use secrets.compare_digest for timing-attack-safe comparison + if not secrets.compare_digest(agent.token, token): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid agent credentials", + ) + + return agent + + +# --- Route handlers (thin controllers) --- + + +@router.post( + "/register", + response_model=AgentRegisterResponse, + status_code=status.HTTP_201_CREATED, +) +async def register_agent( + request: AgentRegisterRequest, + db: AsyncSessionDep, +) -> AgentRegisterResponse: + """ + Register a new SysAdmin agent. + + - **hostname**: Agent hostname (will be used as name) + - **version**: Agent software version + - **metadata**: Optional JSON metadata + + Returns agent_id and token for subsequent API calls. + """ + agent_id = uuid.uuid4() + token = secrets.token_hex(32) + + agent = Agent( + id=agent_id, + name=request.hostname, + version=request.version, + status=AgentStatus.ONLINE.value, + last_heartbeat=utc_now(), + token=token, + tenant_id=None, # Agents register without tenant initially + ) + + db.add(agent) + await db.commit() + + return AgentRegisterResponse(agent_id=agent_id, token=token) + + +@router.post( + "/{agent_id}/heartbeat", + response_model=AgentHeartbeatResponse, +) +async def agent_heartbeat( + agent_id: uuid.UUID, + db: AsyncSessionDep, + authorization: str | None = Header(None), +) -> AgentHeartbeatResponse: + """ + Send heartbeat from agent. + + Updates last_heartbeat timestamp and sets status to online. + Requires Bearer token authentication. + """ + agent = await validate_agent_token(db, agent_id, authorization) + + # Update heartbeat + agent.last_heartbeat = utc_now() + agent.status = AgentStatus.ONLINE.value + + await db.commit() + + return AgentHeartbeatResponse(status="ok") diff --git a/app/routes/health.py b/app/routes/health.py new file mode 100644 index 0000000..d090e9e --- /dev/null +++ b/app/routes/health.py @@ -0,0 +1,21 @@ +"""Health check endpoints.""" + +from fastapi import APIRouter + +from app.config import settings +from app.schemas.common import HealthResponse + +router = APIRouter(tags=["Health"]) + + +@router.get("/health", response_model=HealthResponse) +async def health_check() -> HealthResponse: + """ + Health check endpoint. + + Returns the current status and version of the API. + """ + return HealthResponse( + status="ok", + version=settings.APP_VERSION, + ) diff --git a/app/routes/playbooks.py b/app/routes/playbooks.py new file mode 100644 index 0000000..b158a21 --- /dev/null +++ b/app/routes/playbooks.py @@ -0,0 +1,99 @@ +"""Playbook endpoints for triggering infrastructure automation.""" + +import uuid + +from fastapi import APIRouter, HTTPException, status +from pydantic import BaseModel, Field +from sqlalchemy import select + +from app.db import AsyncSessionDep +from app.models.agent import Agent +from app.models.task import Task +from app.models.tenant import Tenant +from app.playbooks.chatwoot import create_chatwoot_setup_task +from app.schemas.task import TaskResponse + +router = APIRouter(prefix="/tenants/{tenant_id}", tags=["Playbooks"]) + + +class ChatwootSetupRequest(BaseModel): + """Request body for Chatwoot setup playbook.""" + + agent_id: uuid.UUID | None = Field( + None, description="Optional agent UUID to assign the task to" + ) + domain: str = Field( + ..., min_length=1, description="Domain for Chatwoot (e.g., support.example.com)" + ) + + +# --- Helper functions --- + + +async def get_tenant_by_id(db: AsyncSessionDep, tenant_id: uuid.UUID) -> Tenant | None: + """Retrieve a tenant by ID.""" + result = await db.execute(select(Tenant).where(Tenant.id == tenant_id)) + return result.scalar_one_or_none() + + +async def get_agent_by_id(db: AsyncSessionDep, agent_id: uuid.UUID) -> Agent | None: + """Retrieve an agent by ID.""" + result = await db.execute(select(Agent).where(Agent.id == agent_id)) + return result.scalar_one_or_none() + + +# --- Route handlers --- + + +@router.post( + "/chatwoot/setup", + response_model=TaskResponse, + status_code=status.HTTP_201_CREATED, +) +async def setup_chatwoot( + tenant_id: uuid.UUID, + request: ChatwootSetupRequest, + db: AsyncSessionDep, +) -> Task: + """ + Trigger Chatwoot setup playbook for a tenant. + + Creates a COMPOSITE task with the following steps: + 1. **ENV_UPDATE**: Set FRONTEND_URL and BACKEND_URL in chatwoot.env + 2. **DOCKER_RELOAD**: Restart the Chatwoot stack with pull=True + + ## Request Body + - **agent_id**: Optional agent UUID to assign the task to immediately + - **domain**: The domain for Chatwoot (e.g., "support.example.com") + + ## Response + Returns the created Task with type="COMPOSITE" and status="pending". + + The SysAdmin Agent will pick up this task and execute the steps in sequence. + """ + # Validate tenant exists + tenant = await get_tenant_by_id(db, tenant_id) + if tenant is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Tenant {tenant_id} not found", + ) + + # Validate agent exists if provided + if request.agent_id is not None: + agent = await get_agent_by_id(db, request.agent_id) + if agent is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Agent {request.agent_id} not found", + ) + + # Create the COMPOSITE task + task = await create_chatwoot_setup_task( + db=db, + tenant_id=tenant_id, + agent_id=request.agent_id, + domain=request.domain, + ) + + return task diff --git a/app/routes/tasks.py b/app/routes/tasks.py new file mode 100644 index 0000000..787d79a --- /dev/null +++ b/app/routes/tasks.py @@ -0,0 +1,250 @@ +"""Task management endpoints.""" + +import uuid + +from fastapi import APIRouter, Header, HTTPException, Query, status +from sqlalchemy import select + +from app.db import AsyncSessionDep +from app.models.task import Task, TaskStatus +from app.routes.agents import validate_agent_token +from app.schemas.task import TaskCreate, TaskResponse, TaskUpdate + +router = APIRouter(prefix="/tasks", tags=["Tasks"]) + + +# --- Helper functions (embryonic service layer) --- + + +async def create_task(db: AsyncSessionDep, task_in: TaskCreate) -> Task: + """Create a new task in the database.""" + task = Task( + tenant_id=task_in.tenant_id, + type=task_in.type, + payload=task_in.payload, + status=TaskStatus.PENDING.value, + ) + db.add(task) + await db.commit() + await db.refresh(task) + return task + + +async def get_tasks( + db: AsyncSessionDep, + tenant_id: uuid.UUID | None = None, + task_status: TaskStatus | None = None, +) -> list[Task]: + """Retrieve tasks with optional filtering.""" + query = select(Task).order_by(Task.created_at.desc()) + + if tenant_id is not None: + query = query.where(Task.tenant_id == tenant_id) + + if task_status is not None: + query = query.where(Task.status == task_status.value) + + result = await db.execute(query) + return list(result.scalars().all()) + + +async def get_task_by_id(db: AsyncSessionDep, task_id: uuid.UUID) -> Task | None: + """Retrieve a task by ID.""" + result = await db.execute(select(Task).where(Task.id == task_id)) + return result.scalar_one_or_none() + + +async def update_task( + db: AsyncSessionDep, + task: Task, + task_update: TaskUpdate, +) -> Task: + """Update a task's status and/or result.""" + if task_update.status is not None: + task.status = task_update.status.value + if task_update.result is not None: + task.result = task_update.result + + await db.commit() + await db.refresh(task) + return task + + +# --- Route handlers (thin controllers) --- + + +@router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED) +async def create_task_endpoint( + task_in: TaskCreate, + db: AsyncSessionDep, +) -> Task: + """ + Create a new task for agent execution. + + ## Parameters + - **tenant_id**: UUID of the tenant this task belongs to + - **type**: Task type string (see supported types below) + - **payload**: JSON payload with task-specific parameters + + ## Supported Task Types + + | Type | Description | Payload | + |------|-------------|---------| + | FILE_WRITE | Write content to a file | `{"path": str, "content": str}` | + | ENV_UPDATE | Update .env key/value pairs | `{"path": str, "updates": {str: str}}` | + | DOCKER_RELOAD | Reload Docker Compose stack | `{"compose_dir": str}` | + | COMPOSITE | Execute sequence of sub-tasks | `{"sequence": [{task, payload}, ...]}` | + + ## Agent Behavior + 1. Agent polls `GET /tasks/next` to claim pending tasks + 2. Agent executes the task based on type and payload + 3. Agent updates task status via `PATCH /tasks/{id}` + + ## Example Payloads + + **FILE_WRITE:** + ```json + {"path": "/opt/app/config.json", "content": "{\"key\": \"value\"}"} + ``` + + **ENV_UPDATE:** + ```json + {"path": "/opt/app/.env", "updates": {"DB_HOST": "localhost", "DB_PORT": "5432"}} + ``` + + **DOCKER_RELOAD:** + ```json + {"compose_dir": "/opt/stacks/keycloak"} + ``` + + **COMPOSITE:** + ```json + { + "sequence": [ + {"task": "FILE_WRITE", "payload": {"path": "/opt/app/config.json", "content": "{}"}}, + {"task": "DOCKER_RELOAD", "payload": {"compose_dir": "/opt/stacks/app"}} + ] + } + ``` + """ + return await create_task(db, task_in) + + +@router.get("/", response_model=list[TaskResponse]) +async def list_tasks_endpoint( + db: AsyncSessionDep, + tenant_id: uuid.UUID | None = Query(None, description="Filter by tenant ID"), + status: TaskStatus | None = Query(None, description="Filter by task status"), +) -> list[Task]: + """ + List all tasks with optional filtering. + + ## Query Parameters + - **tenant_id**: Optional filter by tenant UUID + - **status**: Optional filter by task status (pending, running, completed, failed) + + ## Task Types + Tasks may have the following types: + - **FILE_WRITE**: Write content to a file + - **ENV_UPDATE**: Update .env key/value pairs + - **DOCKER_RELOAD**: Reload Docker Compose stack + - **COMPOSITE**: Execute sequence of sub-tasks + - Legacy types: provision_server, configure_keycloak, etc. + + ## Response + Returns tasks ordered by created_at descending (newest first). + Each task includes: id, tenant_id, agent_id, type, payload, status, result, timestamps. + """ + return await get_tasks(db, tenant_id=tenant_id, task_status=status) + + +# --- Agent task acquisition --- +# NOTE: /next must be defined BEFORE /{task_id} to avoid path matching issues + + +async def get_next_pending_task(db: AsyncSessionDep) -> Task | None: + """Get the oldest pending task.""" + query = ( + select(Task) + .where(Task.status == TaskStatus.PENDING.value) + .order_by(Task.created_at.asc()) + .limit(1) + ) + result = await db.execute(query) + return result.scalar_one_or_none() + + +@router.get("/next", response_model=TaskResponse | None) +async def get_next_task_endpoint( + db: AsyncSessionDep, + agent_id: uuid.UUID = Query(..., description="Agent UUID requesting the task"), + authorization: str | None = Header(None), +) -> Task | None: + """ + Get the next pending task for an agent. + + Requires Bearer token authentication matching the agent. + Atomically claims the oldest pending task by: + - Setting status to 'running' + - Assigning agent_id to the requesting agent + + Returns null (200) if no pending tasks are available. + """ + # Validate agent credentials + await validate_agent_token(db, agent_id, authorization) + + # Get next pending task + task = await get_next_pending_task(db) + + if task is None: + return None + + # Claim the task + task.status = TaskStatus.RUNNING.value + task.agent_id = agent_id + + await db.commit() + await db.refresh(task) + + return task + + +@router.get("/{task_id}", response_model=TaskResponse) +async def get_task_endpoint( + task_id: uuid.UUID, + db: AsyncSessionDep, +) -> Task: + """ + Get a task by ID. + + Returns the task with the specified UUID. + """ + task = await get_task_by_id(db, task_id) + if task is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Task {task_id} not found", + ) + return task + + +@router.patch("/{task_id}", response_model=TaskResponse) +async def update_task_endpoint( + task_id: uuid.UUID, + task_update: TaskUpdate, + db: AsyncSessionDep, +) -> Task: + """ + Update a task's status and/or result. + + Only status and result fields can be updated. + - **status**: New task status + - **result**: JSON result payload + """ + task = await get_task_by_id(db, task_id) + if task is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Task {task_id} not found", + ) + return await update_task(db, task, task_update) diff --git a/app/routes/tenants.py b/app/routes/tenants.py new file mode 100644 index 0000000..b620fcc --- /dev/null +++ b/app/routes/tenants.py @@ -0,0 +1,85 @@ +"""Tenant management endpoints.""" + +import uuid + +from fastapi import APIRouter, HTTPException, status +from sqlalchemy import select + +from app.db import AsyncSessionDep +from app.models.tenant import Tenant +from app.schemas.tenant import TenantCreate, TenantResponse + +router = APIRouter(prefix="/tenants", tags=["Tenants"]) + + +# --- Helper functions (embryonic service layer) --- + + +async def create_tenant(db: AsyncSessionDep, tenant_in: TenantCreate) -> Tenant: + """Create a new tenant in the database.""" + tenant = Tenant( + name=tenant_in.name, + domain=tenant_in.domain, + ) + db.add(tenant) + await db.commit() + await db.refresh(tenant) + return tenant + + +async def get_tenants(db: AsyncSessionDep) -> list[Tenant]: + """Retrieve all tenants from the database.""" + result = await db.execute(select(Tenant).order_by(Tenant.created_at.desc())) + return list(result.scalars().all()) + + +async def get_tenant_by_id(db: AsyncSessionDep, tenant_id: uuid.UUID) -> Tenant | None: + """Retrieve a tenant by ID.""" + result = await db.execute(select(Tenant).where(Tenant.id == tenant_id)) + return result.scalar_one_or_none() + + +# --- Route handlers (thin controllers) --- + + +@router.post("/", response_model=TenantResponse, status_code=status.HTTP_201_CREATED) +async def create_tenant_endpoint( + tenant_in: TenantCreate, + db: AsyncSessionDep, +) -> Tenant: + """ + Create a new tenant. + + - **name**: Unique tenant name (required) + - **domain**: Optional domain for the tenant + """ + return await create_tenant(db, tenant_in) + + +@router.get("/", response_model=list[TenantResponse]) +async def list_tenants_endpoint(db: AsyncSessionDep) -> list[Tenant]: + """ + List all tenants. + + Returns a list of all registered tenants. + """ + return await get_tenants(db) + + +@router.get("/{tenant_id}", response_model=TenantResponse) +async def get_tenant_endpoint( + tenant_id: uuid.UUID, + db: AsyncSessionDep, +) -> Tenant: + """ + Get a tenant by ID. + + Returns the tenant with the specified UUID. + """ + tenant = await get_tenant_by_id(db, tenant_id) + if tenant is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Tenant {tenant_id} not found", + ) + return tenant diff --git a/app/schemas/__init__.py b/app/schemas/__init__.py new file mode 100644 index 0000000..264a1bb --- /dev/null +++ b/app/schemas/__init__.py @@ -0,0 +1,45 @@ +"""Pydantic schemas for API request/response validation.""" + +from app.schemas.common import HealthResponse +from app.schemas.tenant import TenantCreate, TenantResponse +from app.schemas.task import ( + TaskCreate, + TaskResponse, + TaskUpdate, +) +from app.schemas.agent import ( + AgentRegisterRequest, + AgentRegisterResponse, + AgentHeartbeatResponse, + AgentResponse, +) +from app.schemas.tasks_extended import ( + FileWritePayload, + EnvUpdatePayload, + DockerReloadPayload, + CompositeSubTask, + CompositePayload, +) + +__all__ = [ + # Common + "HealthResponse", + # Tenant + "TenantCreate", + "TenantResponse", + # Task + "TaskCreate", + "TaskResponse", + "TaskUpdate", + # Task Payloads (for documentation/reference) + "FileWritePayload", + "EnvUpdatePayload", + "DockerReloadPayload", + "CompositeSubTask", + "CompositePayload", + # Agent + "AgentRegisterRequest", + "AgentRegisterResponse", + "AgentHeartbeatResponse", + "AgentResponse", +] diff --git a/app/schemas/agent.py b/app/schemas/agent.py new file mode 100644 index 0000000..f4339da --- /dev/null +++ b/app/schemas/agent.py @@ -0,0 +1,43 @@ +"""Agent schemas for API validation.""" + +import uuid +from datetime import datetime +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + + +class AgentRegisterRequest(BaseModel): + """Schema for agent registration request.""" + + hostname: str = Field(..., min_length=1, max_length=255) + version: str = Field(..., min_length=1, max_length=50) + metadata: dict[str, Any] | None = None + + +class AgentRegisterResponse(BaseModel): + """Schema for agent registration response.""" + + agent_id: uuid.UUID + token: str + + +class AgentHeartbeatResponse(BaseModel): + """Schema for agent heartbeat response.""" + + status: str = "ok" + + +class AgentResponse(BaseModel): + """Schema for agent response.""" + + model_config = ConfigDict(from_attributes=True) + + id: uuid.UUID + tenant_id: uuid.UUID | None + name: str + version: str + status: str + last_heartbeat: datetime | None + created_at: datetime + updated_at: datetime diff --git a/app/schemas/common.py b/app/schemas/common.py new file mode 100644 index 0000000..b1411eb --- /dev/null +++ b/app/schemas/common.py @@ -0,0 +1,24 @@ +"""Common schemas used across the API.""" + +from typing import Generic, TypeVar + +from pydantic import BaseModel + +T = TypeVar("T") + + +class HealthResponse(BaseModel): + """Health check response schema.""" + + status: str + version: str + + +class PaginatedResponse(BaseModel, Generic[T]): + """Generic paginated response wrapper.""" + + items: list[T] + total: int + page: int + page_size: int + total_pages: int diff --git a/app/schemas/task.py b/app/schemas/task.py new file mode 100644 index 0000000..54001eb --- /dev/null +++ b/app/schemas/task.py @@ -0,0 +1,70 @@ +"""Task schemas for API validation.""" + +import uuid +from datetime import datetime +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + +from app.models.task import TaskStatus + + +class TaskCreate(BaseModel): + """ + Schema for creating a new task. + + Supported task types and their expected payloads: + + **FILE_WRITE** - Write content to a file + payload: {"path": "/absolute/path", "content": "file content"} + + **ENV_UPDATE** - Update key/value pairs in a .env file + payload: {"path": "/path/to/.env", "updates": {"KEY": "value"}} + + **DOCKER_RELOAD** - Reload a Docker Compose stack + payload: {"compose_dir": "/path/to/compose/dir"} + + **COMPOSITE** - Execute a sequence of sub-tasks + payload: {"sequence": [{"task": "FILE_WRITE", "payload": {...}}, ...]} + + Legacy types (still supported): + - provision_server, configure_keycloak, configure_minio, etc. + + Note: Payload validation is performed agent-side. The orchestrator + accepts any dict payload to allow flexibility and forward compatibility. + """ + + tenant_id: uuid.UUID + type: str = Field( + ..., + min_length=1, + max_length=100, + description="Task type (FILE_WRITE, ENV_UPDATE, DOCKER_RELOAD, COMPOSITE, etc.)", + ) + payload: dict[str, Any] = Field( + default_factory=dict, + description="Task-specific payload (see docstring for formats)", + ) + + +class TaskUpdate(BaseModel): + """Schema for updating a task (status and result only).""" + + status: TaskStatus | None = None + result: dict[str, Any] | None = None + + +class TaskResponse(BaseModel): + """Schema for task response.""" + + model_config = ConfigDict(from_attributes=True) + + id: uuid.UUID + tenant_id: uuid.UUID + agent_id: uuid.UUID | None + type: str + payload: dict[str, Any] + status: str + result: dict[str, Any] | None + created_at: datetime + updated_at: datetime diff --git a/app/schemas/tasks_extended.py b/app/schemas/tasks_extended.py new file mode 100644 index 0000000..e047387 --- /dev/null +++ b/app/schemas/tasks_extended.py @@ -0,0 +1,73 @@ +"""Extended task payload schemas for SysAdmin Agent automation. + +These schemas define the expected payload structure for each task type. +Validation is performed agent-side; the orchestrator accepts any dict payload. +""" + +from typing import Any + +from pydantic import BaseModel, Field + + +class FileWritePayload(BaseModel): + """ + Payload for FILE_WRITE task type. + + Instructs the agent to write content to a file at the specified path. + """ + + path: str = Field(..., description="Absolute path to the target file") + content: str = Field(..., description="Content to write to the file") + + +class EnvUpdatePayload(BaseModel): + """ + Payload for ENV_UPDATE task type. + + Instructs the agent to update key/value pairs in an .env file. + Existing keys are updated; new keys are appended. + """ + + path: str = Field(..., description="Absolute path to the .env file") + updates: dict[str, str] = Field( + ..., description="Key-value pairs to update or add" + ) + + +class DockerReloadPayload(BaseModel): + """ + Payload for DOCKER_RELOAD task type. + + Instructs the agent to reload a Docker Compose stack. + Equivalent to: docker compose down && docker compose up -d + """ + + compose_dir: str = Field( + ..., description="Directory containing docker-compose.yml" + ) + + +class CompositeSubTask(BaseModel): + """ + A single sub-task within a COMPOSITE task. + + Represents one step in a multi-step automation sequence. + """ + + task: str = Field(..., description="Task type (e.g., FILE_WRITE, ENV_UPDATE)") + payload: dict[str, Any] = Field( + default_factory=dict, description="Payload for this sub-task" + ) + + +class CompositePayload(BaseModel): + """ + Payload for COMPOSITE task type. + + Instructs the agent to execute a sequence of sub-tasks in order. + If any sub-task fails, the sequence stops and the composite task fails. + """ + + sequence: list[CompositeSubTask] = Field( + ..., description="Ordered list of sub-tasks to execute" + ) diff --git a/app/schemas/tenant.py b/app/schemas/tenant.py new file mode 100644 index 0000000..9cdd9e5 --- /dev/null +++ b/app/schemas/tenant.py @@ -0,0 +1,25 @@ +"""Tenant schemas for API validation.""" + +import uuid +from datetime import datetime + +from pydantic import BaseModel, ConfigDict, Field + + +class TenantCreate(BaseModel): + """Schema for creating a new tenant.""" + + name: str = Field(..., min_length=1, max_length=255) + domain: str | None = Field(None, max_length=255) + + +class TenantResponse(BaseModel): + """Schema for tenant response.""" + + model_config = ConfigDict(from_attributes=True) + + id: uuid.UUID + name: str + domain: str | None + created_at: datetime + updated_at: datetime diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..9ed85ba --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,37 @@ +services: + db: + image: postgres:16-alpine + container_name: orchestrator-db + environment: + POSTGRES_USER: orchestrator + POSTGRES_PASSWORD: orchestrator + POSTGRES_DB: orchestrator + ports: + - "5433:5432" # Host port 5433 to avoid conflict with existing Postgres + volumes: + - postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U orchestrator -d orchestrator"] + interval: 5s + timeout: 5s + retries: 5 + + api: + build: . + container_name: orchestrator-api + ports: + - "8000:8000" + environment: + DATABASE_URL: postgresql+asyncpg://orchestrator:orchestrator@db:5432/orchestrator + DEBUG: "true" + APP_NAME: "LetsBe Orchestrator" + depends_on: + db: + condition: service_healthy + volumes: + - ./app:/app/app + - ./alembic:/app/alembic + command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload + +volumes: + postgres_data: diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2439795 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,15 @@ +# Web Framework +fastapi>=0.109.0 +uvicorn[standard]>=0.27.0 + +# Database +sqlalchemy[asyncio]>=2.0.25 +asyncpg>=0.29.0 +alembic>=1.13.0 + +# Serialization & Validation +pydantic>=2.5.0 +pydantic-settings>=2.1.0 + +# Utilities +python-dotenv>=1.0.0 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..d38971d --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Test suite for letsbe-orchestrator.""" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..7075608 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,63 @@ +"""Pytest configuration and fixtures for letsbe-orchestrator tests.""" + +import asyncio +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from app.models.base import Base +from app.models.tenant import Tenant + +# Use in-memory SQLite for testing +TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:" + + +@pytest.fixture(scope="session") +def event_loop(): + """Create an instance of the default event loop for the test session.""" + loop = asyncio.get_event_loop_policy().new_event_loop() + yield loop + loop.close() + + +@pytest_asyncio.fixture(scope="function") +async def async_engine(): + """Create a test async engine.""" + engine = create_async_engine(TEST_DATABASE_URL, echo=False) + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + yield engine + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await engine.dispose() + + +@pytest_asyncio.fixture(scope="function") +async def db(async_engine) -> AsyncGenerator[AsyncSession, None]: + """Create a test database session.""" + session_factory = async_sessionmaker( + async_engine, + class_=AsyncSession, + expire_on_commit=False, + autocommit=False, + autoflush=False, + ) + async with session_factory() as session: + yield session + + +@pytest_asyncio.fixture(scope="function") +async def test_tenant(db: AsyncSession) -> Tenant: + """Create a test tenant.""" + tenant = Tenant( + id=uuid.uuid4(), + name="Test Tenant", + slug="test-tenant", + ) + db.add(tenant) + await db.commit() + await db.refresh(tenant) + return tenant diff --git a/tests/test_playbooks_chatwoot.py b/tests/test_playbooks_chatwoot.py new file mode 100644 index 0000000..193c65c --- /dev/null +++ b/tests/test_playbooks_chatwoot.py @@ -0,0 +1,141 @@ +"""Tests for the Chatwoot playbook module.""" + +import uuid + +import pytest +import pytest_asyncio +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.task import Task, TaskStatus +from app.models.tenant import Tenant +from app.playbooks.chatwoot import ( + CHATWOOT_ENV_PATH, + CHATWOOT_STACK_DIR, + CompositeStep, + build_chatwoot_setup_steps, + create_chatwoot_setup_task, +) + + +class TestBuildChatwootSetupSteps: + """Tests for the build_chatwoot_setup_steps function.""" + + def test_returns_two_steps(self): + """Verify that build_chatwoot_setup_steps returns exactly 2 steps.""" + steps = build_chatwoot_setup_steps(domain="support.example.com") + assert len(steps) == 2 + assert all(isinstance(step, CompositeStep) for step in steps) + + def test_env_update_payload(self): + """Verify the ENV_UPDATE step has the correct payload structure.""" + domain = "support.example.com" + steps = build_chatwoot_setup_steps(domain=domain) + + env_step = steps[0] + assert env_step.type == "ENV_UPDATE" + assert env_step.payload["path"] == CHATWOOT_ENV_PATH + assert env_step.payload["updates"]["FRONTEND_URL"] == f"https://{domain}" + assert env_step.payload["updates"]["BACKEND_URL"] == f"https://{domain}" + + def test_docker_reload_payload(self): + """Verify the DOCKER_RELOAD step has the correct payload structure.""" + steps = build_chatwoot_setup_steps(domain="support.example.com") + + docker_step = steps[1] + assert docker_step.type == "DOCKER_RELOAD" + assert docker_step.payload["compose_dir"] == CHATWOOT_STACK_DIR + assert docker_step.payload["pull"] is True + + def test_domain_url_formatting(self): + """Verify that domain URLs are properly formatted with https.""" + domain = "chat.mycompany.io" + steps = build_chatwoot_setup_steps(domain=domain) + + env_step = steps[0] + assert env_step.payload["updates"]["FRONTEND_URL"] == "https://chat.mycompany.io" + assert env_step.payload["updates"]["BACKEND_URL"] == "https://chat.mycompany.io" + + +@pytest.mark.asyncio +class TestCreateChatwootSetupTask: + """Tests for the create_chatwoot_setup_task function.""" + + async def test_persists_composite_task(self, db: AsyncSession, test_tenant: Tenant): + """Verify that create_chatwoot_setup_task persists a COMPOSITE task.""" + task = await create_chatwoot_setup_task( + db=db, + tenant_id=test_tenant.id, + agent_id=None, + domain="support.example.com", + ) + + assert task.id is not None + assert task.tenant_id == test_tenant.id + assert task.type == "COMPOSITE" + assert task.status == TaskStatus.PENDING.value + + async def test_task_payload_contains_steps(self, db: AsyncSession, test_tenant: Tenant): + """Verify that the task payload contains the steps array.""" + task = await create_chatwoot_setup_task( + db=db, + tenant_id=test_tenant.id, + agent_id=None, + domain="support.example.com", + ) + + assert "steps" in task.payload + assert len(task.payload["steps"]) == 2 + + async def test_task_steps_structure(self, db: AsyncSession, test_tenant: Tenant): + """Verify that the steps in the payload have the correct structure.""" + task = await create_chatwoot_setup_task( + db=db, + tenant_id=test_tenant.id, + agent_id=None, + domain="support.example.com", + ) + + steps = task.payload["steps"] + + # First step should be ENV_UPDATE + assert steps[0]["type"] == "ENV_UPDATE" + assert "path" in steps[0]["payload"] + assert "updates" in steps[0]["payload"] + + # Second step should be DOCKER_RELOAD + assert steps[1]["type"] == "DOCKER_RELOAD" + assert "compose_dir" in steps[1]["payload"] + assert steps[1]["payload"]["pull"] is True + + async def test_task_with_agent_id(self, db: AsyncSession, test_tenant: Tenant): + """Verify that agent_id is properly assigned when provided.""" + agent_id = uuid.uuid4() + + # Note: In a real scenario, the agent would need to exist in the DB + # For this test, we're just verifying the task stores the agent_id + task = await create_chatwoot_setup_task( + db=db, + tenant_id=test_tenant.id, + agent_id=agent_id, + domain="support.example.com", + ) + + assert task.agent_id == agent_id + + async def test_task_persisted_to_database(self, db: AsyncSession, test_tenant: Tenant): + """Verify the task is actually persisted and can be retrieved.""" + task = await create_chatwoot_setup_task( + db=db, + tenant_id=test_tenant.id, + agent_id=None, + domain="support.example.com", + ) + + # Query the task back from the database + result = await db.execute(select(Task).where(Task.id == task.id)) + retrieved_task = result.scalar_one_or_none() + + assert retrieved_task is not None + assert retrieved_task.type == "COMPOSITE" + assert retrieved_task.tenant_id == test_tenant.id