letsbe-orchestrator/app/models/registration_token.py

102 lines
3.0 KiB
Python

"""Registration token model for secure agent registration."""
import uuid
from datetime import datetime
from typing import TYPE_CHECKING
from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.models.base import Base, TimestampMixin, UUIDMixin
if TYPE_CHECKING:
from app.models.tenant import Tenant
class RegistrationToken(UUIDMixin, TimestampMixin, Base):
"""
Registration token for secure agent registration.
Tokens are pre-provisioned by admins and map to specific tenants.
Agents use these tokens during initial registration to:
1. Authenticate the registration request
2. Associate themselves with the correct tenant
Tokens can be:
- Single-use (max_uses=1, default)
- Limited-use (max_uses > 1)
- Unlimited (max_uses=0)
- Time-limited (expires_at set)
- Manually revoked (revoked=True)
"""
__tablename__ = "registration_tokens"
tenant_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("tenants.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
token_hash: Mapped[str] = mapped_column(
String(64),
nullable=False,
index=True,
comment="SHA-256 hash of the registration token",
)
description: Mapped[str | None] = mapped_column(
String(255),
nullable=True,
comment="Human-readable description for the token",
)
max_uses: Mapped[int] = mapped_column(
Integer,
nullable=False,
default=1,
comment="Maximum number of uses (0 = unlimited)",
)
use_count: Mapped[int] = mapped_column(
Integer,
nullable=False,
default=0,
comment="Current number of times this token has been used",
)
expires_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True),
nullable=True,
comment="Optional expiration timestamp",
)
revoked: Mapped[bool] = mapped_column(
Boolean,
nullable=False,
default=False,
comment="Whether this token has been manually revoked",
)
created_by: Mapped[str | None] = mapped_column(
String(255),
nullable=True,
comment="Identifier of who created this token (for audit)",
)
# Relationships
tenant: Mapped["Tenant"] = relationship(
back_populates="registration_tokens",
)
def __repr__(self) -> str:
return f"<RegistrationToken(id={self.id}, tenant_id={self.tenant_id}, uses={self.use_count}/{self.max_uses})>"
def is_valid(self, now: datetime | None = None) -> bool:
"""Check if the token can still be used for registration."""
from app.models.base import utc_now
if now is None:
now = utc_now()
if self.revoked:
return False
if self.expires_at is not None and self.expires_at < now:
return False
if self.max_uses > 0 and self.use_count >= self.max_uses:
return False
return True