Skip to content

TradAI Final Architecture - Services

Version: 9.2.1 | Date: 2025-12-09


Service Overview

Service Type Resources Port Purpose
Backend API ECS Fargate (Long-running) 0.5 vCPU, 1GB 8000 API endpoints, orchestration
Data Collection ECS Fargate (Long-running) 0.25 vCPU, 512MB 8002 Data freshness, metadata
MLflow ECS Fargate (Long-running) 0.5 vCPU, 1GB 5000 Experiment tracking, model registry
Strategy Task ECS Fargate (On-demand) 0.5 vCPU, 1GB - Config preparation
Strategy Container (Backtest) ECS Fargate (On-demand) 1 vCPU, 2GB - Backtest execution
Strategy Container (Live) ECS Fargate (Long-running) 0.5 vCPU, 1GB 8080 Live/Dry-run trading (v9.1)

1. Backend API Service

Task Definition Specification

Family: tradai-backend-api
NetworkMode: awsvpc
Cpu: 512 (0.5 vCPU)
Memory: 1024 (1 GB)

Container: backend-api
  Image: {account}.dkr.ecr.us-east-1.amazonaws.com/tradai-backend:latest
  Port: 8000

  Environment Variables:
    - SERVICE_NAME: backend-api
    - LOG_LEVEL: INFO
    - DATA_COLLECTION_URL: http://data-collection.tradai.local:8002
    - MLFLOW_TRACKING_URI: http://mlflow.tradai.local:5000
    - BACKTEST_QUEUE_URL: (from SQS)
    - WORKFLOW_STATE_TABLE: tradai-workflow-state

  Secrets (from Secrets Manager):
    - MLFLOW_TRACKING_USERNAME
    - MLFLOW_TRACKING_PASSWORD

  Health Check:
    Command: curl -f http://localhost:8000/health || exit 1
    Interval: 30s
    Timeout: 5s
    Retries: 3

Application Code Structure

services/api_gateway/
├── api/
│   ├── __init__.py
│   ├── main.py              # FastAPI app
│   ├── routes/
│   │   ├── strategies.py    # GET /strategies/*
│   │   ├── backtest.py      # POST /backtest, GET /backtest/{id}
│   │   ├── data.py          # GET /data/*
│   │   └── health.py        # GET /health, GET /ready
│   ├── schemas/
│   │   ├── backtest.py      # Pydantic models
│   │   └── strategy.py
│   └── middleware/
│       ├── logging.py       # Request/response logging
│       └── security.py      # Security headers
├── core/
│   ├── config.py            # Settings management
│   ├── services/
│   │   ├── mlflow_client.py # MLflow integration
│   │   ├── data_client.py   # Data Collection client
│   │   ├── sqs_client.py    # SQS integration
│   │   └── dynamodb_client.py
│   └── circuit_breaker.py   # Resilience pattern
└── Dockerfile

Key Endpoint: Trigger Backtest

# services/api_gateway/api/routes/backtest.py
from fastapi import APIRouter, HTTPException, Depends
import uuid

router = APIRouter(prefix="/backtest", tags=["backtest"])

@router.post("", response_model=BacktestResponse)
async def trigger_backtest(
    request: BacktestRequest,
    sqs: SQSClient = Depends(),
    dynamo: DynamoDBClient = Depends()
):
    """
    Trigger a new backtest workflow.
    Returns immediately with run_id for status polling.
    """
    run_id = str(uuid.uuid4())

    # Register in DynamoDB
    await dynamo.create_workflow(
        run_id=run_id,
        workflow_type="backtest",
        status="QUEUED",
        metadata={
            "strategy_name": request.strategy_name,
            "strategy_version": request.strategy_version,
            "experiment_name": request.experiment_name
        }
    )

    # Send to SQS (async processing)
    await sqs.send_message(
        queue_url=settings.BACKTEST_QUEUE_URL,
        message={"run_id": run_id, **request.dict()},
        message_group_id=request.strategy_name,
        deduplication_id=run_id
    )

    return BacktestResponse(
        run_id=run_id,
        status="QUEUED",
        status_url=f"/backtest/{run_id}/status"
    )


@router.get("/{run_id}/status", response_model=BacktestStatus)
async def get_backtest_status(
    run_id: str,
    dynamo: DynamoDBClient = Depends()
):
    """Get backtest status and results."""
    workflow = await dynamo.get_workflow(run_id)

    if not workflow:
        raise HTTPException(status_code=404, detail="Backtest not found")

    response = BacktestStatus(
        run_id=run_id,
        status=workflow["status"],
        created_at=workflow["created_at"],
        updated_at=workflow["updated_at"]
    )

    if workflow["status"] == "COMPLETED":
        response.result = workflow.get("result")
        response.mlflow_url = f"{settings.MLFLOW_URL}/experiments/{run_id}"

    return response

Health Check Endpoints

# services/api_gateway/api/routes/health.py
from fastapi import APIRouter, Response
import httpx

router = APIRouter(tags=["health"])

@router.get("/health")
async def health_check():
    """Basic liveness check."""
    return {"status": "healthy", "service": "backend-api"}


@router.get("/ready")
async def readiness_check():
    """Readiness check - are dependencies available?"""
    checks = {"mlflow": False, "data_collection": False, "dynamodb": False}

    # Check MLflow
    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            resp = await client.get(f"{settings.MLFLOW_URL}/health")
            checks["mlflow"] = resp.status_code == 200
    except:
        pass

    # Check Data Collection
    try:
        async with httpx.AsyncClient(timeout=5.0) as client:
            resp = await client.get(f"{settings.DATA_COLLECTION_URL}/health")
            checks["data_collection"] = resp.status_code == 200
    except:
        pass

    # Check DynamoDB
    try:
        dynamo = DynamoDBClient()
        await dynamo.health_check()
        checks["dynamodb"] = True
    except:
        pass

    all_healthy = all(checks.values())
    return Response(
        content=json.dumps({"status": "ready" if all_healthy else "not_ready", "checks": checks}),
        status_code=200 if all_healthy else 503,
        media_type="application/json"
    )

2. Data Collection Service

Task Definition Specification

Family: tradai-data-collection
Cpu: 256 (0.25 vCPU)
Memory: 512 (512 MB)

Container: data-collection
  Image: {account}.dkr.ecr.us-east-1.amazonaws.com/tradai-data-collection:latest
  Port: 8002

  Environment Variables:
    - SERVICE_NAME: data-collection
    - ARCTIC_S3_BUCKET: tradai-arcticdb
    - ARCTIC_LIBRARY: futures

  Health Check:
    Command: curl -f http://localhost:8002/health || exit 1

Data Freshness Check

# services/data_collection/api/routes/data.py
from fastapi import APIRouter
from pydantic import BaseModel
from datetime import datetime
import arcticdb as adb

router = APIRouter(prefix="/data", tags=["data"])

class FreshnessRequest(BaseModel):
    symbols: list[str]
    timeframe: str

class FreshnessResponse(BaseModel):
    results: dict[str, dict]
    all_fresh: bool

@router.post("/freshness", response_model=FreshnessResponse)
async def check_data_freshness(request: FreshnessRequest):
    """
    Check if data is fresh enough for backtesting.
    Fresh = last update within 24 hours.
    """
    arctic = adb.Arctic(f"s3://{settings.ARCTIC_S3_BUCKET}")
    library = arctic.get_library(settings.ARCTIC_LIBRARY)

    results = {}
    all_fresh = True

    for symbol in request.symbols:
        symbol_key = f"{symbol}_{request.timeframe}"

        try:
            info = library.get_info(symbol_key)
            last_update = info.get("last_update", datetime.min)
            age_hours = (datetime.utcnow() - last_update).total_seconds() / 3600

            is_fresh = age_hours < 24
            results[symbol] = {
                "last_update": last_update.isoformat(),
                "age_hours": round(age_hours, 2),
                "is_fresh": is_fresh,
                "row_count": info.get("row_count", 0)
            }

            if not is_fresh:
                all_fresh = False

        except Exception as e:
            results[symbol] = {"error": str(e), "is_fresh": False}
            all_fresh = False

    return FreshnessResponse(results=results, all_fresh=all_fresh)

