Skip to content

Common Entities

Auto-generated API reference for tradai.common entities.

Core Entities

tradai.common.entities.strategy.Strategy

Bases: BaseModel

Trading strategy entity.

Represents a registered trading strategy with its metadata.

Attributes:

Name Type Description
name str

Strategy name (alphanumeric, underscores, hyphens)

version str

Semantic version (e.g., "1.0.0" or "v1.0.0")

ecr_url str

ECR image URL for deployment

created_at datetime

Creation timestamp

tags dict[str, str]

Optional metadata tags

Example

strategy = Strategy( ... name="PascalStrategy", ... version="2.0.0", ... ecr_url="123456789.dkr.ecr.eu-central-1.amazonaws.com/tradai-strategies:pascal-2.0.0" ... )

Source code in libs/tradai-common/src/tradai/common/entities/strategy.py
class Strategy(BaseModel):
    """Trading strategy entity.

    Represents a registered trading strategy with its metadata.

    Attributes:
        name: Strategy name (alphanumeric, underscores, hyphens)
        version: Semantic version (e.g., "1.0.0" or "v1.0.0")
        ecr_url: ECR image URL for deployment
        created_at: Creation timestamp
        tags: Optional metadata tags

    Example:
        >>> strategy = Strategy(
        ...     name="PascalStrategy",
        ...     version="2.0.0",
        ...     ecr_url="123456789.dkr.ecr.eu-central-1.amazonaws.com/tradai-strategies:pascal-2.0.0"
        ... )
    """

    name: str = Field(..., min_length=1, max_length=100, pattern=r"^[a-zA-Z0-9_-]+$")
    version: str = Field(..., pattern=r"^v?\d+\.\d+\.\d+$")
    ecr_url: str = Field(..., description="ECR image URL")
    created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
    tags: dict[str, str] = Field(default_factory=dict)

    model_config = {"frozen": True}  # Immutable

    @field_validator("ecr_url")
    @classmethod
    def validate_ecr_url(cls, v: str) -> str:
        """Validate ECR URL format."""
        if ".dkr.ecr." not in v or ".amazonaws.com/" not in v:
            raise ValueError("ecr_url must be a valid ECR URL")
        return v

validate_ecr_url(v: str) -> str classmethod

Validate ECR URL format.

Source code in libs/tradai-common/src/tradai/common/entities/strategy.py
@field_validator("ecr_url")
@classmethod
def validate_ecr_url(cls, v: str) -> str:
    """Validate ECR URL format."""
    if ".dkr.ecr." not in v or ".amazonaws.com/" not in v:
        raise ValueError("ecr_url must be a valid ECR URL")
    return v

tradai.common.entities.backtest.BacktestConfig

Bases: BaseModel

Backtest configuration with validation.

Defines all parameters needed to run a backtest. Validates symbols use proper trading pair format and date range is valid.

Attributes:

Name Type Description
strategy str

Strategy name to backtest

timeframe str

Trading timeframe (e.g., "1h", "4h", "1d")

start_date str

Start date in YYYY-MM-DD format

end_date str

End date in YYYY-MM-DD format

symbols list[str]

List of trading pairs

stoploss float | None

Optional stoploss percentage (-1.0 to 0.0)

stake_amount float

Position size in stake currency

exchange str

Exchange key (e.g., "binance_futures")

strategy_version str

MLflow version specifier ("latest", stage name, or number)

config_version_id str | None

Optional ConfigVersion ID for reproducibility

Example

config = BacktestConfig( ... strategy="PascalStrategy", ... timeframe="1h", ... start_date="2024-01-01", ... end_date="2024-06-30", ... symbols=["BTC/USDT:USDT", "ETH/USDT:USDT"], ... exchange="binance_futures", ... strategy_version="Production", ... )

Source code in libs/tradai-common/src/tradai/common/entities/backtest.py
class BacktestConfig(BaseModel):
    """Backtest configuration with validation.

    Defines all parameters needed to run a backtest.
    Validates symbols use proper trading pair format and date range is valid.

    Attributes:
        strategy: Strategy name to backtest
        timeframe: Trading timeframe (e.g., "1h", "4h", "1d")
        start_date: Start date in YYYY-MM-DD format
        end_date: End date in YYYY-MM-DD format
        symbols: List of trading pairs
        stoploss: Optional stoploss percentage (-1.0 to 0.0)
        stake_amount: Position size in stake currency
        exchange: Exchange key (e.g., "binance_futures")
        strategy_version: MLflow version specifier ("latest", stage name, or number)
        config_version_id: Optional ConfigVersion ID for reproducibility

    Example:
        >>> config = BacktestConfig(
        ...     strategy="PascalStrategy",
        ...     timeframe="1h",
        ...     start_date="2024-01-01",
        ...     end_date="2024-06-30",
        ...     symbols=["BTC/USDT:USDT", "ETH/USDT:USDT"],
        ...     exchange="binance_futures",
        ...     strategy_version="Production",
        ... )
    """

    strategy: str = Field(..., min_length=1)
    timeframe: str = Field(..., pattern=r"^\d+[mhd]$")  # e.g., "1h", "4h", "1d"
    start_date: str = Field(..., pattern=r"^\d{4}-\d{2}-\d{2}$")
    end_date: str = Field(..., pattern=r"^\d{4}-\d{2}-\d{2}$")
    symbols: list[str] = Field(..., min_length=1)
    stoploss: float | None = Field(default=None, ge=-1.0, le=0.0)
    stake_amount: float = Field(default=1000.0, gt=0)
    stake_currency: str = Field(
        default="USDT",
        min_length=1,
        description="Stake currency (e.g., 'USDT', 'BTC')",
    )

    # Exchange configuration (default matches existing hardcoded value)
    exchange: str = Field(
        default="binance_futures",
        # Pattern derived from TradingMode enum values (see _EXCHANGE_PATTERN above)
        pattern=_EXCHANGE_PATTERN,
        description="Exchange key: '{name}_{trading_mode}' (e.g., 'binance_futures')",
    )

    # Strategy version for MLflow resolution (default matches existing hardcoded value)
    strategy_version: str = Field(
        default="latest",
        description="'latest', 'Production', 'Staging', or specific version number",
    )

    # Config version reference for reproducibility (opt-in)
    config_version_id: str | None = Field(
        default=None,
        description="ConfigVersion ID for reproducibility tracking",
    )

    # FreqAI model override (required for FreqAI strategies)
    freqai_model: str | None = Field(
        default=None,
        description="FreqAI model class name (e.g., 'LightGBMRegressor'). Required for FreqAI strategies.",
    )

    # ECS task definition override (bypass unified convention)
    task_definition: str | None = Field(
        default=None,
        min_length=1,
        description=(
            "ECS task definition family name override. If None, computed from "
            "strategy name + environment via get_strategy_task_definition(). "
            "WARNING: Setting this bypasses the unified naming convention. "
            "Use only for emergency overrides or migration."
        ),
    )

    model_config = {"frozen": True}

    @field_validator("start_date", "end_date")
    @classmethod
    def validate_date_is_real(cls, v: str) -> str:
        """Validate date string represents an actual calendar date (C6).

        The pattern regex only checks format (YYYY-MM-DD), not validity.
        This validator catches impossible dates like 2024-02-30 or 2024-13-01.
        """
        try:
            datetime.strptime(v, "%Y-%m-%d")
        except ValueError as e:
            raise ValueError(f"Invalid date '{v}': {e}") from e
        return v

    @field_validator("strategy_version")
    @classmethod
    def validate_strategy_version(cls, v: str) -> str:
        """Validate strategy_version format.

        Valid formats:
        - "latest" - resolve to highest version number
        - "Production", "Staging", "Archived", "None" - resolve by MLflow stage
        - Numeric string like "1", "2", "3" - specific version number
        """
        valid_specifiers = {"latest", "production", "staging", "archived", "none"}
        if v.lower() in valid_specifiers or v.isdigit():
            return v
        raise ValueError(
            f"Invalid strategy_version '{v}'. Use 'latest', stage name, or version number"
        )

    @field_validator("task_definition")
    @classmethod
    def validate_task_definition(cls, v: str | None) -> str | None:
        """Restrict task_definition to the tradai-strategy naming convention.

        Accepts family name (with optional `:revision`) and full ARN form.
        Prevents arbitrary task definition injection via untrusted config sources.
        """
        if v is None:
            return v
        # Slug from strategy_name_to_slug may contain underscores (e.g., "my_strategy_v2");
        # ECS task definition family allows [a-zA-Z0-9_-]. Require lowercase here to match slug output.
        pattern = (
            r"^(arn:aws:ecs:[a-z0-9-]+:\d+:task-definition/)?"
            r"tradai-strategy-[a-z0-9][a-z0-9_-]*"
            r"(:\d+)?$"
        )
        if not re.match(pattern, v):
            raise ValueError(
                f"Invalid task_definition {v!r}: must match "
                "'tradai-strategy-<slug>[-<env>][:<revision>]' "
                "or full ECS task-definition ARN"
            )
        return v

    @field_validator("symbols")
    @classmethod
    def validate_symbols_format(cls, v: list[str]) -> list[str]:
        """Validate all symbols use proper trading pair format.

        Uses validate_trading_pair to ensure format like BTC/USDT or BTC/USDT:USDT.
        Prevents command injection and normalizes to uppercase.
        """
        return [validate_trading_pair(s) for s in v]

    @model_validator(mode="after")
    def validate_date_range(self) -> BacktestConfig:
        """Ensure end_date is not before start_date."""
        start = datetime.strptime(self.start_date, "%Y-%m-%d")
        end = datetime.strptime(self.end_date, "%Y-%m-%d")
        if end < start:
            raise ValueError(
                f"end_date ({self.end_date}) must be >= start_date ({self.start_date})"
            )
        return self

