TradAI Final Architecture - Services¶
Version: 9.2.1 | Date: 2025-12-09
Service Overview¶
| Service | Type | Resources | Port | Purpose |
|---|---|---|---|---|
| Backend API | ECS Fargate (Long-running) | 0.5 vCPU, 1GB | 8000 | API endpoints, orchestration |
| Data Collection | ECS Fargate (Long-running) | 0.25 vCPU, 512MB | 8002 | Data freshness, metadata |
| MLflow | ECS Fargate (Long-running) | 0.5 vCPU, 1GB | 5000 | Experiment tracking, model registry |
| Strategy Task | ECS Fargate (On-demand) | 0.5 vCPU, 1GB | - | Config preparation |
| Strategy Container (Backtest) | ECS Fargate (On-demand) | 1 vCPU, 2GB | - | Backtest execution |
| Strategy Container (Live) | ECS Fargate (Long-running) | 0.5 vCPU, 1GB | 8080 | Live/Dry-run trading (v9.1) |
1. Backend API Service¶
Task Definition Specification¶
Family: tradai-backend-api
NetworkMode: awsvpc
Cpu: 512 (0.5 vCPU)
Memory: 1024 (1 GB)
Container: backend-api
Image: {account}.dkr.ecr.us-east-1.amazonaws.com/tradai-backend:latest
Port: 8000
Environment Variables:
- SERVICE_NAME: backend-api
- LOG_LEVEL: INFO
- DATA_COLLECTION_URL: http://data-collection.tradai.local:8002
- MLFLOW_TRACKING_URI: http://mlflow.tradai.local:5000
- BACKTEST_QUEUE_URL: (from SQS)
- WORKFLOW_STATE_TABLE: tradai-workflow-state
Secrets (from Secrets Manager):
- MLFLOW_TRACKING_USERNAME
- MLFLOW_TRACKING_PASSWORD
Health Check:
Command: curl -f http://localhost:8000/health || exit 1
Interval: 30s
Timeout: 5s
Retries: 3
Application Code Structure¶
services/api_gateway/
├── api/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── routes/
│ │ ├── strategies.py # GET /strategies/*
│ │ ├── backtest.py # POST /backtest, GET /backtest/{id}
│ │ ├── data.py # GET /data/*
│ │ └── health.py # GET /health, GET /ready
│ ├── schemas/
│ │ ├── backtest.py # Pydantic models
│ │ └── strategy.py
│ └── middleware/
│ ├── logging.py # Request/response logging
│ └── security.py # Security headers
├── core/
│ ├── config.py # Settings management
│ ├── services/
│ │ ├── mlflow_client.py # MLflow integration
│ │ ├── data_client.py # Data Collection client
│ │ ├── sqs_client.py # SQS integration
│ │ └── dynamodb_client.py
│ └── circuit_breaker.py # Resilience pattern
└── Dockerfile
Key Endpoint: Trigger Backtest¶
# services/api_gateway/api/routes/backtest.py
from fastapi import APIRouter, HTTPException, Depends
import uuid
router = APIRouter(prefix="/backtest", tags=["backtest"])
@router.post("", response_model=BacktestResponse)
async def trigger_backtest(
request: BacktestRequest,
sqs: SQSClient = Depends(),
dynamo: DynamoDBClient = Depends()
):
"""
Trigger a new backtest workflow.
Returns immediately with run_id for status polling.
"""
run_id = str(uuid.uuid4())
# Register in DynamoDB
await dynamo.create_workflow(
run_id=run_id,
workflow_type="backtest",
status="QUEUED",
metadata={
"strategy_name": request.strategy_name,
"strategy_version": request.strategy_version,
"experiment_name": request.experiment_name
}
)
# Send to SQS (async processing)
await sqs.send_message(
queue_url=settings.BACKTEST_QUEUE_URL,
message={"run_id": run_id, **request.dict()},
message_group_id=request.strategy_name,
deduplication_id=run_id
)
return BacktestResponse(
run_id=run_id,
status="QUEUED",
status_url=f"/backtest/{run_id}/status"
)
@router.get("/{run_id}/status", response_model=BacktestStatus)
async def get_backtest_status(
run_id: str,
dynamo: DynamoDBClient = Depends()
):
"""Get backtest status and results."""
workflow = await dynamo.get_workflow(run_id)
if not workflow:
raise HTTPException(status_code=404, detail="Backtest not found")
response = BacktestStatus(
run_id=run_id,
status=workflow["status"],
created_at=workflow["created_at"],
updated_at=workflow["updated_at"]
)
if workflow["status"] == "COMPLETED":
response.result = workflow.get("result")
response.mlflow_url = f"{settings.MLFLOW_URL}/experiments/{run_id}"
return response
Health Check Endpoints¶
# services/api_gateway/api/routes/health.py
from fastapi import APIRouter, Response
import httpx
router = APIRouter(tags=["health"])
@router.get("/health")
async def health_check():
"""Basic liveness check."""
return {"status": "healthy", "service": "backend-api"}
@router.get("/ready")
async def readiness_check():
"""Readiness check - are dependencies available?"""
checks = {"mlflow": False, "data_collection": False, "dynamodb": False}
# Check MLflow
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(f"{settings.MLFLOW_URL}/health")
checks["mlflow"] = resp.status_code == 200
except:
pass
# Check Data Collection
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(f"{settings.DATA_COLLECTION_URL}/health")
checks["data_collection"] = resp.status_code == 200
except:
pass
# Check DynamoDB
try:
dynamo = DynamoDBClient()
await dynamo.health_check()
checks["dynamodb"] = True
except:
pass
all_healthy = all(checks.values())
return Response(
content=json.dumps({"status": "ready" if all_healthy else "not_ready", "checks": checks}),
status_code=200 if all_healthy else 503,
media_type="application/json"
)
2. Data Collection Service¶
Task Definition Specification¶
Family: tradai-data-collection
Cpu: 256 (0.25 vCPU)
Memory: 512 (512 MB)
Container: data-collection
Image: {account}.dkr.ecr.us-east-1.amazonaws.com/tradai-data-collection:latest
Port: 8002
Environment Variables:
- SERVICE_NAME: data-collection
- ARCTIC_S3_BUCKET: tradai-arcticdb
- ARCTIC_LIBRARY: futures
Health Check:
Command: curl -f http://localhost:8002/health || exit 1
Data Freshness Check¶
# services/data_collection/api/routes/data.py
from fastapi import APIRouter
from pydantic import BaseModel
from datetime import datetime
import arcticdb as adb
router = APIRouter(prefix="/data", tags=["data"])
class FreshnessRequest(BaseModel):
symbols: list[str]
timeframe: str
class FreshnessResponse(BaseModel):
results: dict[str, dict]
all_fresh: bool
@router.post("/freshness", response_model=FreshnessResponse)
async def check_data_freshness(request: FreshnessRequest):
"""
Check if data is fresh enough for backtesting.
Fresh = last update within 24 hours.
"""
arctic = adb.Arctic(f"s3://{settings.ARCTIC_S3_BUCKET}")
library = arctic.get_library(settings.ARCTIC_LIBRARY)
results = {}
all_fresh = True
for symbol in request.symbols:
symbol_key = f"{symbol}_{request.timeframe}"
try:
info = library.get_info(symbol_key)
last_update = info.get("last_update", datetime.min)
age_hours = (datetime.utcnow() - last_update).total_seconds() / 3600
is_fresh = age_hours < 24
results[symbol] = {
"last_update": last_update.isoformat(),
"age_hours": round(age_hours, 2),
"is_fresh": is_fresh,
"row_count": info.get("row_count", 0)
}
if not is_fresh:
all_fresh = False
except Exception as e:
results[symbol] = {"error": str(e), "is_fresh": False}
all_fresh = False
return FreshnessResponse(results=results, all_fresh=all_fresh)
3. MLflow Service¶
Task Definition Specification¶
Family: tradai-mlflow
Cpu: 512 (0.5 vCPU)
Memory: 1024 (1 GB)
Container: mlflow
Image: {account}.dkr.ecr.us-east-1.amazonaws.com/tradai-mlflow:latest
Port: 5000
Command:
mlflow server
--host=0.0.0.0
--port=5000
--backend-store-uri=postgresql://user:pass@host:5432/mlflow
--default-artifact-root=s3://tradai-results/mlflow-artifacts
Secrets (from Secrets Manager):
- DB_HOST, DB_USER, DB_PASSWORD
4. Strategy Service Task (On-Demand)¶
Task Definition Specification¶
Family: tradai-strategy-service
Cpu: 512 (0.5 vCPU)
Memory: 1024 (1 GB)
Container: strategy-service
Image: {account}.dkr.ecr.us-east-1.amazonaws.com/tradai-strategy-service:latest
# Command set by Step Functions via environment variables
Environment Variables:
- COMMAND: (set by Step Functions)
- INPUT_METHOD: s3
- INPUT_S3_PATH: (set by Step Functions)
- S3_CONFIG_BUCKET: tradai-configs
- MLFLOW_TRACKING_URI: http://mlflow.tradai.local:5000
Prepare Config Command¶
# services/strategy_service/commands/prepare_config.py
import boto3
import mlflow
from omegaconf import OmegaConf
import yaml
import os
def prepare_config(input_data: dict) -> dict:
"""
Prepare configuration for backtest.
1. Fetch strategy metadata from MLflow
2. Download base config from S3
3. Merge with overrides
4. Upload merged config to S3
"""
s3 = boto3.client("s3")
# Get strategy from MLflow
mlflow.set_tracking_uri(os.environ["MLFLOW_TRACKING_URI"])
client = mlflow.tracking.MlflowClient()
model_version = client.get_model_version(
name=input_data["strategy_name"],
version=input_data["strategy_version"]
)
ecr_url = model_version.tags.get("ecr_url")
base_config_path = model_version.tags.get("config_path")
# Download base config
bucket, key = base_config_path.replace("s3://", "").split("/", 1)
response = s3.get_object(Bucket=bucket, Key=key)
base_config = OmegaConf.create(yaml.safe_load(response["Body"]))
# Merge with overrides
overrides = OmegaConf.create(input_data.get("config_overrides", {}))
merged_config = OmegaConf.merge(base_config, overrides)
merged_config.run_id = input_data["run_id"]
merged_config.experiment_name = input_data["experiment_name"]
# Upload merged config
temp_key = f"temp/{input_data['run_id']}/config.yaml"
s3.put_object(
Bucket=os.environ["S3_CONFIG_BUCKET"],
Key=temp_key,
Body=OmegaConf.to_yaml(merged_config)
)
return {
"ecr_url": ecr_url,
"config_path": f"s3://{os.environ['S3_CONFIG_BUCKET']}/{temp_key}",
"strategy_name": input_data["strategy_name"],
"strategy_version": input_data["strategy_version"]
}
5. Strategy Container (Unified)¶
The strategy container is a unified image that handles all trading modes via the TRADING_MODE environment variable. Configuration is loaded from MLflow tags + S3 at runtime (see 11-LIVE-TRADING.md).
Trading Modes¶
| Mode | Launch Type | Spot OK? | Use Case |
|---|---|---|---|
backtest | ECS Task | Yes | Historical simulation |
hyperopt | ECS Task | Yes | Parameter optimization |
dry-run | ECS Service | No | Paper trading |
live | ECS Service | No | Production trading |
Task Definition Specification (Backtest Mode)¶
Family: tradai-strategy-container
Cpu: 1024 (1 vCPU)
Memory: 2048 (2 GB)
Container: strategy-container
# Image set dynamically per strategy by Step Functions
Environment Variables:
- TRADING_MODE: backtest
- STRATEGY_NAME: (from Step Functions)
- STRATEGY_STAGE: (from Step Functions)
- MLFLOW_TRACKING_URI: http://mlflow.tradai.local:5000
- ARCTICDB_S3_URI: s3://tradai-arcticdb
- DYNAMODB_TABLE: tradai-workflow-state
# Use Fargate Spot for 70% cost savings
Capacity Provider: FARGATE_SPOT (primary), FARGATE (fallback)
Service Definition Specification (Live Trading Mode - v9.1)¶
Family: tradai-strategy-{name}
Cpu: 512 (0.5 vCPU)
Memory: 1024 (1 GB)
Container: strategy-container
Image: {account}.dkr.ecr.us-east-1.amazonaws.com/tradai-{strategy}:{version}
Environment Variables (MINIMAL - config from MLflow):
- TRADING_MODE: live
- STRATEGY_NAME: PascalStrategy
- STRATEGY_STAGE: Production
- MLFLOW_TRACKING_URI: http://mlflow.tradai.local:5000
- ARCTICDB_S3_URI: s3://tradai-arcticdb
- DYNAMODB_TABLE: tradai-workflow-state
- S3_CONFIG_BUCKET: tradai-configs
Secrets (from Secrets Manager):
- EXCHANGE_API_KEY
- EXCHANGE_API_SECRET
Health Check:
Command: curl -f http://localhost:8080/health || exit 1
Interval: 30s
Timeout: 5s
Retries: 3
# NEVER use Spot for live trading
Capacity Provider: FARGATE only (NOT FARGATE_SPOT)
MLflow-Centric Config Loading (v9.1)¶
At container startup, the following sequence loads configuration:
- Bootstrap: Read
STRATEGY_NAMEandSTRATEGY_STAGEfrom env - Query MLflow:
MLflowAdapter.get_model_version(name, stage) - Extract tags: timeframe, pairs, warmup_days, configuration_file
- Load S3 config:
ConfigMergeService.load_config(config_s3_path) - Apply overrides: Runtime overrides from DynamoDB session (if resuming)
This design uses existing TradAI patterns: - StrategyMetadata.to_mlflow_tags() for registration - MLflowAdapter for MLflow REST API - ConfigMergeService for S3 config loading + OmegaConf merge
6. Backtest Execution Modes (v9.2)¶
The backtesting system supports 4 execution modes via a Protocol-based executor pattern. The mode is selected via the EXECUTOR_MODE environment variable.
6.1 Execution Mode Comparison¶
| Mode | Use Case | Latency | Complexity | Cost |
|---|---|---|---|---|
| local | Development/testing | ~5s | Low | Free |
| ecs | Simple production | ~30s | Medium | ECS only |
| sqs | High-volume production | ~45s | Medium-High | ECS+SQS+Lambda |
| stepfunctions | Complex workflows | ~60s | High | ECS+SFN+Lambda |
6.2 BacktestExecutor Protocol¶
# services/backend/src/tradai/backend/core/repositories.py
class BacktestExecutor(Protocol):
"""Protocol for backtest execution backends."""
def submit(self, job: BacktestJobMessage) -> str:
"""
Submit a backtest job for execution.
Args:
job: Backtest job message with config and metadata
Returns:
Execution identifier (task ARN, message ID, container ID, etc.)
Raises:
ExternalServiceError: If submission fails
"""
...
6.3 ECS Direct Executor (Mode: ecs)¶
Launches ECS tasks directly without queue buffering. Best for low-volume production.
# services/backend/src/tradai/backend/infrastructure/ecs.py
class ECSBacktestExecutor(LoggerMixin):
"""Launches strategy container as ECS Fargate task."""
def __init__(
self,
cluster: str,
task_definition_prefix: str,
subnets: list[str],
security_groups: list[str],
dynamodb_table: str,
mlflow_uri: str,
results_bucket: str,
ecs_client: Any = None,
) -> None:
super().__init__()
self._cluster = cluster
self._task_prefix = task_definition_prefix
self._subnets = subnets
self._security_groups = security_groups
self._table_name = dynamodb_table
self._mlflow_uri = mlflow_uri
self._results_bucket = results_bucket
self._ecs = ecs_client or boto3.client("ecs")
def submit(self, job: BacktestJobMessage) -> str:
"""Launch ECS task for backtest execution."""
task_def = f"{self._task_prefix}{job.config.strategy}"
response = self._ecs.run_task(
cluster=self._cluster,
taskDefinition=task_def,
launchType="FARGATE",
networkConfiguration={
"awsvpcConfiguration": {
"subnets": self._subnets,
"securityGroups": self._security_groups,
"assignPublicIp": "DISABLED",
}
},
overrides={
"containerOverrides": [{
"name": "strategy",
"command": self._build_command(job),
"environment": [
{"name": "RUN_ID", "value": job.job_id},
{"name": "EXPERIMENT_NAME", "value": job.experiment_name},
{"name": "DYNAMODB_TABLE", "value": self._table_name},
{"name": "MLFLOW_TRACKING_URI", "value": self._mlflow_uri},
{"name": "S3_RESULTS_BUCKET", "value": self._results_bucket},
],
}],
},
capacityProviderStrategy=[
{"capacityProvider": "FARGATE_SPOT", "weight": 1},
{"capacityProvider": "FARGATE", "weight": 0, "base": 1},
],
)
task_arn = response["tasks"][0]["taskArn"]
self.logger.info(f"Launched ECS task: {task_arn} for job {job.job_id}")
return task_arn
def _build_command(self, job: BacktestJobMessage) -> list[str]:
"""Build container entrypoint command."""
return [
"python", "/app/entrypoint.py",
"--strategy", job.config.strategy,
"--timeframe", job.config.timeframe,
"--timerange", f"{job.config.start_date}-{job.config.end_date}",
"--pairs", ",".join(job.config.symbols),
]
6.4 SQS Queue Executor (Mode: sqs)¶
Submits to SQS queue with Lambda trigger for backpressure handling.
# services/backend/src/tradai/backend/infrastructure/sqs.py (existing)
class SQSBacktestExecutor(LoggerMixin):
"""Submits backtest jobs to SQS queue for async processing."""
def __init__(self, sqs_adapter: SQSAdapter) -> None:
super().__init__()
self._adapter = sqs_adapter
def submit(self, job: BacktestJobMessage) -> str:
"""Submit job to SQS queue."""
self.logger.info(f"Submitting job {job.job_id} to SQS queue")
message_id = self._adapter.send_backtest_job(job)
return message_id
6.5 Local Docker Executor (Mode: local)¶
Runs strategy container locally via Docker SDK. For development only.
# services/backend/src/tradai/backend/infrastructure/local.py
class LocalBacktestExecutor(LoggerMixin):
"""Runs strategy container locally via Docker."""
def __init__(
self,
docker_client: Any = None,
data_dir: Path | None = None,
results_dir: Path | None = None,
mlflow_uri: str = "http://localhost:5000",
) -> None:
super().__init__()
self._docker = docker_client or docker.from_env()
self._data_dir = data_dir or Path.home() / ".tradai" / "data"
self._results_dir = results_dir or Path.home() / ".tradai" / "results"
self._mlflow_uri = mlflow_uri
def submit(self, job: BacktestJobMessage) -> str:
"""Run strategy container locally."""
self._results_dir.mkdir(parents=True, exist_ok=True)
container = self._docker.containers.run(
image=f"strategy-{job.config.strategy}:latest",
command=self._build_command(job),
environment={
"RUN_ID": job.job_id,
"MLFLOW_TRACKING_URI": self._mlflow_uri,
"EXPERIMENT_NAME": job.experiment_name,
},
volumes={
str(self._data_dir): {"bind": "/data", "mode": "ro"},
str(self._results_dir / job.job_id): {"bind": "/results", "mode": "rw"},
},
detach=True,
remove=True,
)
self.logger.info(f"Started local container: {container.id[:12]} for job {job.job_id}")
return container.id
6.6 Step Functions Executor (Mode: stepfunctions)¶
Starts Step Functions execution for complex multi-step workflows.
# services/backend/src/tradai/backend/infrastructure/stepfunctions.py
class StepFunctionsExecutor(LoggerMixin):
"""Executes backtest via Step Functions state machine."""
def __init__(
self,
state_machine_arn: str,
sfn_client: Any = None,
) -> None:
super().__init__()
self._state_machine_arn = state_machine_arn
self._sfn = sfn_client or boto3.client("stepfunctions")
def submit(self, job: BacktestJobMessage) -> str:
"""Start Step Functions execution."""
response = self._sfn.start_execution(
stateMachineArn=self._state_machine_arn,
name=f"backtest-{job.job_id}",
input=json.dumps({
"run_id": job.job_id,
"config": job.config.model_dump(),
"experiment_name": job.experiment_name,
}),
)
execution_arn = response["executionArn"]
self.logger.info(f"Started Step Functions execution: {execution_arn}")
return execution_arn
6.7 Executor Factory¶
# services/backend/src/tradai/backend/api/dependencies.py
def get_backtest_executor(
settings: BackendSettings = Depends(get_settings),
) -> BacktestExecutor | None:
"""Get executor based on EXECUTOR_MODE environment variable."""
mode = settings.executor_mode
if mode == "local":
import docker
return LocalBacktestExecutor(
docker_client=docker.from_env(),
mlflow_uri=settings.mlflow_tracking_uri,
)
elif mode == "ecs":
return ECSBacktestExecutor(
cluster=settings.ecs_cluster,
task_definition_prefix=settings.ecs_task_prefix,
subnets=settings.ecs_subnets,
security_groups=settings.ecs_security_groups,
dynamodb_table=settings.workflow_state_table,
mlflow_uri=settings.mlflow_tracking_uri,
results_bucket=settings.s3_results_bucket,
)
elif mode == "sqs":
sqs_adapter = get_sqs_adapter(settings)
return SQSBacktestExecutor(sqs_adapter) if sqs_adapter else None
elif mode == "stepfunctions":
return StepFunctionsExecutor(
state_machine_arn=settings.step_functions_arn,
)
else:
# Dev mode - no executor (jobs stay PENDING)
return None
6.8 Container Entrypoint Script¶
The strategy container handles its own lifecycle via an entrypoint script:
#!/usr/bin/env python3
"""Strategy container entrypoint - handles backtest execution and status updates."""
# strategies/base/entrypoint.py
import os
import sys
import subprocess
from pathlib import Path
import boto3
from freqtrade.data.btanalysis import load_backtest_stats
def wait_for_mlflow(uri: str, max_retries: int = 10, backoff_base: float = 2.0) -> bool:
"""Wait for MLflow to be healthy with exponential backoff.
CRITICAL: Prevents container failures when MLflow is not yet ready.
Args:
uri: MLflow tracking URI
max_retries: Maximum number of retry attempts (default: 10 = ~17 min total)
backoff_base: Base for exponential backoff in seconds
Returns:
True if MLflow is healthy, False if all retries exhausted
"""
import time
import requests
health_url = f"{uri.rstrip('/')}/health"
for attempt in range(max_retries):
try:
response = requests.get(health_url, timeout=5)
if response.status_code == 200:
print(f"MLflow healthy after {attempt + 1} attempt(s)")
return True
except requests.RequestException as e:
wait_time = backoff_base ** attempt
print(f"MLflow not ready (attempt {attempt + 1}/{max_retries}): {e}")
print(f"Retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
print(f"MLflow not healthy after {max_retries} attempts")
return False
def main() -> int:
"""Main entrypoint for strategy container."""
run_id = os.environ["RUN_ID"]
strategy = os.environ.get("STRATEGY", os.environ.get("FREQTRADE_STRATEGY"))
dynamodb_table = os.environ.get("DYNAMODB_TABLE")
mlflow_uri = os.environ.get("MLFLOW_TRACKING_URI")
experiment_name = os.environ.get("EXPERIMENT_NAME", f"backtests/{strategy}")
# Initialize DynamoDB (optional - graceful degradation for local mode)
dynamodb = None
if dynamodb_table:
dynamodb = boto3.resource("dynamodb").Table(dynamodb_table)
try:
# 0. CRITICAL: Wait for MLflow to be healthy (prevents race condition)
if mlflow_uri:
if not wait_for_mlflow(mlflow_uri):
raise RuntimeError(f"MLflow at {mlflow_uri} not available after retries")
# 1. Update status to RUNNING
if dynamodb:
dynamodb.update_item(
Key={"run_id": run_id},
UpdateExpression="SET #s = :s, updated_at = :t",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={":s": "RUNNING", ":t": datetime.utcnow().isoformat()},
)
# 2. Build and execute Freqtrade command
command = build_freqtrade_command()
print(f"Executing: {' '.join(command)}")
result = subprocess.run(command, check=True, capture_output=True, text=True)
print(result.stdout)
# 3. Parse results
results_dir = Path(os.environ.get("RESULTS_DIR", "/freqtrade/user_data/backtest_results"))
stats = load_backtest_stats(results_dir)
# 4. Log to MLflow
if mlflow_uri:
import mlflow
mlflow.set_tracking_uri(mlflow_uri)
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_id):
mlflow.log_metrics(extract_metrics(stats))
mlflow.log_params({"strategy": strategy, "run_id": run_id})
# 5. Upload results to S3
upload_results_to_s3(run_id, stats)
# 6. Update status to COMPLETED
if dynamodb:
dynamodb.update_item(
Key={"run_id": run_id},
UpdateExpression="SET #s = :s, #r = :r, updated_at = :t",
ExpressionAttributeNames={"#s": "status", "#r": "result"},
ExpressionAttributeValues={
":s": "COMPLETED",
":r": json.dumps(summarize_results(stats)),
":t": datetime.utcnow().isoformat(),
},
)
print(f"Backtest completed successfully: {run_id}")
return 0
except subprocess.CalledProcessError as e:
error_msg = f"Freqtrade execution failed: {e.stderr}"
handle_failure(dynamodb, run_id, error_msg)
return 1
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
handle_failure(dynamodb, run_id, error_msg)
return 1
def build_freqtrade_command() -> list[str]:
"""Build Freqtrade CLI command from environment variables."""
cmd = ["freqtrade", "backtesting"]
if strategy := os.environ.get("STRATEGY"):
cmd.extend(["--strategy", strategy])
if timeframe := os.environ.get("TIMEFRAME"):
cmd.extend(["--timeframe", timeframe])
if timerange := os.environ.get("TIMERANGE"):
cmd.extend(["--timerange", timerange])
if pairs := os.environ.get("PAIRS"):
cmd.extend(["--pairs", pairs])
if config := os.environ.get("CONFIG_PATH"):
cmd.extend(["--config", config])
cmd.extend(["--export", "trades"])
return cmd
def handle_failure(dynamodb, run_id: str, error_msg: str) -> None:
"""Update status to FAILED."""
print(error_msg, file=sys.stderr)
if dynamodb:
dynamodb.update_item(
Key={"run_id": run_id},
UpdateExpression="SET #s = :s, #e = :e, updated_at = :t",
ExpressionAttributeNames={"#s": "status", "#e": "error"},
ExpressionAttributeValues={
":s": "FAILED",
":e": error_msg[:1000],
":t": datetime.utcnow().isoformat(),
},
)
if __name__ == "__main__":
sys.exit(main())
6.9 When to Use Each Mode¶
Decision Tree:
Is this development/testing?
├── YES → Use LOCAL mode
└── NO → Is this a single simple backtest?
├── YES → Use ECS DIRECT mode
└── NO → Is ordering/reliability critical?
├── YES → Use SQS mode
└── NO → Is this a multi-step workflow?
├── YES → Use STEP FUNCTIONS mode
└── NO → Use ECS DIRECT mode
7. Lambda Functions¶
Lambda Inventory¶
| Function | Memory | Timeout | Trigger | Purpose |
|---|---|---|---|---|
| sqs-consumer | 256MB | 60s | SQS | Process backtest queue |
| data-collection-proxy | 256MB | 60s | Step Functions | Call Data Collection |
| validate-strategy | 256MB | 30s | Step Functions | Check ECR/S3 exists |
| transform-results | 512MB | 60s | Step Functions | Format results |
| cleanup-resources | 256MB | 30s | Step Functions | Delete temp files |
| notify-completion | 256MB | 30s | Step Functions | Send notifications |
| orphan-scanner | 128MB | 60s | EventBridge (15 min) | Detect stuck RUNNING jobs (v9.2) |
SQS Consumer Lambda¶
Supports both Step Functions and direct ECS launch via LAUNCH_MODE environment variable.
# lambda/sqs_consumer/index.py
import json
import os
import boto3
from datetime import datetime
ecs_client = boto3.client('ecs')
sf_client = boto3.client('stepfunctions')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['WORKFLOW_STATE_TABLE'])
# Configuration
LAUNCH_MODE = os.environ.get('LAUNCH_MODE', 'ecs') # 'ecs' or 'stepfunctions'
ECS_CLUSTER = os.environ.get('ECS_CLUSTER')
ECS_TASK_PREFIX = os.environ.get('ECS_TASK_PREFIX', 'strategy-')
ECS_SUBNETS = os.environ.get('ECS_SUBNETS', '').split(',')
ECS_SECURITY_GROUPS = os.environ.get('ECS_SECURITY_GROUPS', '').split(',')
STEP_FUNCTIONS_ARN = os.environ.get('BACKTEST_WORKFLOW_ARN')
def handler(event, context):
"""Process SQS messages and launch backtests."""
batch_item_failures = []
for record in event['Records']:
message_id = record['messageId']
try:
message = json.loads(record['body'])
run_id = message['run_id']
# Idempotency check
if is_already_running(run_id):
print(f"Backtest {run_id} already running, skipping")
continue
# Launch based on mode
if LAUNCH_MODE == 'stepfunctions':
execution_arn = launch_step_functions(message, run_id)
else: # Default: ECS direct
execution_arn = launch_ecs_task(message, run_id)
# Store execution ARN
store_execution_arn(run_id, execution_arn)
print(f"Launched backtest {run_id}: {execution_arn}")
except Exception as e:
print(f"Error processing {message_id}: {str(e)}")
batch_item_failures.append({"itemIdentifier": message_id})
return {"batchItemFailures": batch_item_failures}
def launch_ecs_task(message: dict, run_id: str) -> str:
"""Launch ECS task directly (simpler, lower latency)."""
strategy = message['config']['strategy']
task_def = f"{ECS_TASK_PREFIX}{strategy}"
response = ecs_client.run_task(
cluster=ECS_CLUSTER,
taskDefinition=task_def,
launchType='FARGATE',
networkConfiguration={
'awsvpcConfiguration': {
'subnets': ECS_SUBNETS,
'securityGroups': ECS_SECURITY_GROUPS,
'assignPublicIp': 'DISABLED',
}
},
overrides={
'containerOverrides': [{
'name': 'strategy',
'environment': [
{'name': 'RUN_ID', 'value': run_id},
{'name': 'STRATEGY', 'value': strategy},
{'name': 'TIMEFRAME', 'value': message['config']['timeframe']},
{'name': 'TIMERANGE', 'value': f"{message['config']['start_date']}-{message['config']['end_date']}"},
{'name': 'PAIRS', 'value': ','.join(message['config']['symbols'])},
{'name': 'EXPERIMENT_NAME', 'value': message.get('experiment_name', f'backtests/{strategy}')},
{'name': 'DYNAMODB_TABLE', 'value': os.environ['WORKFLOW_STATE_TABLE']},
{'name': 'MLFLOW_TRACKING_URI', 'value': os.environ.get('MLFLOW_TRACKING_URI', '')},
],
}],
},
capacityProviderStrategy=[
{'capacityProvider': 'FARGATE_SPOT', 'weight': 1},
{'capacityProvider': 'FARGATE', 'weight': 0, 'base': 1},
],
)
return response['tasks'][0]['taskArn']
def launch_step_functions(message: dict, run_id: str) -> str:
"""Launch Step Functions execution (for complex workflows)."""
response = sf_client.start_execution(
stateMachineArn=STEP_FUNCTIONS_ARN,
name=f"backtest-{run_id}",
input=json.dumps(message)
)
return response['executionArn']
def is_already_running(run_id: str) -> bool:
response = table.get_item(Key={'run_id': run_id})
return response.get('Item', {}).get('status') == 'RUNNING'
def store_execution_arn(run_id: str, execution_arn: str) -> None:
table.update_item(
Key={'run_id': run_id},
UpdateExpression='SET execution_arn = :arn, updated_at = :t',
ExpressionAttributeValues={
':arn': execution_arn,
':t': datetime.utcnow().isoformat(),
},
)
Data Collection Proxy Lambda¶
# lambda/data_collection_proxy/index.py
import json
import httpx
import os
DATA_COLLECTION_URL = os.environ.get(
'DATA_COLLECTION_URL',
'http://data-collection.tradai.local:8002'
)
def handler(event, context):
"""Proxy requests to Data Collection Service (VPC internal)."""
operation = event.get('operation')
with httpx.Client(timeout=30.0) as client:
if operation == 'check-freshness':
response = client.post(
f"{DATA_COLLECTION_URL}/data/freshness",
json={"symbols": event['symbols'], "timeframe": event['timeframe']}
)
elif operation == 'validate':
response = client.post(
f"{DATA_COLLECTION_URL}/data/validate",
json={"symbols": event['symbols']}
)
else:
raise ValueError(f"Unknown operation: {operation}")
response.raise_for_status()
return response.json()
7. Circuit Breaker Implementation¶
# libs/common/src/circuit_breaker.py
from enum import Enum
from datetime import datetime
from typing import Callable
import functools
class CircuitState(Enum):
CLOSED = "closed" # Normal
OPEN = "open" # Failing
HALF_OPEN = "half_open" # Testing
class CircuitBreakerOpenError(Exception):
pass
class CircuitBreaker:
"""
Circuit breaker for resilient service calls.
Usage:
cb = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
@cb
async def call_external_service():
...
"""
def __init__(self, failure_threshold=5, recovery_timeout=30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
self.failures = 0
self.last_failure_time = None
def __call__(self, func: Callable):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
self._check_state()
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
return wrapper
def _check_state(self):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError("Circuit is open")
def _should_attempt_reset(self) -> bool:
if not self.last_failure_time:
return True
elapsed = (datetime.utcnow() - self.last_failure_time).total_seconds()
return elapsed >= self.recovery_timeout
def _on_success(self):
self.failures = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failures += 1
self.last_failure_time = datetime.utcnow()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
8. Service Cost Summary¶
| Service | Type | Monthly Cost |
|---|---|---|
| Backend API | Long-running | $14.60 |
| Data Collection | Long-running | $7.30 |
| MLflow | Long-running | $14.60 |
| Strategy Tasks (Spot) | On-demand | $1.92 |
| Lambda Functions | Serverless | $0.00 |
| TOTAL | $38.42 |
9. Common Service Package Structure¶
All TradAI services follow a standardized package structure to ensure consistency, maintainability, and ease of local development.
9.1 Standard Service Layout¶
libs/tradai-{service}/
├── src/tradai/{service}/
│ ├── __init__.py # Package exports
│ ├── __main__.py # CLI entry point: python -m tradai.{service}
│ ├── app.py # FastAPI application factory
│ ├── cli.py # Typer CLI commands
│ ├── config.py # Pydantic settings (extends tradai.common.Settings)
│ │
│ ├── api/ # FastAPI routes (presentation layer)
│ │ ├── __init__.py
│ │ ├── routes.py # Route definitions
│ │ ├── deps.py # Dependency injection
│ │ └── schemas.py # Request/Response Pydantic models
│ │
│ ├── core/ # Business logic (domain layer)
│ │ ├── __init__.py
│ │ ├── entities.py # Domain entities
│ │ ├── services.py # Business logic services
│ │ └── repositories.py # Repository interfaces
│ │
│ └── infrastructure/ # External integrations (infrastructure layer)
│ ├── __init__.py
│ ├── repositories/ # Repository implementations
│ │ └── {adapter}.py
│ └── adapters/ # External service adapters
│ └── {adapter}.py
│
├── tests/
│ ├── __init__.py
│ ├── conftest.py # Pytest fixtures
│ ├── test_api/ # API tests
│ ├── test_core/ # Unit tests
│ └── test_infrastructure/ # Integration tests
│
├── Dockerfile # Service container
├── pyproject.toml # Dependencies and config
└── README.md # Service documentation
9.2 Base Classes from tradai-common¶
All services inherit from common base classes:
# libs/tradai-common/src/tradai/common/
# Base service class
class BaseService(LoggerMixin):
"""Base class with logging and Hydra config support."""
settings_class: type[Settings]
@classmethod
def from_hydra_cfg(cls, cfg: DictConfig, config_name: str) -> Self:
"""Create service from Hydra configuration."""
# Base settings class
class Settings(BaseSettings):
"""Pydantic settings with Hydra integration."""
@classmethod
def from_hydra_cfg(cls, cfg: DictConfig, config_name: str) -> Self:
"""Load settings from Hydra config."""
# Logger mixin
class LoggerMixin:
"""Provides structured JSON logging."""
@property
def logger(self) -> logging.Logger:
"""Get configured logger for this class."""
9.3 Service Template (cookiecutter-tradai-service)¶
New services can be scaffolded using the cookiecutter template:
# Create new service
uv run cookiecutter cookiecutter-tradai-service/
# Prompts:
# service_name: MyService
# service_slug: my_service
# port: 8004
# description: My new TradAI service
# with_database: [yes/no]
# with_sqs: [yes/no]
Template Structure:
cookiecutter-tradai-service/
├── cookiecutter.json
├── hooks/
│ ├── pre_gen_project.py # Validation
│ └── post_gen_project.py # Post-generation setup
└── {{cookiecutter.service_slug}}/
├── src/tradai/{{cookiecutter.service_slug}}/
│ ├── __init__.py
│ ├── __main__.py
│ ├── app.py
│ ├── cli.py
│ ├── config.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── routes.py
│ │ ├── deps.py
│ │ └── schemas.py
│ ├── core/
│ │ ├── __init__.py
│ │ ├── entities.py
│ │ ├── services.py
│ │ └── repositories.py
│ └── infrastructure/
│ └── __init__.py
├── tests/
│ ├── conftest.py
│ └── test_api.py
├── Dockerfile
├── pyproject.toml
└── README.md
cookiecutter.json:
{
"service_name": "MyService",
"service_slug": "{{ cookiecutter.service_name | lower | replace(' ', '_') }}",
"service_class": "{{ cookiecutter.service_name | replace(' ', '') }}",
"port": "8004",
"description": "A TradAI service",
"author": "TradAI Team",
"with_database": ["no", "yes"],
"with_sqs": ["no", "yes"],
"with_s3": ["yes", "no"]
}
10. CLI Support¶
All services support both CLI and programmatic usage, enabling local development and testing.
10.1 CLI Entry Point Pattern¶
Each service implements a standardized CLI using Typer:
# libs/tradai-{service}/src/tradai/{service}/__main__.py
"""Entry point for: python -m tradai.{service}"""
from tradai.{service}.cli import app
if __name__ == "__main__":
app()
# libs/tradai-{service}/src/tradai/{service}/cli.py
"""CLI commands for {service}."""
import typer
import uvicorn
from typing import Optional
from tradai.{service}.config import Settings
from tradai.{service}.app import create_app
app = typer.Typer(
name="tradai-{service}",
help="TradAI {Service} - {description}",
add_completion=False,
)
@app.command()
def serve(
host: str = typer.Option("0.0.0.0", "--host", "-h", help="Bind host"),
port: int = typer.Option(8000, "--port", "-p", help="Bind port"),
reload: bool = typer.Option(False, "--reload", "-r", help="Enable hot reload"),
workers: int = typer.Option(1, "--workers", "-w", help="Number of workers"),
log_level: str = typer.Option("info", "--log-level", "-l", help="Log level"),
):
"""Start the service HTTP server."""
typer.echo(f"Starting {service} on {host}:{port}")
uvicorn.run(
"tradai.{service}.app:create_app",
factory=True,
host=host,
port=port,
reload=reload,
workers=workers,
log_level=log_level,
)
@app.command()
def health():
"""Check service health (without starting server)."""
from tradai.{service}.core.services import HealthService
settings = Settings()
health_service = HealthService(settings)
result = health_service.check()
if result.healthy:
typer.echo("✓ Service is healthy")
raise typer.Exit(0)
else:
typer.echo(f"✗ Service unhealthy: {result.message}")
raise typer.Exit(1)
@app.command()
def config():
"""Show current configuration."""
settings = Settings()
typer.echo(settings.model_dump_json(indent=2))
@app.command()
def version():
"""Show version information."""
from tradai.{service} import __version__
typer.echo(f"tradai-{service} {__version__}")
if __name__ == "__main__":
app()
10.2 CLI Usage Examples¶
# Run service via Python module
python -m tradai.data serve --port 8002
python -m tradai.backend serve --port 8000 --reload
# Run via installed CLI (after pip install)
tradai-data serve --port 8002
tradai-backend serve --port 8000 --reload
# Other CLI commands
python -m tradai.data health # Check health
python -m tradai.data config # Show config
python -m tradai.data version # Show version
python -m tradai.data --help # Show help
# With environment variables
LOG_LEVEL=debug python -m tradai.data serve
ARCTIC_S3_BUCKET=my-bucket python -m tradai.data serve
10.3 pyproject.toml CLI Scripts¶
# libs/tradai-{service}/pyproject.toml
[project.scripts]
tradai-{service} = "tradai.{service}.cli:app"
# Example for tradai-data:
# tradai-data = "tradai.data.cli:app"
10.4 Root-Level Unified CLI¶
A root-level tradai CLI provides unified access to all services:
# src/tradai_cli/__main__.py
"""Unified TradAI CLI."""
import typer
app = typer.Typer(
name="tradai",
help="TradAI Platform CLI",
add_completion=False,
)
# Import and register sub-apps
from tradai.data.cli import app as data_app
from tradai.backend.cli import app as backend_app
app.add_typer(data_app, name="data", help="Data Collection service")
app.add_typer(backend_app, name="backend", help="Backend API service")
@app.command()
def up(
services: list[str] = typer.Argument(None, help="Services to start (default: all)"),
detach: bool = typer.Option(False, "--detach", "-d", help="Run in background"),
):
"""Start services using Docker Compose."""
import subprocess
cmd = ["docker", "compose", "up"]
if detach:
cmd.append("-d")
if services:
cmd.extend(services)
subprocess.run(cmd)
@app.command()
def down():
"""Stop all services."""
import subprocess
subprocess.run(["docker", "compose", "down"])
@app.command()
def logs(
service: str = typer.Argument(..., help="Service name"),
follow: bool = typer.Option(False, "--follow", "-f", help="Follow logs"),
):
"""View service logs."""
import subprocess
cmd = ["docker", "compose", "logs"]
if follow:
cmd.append("-f")
cmd.append(service)
subprocess.run(cmd)
if __name__ == "__main__":
app()
Usage:
# Unified CLI
tradai up # Start all services
tradai up backend data # Start specific services
tradai down # Stop all services
tradai logs backend -f # Follow backend logs
# Service-specific commands via unified CLI
tradai data serve --port 8002
tradai backend health
tradai data config
11. Docker Support¶
11.1 Service Dockerfile Template¶
Each service uses a standardized multi-stage Dockerfile:
# libs/tradai-{service}/Dockerfile
# ============================================
# Stage 1: Builder
# ============================================
FROM python:3.11-slim as builder
WORKDIR /app
# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install uv for fast dependency resolution
RUN curl -LsSf https://astral.sh/uv/install.sh | sh
ENV PATH="/root/.cargo/bin:$PATH"
# Copy dependency files
COPY pyproject.toml .
COPY libs/tradai-common/pyproject.toml libs/tradai-common/
COPY libs/tradai-{service}/pyproject.toml libs/tradai-{service}/
# Install dependencies
RUN uv pip install --system --no-cache \
-e libs/tradai-common \
-e libs/tradai-{service}
# ============================================
# Stage 2: Runtime
# ============================================
FROM python:3.11-slim as runtime
WORKDIR /app
# Install runtime dependencies only
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy installed packages from builder
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
# Copy application code
COPY libs/tradai-common/src libs/tradai-common/src
COPY libs/tradai-{service}/src libs/tradai-{service}/src
# Set Python path
ENV PYTHONPATH="/app/libs/tradai-common/src:/app/libs/tradai-{service}/src"
ENV PYTHONUNBUFFERED=1
# Health check
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD curl -f http://localhost:${PORT:-8000}/health || exit 1
# Default port (override via environment)
ENV PORT=8000
EXPOSE ${PORT}
# Run the service
ENTRYPOINT ["python", "-m", "tradai.{service}"]
CMD ["serve", "--host", "0.0.0.0"]
11.2 Docker Compose for Local Development¶
# docker-compose.yaml (project root)
version: "3.9"
services:
# ===========================================
# Backend API Service
# ===========================================
backend:
build:
context: .
dockerfile: libs/tradai-backend/Dockerfile
container_name: tradai-backend
ports:
- "8000:8000"
environment:
- PORT=8000
- LOG_LEVEL=debug
- DATA_COLLECTION_URL=http://data:8002
- MLFLOW_TRACKING_URI=http://mlflow:5000
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-}
- AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION:-us-east-1}
volumes:
- ./libs/tradai-common/src:/app/libs/tradai-common/src:ro
- ./libs/tradai-backend/src:/app/libs/tradai-backend/src:ro
depends_on:
- data
- mlflow
networks:
- tradai-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 5s
retries: 3
# ===========================================
# Data Collection Service
# ===========================================
data:
build:
context: .
dockerfile: libs/tradai-data/Dockerfile
container_name: tradai-data
ports:
- "8002:8002"
environment:
- PORT=8002
- LOG_LEVEL=debug
- ARCTIC_S3_BUCKET=${ARCTIC_S3_BUCKET:-tradai-arcticdb-dev}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-}
- AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION:-us-east-1}
volumes:
- ./libs/tradai-common/src:/app/libs/tradai-common/src:ro
- ./libs/tradai-data/src:/app/libs/tradai-data/src:ro
networks:
- tradai-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8002/health"]
interval: 30s
timeout: 5s
retries: 3
# ===========================================
# MLflow Tracking Server
# ===========================================
mlflow:
image: ghcr.io/mlflow/mlflow:v2.18.0
container_name: tradai-mlflow
ports:
- "5000:5000"
environment:
- MLFLOW_BACKEND_STORE_URI=sqlite:///mlflow/mlflow.db
- MLFLOW_DEFAULT_ARTIFACT_ROOT=/mlflow/artifacts
volumes:
- mlflow-data:/mlflow
command: >
mlflow server
--host 0.0.0.0
--port 5000
--backend-store-uri sqlite:///mlflow/mlflow.db
--default-artifact-root /mlflow/artifacts
networks:
- tradai-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
interval: 30s
timeout: 5s
retries: 3
# ===========================================
# PostgreSQL (for local MLflow/testing)
# ===========================================
postgres:
image: postgres:15-alpine
container_name: tradai-postgres
ports:
- "5432:5432"
environment:
- POSTGRES_USER=tradai
- POSTGRES_PASSWORD=tradai_dev
- POSTGRES_DB=tradai
volumes:
- postgres-data:/var/lib/postgresql/data
networks:
- tradai-network
healthcheck:
test: ["CMD-SHELL", "pg_isready -U tradai"]
interval: 10s
timeout: 5s
retries: 5
profiles:
- full # Only start with: docker compose --profile full up
# ===========================================
# Redis (for caching - optional)
# ===========================================
redis:
image: redis:7-alpine
container_name: tradai-redis
ports:
- "6379:6379"
volumes:
- redis-data:/data
networks:
- tradai-network
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
profiles:
- full # Only start with: docker compose --profile full up
networks:
tradai-network:
driver: bridge
name: tradai-network
volumes:
mlflow-data:
postgres-data:
redis-data:
11.3 Docker Compose Override for Development¶
# docker-compose.override.yaml (auto-loaded in development)
version: "3.9"
services:
backend:
build:
target: builder # Use builder stage with all dev dependencies
environment:
- LOG_LEVEL=debug
- RELOAD=true
command: ["serve", "--host", "0.0.0.0", "--reload"]
volumes:
# Mount source for hot reload
- ./libs/tradai-common/src:/app/libs/tradai-common/src
- ./libs/tradai-backend/src:/app/libs/tradai-backend/src
data:
build:
target: builder
environment:
- LOG_LEVEL=debug
- RELOAD=true
command: ["serve", "--host", "0.0.0.0", "--reload"]
volumes:
- ./libs/tradai-common/src:/app/libs/tradai-common/src
- ./libs/tradai-data/src:/app/libs/tradai-data/src
11.4 Local Development Commands¶
# ===========================================
# Docker Compose Commands
# ===========================================
# Start all services (with hot reload in dev)
docker compose up
# Start specific services
docker compose up backend data
# Start in background
docker compose up -d
# Start with full stack (postgres, redis)
docker compose --profile full up
# Stop all services
docker compose down
# Rebuild and start
docker compose up --build
# View logs
docker compose logs -f backend
docker compose logs -f data
# Execute command in running container
docker compose exec backend python -m tradai.backend health
# ===========================================
# Without Docker (CLI directly)
# ===========================================
# Install dependencies
uv sync
# Run services directly
uv run python -m tradai.data serve --port 8002
uv run python -m tradai.backend serve --port 8000 --reload
# Run with specific config
LOG_LEVEL=debug uv run python -m tradai.data serve
# Run tests
uv run pytest libs/tradai-data/tests/
uv run pytest libs/tradai-backend/tests/
# ===========================================
# Mixed Mode (some Docker, some local)
# ===========================================
# Start only MLflow in Docker
docker compose up mlflow -d
# Run backend locally pointing to Docker MLflow
MLFLOW_TRACKING_URI=http://localhost:5000 uv run python -m tradai.backend serve
12. Application Factory Pattern¶
12.1 FastAPI App Factory¶
Each service uses an application factory for flexible configuration:
# libs/tradai-{service}/src/tradai/{service}/app.py
"""FastAPI application factory."""
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from tradai.{service}.config import Settings
from tradai.{service}.api.routes import router
from tradai.{service}.api.deps import get_settings
def create_app(settings: Settings | None = None) -> FastAPI:
"""
Create and configure FastAPI application.
Args:
settings: Optional settings override (for testing)
Returns:
Configured FastAPI application
"""
if settings is None:
settings = Settings()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
# Startup
app.state.settings = settings
yield
# Shutdown
# Cleanup resources here
app = FastAPI(
title=f"TradAI {settings.service_name}",
description=settings.description,
version=settings.version,
lifespan=lifespan,
docs_url="/docs" if settings.debug else None,
redoc_url="/redoc" if settings.debug else None,
)
# Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Routes
app.include_router(router)
# Dependency override for settings
app.dependency_overrides[get_settings] = lambda: settings
return app
12.2 Service Configuration¶
# libs/tradai-{service}/src/tradai/{service}/config.py
"""Service configuration."""
from typing import Literal
from pydantic import Field
from tradai.common import Settings as BaseSettings
class Settings(BaseSettings):
"""Configuration for {service}."""
# Service identity
service_name: str = "{service}"
description: str = "TradAI {Service}"
version: str = "0.1.0"
# Server settings
host: str = Field(default="0.0.0.0", env="HOST")
port: int = Field(default=8000, env="PORT")
debug: bool = Field(default=False, env="DEBUG")
log_level: Literal["debug", "info", "warning", "error"] = Field(
default="info", env="LOG_LEVEL"
)
# CORS
cors_origins: list[str] = Field(
default=["http://localhost:3000", "http://localhost:8000"],
env="CORS_ORIGINS",
)
# Service-specific settings (override in subclass)
# ...
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
13. Service Cost Summary¶
| Service | Type | Monthly Cost |
|---|---|---|
| Backend API | Long-running | $14.60 |
| Data Collection | Long-running | $7.30 |
| MLflow | Long-running | $14.60 |
| Strategy Tasks (Spot) | On-demand | $1.92 |
| Lambda Functions | Serverless | $0.00 |
| TOTAL | $38.42 |
14. Orphan Job Detection (v9.2)¶
CRITICAL: Jobs can become orphaned (stuck in RUNNING) if the container crashes before updating status. This section defines the detection and recovery mechanism.
14.1 Problem Statement¶
Container Crash Scenarios:
├── OOM Kill (memory exhausted)
├── Spot Instance interruption (2-min warning, but may not complete)
├── Network partition (can't reach DynamoDB)
├── Unhandled exception before finally block
└── ECS task killed (deployment, scaling)
Result: Job stays RUNNING in DynamoDB forever
14.2 CloudWatch Alarm for Orphan Detection¶
# Pulumi/CloudFormation: Orphan Job Detection Alarm
OrphanJobAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: tradai-orphan-jobs
AlarmDescription: "Jobs stuck in RUNNING state for >2x expected duration"
MetricName: OrphanedJobs
Namespace: TradAI/Backtests
Statistic: Sum
Period: 300 # 5 minutes
EvaluationPeriods: 2
Threshold: 1
ComparisonOperator: GreaterThanOrEqualToThreshold
TreatMissingData: notBreaching
AlarmActions:
- !Ref AlertTopic
14.3 Lambda: Orphan Job Scanner¶
Runs every 15 minutes via EventBridge, scans for jobs RUNNING > threshold.
# lambda/orphan_scanner/index.py
import os
import boto3
from datetime import datetime, timedelta
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['WORKFLOW_STATE_TABLE'])
cloudwatch = boto3.client('cloudwatch')
sns = boto3.client('sns')
# Configuration
MAX_RUNNING_MINUTES = int(os.environ.get('MAX_RUNNING_MINUTES', '90')) # 1.5 hours
ALERT_TOPIC_ARN = os.environ.get('ALERT_TOPIC_ARN')
def handler(event, context):
"""Scan for orphaned jobs and emit metrics/alerts."""
threshold = datetime.utcnow() - timedelta(minutes=MAX_RUNNING_MINUTES)
threshold_iso = threshold.isoformat()
# Query for RUNNING jobs older than threshold
response = table.query(
IndexName='status-created_at-index',
KeyConditionExpression='#s = :status AND created_at < :threshold',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={
':status': 'RUNNING',
':threshold': threshold_iso,
},
)
orphaned_jobs = response.get('Items', [])
orphan_count = len(orphaned_jobs)
# Emit CloudWatch metric
cloudwatch.put_metric_data(
Namespace='TradAI/Backtests',
MetricData=[{
'MetricName': 'OrphanedJobs',
'Value': orphan_count,
'Unit': 'Count',
'Dimensions': [
{'Name': 'Environment', 'Value': os.environ.get('ENVIRONMENT', 'dev')},
],
}],
)
# Mark orphaned jobs as FAILED and alert
for job in orphaned_jobs:
run_id = job['run_id']
running_since = job.get('updated_at', job.get('created_at'))
# Update status to FAILED with orphan reason
table.update_item(
Key={'run_id': run_id},
UpdateExpression='SET #s = :s, #e = :e, updated_at = :t',
ExpressionAttributeNames={'#s': 'status', '#e': 'error'},
ExpressionAttributeValues={
':s': 'FAILED',
':e': f'Orphaned: RUNNING since {running_since}, exceeded {MAX_RUNNING_MINUTES}min threshold',
':t': datetime.utcnow().isoformat(),
},
)
print(f"Marked orphaned job as FAILED: {run_id}")
# Send alert if orphans found
if orphan_count > 0 and ALERT_TOPIC_ARN:
sns.publish(
TopicArn=ALERT_TOPIC_ARN,
Subject=f'[TradAI] {orphan_count} Orphaned Jobs Detected',
Message=f'''
Orphan Job Scanner detected {orphan_count} job(s) stuck in RUNNING state.
Jobs have been automatically marked as FAILED.
Job IDs:
{chr(10).join(f"- {job['run_id']}" for job in orphaned_jobs)}
Threshold: {MAX_RUNNING_MINUTES} minutes
Scan Time: {datetime.utcnow().isoformat()}
Action Required: Investigate why containers are not completing status updates.
Possible causes:
- Container OOM kills
- Spot instance interruptions
- Network issues preventing DynamoDB access
- Unhandled exceptions in entrypoint
''',
)
return {
'orphan_count': orphan_count,
'jobs_marked_failed': [job['run_id'] for job in orphaned_jobs],
}
14.4 EventBridge Rule¶
OrphanScannerSchedule:
Type: AWS::Events::Rule
Properties:
Name: tradai-orphan-scanner-schedule
Description: "Trigger orphan job scanner every 15 minutes"
ScheduleExpression: "rate(15 minutes)"
State: ENABLED
Targets:
- Id: OrphanScannerLambda
Arn: !GetAtt OrphanScannerLambda.Arn
14.5 Lambda Configuration¶
| Parameter | Value |
|---|---|
| Function Name | tradai-orphan-scanner |
| Runtime | Python 3.11 |
| Memory | 128 MB |
| Timeout | 60s |
| VPC | Yes (needs DynamoDB access) |
| Trigger | EventBridge (every 15 min) |
| Cost | ~$0.05/month |
14.6 DynamoDB GSI Requirement¶
The orphan scanner requires a Global Secondary Index for efficient querying:
# Already defined in 10-CANONICAL-CONFIG.md
GlobalSecondaryIndex:
IndexName: status-created_at-index
KeySchema:
- AttributeName: status
KeyType: HASH
- AttributeName: created_at
KeyType: RANGE
Projection:
ProjectionType: ALL
Next Steps¶
- Review 06-STEP-FUNCTIONS.md for workflow orchestration
- Review 09-PULUMI-CODE.md for infrastructure code
- Use
cookiecutter-tradai-serviceto scaffold new services - Run locally with
docker compose uporpython -m tradai.{service} serve