3. MLflow Service

Task Definition Specification

Family: tradai-mlflow
Cpu: 512 (0.5 vCPU)
Memory: 1024 (1 GB)

Container: mlflow
  Image: {account}.dkr.ecr.us-east-1.amazonaws.com/tradai-mlflow:latest
  Port: 5000

  Command:
    mlflow server
      --host=0.0.0.0
      --port=5000
      --backend-store-uri=postgresql://user:pass@host:5432/mlflow
      --default-artifact-root=s3://tradai-results/mlflow-artifacts

  Secrets (from Secrets Manager):
    - DB_HOST, DB_USER, DB_PASSWORD

4. Strategy Service Task (On-Demand)

Task Definition Specification

Family: tradai-strategy-service
Cpu: 512 (0.5 vCPU)
Memory: 1024 (1 GB)

Container: strategy-service
  Image: {account}.dkr.ecr.us-east-1.amazonaws.com/tradai-strategy-service:latest

  # Command set by Step Functions via environment variables
  Environment Variables:
    - COMMAND: (set by Step Functions)
    - INPUT_METHOD: s3
    - INPUT_S3_PATH: (set by Step Functions)
    - S3_CONFIG_BUCKET: tradai-configs
    - MLFLOW_TRACKING_URI: http://mlflow.tradai.local:5000

Prepare Config Command

# services/strategy_service/commands/prepare_config.py
import boto3
import mlflow
from omegaconf import OmegaConf
import yaml
import os

def prepare_config(input_data: dict) -> dict:
    """
    Prepare configuration for backtest.
    1. Fetch strategy metadata from MLflow
    2. Download base config from S3
    3. Merge with overrides
    4. Upload merged config to S3
    """
    s3 = boto3.client("s3")

    # Get strategy from MLflow
    mlflow.set_tracking_uri(os.environ["MLFLOW_TRACKING_URI"])
    client = mlflow.tracking.MlflowClient()

    model_version = client.get_model_version(
        name=input_data["strategy_name"],
        version=input_data["strategy_version"]
    )

    ecr_url = model_version.tags.get("ecr_url")
    base_config_path = model_version.tags.get("config_path")

    # Download base config
    bucket, key = base_config_path.replace("s3://", "").split("/", 1)
    response = s3.get_object(Bucket=bucket, Key=key)
    base_config = OmegaConf.create(yaml.safe_load(response["Body"]))

    # Merge with overrides
    overrides = OmegaConf.create(input_data.get("config_overrides", {}))
    merged_config = OmegaConf.merge(base_config, overrides)
    merged_config.run_id = input_data["run_id"]
    merged_config.experiment_name = input_data["experiment_name"]

    # Upload merged config
    temp_key = f"temp/{input_data['run_id']}/config.yaml"
    s3.put_object(
        Bucket=os.environ["S3_CONFIG_BUCKET"],
        Key=temp_key,
        Body=OmegaConf.to_yaml(merged_config)
    )

    return {
        "ecr_url": ecr_url,
        "config_path": f"s3://{os.environ['S3_CONFIG_BUCKET']}/{temp_key}",
        "strategy_name": input_data["strategy_name"],
        "strategy_version": input_data["strategy_version"]
    }

5. Strategy Container (Unified)

The strategy container is a unified image that handles all trading modes via the TRADING_MODE environment variable. Configuration is loaded from MLflow tags + S3 at runtime (see 11-LIVE-TRADING.md).

Trading Modes

Mode Launch Type Spot OK? Use Case
backtest ECS Task Yes Historical simulation
hyperopt ECS Task Yes Parameter optimization
dry-run ECS Service No Paper trading
live ECS Service No Production trading

Task Definition Specification (Backtest Mode)

Family: tradai-strategy-container
Cpu: 1024 (1 vCPU)
Memory: 2048 (2 GB)

Container: strategy-container
  # Image set dynamically per strategy by Step Functions

  Environment Variables:
    - TRADING_MODE: backtest
    - STRATEGY_NAME: (from Step Functions)
    - STRATEGY_STAGE: (from Step Functions)
    - MLFLOW_TRACKING_URI: http://mlflow.tradai.local:5000
    - ARCTICDB_S3_URI: s3://tradai-arcticdb
    - DYNAMODB_TABLE: tradai-workflow-state

  # Use Fargate Spot for 70% cost savings
  Capacity Provider: FARGATE_SPOT (primary), FARGATE (fallback)

Service Definition Specification (Live Trading Mode - v9.1)

Family: tradai-strategy-{name}
Cpu: 512 (0.5 vCPU)
Memory: 1024 (1 GB)

Container: strategy-container
  Image: {account}.dkr.ecr.us-east-1.amazonaws.com/tradai-{strategy}:{version}

  Environment Variables (MINIMAL - config from MLflow):
    - TRADING_MODE: live
    - STRATEGY_NAME: PascalStrategy
    - STRATEGY_STAGE: Production
    - MLFLOW_TRACKING_URI: http://mlflow.tradai.local:5000
    - ARCTICDB_S3_URI: s3://tradai-arcticdb
    - DYNAMODB_TABLE: tradai-workflow-state
    - S3_CONFIG_BUCKET: tradai-configs

  Secrets (from Secrets Manager):
    - EXCHANGE_API_KEY
    - EXCHANGE_API_SECRET

  Health Check:
    Command: curl -f http://localhost:8080/health || exit 1
    Interval: 30s
    Timeout: 5s
    Retries: 3

  # NEVER use Spot for live trading
  Capacity Provider: FARGATE only (NOT FARGATE_SPOT)

MLflow-Centric Config Loading (v9.1)

At container startup, the following sequence loads configuration:

  1. Bootstrap: Read STRATEGY_NAME and STRATEGY_STAGE from env
  2. Query MLflow: MLflowAdapter.get_model_version(name, stage)
  3. Extract tags: timeframe, pairs, warmup_days, configuration_file
  4. Load S3 config: ConfigMergeService.load_config(config_s3_path)
  5. Apply overrides: Runtime overrides from DynamoDB session (if resuming)

This design uses existing TradAI patterns: - StrategyMetadata.to_mlflow_tags() for registration - MLflowAdapter for MLflow REST API - ConfigMergeService for S3 config loading + OmegaConf merge


6. Backtest Execution Modes (v9.2)

The backtesting system supports 4 execution modes via a Protocol-based executor pattern. The mode is selected via the EXECUTOR_MODE environment variable.

6.1 Execution Mode Comparison

Mode Use Case Latency Complexity Cost
local Development/testing ~5s Low Free
ecs Simple production ~30s Medium ECS only
sqs High-volume production ~45s Medium-High ECS+SQS+Lambda
stepfunctions Complex workflows ~60s High ECS+SFN+Lambda

6.2 BacktestExecutor Protocol

# services/backend/src/tradai/backend/core/repositories.py

class BacktestExecutor(Protocol):
    """Protocol for backtest execution backends."""

    def submit(self, job: BacktestJobMessage) -> str:
        """
        Submit a backtest job for execution.

        Args:
            job: Backtest job message with config and metadata

        Returns:
            Execution identifier (task ARN, message ID, container ID, etc.)

        Raises:
            ExternalServiceError: If submission fails
        """
        ...