validate_date_is_real(v: str) -> str classmethod

Validate date string represents an actual calendar date (C6).

The pattern regex only checks format (YYYY-MM-DD), not validity. This validator catches impossible dates like 2024-02-30 or 2024-13-01.

Source code in libs/tradai-common/src/tradai/common/entities/backtest.py
@field_validator("start_date", "end_date")
@classmethod
def validate_date_is_real(cls, v: str) -> str:
    """Validate date string represents an actual calendar date (C6).

    The pattern regex only checks format (YYYY-MM-DD), not validity.
    This validator catches impossible dates like 2024-02-30 or 2024-13-01.
    """
    try:
        datetime.strptime(v, "%Y-%m-%d")
    except ValueError as e:
        raise ValueError(f"Invalid date '{v}': {e}") from e
    return v

validate_strategy_version(v: str) -> str classmethod

Validate strategy_version format.

Valid formats: - "latest" - resolve to highest version number - "Production", "Staging", "Archived", "None" - resolve by MLflow stage - Numeric string like "1", "2", "3" - specific version number

Source code in libs/tradai-common/src/tradai/common/entities/backtest.py
@field_validator("strategy_version")
@classmethod
def validate_strategy_version(cls, v: str) -> str:
    """Validate strategy_version format.

    Valid formats:
    - "latest" - resolve to highest version number
    - "Production", "Staging", "Archived", "None" - resolve by MLflow stage
    - Numeric string like "1", "2", "3" - specific version number
    """
    valid_specifiers = {"latest", "production", "staging", "archived", "none"}
    if v.lower() in valid_specifiers or v.isdigit():
        return v
    raise ValueError(
        f"Invalid strategy_version '{v}'. Use 'latest', stage name, or version number"
    )

validate_task_definition(v: str | None) -> str | None classmethod

Restrict task_definition to the tradai-strategy naming convention.

Accepts family name (with optional :revision) and full ARN form. Prevents arbitrary task definition injection via untrusted config sources.

Source code in libs/tradai-common/src/tradai/common/entities/backtest.py
@field_validator("task_definition")
@classmethod
def validate_task_definition(cls, v: str | None) -> str | None:
    """Restrict task_definition to the tradai-strategy naming convention.

    Accepts family name (with optional `:revision`) and full ARN form.
    Prevents arbitrary task definition injection via untrusted config sources.
    """
    if v is None:
        return v
    # Slug from strategy_name_to_slug may contain underscores (e.g., "my_strategy_v2");
    # ECS task definition family allows [a-zA-Z0-9_-]. Require lowercase here to match slug output.
    pattern = (
        r"^(arn:aws:ecs:[a-z0-9-]+:\d+:task-definition/)?"
        r"tradai-strategy-[a-z0-9][a-z0-9_-]*"
        r"(:\d+)?$"
    )
    if not re.match(pattern, v):
        raise ValueError(
            f"Invalid task_definition {v!r}: must match "
            "'tradai-strategy-<slug>[-<env>][:<revision>]' "
            "or full ECS task-definition ARN"
        )
    return v

validate_symbols_format(v: list[str]) -> list[str] classmethod

Validate all symbols use proper trading pair format.

Uses validate_trading_pair to ensure format like BTC/USDT or BTC/USDT:USDT. Prevents command injection and normalizes to uppercase.

Source code in libs/tradai-common/src/tradai/common/entities/backtest.py
@field_validator("symbols")
@classmethod
def validate_symbols_format(cls, v: list[str]) -> list[str]:
    """Validate all symbols use proper trading pair format.

    Uses validate_trading_pair to ensure format like BTC/USDT or BTC/USDT:USDT.
    Prevents command injection and normalizes to uppercase.
    """
    return [validate_trading_pair(s) for s in v]

validate_date_range() -> BacktestConfig

Ensure end_date is not before start_date.

Source code in libs/tradai-common/src/tradai/common/entities/backtest.py
@model_validator(mode="after")
def validate_date_range(self) -> BacktestConfig:
    """Ensure end_date is not before start_date."""
    start = datetime.strptime(self.start_date, "%Y-%m-%d")
    end = datetime.strptime(self.end_date, "%Y-%m-%d")
    if end < start:
        raise ValueError(
            f"end_date ({self.end_date}) must be >= start_date ({self.start_date})"
        )
    return self

tradai.common.entities.backtest.BacktestResult

Bases: BaseModel

Backtest execution result.

Contains complete metrics and trades from a Freqtrade backtest. Includes all key ratios: Sharpe, Sortino, Calmar, SQN, etc. Additional metrics stored in the metrics dict for MLflow logging.

Attributes:

Name Type Description
total_trades int

Total number of trades executed

winning_trades int

Number of profitable trades

losing_trades int

Number of losing trades

draw_trades int

Number of break-even trades

total_profit float

Total profit in stake currency

total_profit_pct float

Total profit as percentage

profit_factor float | None

Ratio of gross profit to gross loss

sharpe_ratio float | None

Risk-adjusted return metric

sortino_ratio float | None

Downside risk-adjusted return

calmar_ratio float | None

Return over max drawdown

sqn float | None

System Quality Number

cagr float | None

Compound Annual Growth Rate

max_drawdown float | None

Maximum drawdown in stake currency

max_drawdown_pct float | None

Maximum drawdown as percentage

win_rate float | None

Percentage of winning trades

metrics dict[str, float]

Additional custom metrics

trades list[dict[str, Any]]

Raw trade data (limited)

raw_stats dict[str, Any]

Full raw stats from Freqtrade

mlflow_run_id str | None

MLflow run ID for traceability

config_artifact_uri str | None

URI of config artifact in MLflow

results_artifact_uri str | None

S3 URI of results file

job_id str | None

DynamoDB job ID for reverse lookup

git_commit str | None

Git commit SHA for reproducibility

resolved_strategy_version str | None

Resolved MLflow version number

config_version_id str | None

ConfigVersion ID used for this backtest