6.3 ECS Direct Executor (Mode: ecs)

Launches ECS tasks directly without queue buffering. Best for low-volume production.

# services/backend/src/tradai/backend/infrastructure/ecs.py

class ECSBacktestExecutor(LoggerMixin):
    """Launches strategy container as ECS Fargate task."""

    def __init__(
        self,
        cluster: str,
        task_definition_prefix: str,
        subnets: list[str],
        security_groups: list[str],
        dynamodb_table: str,
        mlflow_uri: str,
        results_bucket: str,
        ecs_client: Any = None,
    ) -> None:
        super().__init__()
        self._cluster = cluster
        self._task_prefix = task_definition_prefix
        self._subnets = subnets
        self._security_groups = security_groups
        self._table_name = dynamodb_table
        self._mlflow_uri = mlflow_uri
        self._results_bucket = results_bucket
        self._ecs = ecs_client or boto3.client("ecs")

    def submit(self, job: BacktestJobMessage) -> str:
        """Launch ECS task for backtest execution."""
        task_def = f"{self._task_prefix}{job.config.strategy}"

        response = self._ecs.run_task(
            cluster=self._cluster,
            taskDefinition=task_def,
            launchType="FARGATE",
            networkConfiguration={
                "awsvpcConfiguration": {
                    "subnets": self._subnets,
                    "securityGroups": self._security_groups,
                    "assignPublicIp": "DISABLED",
                }
            },
            overrides={
                "containerOverrides": [{
                    "name": "strategy",
                    "command": self._build_command(job),
                    "environment": [
                        {"name": "RUN_ID", "value": job.job_id},
                        {"name": "EXPERIMENT_NAME", "value": job.experiment_name},
                        {"name": "DYNAMODB_TABLE", "value": self._table_name},
                        {"name": "MLFLOW_TRACKING_URI", "value": self._mlflow_uri},
                        {"name": "S3_RESULTS_BUCKET", "value": self._results_bucket},
                    ],
                }],
            },
            capacityProviderStrategy=[
                {"capacityProvider": "FARGATE_SPOT", "weight": 1},
                {"capacityProvider": "FARGATE", "weight": 0, "base": 1},
            ],
        )

        task_arn = response["tasks"][0]["taskArn"]
        self.logger.info(f"Launched ECS task: {task_arn} for job {job.job_id}")
        return task_arn

    def _build_command(self, job: BacktestJobMessage) -> list[str]:
        """Build container entrypoint command."""
        return [
            "python", "/app/entrypoint.py",
            "--strategy", job.config.strategy,
            "--timeframe", job.config.timeframe,
            "--timerange", f"{job.config.start_date}-{job.config.end_date}",
            "--pairs", ",".join(job.config.symbols),
        ]

6.4 SQS Queue Executor (Mode: sqs)

Submits to SQS queue with Lambda trigger for backpressure handling.

# services/backend/src/tradai/backend/infrastructure/sqs.py (existing)

class SQSBacktestExecutor(LoggerMixin):
    """Submits backtest jobs to SQS queue for async processing."""

    def __init__(self, sqs_adapter: SQSAdapter) -> None:
        super().__init__()
        self._adapter = sqs_adapter

    def submit(self, job: BacktestJobMessage) -> str:
        """Submit job to SQS queue."""
        self.logger.info(f"Submitting job {job.job_id} to SQS queue")
        message_id = self._adapter.send_backtest_job(job)
        return message_id

6.5 Local Docker Executor (Mode: local)

Runs strategy container locally via Docker SDK. For development only.

# services/backend/src/tradai/backend/infrastructure/local.py

class LocalBacktestExecutor(LoggerMixin):
    """Runs strategy container locally via Docker."""

    def __init__(
        self,
        docker_client: Any = None,
        data_dir: Path | None = None,
        results_dir: Path | None = None,
        mlflow_uri: str = "http://localhost:5000",
    ) -> None:
        super().__init__()
        self._docker = docker_client or docker.from_env()
        self._data_dir = data_dir or Path.home() / ".tradai" / "data"
        self._results_dir = results_dir or Path.home() / ".tradai" / "results"
        self._mlflow_uri = mlflow_uri

    def submit(self, job: BacktestJobMessage) -> str:
        """Run strategy container locally."""
        self._results_dir.mkdir(parents=True, exist_ok=True)

        container = self._docker.containers.run(
            image=f"strategy-{job.config.strategy}:latest",
            command=self._build_command(job),
            environment={
                "RUN_ID": job.job_id,
                "MLFLOW_TRACKING_URI": self._mlflow_uri,
                "EXPERIMENT_NAME": job.experiment_name,
            },
            volumes={
                str(self._data_dir): {"bind": "/data", "mode": "ro"},
                str(self._results_dir / job.job_id): {"bind": "/results", "mode": "rw"},
            },
            detach=True,
            remove=True,
        )

        self.logger.info(f"Started local container: {container.id[:12]} for job {job.job_id}")
        return container.id

6.6 Step Functions Executor (Mode: stepfunctions)

Starts Step Functions execution for complex multi-step workflows.

# services/backend/src/tradai/backend/infrastructure/stepfunctions.py

class StepFunctionsExecutor(LoggerMixin):
    """Executes backtest via Step Functions state machine."""

    def __init__(
        self,
        state_machine_arn: str,
        sfn_client: Any = None,
    ) -> None:
        super().__init__()
        self._state_machine_arn = state_machine_arn
        self._sfn = sfn_client or boto3.client("stepfunctions")

    def submit(self, job: BacktestJobMessage) -> str:
        """Start Step Functions execution."""
        response = self._sfn.start_execution(
            stateMachineArn=self._state_machine_arn,
            name=f"backtest-{job.job_id}",
            input=json.dumps({
                "run_id": job.job_id,
                "config": job.config.model_dump(),
                "experiment_name": job.experiment_name,
            }),
        )

        execution_arn = response["executionArn"]
        self.logger.info(f"Started Step Functions execution: {execution_arn}")
        return execution_arn

6.7 Executor Factory

# services/backend/src/tradai/backend/api/dependencies.py

def get_backtest_executor(
    settings: BackendSettings = Depends(get_settings),
) -> BacktestExecutor | None:
    """Get executor based on EXECUTOR_MODE environment variable."""
    mode = settings.executor_mode

    if mode == "local":
        import docker
        return LocalBacktestExecutor(
            docker_client=docker.from_env(),
            mlflow_uri=settings.mlflow_tracking_uri,
        )

    elif mode == "ecs":
        return ECSBacktestExecutor(
            cluster=settings.ecs_cluster,
            task_definition_prefix=settings.ecs_task_prefix,
            subnets=settings.ecs_subnets,
            security_groups=settings.ecs_security_groups,
            dynamodb_table=settings.workflow_state_table,
            mlflow_uri=settings.mlflow_tracking_uri,
            results_bucket=settings.s3_results_bucket,
        )

    elif mode == "sqs":
        sqs_adapter = get_sqs_adapter(settings)
        return SQSBacktestExecutor(sqs_adapter) if sqs_adapter else None

    elif mode == "stepfunctions":
        return StepFunctionsExecutor(
            state_machine_arn=settings.step_functions_arn,
        )

    else:
        # Dev mode - no executor (jobs stay PENDING)
        return None

6.8 Container Entrypoint Script

The strategy container handles its own lifecycle via an entrypoint script:

#!/usr/bin/env python3
"""Strategy container entrypoint - handles backtest execution and status updates."""
# strategies/base/entrypoint.py

import os
import sys
import subprocess
from pathlib import Path

import boto3
from freqtrade.data.btanalysis import load_backtest_stats


def wait_for_mlflow(uri: str, max_retries: int = 10, backoff_base: float = 2.0) -> bool:
    """Wait for MLflow to be healthy with exponential backoff.

    CRITICAL: Prevents container failures when MLflow is not yet ready.

    Args:
        uri: MLflow tracking URI
        max_retries: Maximum number of retry attempts (default: 10 = ~17 min total)
        backoff_base: Base for exponential backoff in seconds

    Returns:
        True if MLflow is healthy, False if all retries exhausted
    """
    import time
    import requests

    health_url = f"{uri.rstrip('/')}/health"

    for attempt in range(max_retries):
        try:
            response = requests.get(health_url, timeout=5)
            if response.status_code == 200:
                print(f"MLflow healthy after {attempt + 1} attempt(s)")
                return True
        except requests.RequestException as e:
            wait_time = backoff_base ** attempt
            print(f"MLflow not ready (attempt {attempt + 1}/{max_retries}): {e}")
            print(f"Retrying in {wait_time:.1f}s...")
            time.sleep(wait_time)

    print(f"MLflow not healthy after {max_retries} attempts")
    return False


def main() -> int:
    """Main entrypoint for strategy container."""
    run_id = os.environ["RUN_ID"]
    strategy = os.environ.get("STRATEGY", os.environ.get("FREQTRADE_STRATEGY"))
    dynamodb_table = os.environ.get("DYNAMODB_TABLE")
    mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI")
    experiment_name = os.environ.get("EXPERIMENT_NAME", f"backtests/{strategy}")

    # Initialize DynamoDB (optional - graceful degradation for local mode)
    dynamodb = None
    if dynamodb_table:
        dynamodb = boto3.resource("dynamodb").Table(dynamodb_table)

    try:
        # 0. CRITICAL: Wait for MLflow to be healthy (prevents race condition)
        if mlflow_uri:
            if not wait_for_mlflow(mlflow_uri):
                raise RuntimeError(f"MLflow at {mlflow_uri} not available after retries")

        # 1. Update status to RUNNING
        if dynamodb:
            dynamodb.update_item(
                Key={"run_id": run_id},
                UpdateExpression="SET #s = :s, updated_at = :t",
                ExpressionAttributeNames={"#s": "status"},
                ExpressionAttributeValues={":s": "RUNNING", ":t": datetime.utcnow().isoformat()},
            )

        # 2. Build and execute Freqtrade command
        command = build_freqtrade_command()
        print(f"Executing: {' '.join(command)}")
        result = subprocess.run(command, check=True, capture_output=True, text=True)
        print(result.stdout)

        # 3. Parse results
        results_dir = Path(os.environ.get("RESULTS_DIR", "/freqtrade/user_data/backtest_results"))
        stats = load_backtest_stats(results_dir)

        # 4. Log to MLflow
        if mlflow_uri:
            import mlflow
            mlflow.set_tracking_uri(mlflow_uri)
            mlflow.set_experiment(experiment_name)
            with mlflow.start_run(run_name=run_id):
                mlflow.log_metrics(extract_metrics(stats))
                mlflow.log_params({"strategy": strategy, "run_id": run_id})

        # 5. Upload results to S3
        upload_results_to_s3(run_id, stats)

        # 6. Update status to COMPLETED
        if dynamodb:
            dynamodb.update_item(
                Key={"run_id": run_id},
                UpdateExpression="SET #s = :s, #r = :r, updated_at = :t",
                ExpressionAttributeNames={"#s": "status", "#r": "result"},
                ExpressionAttributeValues={
                    ":s": "COMPLETED",
                    ":r": json.dumps(summarize_results(stats)),
                    ":t": datetime.utcnow().isoformat(),
                },
            )

        print(f"Backtest completed successfully: {run_id}")
        return 0

    except subprocess.CalledProcessError as e:
        error_msg = f"Freqtrade execution failed: {e.stderr}"
        handle_failure(dynamodb, run_id, error_msg)
        return 1

    except Exception as e:
        error_msg = f"Unexpected error: {str(e)}"
        handle_failure(dynamodb, run_id, error_msg)
        return 1


def build_freqtrade_command() -> list[str]:
    """Build Freqtrade CLI command from environment variables."""
    cmd = ["freqtrade", "backtesting"]

    if strategy := os.environ.get("STRATEGY"):
        cmd.extend(["--strategy", strategy])
    if timeframe := os.environ.get("TIMEFRAME"):
        cmd.extend(["--timeframe", timeframe])
    if timerange := os.environ.get("TIMERANGE"):
        cmd.extend(["--timerange", timerange])
    if pairs := os.environ.get("PAIRS"):
        cmd.extend(["--pairs", pairs])
    if config := os.environ.get("CONFIG_PATH"):
        cmd.extend(["--config", config])

    cmd.extend(["--export", "trades"])
    return cmd


def handle_failure(dynamodb, run_id: str, error_msg: str) -> None:
    """Update status to FAILED."""
    print(error_msg, file=sys.stderr)
    if dynamodb:
        dynamodb.update_item(
            Key={"run_id": run_id},
            UpdateExpression="SET #s = :s, #e = :e, updated_at = :t",
            ExpressionAttributeNames={"#s": "status", "#e": "error"},
            ExpressionAttributeValues={
                ":s": "FAILED",
                ":e": error_msg[:1000],
                ":t": datetime.utcnow().isoformat(),
            },
        )


if __name__ == "__main__":
    sys.exit(main())

6.9 When to Use Each Mode

Decision Tree:

Is this development/testing?
├── YES → Use LOCAL mode
└── NO → Is this a single simple backtest?
         ├── YES → Use ECS DIRECT mode
         └── NO → Is ordering/reliability critical?
                  ├── YES → Use SQS mode
                  └── NO → Is this a multi-step workflow?
                           ├── YES → Use STEP FUNCTIONS mode
                           └── NO → Use ECS DIRECT mode

7. Lambda Functions

Lambda Inventory

Function Memory Timeout Trigger Purpose
sqs-consumer 256MB 60s SQS Process backtest queue
data-collection-proxy 256MB 60s Step Functions Call Data Collection
validate-strategy 256MB 30s Step Functions Check ECR/S3 exists
transform-results 512MB 60s Step Functions Format results
cleanup-resources 256MB 30s Step Functions Delete temp files
notify-completion 256MB 30s Step Functions Send notifications
orphan-scanner 128MB 60s EventBridge (15 min) Detect stuck RUNNING jobs (v9.2)

SQS Consumer Lambda

Supports both Step Functions and direct ECS launch via LAUNCH_MODE environment variable.

# lambda/sqs_consumer/index.py
import json
import os
import boto3
from datetime import datetime

ecs_client = boto3.client('ecs')
sf_client = boto3.client('stepfunctions')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['WORKFLOW_STATE_TABLE'])

# Configuration
LAUNCH_MODE = os.environ.get('LAUNCH_MODE', 'ecs')  # 'ecs' or 'stepfunctions'
ECS_CLUSTER = os.environ.get('ECS_CLUSTER')
ECS_TASK_PREFIX = os.environ.get('ECS_TASK_PREFIX', 'strategy-')
ECS_SUBNETS = os.environ.get('ECS_SUBNETS', '').split(',')
ECS_SECURITY_GROUPS = os.environ.get('ECS_SECURITY_GROUPS', '').split(',')
STEP_FUNCTIONS_ARN = os.environ.get('BACKTEST_WORKFLOW_ARN')