Source code in libs/tradai-common/src/tradai/common/entities/backtest.py
class BacktestResult(BaseModel):
    """Backtest execution result.

    Contains complete metrics and trades from a Freqtrade backtest.
    Includes all key ratios: Sharpe, Sortino, Calmar, SQN, etc.
    Additional metrics stored in the metrics dict for MLflow logging.

    Attributes:
        total_trades: Total number of trades executed
        winning_trades: Number of profitable trades
        losing_trades: Number of losing trades
        draw_trades: Number of break-even trades
        total_profit: Total profit in stake currency
        total_profit_pct: Total profit as percentage
        profit_factor: Ratio of gross profit to gross loss
        sharpe_ratio: Risk-adjusted return metric
        sortino_ratio: Downside risk-adjusted return
        calmar_ratio: Return over max drawdown
        sqn: System Quality Number
        cagr: Compound Annual Growth Rate
        max_drawdown: Maximum drawdown in stake currency
        max_drawdown_pct: Maximum drawdown as percentage
        win_rate: Percentage of winning trades
        metrics: Additional custom metrics
        trades: Raw trade data (limited)
        raw_stats: Full raw stats from Freqtrade
        mlflow_run_id: MLflow run ID for traceability
        config_artifact_uri: URI of config artifact in MLflow
        results_artifact_uri: S3 URI of results file
        job_id: DynamoDB job ID for reverse lookup
        git_commit: Git commit SHA for reproducibility
        resolved_strategy_version: Resolved MLflow version number
        config_version_id: ConfigVersion ID used for this backtest
    """

    # Trade summary
    total_trades: int = Field(ge=0)
    winning_trades: int = Field(ge=0)
    losing_trades: int = Field(default=0, ge=0)
    draw_trades: int = Field(default=0, ge=0)

    # Profit metrics
    total_profit: float = Field(default=0.0)
    total_profit_pct: float = Field(default=0.0)
    profit_factor: float | None = Field(default=None, ge=0)
    expectancy: float | None = Field(default=None)
    expectancy_ratio: float | None = Field(default=None)

    # Risk ratios (Freqtrade 2023.1+)
    sharpe_ratio: float | None = Field(default=None)
    sortino_ratio: float | None = Field(default=None)
    calmar_ratio: float | None = Field(default=None)
    sqn: float | None = Field(default=None)  # System Quality Number
    cagr: float | None = Field(default=None)  # Compound Annual Growth Rate

    # Drawdown metrics
    max_drawdown: float | None = Field(default=None)
    max_drawdown_pct: float | None = Field(
        default=None,
        description="Maximum drawdown as positive percentage (e.g., 15.0 means 15%).",
    )
    drawdown_start: str | None = Field(default=None)
    drawdown_end: str | None = Field(default=None)

    # Trade durations
    avg_trade_duration: str | None = Field(default=None)
    winning_avg_duration: str | None = Field(default=None)
    losing_avg_duration: str | None = Field(default=None)

    # Win rate
    win_rate: float | None = Field(default=None, ge=0.0, le=1.0)

    # Best/worst performance
    best_pair: str | None = Field(default=None)
    worst_pair: str | None = Field(default=None)
    best_trade_profit: float | None = Field(default=None)
    worst_trade_profit: float | None = Field(default=None)

    # Additional metrics dict (for MLflow logging)
    metrics: dict[str, float] = Field(default_factory=dict)

    # Raw trades (limited to prevent memory issues)
    # Expected keys: pair, profit_ratio, profit_abs, open_date, close_date,
    # trade_duration, is_open, is_short, open_rate, close_rate, leverage
    trades: list[dict[str, Any]] = Field(default_factory=list)

    # Full raw stats from Freqtrade (for complete logging)
    raw_stats: dict[str, Any] = Field(default_factory=dict)

    # MLflow traceability fields (SS010/BE010)
    mlflow_run_id: str | None = Field(default=None, description="MLflow run ID for traceability")
    config_artifact_uri: str | None = Field(
        default=None, description="URI of uploaded config artifact in MLflow"
    )
    results_artifact_uri: str | None = Field(default=None, description="S3 URI of results file")

    # E2E traceability fields (P1.2/P1.3)
    job_id: str | None = Field(default=None, description="DynamoDB job ID for reverse lookup")
    git_commit: str | None = Field(default=None, description="Git commit SHA for reproducibility")

    # Internal run correlation (S3 paths and MLflow tags)
    tradai_run_id: str | None = Field(
        default=None, description="Internal run ID used for S3 paths and MLflow tags"
    )

    # Strategy versioning traceability (V-003)
    resolved_strategy_version: str | None = Field(
        default=None, description="Resolved MLflow version number (e.g., '3')"
    )
    config_version_id: str | None = Field(
        default=None, description="ConfigVersion ID used for this backtest"
    )

    # Exchange traceability
    exchange: str | None = Field(default=None, description="Exchange key (e.g., 'binance_futures')")

    model_config = {"frozen": True}

    @field_validator("max_drawdown_pct", mode="before")
    @classmethod
    def normalize_drawdown_sign(cls, v: float | None) -> float | None:
        """Normalize max_drawdown_pct to positive percentage."""
        if v is not None:
            return abs(v)
        return v

    @field_validator("winning_trades")
    @classmethod
    def validate_winning_trades(cls, v: int, info: Any) -> int:
        """Ensure winning trades <= total trades."""
        if "total_trades" in info.data and v > info.data["total_trades"]:
            raise ValueError("winning_trades cannot exceed total_trades")
        return v

    @field_validator("git_commit")
    @classmethod
    def validate_git_commit(cls, v: str | None) -> str | None:
        """Validate git commit SHA format (P1.3).

        Accepts:
        - None (no git info available)
        - "unknown" (not in a git repository)
        - 7-char short SHA (e.g., "abc1234")
        - 40-char full SHA (e.g., "abc1234567890...")
        """
        if v is None or v == "unknown":
            return v
        if len(v) not in (7, 40):
            raise ValueError(f"Invalid git commit SHA length: {len(v)} (expected 7 or 40)")
        if not all(c in "0123456789abcdef" for c in v.lower()):
            raise ValueError(f"Invalid git commit SHA (not hex): {v}")
        return v

    @classmethod
    def from_mlflow_metrics(
        cls,
        metrics: dict[str, float],
        mlflow_run_id: str,
        config_artifact_uri: str | None = None,
    ) -> BacktestResult:
        """Create BacktestResult from MLflow run metrics dict.

        Extracts standard metric keys from MLflow's flat metrics dictionary.

        Args:
            metrics: MLflow run metrics (e.g., run.data.metrics)
            mlflow_run_id: MLflow run ID for traceability
            config_artifact_uri: Optional URI of config artifact

        Returns:
            BacktestResult with metrics populated from the dict

        Example:
            >>> run = mlflow_client.get_run(run_id)
            >>> result = BacktestResult.from_mlflow_metrics(
            ...     run.data.metrics, run_id
            ... )
        """
        return cls(
            total_trades=int(metrics.get("total_trades", 0)),
            winning_trades=int(metrics.get("winning_trades", 0)),
            losing_trades=int(metrics.get("losing_trades", 0)),
            total_profit=metrics.get("total_profit", 0.0),
            total_profit_pct=metrics.get("total_profit_pct", 0.0),
            sharpe_ratio=metrics.get("sharpe_ratio"),
            profit_factor=metrics.get("profit_factor"),
            win_rate=metrics.get("win_rate"),
            max_drawdown_pct=extract_drawdown_pct(metrics),
            mlflow_run_id=mlflow_run_id,
            config_artifact_uri=config_artifact_uri,
        )

normalize_drawdown_sign(v: float | None) -> float | None classmethod

Normalize max_drawdown_pct to positive percentage.

Source code in libs/tradai-common/src/tradai/common/entities/backtest.py
@field_validator("max_drawdown_pct", mode="before")
@classmethod
def normalize_drawdown_sign(cls, v: float | None) -> float | None:
    """Normalize max_drawdown_pct to positive percentage."""
    if v is not None:
        return abs(v)
    return v

validate_winning_trades(v: int, info: Any) -> int classmethod

Ensure winning trades <= total trades.

Source code in libs/tradai-common/src/tradai/common/entities/backtest.py
@field_validator("winning_trades")
@classmethod
def validate_winning_trades(cls, v: int, info: Any) -> int:
    """Ensure winning trades <= total trades."""
    if "total_trades" in info.data and v > info.data["total_trades"]:
        raise ValueError("winning_trades cannot exceed total_trades")
    return v

validate_git_commit(v: str | None) -> str | None classmethod

Validate git commit SHA format (P1.3).

Accepts: - None (no git info available) - "unknown" (not in a git repository) - 7-char short SHA (e.g., "abc1234") - 40-char full SHA (e.g., "abc1234567890...")

Source code in libs/tradai-common/src/tradai/common/entities/backtest.py
@field_validator("git_commit")
@classmethod
def validate_git_commit(cls, v: str | None) -> str | None:
    """Validate git commit SHA format (P1.3).

    Accepts:
    - None (no git info available)
    - "unknown" (not in a git repository)
    - 7-char short SHA (e.g., "abc1234")
    - 40-char full SHA (e.g., "abc1234567890...")
    """
    if v is None or v == "unknown":
        return v
    if len(v) not in (7, 40):
        raise ValueError(f"Invalid git commit SHA length: {len(v)} (expected 7 or 40)")
    if not all(c in "0123456789abcdef" for c in v.lower()):
        raise ValueError(f"Invalid git commit SHA (not hex): {v}")
    return v

from_mlflow_metrics(metrics: dict[str, float], mlflow_run_id: str, config_artifact_uri: str | None = None) -> BacktestResult classmethod

Create BacktestResult from MLflow run metrics dict.

Extracts standard metric keys from MLflow's flat metrics dictionary.

Parameters:

Name Type Description Default
metrics dict[str, float]

MLflow run metrics (e.g., run.data.metrics)

required
mlflow_run_id str

MLflow run ID for traceability

required
config_artifact_uri str | None

Optional URI of config artifact

None

Returns:

Type Description
BacktestResult

BacktestResult with metrics populated from the dict

Example

run = mlflow_client.get_run(run_id) result = BacktestResult.from_mlflow_metrics( ... run.data.metrics, run_id ... )

Source code in libs/tradai-common/src/tradai/common/entities/backtest.py
@classmethod
def from_mlflow_metrics(
    cls,
    metrics: dict[str, float],
    mlflow_run_id: str,
    config_artifact_uri: str | None = None,
) -> BacktestResult:
    """Create BacktestResult from MLflow run metrics dict.

    Extracts standard metric keys from MLflow's flat metrics dictionary.

    Args:
        metrics: MLflow run metrics (e.g., run.data.metrics)
        mlflow_run_id: MLflow run ID for traceability
        config_artifact_uri: Optional URI of config artifact

    Returns:
        BacktestResult with metrics populated from the dict

    Example:
        >>> run = mlflow_client.get_run(run_id)
        >>> result = BacktestResult.from_mlflow_metrics(
        ...     run.data.metrics, run_id
        ... )
    """
    return cls(
        total_trades=int(metrics.get("total_trades", 0)),
        winning_trades=int(metrics.get("winning_trades", 0)),
        losing_trades=int(metrics.get("losing_trades", 0)),
        total_profit=metrics.get("total_profit", 0.0),
        total_profit_pct=metrics.get("total_profit_pct", 0.0),
        sharpe_ratio=metrics.get("sharpe_ratio"),
        profit_factor=metrics.get("profit_factor"),
        win_rate=metrics.get("win_rate"),
        max_drawdown_pct=extract_drawdown_pct(metrics),
        mlflow_run_id=mlflow_run_id,
        config_artifact_uri=config_artifact_uri,
    )

Configuration

tradai.common.entities.config_version.ConfigVersion

Bases: DynamoDBSerializableMixin

Immutable config version entity with DynamoDB serialization.

Content-addressable versioning using SHA256 hash for deduplication. Follows existing patterns: - frozen=True for immutability (like BacktestJobStatus) - with_status() for state transitions - DynamoDBSerializableMixin provides to_dynamodb_item()/from_dynamodb_item() - ClassVar for constants (not dict)

DynamoDB Schema

PK: strategy_name SK: config_id GSIs: - status-index (PK: strategy_name, SK: status) for get_active - config_hash-index (PK: config_hash) for deduplication

Example

version = ConfigVersion( ... strategy_name="PascalStrategy", ... config_id="v1-abc12345", ... config_hash="a" * 64, ... config_data={"timeframe": "1h"}, ... version_number=1, ... ) activated = version.with_status(ConfigVersionStatus.ACTIVE) activated.deployed_at # Set automatically

Source code in libs/tradai-common/src/tradai/common/entities/config_version.py
class ConfigVersion(DynamoDBSerializableMixin):
    """Immutable config version entity with DynamoDB serialization.

    Content-addressable versioning using SHA256 hash for deduplication.
    Follows existing patterns:
    - frozen=True for immutability (like BacktestJobStatus)
    - with_status() for state transitions
    - DynamoDBSerializableMixin provides to_dynamodb_item()/from_dynamodb_item()
    - ClassVar for constants (not dict)

    DynamoDB Schema:
        PK: strategy_name
        SK: config_id
        GSIs:
            - status-index (PK: strategy_name, SK: status) for get_active
            - config_hash-index (PK: config_hash) for deduplication

    Example:
        >>> version = ConfigVersion(
        ...     strategy_name="PascalStrategy",
        ...     config_id="v1-abc12345",
        ...     config_hash="a" * 64,
        ...     config_data={"timeframe": "1h"},
        ...     version_number=1,
        ... )
        >>> activated = version.with_status(ConfigVersionStatus.ACTIVE)
        >>> activated.deployed_at  # Set automatically
    """

    model_config = ConfigDict(frozen=True, extra="forbid")

    # Keys
    strategy_name: str = Field(..., description="Partition key")
    config_id: str = Field(..., description="Sort key: v{version}-{hash[:8]}")

    # Content-addressable
    config_hash: str = Field(..., description="SHA256 of normalized config content")
    config_data: dict[str, Any] = Field(default_factory=dict, description="Frozen config content")

    # Lifecycle
    status: ConfigVersionStatus = Field(default=ConfigVersionStatus.DRAFT)
    version_number: int = Field(ge=1, description="Sequential version per strategy")

    # Metadata
    description: str = Field(default="", description="Version description")
    created_by: str = Field(default="system", description="User/system that created version")
    created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
    deployed_at: datetime | None = Field(default=None, description="When activated")
    deprecated_at: datetime | None = Field(default=None, description="When deprecated")
    superseded_by: str | None = Field(default=None, description="config_id of newer version")
    ttl: int | None = Field(default=None, description="Unix epoch for DynamoDB auto-cleanup")

    # Constants (ClassVar, not PrivateAttr)
    _TTL_DAYS: ClassVar[int] = 90

    @field_validator("config_hash")
    @classmethod
    def validate_hash(cls, v: str) -> str:
        """Validate SHA256 format and normalize to lowercase.

        Args:
            v: Hash string to validate

        Returns:
            Lowercase normalized hash

        Raises:
            ValueError: If hash is not valid SHA256 (64 hex chars)
        """
        if len(v) != 64:
            raise ValueError(f"Invalid SHA256 hash: length {len(v)}, expected 64")
        if not all(c in "0123456789abcdefABCDEF" for c in v):
            raise ValueError(f"Invalid SHA256 hash: non-hex characters in {v}")
        return v.lower()

    def with_status(self, status: ConfigVersionStatus) -> ConfigVersion:
        """Return new instance with updated status (immutable update pattern).

        Automatically sets:
        - deployed_at when transitioning to ACTIVE
        - deprecated_at and ttl when transitioning to DEPRECATED

        Args:
            status: New status

        Returns:
            New ConfigVersion with updated status
        """
        updates: dict[str, Any] = {"status": status}

        if status == ConfigVersionStatus.ACTIVE:
            updates["deployed_at"] = datetime.now(UTC)
        elif status == ConfigVersionStatus.DEPRECATED:
            updates["deprecated_at"] = datetime.now(UTC)
            # Set TTL for auto-cleanup
            updates["ttl"] = int(datetime.now(UTC).timestamp() + (self._TTL_DAYS * 86400))

        return ConfigVersion(**{**self.model_dump(), **updates})

validate_hash(v: str) -> str classmethod

Validate SHA256 format and normalize to lowercase.

Parameters:

Name Type Description Default
v str

Hash string to validate

required

Returns:

Type Description
str

Lowercase normalized hash

Raises:

Type Description
ValueError

If hash is not valid SHA256 (64 hex chars)

Source code in libs/tradai-common/src/tradai/common/entities/config_version.py
@field_validator("config_hash")
@classmethod
def validate_hash(cls, v: str) -> str:
    """Validate SHA256 format and normalize to lowercase.

    Args:
        v: Hash string to validate

    Returns:
        Lowercase normalized hash

    Raises:
        ValueError: If hash is not valid SHA256 (64 hex chars)
    """
    if len(v) != 64:
        raise ValueError(f"Invalid SHA256 hash: length {len(v)}, expected 64")
    if not all(c in "0123456789abcdefABCDEF" for c in v):
        raise ValueError(f"Invalid SHA256 hash: non-hex characters in {v}")
    return v.lower()

with_status(status: ConfigVersionStatus) -> ConfigVersion

Return new instance with updated status (immutable update pattern).

Automatically sets: - deployed_at when transitioning to ACTIVE - deprecated_at and ttl when transitioning to DEPRECATED

Parameters:

Name Type Description Default
status ConfigVersionStatus

New status

required

Returns:

Type Description
ConfigVersion

New ConfigVersion with updated status

Source code in libs/tradai-common/src/tradai/common/entities/config_version.py
def with_status(self, status: ConfigVersionStatus) -> ConfigVersion:
    """Return new instance with updated status (immutable update pattern).

    Automatically sets:
    - deployed_at when transitioning to ACTIVE
    - deprecated_at and ttl when transitioning to DEPRECATED

    Args:
        status: New status

    Returns:
        New ConfigVersion with updated status
    """
    updates: dict[str, Any] = {"status": status}

    if status == ConfigVersionStatus.ACTIVE:
        updates["deployed_at"] = datetime.now(UTC)
    elif status == ConfigVersionStatus.DEPRECATED:
        updates["deprecated_at"] = datetime.now(UTC)
        # Set TTL for auto-cleanup
        updates["ttl"] = int(datetime.now(UTC).timestamp() + (self._TTL_DAYS * 86400))

    return ConfigVersion(**{**self.model_dump(), **updates})

tradai.common.entities.config_version.ConfigVersionStatus

Bases: str, Enum

Config version lifecycle status.

Lifecycle: DRAFT → ACTIVE → DEPRECATED

  • DRAFT: Not yet validated/deployed
  • ACTIVE: Currently deployed (only one per strategy)
  • DEPRECATED: Superseded by newer version (auto-cleanup via TTL)
Source code in libs/tradai-common/src/tradai/common/entities/config_version.py
class ConfigVersionStatus(str, Enum):
    """Config version lifecycle status.

    Lifecycle: DRAFT → ACTIVE → DEPRECATED

    - DRAFT: Not yet validated/deployed
    - ACTIVE: Currently deployed (only one per strategy)
    - DEPRECATED: Superseded by newer version (auto-cleanup via TTL)
    """

    DRAFT = "draft"
    ACTIVE = "active"
    DEPRECATED = "deprecated"

    @property
    def is_terminal(self) -> bool:
        """DEPRECATED is terminal (no further transitions)."""
        return self == ConfigVersionStatus.DEPRECATED

    @property
    def is_active(self) -> bool:
        """Only ACTIVE versions are deployable."""
        return self == ConfigVersionStatus.ACTIVE

is_terminal: bool property

DEPRECATED is terminal (no further transitions).

is_active: bool property

Only ACTIVE versions are deployable.

Exchange

tradai.common.entities.exchange.ExchangeConfig

Bases: BaseModel

Configuration for a CCXT exchange connection.

Supports both CEX (API key/secret) and DEX (private key) authentication. Uses TradingMode enum for market type. Provides ccxt_types property for CCXT market filtering.

Example

CEX (Binance)

config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES) config.key 'binance_futures'

DEX (Hyperliquid)

config = ExchangeConfig( ... name="hyperliquid", ... auth_type=AuthType.PRIVATE_KEY, ... private_key=SecretStr("0x..."), ... )

From Secrets Manager

config = ExchangeConfig.from_secret("tradai/prod/exchange/binance")