def handler(event, context):
    """Process SQS messages and launch backtests."""
    batch_item_failures = []

    for record in event['Records']:
        message_id = record['messageId']

        try:
            message = json.loads(record['body'])
            run_id = message['run_id']

            # Idempotency check
            if is_already_running(run_id):
                print(f"Backtest {run_id} already running, skipping")
                continue

            # Launch based on mode
            if LAUNCH_MODE == 'stepfunctions':
                execution_arn = launch_step_functions(message, run_id)
            else:  # Default: ECS direct
                execution_arn = launch_ecs_task(message, run_id)

            # Store execution ARN
            store_execution_arn(run_id, execution_arn)
            print(f"Launched backtest {run_id}: {execution_arn}")

        except Exception as e:
            print(f"Error processing {message_id}: {str(e)}")
            batch_item_failures.append({"itemIdentifier": message_id})

    return {"batchItemFailures": batch_item_failures}


def launch_ecs_task(message: dict, run_id: str) -> str:
    """Launch ECS task directly (simpler, lower latency)."""
    strategy = message['config']['strategy']
    task_def = f"{ECS_TASK_PREFIX}{strategy}"

    response = ecs_client.run_task(
        cluster=ECS_CLUSTER,
        taskDefinition=task_def,
        launchType='FARGATE',
        networkConfiguration={
            'awsvpcConfiguration': {
                'subnets': ECS_SUBNETS,
                'securityGroups': ECS_SECURITY_GROUPS,
                'assignPublicIp': 'DISABLED',
            }
        },
        overrides={
            'containerOverrides': [{
                'name': 'strategy',
                'environment': [
                    {'name': 'RUN_ID', 'value': run_id},
                    {'name': 'STRATEGY', 'value': strategy},
                    {'name': 'TIMEFRAME', 'value': message['config']['timeframe']},
                    {'name': 'TIMERANGE', 'value': f"{message['config']['start_date']}-{message['config']['end_date']}"},
                    {'name': 'PAIRS', 'value': ','.join(message['config']['symbols'])},
                    {'name': 'EXPERIMENT_NAME', 'value': message.get('experiment_name', f'backtests/{strategy}')},
                    {'name': 'DYNAMODB_TABLE', 'value': os.environ['WORKFLOW_STATE_TABLE']},
                    {'name': 'MLFLOW_TRACKING_URI', 'value': os.environ.get('MLFLOW_TRACKING_URI', '')},
                ],
            }],
        },
        capacityProviderStrategy=[
            {'capacityProvider': 'FARGATE_SPOT', 'weight': 1},
            {'capacityProvider': 'FARGATE', 'weight': 0, 'base': 1},
        ],
    )

    return response['tasks'][0]['taskArn']


def launch_step_functions(message: dict, run_id: str) -> str:
    """Launch Step Functions execution (for complex workflows)."""
    response = sf_client.start_execution(
        stateMachineArn=STEP_FUNCTIONS_ARN,
        name=f"backtest-{run_id}",
        input=json.dumps(message)
    )
    return response['executionArn']


def is_already_running(run_id: str) -> bool:
    response = table.get_item(Key={'run_id': run_id})
    return response.get('Item', {}).get('status') == 'RUNNING'


def store_execution_arn(run_id: str, execution_arn: str) -> None:
    table.update_item(
        Key={'run_id': run_id},
        UpdateExpression='SET execution_arn = :arn, updated_at = :t',
        ExpressionAttributeValues={
            ':arn': execution_arn,
            ':t': datetime.utcnow().isoformat(),
        },
    )

Data Collection Proxy Lambda

# lambda/data_collection_proxy/index.py
import json
import httpx
import os

DATA_COLLECTION_URL = os.environ.get(
    'DATA_COLLECTION_URL',
    'http://data-collection.tradai.local:8002'
)

def handler(event, context):
    """Proxy requests to Data Collection Service (VPC internal)."""
    operation = event.get('operation')

    with httpx.Client(timeout=30.0) as client:
        if operation == 'check-freshness':
            response = client.post(
                f"{DATA_COLLECTION_URL}/data/freshness",
                json={"symbols": event['symbols'], "timeframe": event['timeframe']}
            )
        elif operation == 'validate':
            response = client.post(
                f"{DATA_COLLECTION_URL}/data/validate",
                json={"symbols": event['symbols']}
            )
        else:
            raise ValueError(f"Unknown operation: {operation}")

        response.raise_for_status()
        return response.json()

7. Circuit Breaker Implementation

# libs/common/src/circuit_breaker.py
from enum import Enum
from datetime import datetime
from typing import Callable
import functools

class CircuitState(Enum):
    CLOSED = "closed"       # Normal
    OPEN = "open"           # Failing
    HALF_OPEN = "half_open" # Testing

class CircuitBreakerOpenError(Exception):
    pass

class CircuitBreaker:
    """
    Circuit breaker for resilient service calls.

    Usage:
        cb = CircuitBreaker(failure_threshold=5, recovery_timeout=30)

        @cb
        async def call_external_service():
            ...
    """
    def __init__(self, failure_threshold=5, recovery_timeout=30):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = CircuitState.CLOSED
        self.failures = 0
        self.last_failure_time = None

    def __call__(self, func: Callable):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            self._check_state()
            try:
                result = await func(*args, **kwargs)
                self._on_success()
                return result
            except Exception as e:
                self._on_failure()
                raise
        return wrapper

    def _check_state(self):
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpenError("Circuit is open")

    def _should_attempt_reset(self) -> bool:
        if not self.last_failure_time:
            return True
        elapsed = (datetime.utcnow() - self.last_failure_time).total_seconds()
        return elapsed >= self.recovery_timeout

    def _on_success(self):
        self.failures = 0
        self.state = CircuitState.CLOSED

    def _on_failure(self):
        self.failures += 1
        self.last_failure_time = datetime.utcnow()
        if self.failures >= self.failure_threshold:
            self.state = CircuitState.OPEN

8. Service Cost Summary

Service Type Monthly Cost
Backend API Long-running $14.60
Data Collection Long-running $7.30
MLflow Long-running $14.60
Strategy Tasks (Spot) On-demand $1.92
Lambda Functions Serverless $0.00
TOTAL $38.42

9. Common Service Package Structure

All TradAI services follow a standardized package structure to ensure consistency, maintainability, and ease of local development.

9.1 Standard Service Layout

libs/tradai-{service}/
├── src/tradai/{service}/
│   ├── __init__.py           # Package exports
│   ├── __main__.py           # CLI entry point: python -m tradai.{service}
│   ├── app.py                # FastAPI application factory
│   ├── cli.py                # Typer CLI commands
│   ├── config.py             # Pydantic settings (extends tradai.common.Settings)
│   │
│   ├── api/                  # FastAPI routes (presentation layer)
│   │   ├── __init__.py
│   │   ├── routes.py         # Route definitions
│   │   ├── deps.py           # Dependency injection
│   │   └── schemas.py        # Request/Response Pydantic models
│   │
│   ├── core/                 # Business logic (domain layer)
│   │   ├── __init__.py
│   │   ├── entities.py       # Domain entities
│   │   ├── services.py       # Business logic services
│   │   └── repositories.py   # Repository interfaces
│   │
│   └── infrastructure/       # External integrations (infrastructure layer)
│       ├── __init__.py
│       ├── repositories/     # Repository implementations
│       │   └── {adapter}.py
│       └── adapters/         # External service adapters
│           └── {adapter}.py
├── tests/
│   ├── __init__.py
│   ├── conftest.py           # Pytest fixtures
│   ├── test_api/             # API tests
│   ├── test_core/            # Unit tests
│   └── test_infrastructure/  # Integration tests
├── Dockerfile                # Service container
├── pyproject.toml            # Dependencies and config
└── README.md                 # Service documentation

9.2 Base Classes from tradai-common