Source code in libs/tradai-common/src/tradai/common/entities/exchange.py
class ExchangeConfig(BaseModel):
    """Configuration for a CCXT exchange connection.

    Supports both CEX (API key/secret) and DEX (private key) authentication.
    Uses TradingMode enum for market type.
    Provides ccxt_types property for CCXT market filtering.

    Example:
        >>> # CEX (Binance)
        >>> config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
        >>> config.key
        'binance_futures'

        >>> # DEX (Hyperliquid)
        >>> config = ExchangeConfig(
        ...     name="hyperliquid",
        ...     auth_type=AuthType.PRIVATE_KEY,
        ...     private_key=SecretStr("0x..."),
        ... )

        >>> # From Secrets Manager
        >>> config = ExchangeConfig.from_secret("tradai/prod/exchange/binance")
    """

    model_config = {"frozen": True}

    name: str = Field(..., description="CCXT exchange name (e.g., 'binance', 'hyperliquid')")
    trading_mode: TradingMode = Field(default=TradingMode.FUTURES)
    auth_type: AuthType = Field(default=AuthType.API_KEY)

    # CEX fields (API key/secret pattern)
    api_key: SecretStr = Field(default=SecretStr(""), description="Exchange API key (CEX)")
    api_secret: SecretStr = Field(default=SecretStr(""), description="Exchange API secret (CEX)")
    passphrase: SecretStr = Field(
        default=SecretStr(""), description="Exchange passphrase (Coinbase, Kucoin)"
    )

    # DEX fields (private key pattern)
    private_key: SecretStr = Field(default=SecretStr(""), description="Private key (DEX)")
    wallet_address: str = Field(default="", description="Wallet address (DEX, optional)")

    @classmethod
    def from_secret(
        cls,
        secret_name: str,
        name: str | None = None,
        trading_mode: TradingMode = TradingMode.FUTURES,
        client: Any = None,
    ) -> ExchangeConfig:
        """Load credentials from AWS Secrets Manager.

        Expected secret format for CEX:
        ```json
        {
            "auth_type": "api_key",
            "api_key": "xxx",
            "api_secret": "yyy",
            "passphrase": "zzz"
        }
        ```

        Expected secret format for DEX:
        ```json
        {
            "auth_type": "private_key",
            "private_key": "0x...",
            "wallet_address": "0x..."
        }
        ```

        Args:
            secret_name: Name or ARN of the secret in Secrets Manager
            name: Exchange name (optional, can be in secret as "exchange_name")
            trading_mode: Trading mode (default: FUTURES)
            client: Optional Secrets Manager client for testing

        Returns:
            ExchangeConfig instance

        Raises:
            ExternalServiceError: If secret retrieval fails
            ValidationError: If secret format is invalid
        """
        from tradai.common.aws.secrets_manager import get_secret

        secret = get_secret(secret_name, client=client)
        auth_type = AuthType(secret.get("auth_type", AuthType.API_KEY.value))
        exchange_name = name or secret.get("exchange_name", "")

        if not exchange_name:
            raise ValueError("Exchange name must be provided or included in secret")

        try:
            if auth_type == AuthType.PRIVATE_KEY:
                return cls(
                    name=exchange_name,
                    trading_mode=trading_mode,
                    auth_type=auth_type,
                    private_key=SecretStr(secret["private_key"]),
                    wallet_address=secret.get("wallet_address", ""),
                )

            return cls(
                name=exchange_name,
                trading_mode=trading_mode,
                auth_type=auth_type,
                api_key=SecretStr(secret["api_key"]),
                api_secret=SecretStr(secret["api_secret"]),
                passphrase=SecretStr(secret.get("passphrase", "")),
            )
        except KeyError as e:
            raise ValidationError(f"Missing required secret field: {e.args[0]}") from e

    @property
    def key(self) -> str:
        """Return dict key for this exchange (e.g., 'binance_futures')."""
        return f"{self.name}_{self.trading_mode.value}"

    @property
    def ccxt_types(self) -> frozenset[str]:
        """Get CCXT market types for filtering."""
        return self.trading_mode.ccxt_types

    @property
    def is_authenticated(self) -> bool:
        """Check if credentials are present for authentication.

        Returns:
            True if required credentials are set.
        """
        if self.auth_type == AuthType.PRIVATE_KEY:
            return bool(self.private_key.get_secret_value())
        return bool(self.api_key.get_secret_value() and self.api_secret.get_secret_value())

    def to_ccxt_config(self) -> dict[str, str]:
        """Convert credentials to CCXT configuration format.

        Returns:
            Dictionary suitable for CCXT exchange initialization.

        Example:
            >>> config = ExchangeConfig(name="binance", api_key=SecretStr("key"), ...)
            >>> ccxt_config = config.to_ccxt_config()
            >>> exchange = ccxt.binance(ccxt_config)
        """
        result: dict[str, str] = {}

        if self.auth_type == AuthType.API_KEY:
            if self.api_key.get_secret_value():
                result["apiKey"] = self.api_key.get_secret_value()
            if self.api_secret.get_secret_value():
                result["secret"] = self.api_secret.get_secret_value()
            if self.passphrase.get_secret_value():
                result["password"] = self.passphrase.get_secret_value()
        elif self.auth_type == AuthType.PRIVATE_KEY:
            if self.private_key.get_secret_value():
                result["privateKey"] = self.private_key.get_secret_value()
            if self.wallet_address:
                result["walletAddress"] = self.wallet_address

        return result

    def validate_credentials(self, timeout: int = 10) -> None:
        """Validate exchange credentials by making an authenticated API call.

        Uses fetch_balance() as the validation method - it's lightweight and
        requires authentication on all exchanges.

        Args:
            timeout: Request timeout in seconds

        Raises:
            AuthenticationError: If credentials are invalid or missing
            ExternalServiceError: If exchange API call fails for other reasons

        Example:
            >>> config = ExchangeConfig.from_secret("tradai/prod/binance")
            >>> config.validate_credentials()  # Raises if invalid
        """
        if not self.is_authenticated:
            raise AuthenticationError(f"No credentials configured for {self.name}")

        exchange = None
        try:
            import ccxt

            # Create exchange client
            exchange_id = self.name.lower()
            if not hasattr(ccxt, exchange_id):
                raise ExternalServiceError(f"Unknown exchange: {exchange_id}")

            exchange_class = getattr(ccxt, exchange_id)
            # Build config with proper types (to_ccxt_config returns dict[str, str])
            ccxt_config: dict[str, Any] = {
                **self.to_ccxt_config(),
                "enableRateLimit": True,
                "timeout": timeout * 1000,  # ccxt uses milliseconds
            }

            exchange = exchange_class(ccxt_config)

            # Validate by calling an authenticated endpoint
            exchange.fetch_balance()

        except ccxt.AuthenticationError as e:
            raise AuthenticationError(f"Invalid credentials for {self.name}: {e}") from e
        except ccxt.ExchangeError as e:
            raise ExternalServiceError(
                f"Exchange error validating credentials for {self.name}: {e}"
            ) from e
        except ImportError as e:
            from tradai.common._optional import MissingDependencyError

            raise MissingDependencyError(
                "ccxt", "ccxt", "ExchangeConfig.validate_credentials"
            ) from e
        finally:
            # Clean up exchange client to prevent resource leak
            if exchange is not None and hasattr(exchange, "close"):
                try:
                    exchange.close()
                except Exception as e:
                    _logger.debug("Exchange cleanup failed: %s", e)

    @field_validator("name")
    @classmethod
    def validate_name(cls, v: str) -> str:
        """Normalize exchange name to lowercase."""
        if not v:
            raise ValueError("Exchange name cannot be empty")
        return v.lower()

    @field_validator("trading_mode", mode="before")
    @classmethod
    def parse_trading_mode(cls, v: str | TradingMode) -> TradingMode:
        """Parse trading mode from string or TradingMode enum."""
        if isinstance(v, TradingMode):
            return v
        if isinstance(v, str):
            return TradingMode.from_string(v)
        raise ValueError(f"trading_mode must be str or TradingMode, got {type(v)}")

key: str property

Return dict key for this exchange (e.g., 'binance_futures').

ccxt_types: frozenset[str] property

Get CCXT market types for filtering.

is_authenticated: bool property

Check if credentials are present for authentication.

Returns:

Type Description
bool

True if required credentials are set.

from_secret(secret_name: str, name: str | None = None, trading_mode: TradingMode = TradingMode.FUTURES, client: Any = None) -> ExchangeConfig classmethod

Load credentials from AWS Secrets Manager.

Expected secret format for CEX:

{
    "auth_type": "api_key",
    "api_key": "xxx",
    "api_secret": "yyy",
    "passphrase": "zzz"
}

Expected secret format for DEX:

{
    "auth_type": "private_key",
    "private_key": "0x...",
    "wallet_address": "0x..."
}

Parameters:

Name Type Description Default
secret_name str

Name or ARN of the secret in Secrets Manager

required
name str | None

Exchange name (optional, can be in secret as "exchange_name")

None
trading_mode TradingMode

Trading mode (default: FUTURES)

FUTURES
client Any

Optional Secrets Manager client for testing

None

Returns:

Type Description
ExchangeConfig

ExchangeConfig instance

Raises:

Type Description
ExternalServiceError

If secret retrieval fails

ValidationError

If secret format is invalid

Source code in libs/tradai-common/src/tradai/common/entities/exchange.py
@classmethod
def from_secret(
    cls,
    secret_name: str,
    name: str | None = None,
    trading_mode: TradingMode = TradingMode.FUTURES,
    client: Any = None,
) -> ExchangeConfig:
    """Load credentials from AWS Secrets Manager.

    Expected secret format for CEX:
    ```json
    {
        "auth_type": "api_key",
        "api_key": "xxx",
        "api_secret": "yyy",
        "passphrase": "zzz"
    }
    ```

    Expected secret format for DEX:
    ```json
    {
        "auth_type": "private_key",
        "private_key": "0x...",
        "wallet_address": "0x..."
    }
    ```

    Args:
        secret_name: Name or ARN of the secret in Secrets Manager
        name: Exchange name (optional, can be in secret as "exchange_name")
        trading_mode: Trading mode (default: FUTURES)
        client: Optional Secrets Manager client for testing

    Returns:
        ExchangeConfig instance

    Raises:
        ExternalServiceError: If secret retrieval fails
        ValidationError: If secret format is invalid
    """
    from tradai.common.aws.secrets_manager import get_secret

    secret = get_secret(secret_name, client=client)
    auth_type = AuthType(secret.get("auth_type", AuthType.API_KEY.value))
    exchange_name = name or secret.get("exchange_name", "")

    if not exchange_name:
        raise ValueError("Exchange name must be provided or included in secret")

    try:
        if auth_type == AuthType.PRIVATE_KEY:
            return cls(
                name=exchange_name,
                trading_mode=trading_mode,
                auth_type=auth_type,
                private_key=SecretStr(secret["private_key"]),
                wallet_address=secret.get("wallet_address", ""),
            )

        return cls(
            name=exchange_name,
            trading_mode=trading_mode,
            auth_type=auth_type,
            api_key=SecretStr(secret["api_key"]),
            api_secret=SecretStr(secret["api_secret"]),
            passphrase=SecretStr(secret.get("passphrase", "")),
        )
    except KeyError as e:
        raise ValidationError(f"Missing required secret field: {e.args[0]}") from e

to_ccxt_config() -> dict[str, str]

Convert credentials to CCXT configuration format.

Returns:

Type Description
dict[str, str]

Dictionary suitable for CCXT exchange initialization.

Example

config = ExchangeConfig(name="binance", api_key=SecretStr("key"), ...) ccxt_config = config.to_ccxt_config() exchange = ccxt.binance(ccxt_config)

Source code in libs/tradai-common/src/tradai/common/entities/exchange.py
def to_ccxt_config(self) -> dict[str, str]:
    """Convert credentials to CCXT configuration format.

    Returns:
        Dictionary suitable for CCXT exchange initialization.

    Example:
        >>> config = ExchangeConfig(name="binance", api_key=SecretStr("key"), ...)
        >>> ccxt_config = config.to_ccxt_config()
        >>> exchange = ccxt.binance(ccxt_config)
    """
    result: dict[str, str] = {}

    if self.auth_type == AuthType.API_KEY:
        if self.api_key.get_secret_value():
            result["apiKey"] = self.api_key.get_secret_value()
        if self.api_secret.get_secret_value():
            result["secret"] = self.api_secret.get_secret_value()
        if self.passphrase.get_secret_value():
            result["password"] = self.passphrase.get_secret_value()
    elif self.auth_type == AuthType.PRIVATE_KEY:
        if self.private_key.get_secret_value():
            result["privateKey"] = self.private_key.get_secret_value()
        if self.wallet_address:
            result["walletAddress"] = self.wallet_address

    return result

validate_credentials(timeout: int = 10) -> None

Validate exchange credentials by making an authenticated API call.

Uses fetch_balance() as the validation method - it's lightweight and requires authentication on all exchanges.

Parameters:

Name Type Description Default
timeout int

Request timeout in seconds

10

Raises:

Type Description
AuthenticationError

If credentials are invalid or missing

ExternalServiceError

If exchange API call fails for other reasons

Example

config = ExchangeConfig.from_secret("tradai/prod/binance") config.validate_credentials() # Raises if invalid

Source code in libs/tradai-common/src/tradai/common/entities/exchange.py
def validate_credentials(self, timeout: int = 10) -> None:
    """Validate exchange credentials by making an authenticated API call.

    Uses fetch_balance() as the validation method - it's lightweight and
    requires authentication on all exchanges.

    Args:
        timeout: Request timeout in seconds

    Raises:
        AuthenticationError: If credentials are invalid or missing
        ExternalServiceError: If exchange API call fails for other reasons

    Example:
        >>> config = ExchangeConfig.from_secret("tradai/prod/binance")
        >>> config.validate_credentials()  # Raises if invalid
    """
    if not self.is_authenticated:
        raise AuthenticationError(f"No credentials configured for {self.name}")

    exchange = None
    try:
        import ccxt

        # Create exchange client
        exchange_id = self.name.lower()
        if not hasattr(ccxt, exchange_id):
            raise ExternalServiceError(f"Unknown exchange: {exchange_id}")

        exchange_class = getattr(ccxt, exchange_id)
        # Build config with proper types (to_ccxt_config returns dict[str, str])
        ccxt_config: dict[str, Any] = {
            **self.to_ccxt_config(),
            "enableRateLimit": True,
            "timeout": timeout * 1000,  # ccxt uses milliseconds
        }

        exchange = exchange_class(ccxt_config)

        # Validate by calling an authenticated endpoint
        exchange.fetch_balance()

    except ccxt.AuthenticationError as e:
        raise AuthenticationError(f"Invalid credentials for {self.name}: {e}") from e
    except ccxt.ExchangeError as e:
        raise ExternalServiceError(
            f"Exchange error validating credentials for {self.name}: {e}"
        ) from e
    except ImportError as e:
        from tradai.common._optional import MissingDependencyError

        raise MissingDependencyError(
            "ccxt", "ccxt", "ExchangeConfig.validate_credentials"
        ) from e
    finally:
        # Clean up exchange client to prevent resource leak
        if exchange is not None and hasattr(exchange, "close"):
            try:
                exchange.close()
            except Exception as e:
                _logger.debug("Exchange cleanup failed: %s", e)

validate_name(v: str) -> str classmethod

Normalize exchange name to lowercase.

Source code in libs/tradai-common/src/tradai/common/entities/exchange.py
@field_validator("name")
@classmethod
def validate_name(cls, v: str) -> str:
    """Normalize exchange name to lowercase."""
    if not v:
        raise ValueError("Exchange name cannot be empty")
    return v.lower()

parse_trading_mode(v: str | TradingMode) -> TradingMode classmethod

Parse trading mode from string or TradingMode enum.

Source code in libs/tradai-common/src/tradai/common/entities/exchange.py
@field_validator("trading_mode", mode="before")
@classmethod
def parse_trading_mode(cls, v: str | TradingMode) -> TradingMode:
    """Parse trading mode from string or TradingMode enum."""
    if isinstance(v, TradingMode):
        return v
    if isinstance(v, str):
        return TradingMode.from_string(v)
    raise ValueError(f"trading_mode must be str or TradingMode, got {type(v)}")

tradai.common.entities.exchange.TradingMode

Bases: str, Enum

Trading mode enum compatible with Freqtrade's TradingMode.

Defines market types for exchange connections: - SPOT: Spot trading markets - MARGIN: Margin trading markets - FUTURES: Futures markets (perpetual swaps and delivery)

Maps to CCXT market types via ccxt_types property.

Example

mode = TradingMode.FUTURES mode.value 'futures' mode.ccxt_types frozenset({'swap', 'future'})

Source code in libs/tradai-common/src/tradai/common/entities/exchange.py
class TradingMode(str, Enum):
    """Trading mode enum compatible with Freqtrade's TradingMode.

    Defines market types for exchange connections:
    - SPOT: Spot trading markets
    - MARGIN: Margin trading markets
    - FUTURES: Futures markets (perpetual swaps and delivery)

    Maps to CCXT market types via ccxt_types property.

    Example:
        >>> mode = TradingMode.FUTURES
        >>> mode.value
        'futures'
        >>> mode.ccxt_types
        frozenset({'swap', 'future'})
    """

    SPOT = "spot"
    MARGIN = "margin"
    FUTURES = "futures"

    @property
    def ccxt_types(self) -> frozenset[str]:
        """Get CCXT market types for filtering.

        Returns:
            Set of CCXT type strings for this trading mode
        """
        ccxt_map: dict[str, frozenset[str]] = {
            "spot": frozenset({"spot"}),
            "margin": frozenset({"margin"}),
            "futures": frozenset({"swap", "future"}),  # swap=perpetual, future=delivery
        }
        return ccxt_map[self.value]

    @classmethod
    def from_string(cls, value: str) -> TradingMode:
        """Parse trading mode from string (case-insensitive).

        Args:
            value: Trading mode string ('spot', 'margin', 'futures')

        Returns:
            TradingMode enum value

        Raises:
            ValueError: If value is not a valid trading mode
        """
        normalized = value.lower().strip()
        for member in cls:
            if member.value == normalized:
                return member
        valid = [m.value for m in cls]
        raise ValueError(f"Invalid trading mode '{value}'. Valid: {valid}")

ccxt_types: frozenset[str] property

Get CCXT market types for filtering.

Returns:

Type Description
frozenset[str]

Set of CCXT type strings for this trading mode

from_string(value: str) -> TradingMode classmethod

Parse trading mode from string (case-insensitive).

Parameters:

Name Type Description Default
value str