All services inherit from common base classes:

# libs/tradai-common/src/tradai/common/

# Base service class
class BaseService(LoggerMixin):
    """Base class with logging and Hydra config support."""
    settings_class: type[Settings]

    @classmethod
    def from_hydra_cfg(cls, cfg: DictConfig, config_name: str) -> Self:
        """Create service from Hydra configuration."""

# Base settings class
class Settings(BaseSettings):
    """Pydantic settings with Hydra integration."""

    @classmethod
    def from_hydra_cfg(cls, cfg: DictConfig, config_name: str) -> Self:
        """Load settings from Hydra config."""

# Logger mixin
class LoggerMixin:
    """Provides structured JSON logging."""

    @property
    def logger(self) -> logging.Logger:
        """Get configured logger for this class."""

9.3 Service Template (cookiecutter-tradai-service)

New services can be scaffolded using the cookiecutter template:

# Create new service
uv run cookiecutter cookiecutter-tradai-service/

# Prompts:
#   service_name: MyService
#   service_slug: my_service
#   port: 8004
#   description: My new TradAI service
#   with_database: [yes/no]
#   with_sqs: [yes/no]

Template Structure:

cookiecutter-tradai-service/
├── cookiecutter.json
├── hooks/
│   ├── pre_gen_project.py    # Validation
│   └── post_gen_project.py   # Post-generation setup
└── {{cookiecutter.service_slug}}/
    ├── src/tradai/{{cookiecutter.service_slug}}/
    │   ├── __init__.py
    │   ├── __main__.py
    │   ├── app.py
    │   ├── cli.py
    │   ├── config.py
    │   ├── api/
    │   │   ├── __init__.py
    │   │   ├── routes.py
    │   │   ├── deps.py
    │   │   └── schemas.py
    │   ├── core/
    │   │   ├── __init__.py
    │   │   ├── entities.py
    │   │   ├── services.py
    │   │   └── repositories.py
    │   └── infrastructure/
    │       └── __init__.py
    ├── tests/
    │   ├── conftest.py
    │   └── test_api.py
    ├── Dockerfile
    ├── pyproject.toml
    └── README.md

cookiecutter.json:

{
  "service_name": "MyService",
  "service_slug": "{{ cookiecutter.service_name | lower | replace(' ', '_') }}",
  "service_class": "{{ cookiecutter.service_name | replace(' ', '') }}",
  "port": "8004",
  "description": "A TradAI service",
  "author": "TradAI Team",
  "with_database": ["no", "yes"],
  "with_sqs": ["no", "yes"],
  "with_s3": ["yes", "no"]
}

10. CLI Support

All services support both CLI and programmatic usage, enabling local development and testing.

10.1 CLI Entry Point Pattern

Each service implements a standardized CLI using Typer:

# libs/tradai-{service}/src/tradai/{service}/__main__.py
"""Entry point for: python -m tradai.{service}"""

from tradai.{service}.cli import app

if __name__ == "__main__":
    app()
# libs/tradai-{service}/src/tradai/{service}/cli.py
"""CLI commands for {service}."""

import typer
import uvicorn
from typing import Optional

from tradai.{service}.config import Settings
from tradai.{service}.app import create_app

app = typer.Typer(
    name="tradai-{service}",
    help="TradAI {Service} - {description}",
    add_completion=False,
)


@app.command()
def serve(
    host: str = typer.Option("0.0.0.0", "--host", "-h", help="Bind host"),
    port: int = typer.Option(8000, "--port", "-p", help="Bind port"),
    reload: bool = typer.Option(False, "--reload", "-r", help="Enable hot reload"),
    workers: int = typer.Option(1, "--workers", "-w", help="Number of workers"),
    log_level: str = typer.Option("info", "--log-level", "-l", help="Log level"),
):
    """Start the service HTTP server."""
    typer.echo(f"Starting {service} on {host}:{port}")

    uvicorn.run(
        "tradai.{service}.app:create_app",
        factory=True,
        host=host,
        port=port,
        reload=reload,
        workers=workers,
        log_level=log_level,
    )


@app.command()
def health():
    """Check service health (without starting server)."""
    from tradai.{service}.core.services import HealthService

    settings = Settings()
    health_service = HealthService(settings)
    result = health_service.check()

    if result.healthy:
        typer.echo("✓ Service is healthy")
        raise typer.Exit(0)
    else:
        typer.echo(f"✗ Service unhealthy: {result.message}")
        raise typer.Exit(1)


@app.command()
def config():
    """Show current configuration."""
    settings = Settings()
    typer.echo(settings.model_dump_json(indent=2))


@app.command()
def version():
    """Show version information."""
    from tradai.{service} import __version__
    typer.echo(f"tradai-{service} {__version__}")


if __name__ == "__main__":
    app()

10.2 CLI Usage Examples

# Run service via Python module
python -m tradai.data serve --port 8002
python -m tradai.backend serve --port 8000 --reload

# Run via installed CLI (after pip install)
tradai-data serve --port 8002
tradai-backend serve --port 8000 --reload

# Other CLI commands
python -m tradai.data health          # Check health
python -m tradai.data config          # Show config
python -m tradai.data version         # Show version
python -m tradai.data --help          # Show help

# With environment variables
LOG_LEVEL=debug python -m tradai.data serve
ARCTIC_S3_BUCKET=my-bucket python -m tradai.data serve

10.3 pyproject.toml CLI Scripts

# libs/tradai-{service}/pyproject.toml

[project.scripts]
tradai-{service} = "tradai.{service}.cli:app"

# Example for tradai-data:
# tradai-data = "tradai.data.cli:app"

10.4 Root-Level Unified CLI

A root-level tradai CLI provides unified access to all services:

# src/tradai_cli/__main__.py
"""Unified TradAI CLI."""

import typer

app = typer.Typer(
    name="tradai",
    help="TradAI Platform CLI",
    add_completion=False,
)

# Import and register sub-apps
from tradai.data.cli import app as data_app
from tradai.backend.cli import app as backend_app

app.add_typer(data_app, name="data", help="Data Collection service")
app.add_typer(backend_app, name="backend", help="Backend API service")


@app.command()
def up(
    services: list[str] = typer.Argument(None, help="Services to start (default: all)"),
    detach: bool = typer.Option(False, "--detach", "-d", help="Run in background"),
):
    """Start services using Docker Compose."""
    import subprocess

    cmd = ["docker", "compose", "up"]
    if detach:
        cmd.append("-d")
    if services:
        cmd.extend(services)

    subprocess.run(cmd)


@app.command()
def down():
    """Stop all services."""
    import subprocess
    subprocess.run(["docker", "compose", "down"])


@app.command()
def logs(
    service: str = typer.Argument(..., help="Service name"),
    follow: bool = typer.Option(False, "--follow", "-f", help="Follow logs"),
):
    """View service logs."""
    import subprocess

    cmd = ["docker", "compose", "logs"]
    if follow:
        cmd.append("-f")
    cmd.append(service)

    subprocess.run(cmd)


if __name__ == "__main__":
    app()

Usage:

# Unified CLI
tradai up                      # Start all services
tradai up backend data         # Start specific services
tradai down                    # Stop all services
tradai logs backend -f         # Follow backend logs

# Service-specific commands via unified CLI
tradai data serve --port 8002
tradai backend health
tradai data config

11. Docker Support

11.1 Service Dockerfile Template

Each service uses a standardized multi-stage Dockerfile:

# libs/tradai-{service}/Dockerfile

# ============================================
# Stage 1: Builder
# ============================================
FROM python:3.11-slim as builder

WORKDIR /app

# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Install uv for fast dependency resolution
RUN curl -LsSf https://astral.sh/uv/install.sh | sh
ENV PATH="/root/.cargo/bin:$PATH"