Trading mode string ('spot', 'margin', 'futures')

required

Returns:

Type Description
TradingMode

TradingMode enum value

Raises:

Type Description
ValueError

If value is not a valid trading mode

Source code in libs/tradai-common/src/tradai/common/entities/exchange.py
@classmethod
def from_string(cls, value: str) -> TradingMode:
    """Parse trading mode from string (case-insensitive).

    Args:
        value: Trading mode string ('spot', 'margin', 'futures')

    Returns:
        TradingMode enum value

    Raises:
        ValueError: If value is not a valid trading mode
    """
    normalized = value.lower().strip()
    for member in cls:
        if member.value == normalized:
            return member
    valid = [m.value for m in cls]
    raise ValueError(f"Invalid trading mode '{value}'. Valid: {valid}")

MLflow

tradai.common.entities.mlflow.ModelStage

Bases: str, Enum

MLflow model lifecycle stages.

Defines the stages a model version can be in within the MLflow Model Registry.

Stages

NONE: Not yet staged (default for new registrations) STAGING: Ready for validation/testing PRODUCTION: Live production model ARCHIVED: Previous production version (for rollback)

Source code in libs/tradai-common/src/tradai/common/entities/mlflow.py
class ModelStage(str, Enum):
    """MLflow model lifecycle stages.

    Defines the stages a model version can be in within the
    MLflow Model Registry.

    Stages:
        NONE: Not yet staged (default for new registrations)
        STAGING: Ready for validation/testing
        PRODUCTION: Live production model
        ARCHIVED: Previous production version (for rollback)
    """

    NONE = "None"
    STAGING = "Staging"
    PRODUCTION = "Production"
    ARCHIVED = "Archived"

tradai.common.entities.mlflow.ModelVersion

Bases: BaseModel

MLflow model version information.

Represents a version of a registered model in the MLflow model registry. Used for tracking strategy versions and deployment status.

Attributes:

Name Type Description
name str

Name of the registered model

version str

Version number (string)

source str | None

URI of the model artifacts

run_id str | None

Optional run ID that produced this version

status str

Registration status (READY, PENDING_REGISTRATION, etc.)

current_stage str | None

Current stage (None, Staging, Production, Archived)

tags dict[str, str]

Version-specific tags

Example

version = ModelVersion( ... name="PascalStrategy", ... version="1", ... source="s3://tradai/models/pascal/1", ... current_stage="Production", ... ) version.current_stage 'Production'

Source code in libs/tradai-common/src/tradai/common/entities/mlflow.py
class ModelVersion(BaseModel):
    """MLflow model version information.

    Represents a version of a registered model in the MLflow model registry.
    Used for tracking strategy versions and deployment status.

    Attributes:
        name: Name of the registered model
        version: Version number (string)
        source: URI of the model artifacts
        run_id: Optional run ID that produced this version
        status: Registration status (READY, PENDING_REGISTRATION, etc.)
        current_stage: Current stage (None, Staging, Production, Archived)
        tags: Version-specific tags

    Example:
        >>> version = ModelVersion(
        ...     name="PascalStrategy",
        ...     version="1",
        ...     source="s3://tradai/models/pascal/1",
        ...     current_stage="Production",
        ... )
        >>> version.current_stage
        'Production'
    """

    model_config = ConfigDict(frozen=True)

    name: str = Field(..., description="Name of the registered model")
    version: str = Field(..., description="Version number")
    source: str | None = Field(default=None, description="URI of the model artifacts")
    run_id: str | None = Field(default=None, description="Run ID that produced this version")
    status: str = Field(default="READY", description="Registration status")
    current_stage: str | None = Field(
        default=None, description="Current stage (None, Staging, Production, Archived)"
    )
    tags: dict[str, str] = Field(default_factory=dict, description="Version-specific tags")

    @classmethod
    def from_api_response(cls, response: dict[str, Any]) -> ModelVersion:
        """Create ModelVersion from MLflow API response.

        Args:
            response: Raw API response dict from MLflow

        Returns:
            ModelVersion instance

        Example:
            >>> resp = {"name": "Strategy", "version": "1", "source": "s3://...", "current_stage": "Production"}
            >>> ModelVersion.from_api_response(resp)
            ModelVersion(name='Strategy', version='1', current_stage='Production', ...)
        """
        # Parse tags from list format [{key: k, value: v}, ...] to dict
        tags_list = response.get("tags", [])
        tags_dict: dict[str, str] = {}
        if isinstance(tags_list, list):
            for tag in tags_list:
                if isinstance(tag, dict) and "key" in tag and "value" in tag:
                    tags_dict[tag["key"]] = tag["value"]
        elif isinstance(tags_list, dict):
            tags_dict = tags_list

        return cls(
            name=response["name"],
            version=response["version"],
            source=response["source"],
            run_id=response.get("run_id"),
            status=response.get("status", "READY"),
            current_stage=response.get("current_stage"),
            tags=tags_dict,
        )

from_api_response(response: dict[str, Any]) -> ModelVersion classmethod

Create ModelVersion from MLflow API response.

Parameters:

Name Type Description Default
response dict[str, Any]

Raw API response dict from MLflow

required

Returns:

Type Description
ModelVersion

ModelVersion instance

Example

resp = {"name": "Strategy", "version": "1", "source": "s3://...", "current_stage": "Production"} ModelVersion.from_api_response(resp) ModelVersion(name='Strategy', version='1', current_stage='Production', ...)

Source code in libs/tradai-common/src/tradai/common/entities/mlflow.py
@classmethod
def from_api_response(cls, response: dict[str, Any]) -> ModelVersion:
    """Create ModelVersion from MLflow API response.

    Args:
        response: Raw API response dict from MLflow

    Returns:
        ModelVersion instance

    Example:
        >>> resp = {"name": "Strategy", "version": "1", "source": "s3://...", "current_stage": "Production"}
        >>> ModelVersion.from_api_response(resp)
        ModelVersion(name='Strategy', version='1', current_stage='Production', ...)
    """
    # Parse tags from list format [{key: k, value: v}, ...] to dict
    tags_list = response.get("tags", [])
    tags_dict: dict[str, str] = {}
    if isinstance(tags_list, list):
        for tag in tags_list:
            if isinstance(tag, dict) and "key" in tag and "value" in tag:
                tags_dict[tag["key"]] = tag["value"]
    elif isinstance(tags_list, dict):
        tags_dict = tags_list

    return cls(
        name=response["name"],
        version=response["version"],
        source=response["source"],
        run_id=response.get("run_id"),
        status=response.get("status", "READY"),
        current_stage=response.get("current_stage"),
        tags=tags_dict,
    )

State Management

tradai.common.entities.aws.JobStatus

Bases: str, Enum

Status of a backtest job in the queue.

Lifecycle: PENDING -> RUNNING -> COMPLETED/FAILED/CANCELLED

Terminal statuses (COMPLETED, FAILED, CANCELLED) indicate the job has finished processing and will not change state again.

Valid transitions (same-state allowed for idempotency): PENDING -> PENDING, RUNNING, FAILED, CANCELLED RUNNING -> RUNNING, COMPLETED, FAILED, CANCELLED COMPLETED -> (terminal — no transitions) FAILED -> (terminal — no transitions) CANCELLED -> (terminal — no transitions)

Source code in libs/tradai-common/src/tradai/common/entities/aws.py
class JobStatus(str, Enum):
    """Status of a backtest job in the queue.

    Lifecycle: PENDING -> RUNNING -> COMPLETED/FAILED/CANCELLED

    Terminal statuses (COMPLETED, FAILED, CANCELLED) indicate the job
    has finished processing and will not change state again.

    Valid transitions (same-state allowed for idempotency):
        PENDING   -> PENDING, RUNNING, FAILED, CANCELLED
        RUNNING   -> RUNNING, COMPLETED, FAILED, CANCELLED
        COMPLETED -> (terminal — no transitions)
        FAILED    -> (terminal — no transitions)
        CANCELLED -> (terminal — no transitions)
    """

    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

    @property
    def is_terminal(self) -> bool:
        """Check if status is terminal (job finished).

        Returns:
            True if job is in a final state (completed, failed, cancelled)
        """
        return self in (JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED)

    def can_transition_to(self, target: JobStatus) -> bool:
        """Check if transitioning to target status is valid.

        Same-state transitions are allowed (idempotent). Terminal states
        cannot transition to any other state.

        Args:
            target: Target status to transition to

        Returns:
            True if the transition is valid
        """
        if self == target:
            return True
        if self.is_terminal:
            return False
        valid = _VALID_TRANSITIONS.get(self, set())
        return target in valid

is_terminal: bool property

Check if status is terminal (job finished).

Returns:

Type Description
bool

True if job is in a final state (completed, failed, cancelled)

can_transition_to(target: JobStatus) -> bool

Check if transitioning to target status is valid.

Same-state transitions are allowed (idempotent). Terminal states cannot transition to any other state.

Parameters:

Name Type Description Default
target JobStatus

Target status to transition to

required

Returns:

Type Description
bool

True if the transition is valid