# Copy dependency files
COPY pyproject.toml .
COPY libs/tradai-common/pyproject.toml libs/tradai-common/
COPY libs/tradai-{service}/pyproject.toml libs/tradai-{service}/

# Install dependencies
RUN uv pip install --system --no-cache \
    -e libs/tradai-common \
    -e libs/tradai-{service}

# ============================================
# Stage 2: Runtime
# ============================================
FROM python:3.11-slim as runtime

WORKDIR /app

# Install runtime dependencies only
RUN apt-get update && apt-get install -y --no-install-recommends \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Copy installed packages from builder
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin

# Copy application code
COPY libs/tradai-common/src libs/tradai-common/src
COPY libs/tradai-{service}/src libs/tradai-{service}/src

# Set Python path
ENV PYTHONPATH="/app/libs/tradai-common/src:/app/libs/tradai-{service}/src"
ENV PYTHONUNBUFFERED=1

# Health check
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
    CMD curl -f http://localhost:${PORT:-8000}/health || exit 1

# Default port (override via environment)
ENV PORT=8000
EXPOSE ${PORT}

# Run the service
ENTRYPOINT ["python", "-m", "tradai.{service}"]
CMD ["serve", "--host", "0.0.0.0"]

11.2 Docker Compose for Local Development

# docker-compose.yaml (project root)

version: "3.9"

services:
  # ===========================================
  # Backend API Service
  # ===========================================
  backend:
    build:
      context: .
      dockerfile: libs/tradai-backend/Dockerfile
    container_name: tradai-backend
    ports:
      - "8000:8000"
    environment:
      - PORT=8000
      - LOG_LEVEL=debug
      - DATA_COLLECTION_URL=http://data:8002
      - MLFLOW_TRACKING_URI=http://mlflow:5000
      - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-}
      - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-}
      - AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION:-us-east-1}
    volumes:
      - ./libs/tradai-common/src:/app/libs/tradai-common/src:ro
      - ./libs/tradai-backend/src:/app/libs/tradai-backend/src:ro
    depends_on:
      - data
      - mlflow
    networks:
      - tradai-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 5s
      retries: 3

  # ===========================================
  # Data Collection Service
  # ===========================================
  data:
    build:
      context: .
      dockerfile: libs/tradai-data/Dockerfile
    container_name: tradai-data
    ports:
      - "8002:8002"
    environment:
      - PORT=8002
      - LOG_LEVEL=debug
      - ARCTIC_S3_BUCKET=${ARCTIC_S3_BUCKET:-tradai-arcticdb-dev}
      - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-}
      - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-}
      - AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION:-us-east-1}
    volumes:
      - ./libs/tradai-common/src:/app/libs/tradai-common/src:ro
      - ./libs/tradai-data/src:/app/libs/tradai-data/src:ro
    networks:
      - tradai-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8002/health"]
      interval: 30s
      timeout: 5s
      retries: 3

  # ===========================================
  # MLflow Tracking Server
  # ===========================================
  mlflow:
    image: ghcr.io/mlflow/mlflow:v2.18.0
    container_name: tradai-mlflow
    ports:
      - "5000:5000"
    environment:
      - MLFLOW_BACKEND_STORE_URI=sqlite:///mlflow/mlflow.db
      - MLFLOW_DEFAULT_ARTIFACT_ROOT=/mlflow/artifacts
    volumes:
      - mlflow-data:/mlflow
    command: >
      mlflow server
      --host 0.0.0.0
      --port 5000
      --backend-store-uri sqlite:///mlflow/mlflow.db
      --default-artifact-root /mlflow/artifacts
    networks:
      - tradai-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
      interval: 30s
      timeout: 5s
      retries: 3

  # ===========================================
  # PostgreSQL (for local MLflow/testing)
  # ===========================================
  postgres:
    image: postgres:15-alpine
    container_name: tradai-postgres
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=tradai
      - POSTGRES_PASSWORD=tradai_dev
      - POSTGRES_DB=tradai
    volumes:
      - postgres-data:/var/lib/postgresql/data
    networks:
      - tradai-network
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U tradai"]
      interval: 10s
      timeout: 5s
      retries: 5
    profiles:
      - full  # Only start with: docker compose --profile full up

  # ===========================================
  # Redis (for caching - optional)
  # ===========================================
  redis:
    image: redis:7-alpine
    container_name: tradai-redis
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    networks:
      - tradai-network
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    profiles:
      - full  # Only start with: docker compose --profile full up

networks:
  tradai-network:
    driver: bridge
    name: tradai-network

volumes:
  mlflow-data:
  postgres-data:
  redis-data:

11.3 Docker Compose Override for Development

# docker-compose.override.yaml (auto-loaded in development)

version: "3.9"

services:
  backend:
    build:
      target: builder  # Use builder stage with all dev dependencies
    environment:
      - LOG_LEVEL=debug
      - RELOAD=true
    command: ["serve", "--host", "0.0.0.0", "--reload"]
    volumes:
      # Mount source for hot reload
      - ./libs/tradai-common/src:/app/libs/tradai-common/src
      - ./libs/tradai-backend/src:/app/libs/tradai-backend/src

  data:
    build:
      target: builder
    environment:
      - LOG_LEVEL=debug
      - RELOAD=true
    command: ["serve", "--host", "0.0.0.0", "--reload"]
    volumes:
      - ./libs/tradai-common/src:/app/libs/tradai-common/src
      - ./libs/tradai-data/src:/app/libs/tradai-data/src

11.4 Local Development Commands

# ===========================================
# Docker Compose Commands
# ===========================================

# Start all services (with hot reload in dev)
docker compose up

# Start specific services
docker compose up backend data

# Start in background
docker compose up -d

# Start with full stack (postgres, redis)
docker compose --profile full up

# Stop all services
docker compose down

# Rebuild and start
docker compose up --build

# View logs
docker compose logs -f backend
docker compose logs -f data

# Execute command in running container
docker compose exec backend python -m tradai.backend health

# ===========================================
# Without Docker (CLI directly)
# ===========================================

# Install dependencies
uv sync

# Run services directly
uv run python -m tradai.data serve --port 8002
uv run python -m tradai.backend serve --port 8000 --reload

# Run with specific config
LOG_LEVEL=debug uv run python -m tradai.data serve

# Run tests
uv run pytest libs/tradai-data/tests/
uv run pytest libs/tradai-backend/tests/

# ===========================================
# Mixed Mode (some Docker, some local)
# ===========================================

# Start only MLflow in Docker
docker compose up mlflow -d

# Run backend locally pointing to Docker MLflow
MLFLOW_TRACKING_URI=http://localhost:5000 uv run python -m tradai.backend serve

12. Application Factory Pattern

12.1 FastAPI App Factory

Each service uses an application factory for flexible configuration:

# libs/tradai-{service}/src/tradai/{service}/app.py
"""FastAPI application factory."""

from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from tradai.{service}.config import Settings
from tradai.{service}.api.routes import router
from tradai.{service}.api.deps import get_settings


def create_app(settings: Settings | None = None) -> FastAPI:
    """
    Create and configure FastAPI application.

    Args:
        settings: Optional settings override (for testing)

    Returns:
        Configured FastAPI application
    """
    if settings is None:
        settings = Settings()

    @asynccontextmanager
    async def lifespan(app: FastAPI):
        """Application lifespan handler."""
        # Startup
        app.state.settings = settings
        yield
        # Shutdown
        # Cleanup resources here

    app = FastAPI(
        title=f"TradAI {settings.service_name}",
        description=settings.description,
        version=settings.version,
        lifespan=lifespan,
        docs_url="/docs" if settings.debug else None,
        redoc_url="/redoc" if settings.debug else None,
    )

    # Middleware
    app.add_middleware(
        CORSMiddleware,
        allow_origins=settings.cors_origins,
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )

    # Routes
    app.include_router(router)

    # Dependency override for settings
    app.dependency_overrides[get_settings] = lambda: settings

    return app