Source code in libs/tradai-common/src/tradai/common/entities/aws.py
def can_transition_to(self, target: JobStatus) -> bool:
    """Check if transitioning to target status is valid.

    Same-state transitions are allowed (idempotent). Terminal states
    cannot transition to any other state.

    Args:
        target: Target status to transition to

    Returns:
        True if the transition is valid
    """
    if self == target:
        return True
    if self.is_terminal:
        return False
    valid = _VALID_TRANSITIONS.get(self, set())
    return target in valid

tradai.common.entities.trading_state.TradingState

Bases: DynamoDBSerializableMixin

Live trading container state for health monitoring.

Stored in DynamoDB with strategy_id as partition key. Tracks container lifecycle, NOT individual trades (Freqtrade handles that).

Follows same patterns as BacktestJobStatus: - frozen=True for immutability - with_status() for status updates - DynamoDBSerializableMixin provides to_dynamodb_item()/from_dynamodb_item() - UTC timestamps

Attributes:

Name Type Description
strategy_id str

Strategy identifier (PK)

instance_id str

Unique container instance ID (ECS task ID, etc.)

status TradingStatus

Current container status

config_snapshot dict[str, Any]

Frozen config at startup (for audit/debugging)

last_heartbeat datetime

Most recent heartbeat timestamp

started_at datetime

When container started

stopped_at datetime | None

When container stopped (if terminal)

error str | None

Error message if in ERROR status

pnl_snapshot dict[str, Any]

Latest PnL metrics as JSON-safe dict (H1)

open_trades_snapshot dict[str, Any]

Latest open trades as {"trades": [...]} (H1)

Source code in libs/tradai-common/src/tradai/common/entities/trading_state.py
class TradingState(DynamoDBSerializableMixin):
    """Live trading container state for health monitoring.

    Stored in DynamoDB with strategy_id as partition key.
    Tracks container lifecycle, NOT individual trades (Freqtrade handles that).

    Follows same patterns as BacktestJobStatus:
    - frozen=True for immutability
    - with_status() for status updates
    - DynamoDBSerializableMixin provides to_dynamodb_item()/from_dynamodb_item()
    - UTC timestamps

    Attributes:
        strategy_id: Strategy identifier (PK)
        instance_id: Unique container instance ID (ECS task ID, etc.)
        status: Current container status
        config_snapshot: Frozen config at startup (for audit/debugging)
        last_heartbeat: Most recent heartbeat timestamp
        started_at: When container started
        stopped_at: When container stopped (if terminal)
        error: Error message if in ERROR status
        pnl_snapshot: Latest PnL metrics as JSON-safe dict (H1)
        open_trades_snapshot: Latest open trades as {"trades": [...]} (H1)
    """

    model_config = ConfigDict(frozen=True, extra="forbid")

    strategy_id: str = Field(..., min_length=1)
    instance_id: str = Field(..., min_length=1)
    status: TradingStatus = Field(default=TradingStatus.INITIALIZING)
    config_snapshot: dict[str, Any] = Field(default_factory=dict)
    last_heartbeat: datetime = Field(default_factory=lambda: datetime.now(UTC))
    started_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
    stopped_at: datetime | None = Field(default=None)
    error: str | None = Field(default=None, max_length=1000)
    pnl_snapshot: dict[str, Any] = Field(default_factory=dict)
    open_trades_snapshot: dict[str, Any] = Field(default_factory=dict)

    @field_validator("config_snapshot")
    @classmethod
    def validate_json_serializable(cls, v: dict[str, Any]) -> dict[str, Any]:
        """Ensure config_snapshot is JSON serializable.

        This prevents runtime errors when saving to DynamoDB.

        Raises:
            ValueError: If config contains non-JSON-serializable objects
        """
        try:
            json.dumps(v)
        except TypeError as e:
            raise ValueError(f"config_snapshot must be JSON-serializable: {e}") from e
        return v

    def with_status(
        self,
        status: TradingStatus,
        error: str | None = None,
    ) -> TradingState:
        """Return new instance with updated status (same pattern as BacktestJobStatus).

        Args:
            status: New container status
            error: Optional error message (typically for ERROR status)

        Returns:
            New TradingState instance with updated fields

        Note:
            - Terminal statuses (ERROR, STOPPED) set stopped_at to now
            - INITIALIZING clears stopped_at (for container restart scenarios)
            - Other statuses preserve the existing stopped_at value
        """
        # Determine stopped_at based on status transition
        if status.is_terminal:
            stopped_at = datetime.now(UTC)
        elif status == TradingStatus.INITIALIZING:
            stopped_at = None  # Clear when re-initializing
        else:
            stopped_at = self.stopped_at

        return self.model_copy(
            update={
                "status": status,
                "last_heartbeat": datetime.now(UTC),
                "stopped_at": stopped_at,
                "error": error,
            }
        )

    def with_heartbeat(self) -> TradingState:
        """Return new instance with updated heartbeat timestamp.

        Returns:
            New TradingState instance with updated last_heartbeat
        """
        return self.model_copy(update={"last_heartbeat": datetime.now(UTC)})

validate_json_serializable(v: dict[str, Any]) -> dict[str, Any] classmethod

Ensure config_snapshot is JSON serializable.

This prevents runtime errors when saving to DynamoDB.

Raises:

Type Description
ValueError

If config contains non-JSON-serializable objects

Source code in libs/tradai-common/src/tradai/common/entities/trading_state.py
@field_validator("config_snapshot")
@classmethod
def validate_json_serializable(cls, v: dict[str, Any]) -> dict[str, Any]:
    """Ensure config_snapshot is JSON serializable.

    This prevents runtime errors when saving to DynamoDB.

    Raises:
        ValueError: If config contains non-JSON-serializable objects
    """
    try:
        json.dumps(v)
    except TypeError as e:
        raise ValueError(f"config_snapshot must be JSON-serializable: {e}") from e
    return v

with_status(status: TradingStatus, error: str | None = None) -> TradingState

Return new instance with updated status (same pattern as BacktestJobStatus).

Parameters:

Name Type Description Default
status TradingStatus

New container status

required
error str | None

Optional error message (typically for ERROR status)

None

Returns:

Type Description
TradingState

New TradingState instance with updated fields

Note
  • Terminal statuses (ERROR, STOPPED) set stopped_at to now
  • INITIALIZING clears stopped_at (for container restart scenarios)
  • Other statuses preserve the existing stopped_at value
Source code in libs/tradai-common/src/tradai/common/entities/trading_state.py
def with_status(
    self,
    status: TradingStatus,
    error: str | None = None,
) -> TradingState:
    """Return new instance with updated status (same pattern as BacktestJobStatus).

    Args:
        status: New container status
        error: Optional error message (typically for ERROR status)

    Returns:
        New TradingState instance with updated fields

    Note:
        - Terminal statuses (ERROR, STOPPED) set stopped_at to now
        - INITIALIZING clears stopped_at (for container restart scenarios)
        - Other statuses preserve the existing stopped_at value
    """
    # Determine stopped_at based on status transition
    if status.is_terminal:
        stopped_at = datetime.now(UTC)
    elif status == TradingStatus.INITIALIZING:
        stopped_at = None  # Clear when re-initializing
    else:
        stopped_at = self.stopped_at

    return self.model_copy(
        update={
            "status": status,
            "last_heartbeat": datetime.now(UTC),
            "stopped_at": stopped_at,
            "error": error,
        }
    )

with_heartbeat() -> TradingState

Return new instance with updated heartbeat timestamp.

Returns:

Type Description
TradingState

New TradingState instance with updated last_heartbeat

Source code in libs/tradai-common/src/tradai/common/entities/trading_state.py
def with_heartbeat(self) -> TradingState:
    """Return new instance with updated heartbeat timestamp.

    Returns:
        New TradingState instance with updated last_heartbeat
    """
    return self.model_copy(update={"last_heartbeat": datetime.now(UTC)})

tradai.common.entities.trading_state.TradingStatus

Bases: str, Enum

Live trading container status.

Different from JobStatus (batch jobs) - includes states for long-running containers like WARMUP and PAUSED.

Lifecycle: INITIALIZING -> WARMUP -> RUNNING <-> PAUSED -> STOPPED/ERROR

Source code in libs/tradai-common/src/tradai/common/entities/trading_state.py
class TradingStatus(str, Enum):
    """Live trading container status.

    Different from JobStatus (batch jobs) - includes states for
    long-running containers like WARMUP and PAUSED.

    Lifecycle: INITIALIZING -> WARMUP -> RUNNING <-> PAUSED -> STOPPED/ERROR
    """

    INITIALIZING = "initializing"  # Container starting, loading config
    WARMUP = "warmup"  # Loading historical data, model inference warmup
    RUNNING = "running"  # Actively trading
    PAUSED = "paused"  # Temporarily paused (manual intervention)
    ERROR = "error"  # Fatal error, needs investigation
    STOPPED = "stopped"  # Gracefully stopped

    @property
    def is_active(self) -> bool:
        """Check if status represents an active trading state."""
        return self in (TradingStatus.WARMUP, TradingStatus.RUNNING)

    @property
    def is_terminal(self) -> bool:
        """Check if status is a terminal state (same pattern as JobStatus)."""
        return self in (TradingStatus.ERROR, TradingStatus.STOPPED)

is_active: bool property

Check if status represents an active trading state.

is_terminal: bool property

Check if status is a terminal state (same pattern as JobStatus).