12.2 Service Configuration

# libs/tradai-{service}/src/tradai/{service}/config.py
"""Service configuration."""

from typing import Literal
from pydantic import Field
from tradai.common import Settings as BaseSettings


class Settings(BaseSettings):
    """Configuration for {service}."""

    # Service identity
    service_name: str = "{service}"
    description: str = "TradAI {Service}"
    version: str = "0.1.0"

    # Server settings
    host: str = Field(default="0.0.0.0", env="HOST")
    port: int = Field(default=8000, env="PORT")
    debug: bool = Field(default=False, env="DEBUG")
    log_level: Literal["debug", "info", "warning", "error"] = Field(
        default="info", env="LOG_LEVEL"
    )

    # CORS
    cors_origins: list[str] = Field(
        default=["http://localhost:3000", "http://localhost:8000"],
        env="CORS_ORIGINS",
    )

    # Service-specific settings (override in subclass)
    # ...

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

13. Service Cost Summary

Service Type Monthly Cost
Backend API Long-running $14.60
Data Collection Long-running $7.30
MLflow Long-running $14.60
Strategy Tasks (Spot) On-demand $1.92
Lambda Functions Serverless $0.00
TOTAL $38.42

14. Orphan Job Detection (v9.2)

CRITICAL: Jobs can become orphaned (stuck in RUNNING) if the container crashes before updating status. This section defines the detection and recovery mechanism.

14.1 Problem Statement

Container Crash Scenarios:
├── OOM Kill (memory exhausted)
├── Spot Instance interruption (2-min warning, but may not complete)
├── Network partition (can't reach DynamoDB)
├── Unhandled exception before finally block
└── ECS task killed (deployment, scaling)

Result: Job stays RUNNING in DynamoDB forever

14.2 CloudWatch Alarm for Orphan Detection

# Pulumi/CloudFormation: Orphan Job Detection Alarm
OrphanJobAlarm:
  Type: AWS::CloudWatch::Alarm
  Properties:
    AlarmName: tradai-orphan-jobs
    AlarmDescription: "Jobs stuck in RUNNING state for >2x expected duration"
    MetricName: OrphanedJobs
    Namespace: TradAI/Backtests
    Statistic: Sum
    Period: 300  # 5 minutes
    EvaluationPeriods: 2
    Threshold: 1
    ComparisonOperator: GreaterThanOrEqualToThreshold
    TreatMissingData: notBreaching
    AlarmActions:
      - !Ref AlertTopic

14.3 Lambda: Orphan Job Scanner

Runs every 15 minutes via EventBridge, scans for jobs RUNNING > threshold.

# lambda/orphan_scanner/index.py
import os
import boto3
from datetime import datetime, timedelta

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['WORKFLOW_STATE_TABLE'])
cloudwatch = boto3.client('cloudwatch')
sns = boto3.client('sns')

# Configuration
MAX_RUNNING_MINUTES = int(os.environ.get('MAX_RUNNING_MINUTES', '90'))  # 1.5 hours
ALERT_TOPIC_ARN = os.environ.get('ALERT_TOPIC_ARN')


def handler(event, context):
    """Scan for orphaned jobs and emit metrics/alerts."""
    threshold = datetime.utcnow() - timedelta(minutes=MAX_RUNNING_MINUTES)
    threshold_iso = threshold.isoformat()

    # Query for RUNNING jobs older than threshold
    response = table.query(
        IndexName='status-created_at-index',
        KeyConditionExpression='#s = :status AND created_at < :threshold',
        ExpressionAttributeNames={'#s': 'status'},
        ExpressionAttributeValues={
            ':status': 'RUNNING',
            ':threshold': threshold_iso,
        },
    )

    orphaned_jobs = response.get('Items', [])
    orphan_count = len(orphaned_jobs)

    # Emit CloudWatch metric
    cloudwatch.put_metric_data(
        Namespace='TradAI/Backtests',
        MetricData=[{
            'MetricName': 'OrphanedJobs',
            'Value': orphan_count,
            'Unit': 'Count',
            'Dimensions': [
                {'Name': 'Environment', 'Value': os.environ.get('ENVIRONMENT', 'dev')},
            ],
        }],
    )

    # Mark orphaned jobs as FAILED and alert
    for job in orphaned_jobs:
        run_id = job['run_id']
        running_since = job.get('updated_at', job.get('created_at'))

        # Update status to FAILED with orphan reason
        table.update_item(
            Key={'run_id': run_id},
            UpdateExpression='SET #s = :s, #e = :e, updated_at = :t',
            ExpressionAttributeNames={'#s': 'status', '#e': 'error'},
            ExpressionAttributeValues={
                ':s': 'FAILED',
                ':e': f'Orphaned: RUNNING since {running_since}, exceeded {MAX_RUNNING_MINUTES}min threshold',
                ':t': datetime.utcnow().isoformat(),
            },
        )
        print(f"Marked orphaned job as FAILED: {run_id}")

    # Send alert if orphans found
    if orphan_count > 0 and ALERT_TOPIC_ARN:
        sns.publish(
            TopicArn=ALERT_TOPIC_ARN,
            Subject=f'[TradAI] {orphan_count} Orphaned Jobs Detected',
            Message=f'''
Orphan Job Scanner detected {orphan_count} job(s) stuck in RUNNING state.

Jobs have been automatically marked as FAILED.

Job IDs:
{chr(10).join(f"- {job['run_id']}" for job in orphaned_jobs)}

Threshold: {MAX_RUNNING_MINUTES} minutes
Scan Time: {datetime.utcnow().isoformat()}

Action Required: Investigate why containers are not completing status updates.
Possible causes:
- Container OOM kills
- Spot instance interruptions
- Network issues preventing DynamoDB access
- Unhandled exceptions in entrypoint
''',
        )

    return {
        'orphan_count': orphan_count,
        'jobs_marked_failed': [job['run_id'] for job in orphaned_jobs],
    }

14.4 EventBridge Rule

OrphanScannerSchedule:
  Type: AWS::Events::Rule
  Properties:
    Name: tradai-orphan-scanner-schedule
    Description: "Trigger orphan job scanner every 15 minutes"
    ScheduleExpression: "rate(15 minutes)"
    State: ENABLED
    Targets:
      - Id: OrphanScannerLambda
        Arn: !GetAtt OrphanScannerLambda.Arn

14.5 Lambda Configuration

Parameter Value
Function Name tradai-orphan-scanner
Runtime Python 3.11
Memory 128 MB
Timeout 60s
VPC Yes (needs DynamoDB access)
Trigger EventBridge (every 15 min)
Cost ~$0.05/month

14.6 DynamoDB GSI Requirement

The orphan scanner requires a Global Secondary Index for efficient querying:

# Already defined in 10-CANONICAL-CONFIG.md
GlobalSecondaryIndex:
  IndexName: status-created_at-index
  KeySchema:
    - AttributeName: status
      KeyType: HASH
    - AttributeName: created_at
      KeyType: RANGE
  Projection:
    ProjectionType: ALL

Next Steps

  1. Review 06-STEP-FUNCTIONS.md for workflow orchestration
  2. Review 09-PULUMI-CODE.md for infrastructure code
  3. Use cookiecutter-tradai-service to scaffold new services
  4. Run locally with docker compose up or python -m tradai.{service} serve