Skip to content

TradAI Observability Architecture

Version: 1.0.0 | Date: 2026-03-28 | Status: CURRENT


1. TL;DR

TradAI observability is built on four pillars: structured logging via LoggerMixin, metrics collection from Freqtrade into DynamoDB and CloudWatch, health monitoring through a protocol-based checker framework with async concurrent execution, and risk monitoring via a pure-evaluator RiskMonitor that triggers circuit-breaker actions. End-to-end traceability is achieved through a trace_id correlation field that flows from job submission through Step Functions, ECS tasks, DynamoDB records, and MLflow experiment tags. CloudWatch provides the unified alerting and dashboarding layer with custom metric namespaces, composite alarms, and a pre-built operations dashboard.


2. Logging Architecture

LoggerMixin Pattern

All TradAI classes use LoggerMixin for structured logging via cooperative multiple inheritance. The mixin auto-initializes a logger named after the fully qualified class (module.ClassName).

Source: libs/tradai-common/src/tradai/common/logger_mixin.py

class LoggerMixin:
    LOGGER_FORMAT: str = "[%(asctime)s][%(name)s][%(levelname)s] - %(message)s"

Key behaviors:

  • Logger is auto-initialized in __init__ with fully qualified class name
  • Lazy re-initialization on first self.logger access (handles unpickling)
  • Pickle-safe: logger is excluded from __getstate__ and re-created in __setstate__
  • Hydra integration via get_hydra_logfile_dir_and_path() for file-based logging

Log Level Convention

  • DEBUG: Internal diagnostics (heartbeat counts, metric parse details)
  • INFO: Key operations (status updates, MLflow logging, strategy start/stop)
  • WARNING: Recoverable failures (heartbeat fails, metric collection fails)
  • ERROR: Unrecoverable failures (health check exceptions, critical breaches)
  • Never log secrets or PII.

CloudWatch Log Groups

All log groups use the canonical retention policy from infra/shared/tradai_infra_shared/config.py:

LOG_RETENTION_DAYS = 30 if ENVIRONMENT != "prod" else 90
Log Group Pattern Source Retention
/ecs/tradai/{env} ECS cluster (shared) 30d / 90d prod
/ecs/tradai/{env}/services ECS service-level logs 30d / 90d prod
/aws/lambda/tradai-{func}-{env} Lambda functions (18 handlers) 30d / 90d prod
/aws/states/tradai-backtest-workflow-{env} Step Functions execution logs 30d / 90d prod
aws-waf-logs-tradai-{env} WAF request logs 30d / 90d prod
/aws/cloudtrail/tradai CloudTrail audit logs 30d / 90d prod
/tradai/consolidated/user-data EC2 consolidated bootstrap 7d
/tradai/consolidated/containers EC2 Docker container logs 7d

3. Metrics Collection

The metrics pipeline collects real-time PnL and trade data from Freqtrade during live trading, persists snapshots to DynamoDB, and publishes derived metrics to CloudWatch.

Source files:

  • libs/tradai-common/src/tradai/common/health/metrics_collector.py
  • libs/tradai-common/src/tradai/common/health/freqtrade_client.py
  • libs/tradai-common/src/tradai/common/health/reporter.py

Collection Flow

sequenceDiagram
    participant HR as HealthReporter (30s)
    participant MC as MetricsCollector
    participant FT as Freqtrade API
    participant DB as DynamoDB

    HR->>MC: collect_all()
    MC->>FT: GET /api/v1/profit
    FT-->>MC: profit_data (ratios)
    MC->>FT: GET /api/v1/status
    FT-->>MC: open_trades[]
    MC-->>HR: (StrategyPnL, list[LiveTrade])
    HR->>DB: update_metrics(strategy_id, pnl, trades)
    HR->>HR: _check_risk(pnl, trades)

MetricsCollector

MetricsCollector transforms Freqtrade API responses into typed entities:

  • GET /api/v1/profit produces StrategyPnL with: realized_pnl_pct, drawdown_pct, open_trades, closed_trades, timestamp
  • GET /api/v1/status produces list[LiveTrade] with: pair, is_open, is_short, open_rate, stake_amount, profit_pct, leverage
  • Unrealized PnL is computed as stake-weighted average of open trade profits

Ratio to Percentage Conversion

Freqtrade returns ratios (0.0-1.0). MetricsCollector multiplies by 100 for all *_pct fields. This conversion happens exactly once at the collection boundary.

FreqtradeAPIClient

The FreqtradeAPIClient wraps the Freqtrade REST API (localhost:8080) with JWT auth. Provides get_profit(), get_open_trades(), get_balance(), pause(), and resume(). All calls use 5-second timeouts and are best-effort (failures logged, never raised).


4. Health Monitoring

The health monitoring framework uses a protocol-based architecture with async concurrent execution and three-state health status (HEALTHY, DEGRADED, UNHEALTHY).

Module Files

File Purpose
health/__init__.py Module entry point with lazy loading
health/protocols.py HealthChecker protocol (@runtime_checkable)
health/base.py BaseHealthChecker with latency measurement and error handling
health/aws_checkers.py AWSHealthChecker base with lazy boto3 client management
health/checkers.py Pre-built checkers: Redis, HTTP, Database, DynamoDB, SQS, S3, MLflow, SNS
health/service.py HealthService aggregator with async concurrent checks
health/reporter.py HealthReporter background heartbeat thread
health/metrics_collector.py MetricsCollector for Freqtrade PnL/trade data
health/risk_monitor.py RiskMonitor pure evaluator for risk limits
health/freqtrade_client.py FreqtradeAPIClient for Freqtrade REST API

Architecture Flowchart

flowchart TB
    subgraph "HTTP Health Endpoint"
        HS[HealthService<br/>async concurrent]
        HS --> RC[RedisHealthChecker]
        HS --> HC[HTTPHealthChecker]
        HS --> DC[DatabaseHealthChecker]
        HS --> DDB[DynamoDBHealthChecker]
        HS --> SQC[SQSHealthChecker]
        HS --> S3C[S3HealthChecker]
        HS --> MLC[MLflowHealthChecker<br/>critical=False]
        HS --> SNSC[SNSHealthChecker]
    end

    subgraph "Background Heartbeat"
        HR[HealthReporter<br/>30s interval thread]
        HR -->|collect_all| MC[MetricsCollector]
        HR -->|record_heartbeat| DY[(DynamoDB<br/>trading-state)]
        HR -->|evaluate| RM[RiskMonitor]
        HR -->|pause/resume| FT[FreqtradeAPIClient]
        HR -->|send_critical| AS[AlertService<br/>SNS]
    end

    subgraph "Aggregated Result"
        HS -->|HealthResult| HR2{Status}
        HR2 -->|all critical OK| HEALTHY[HEALTHY]
        HR2 -->|non-critical failing| DEGRADED[DEGRADED]
        HR2 -->|critical failing| UNHEALTHY[UNHEALTHY]
    end

HealthService

HealthService runs all registered HealthChecker instances via asyncio.gather (timeout: 5s). Status: HEALTHY (all OK), DEGRADED (non-critical failing), UNHEALTHY (critical failing).

HealthReporter

Background non-daemon thread running a 30-second heartbeat loop: (1) collect metrics from Freqtrade, (2) send heartbeat + PnL snapshots to DynamoDB, (3) check risk via RiskMonitor, (4) check pause/resume state from DynamoDB for operator control.

Alerting on Heartbeat Failures

After 3 consecutive failures (configurable), an SNS alert fires via AlertService. One alert per failure run to avoid storms. Resets on next successful heartbeat.

Pre-built Health Checkers

Checker Check Method Critical
RedisHealthChecker PING command Yes
HTTPHealthChecker GET + status code validation Yes
DatabaseHealthChecker Async ping callable Yes
DynamoDBHealthChecker DescribeTable (ACTIVE) Yes
SQSHealthChecker GetQueueAttributes Yes
S3HealthChecker HeadBucket Yes
SNSHealthChecker GetTopicAttributes Yes
MLflowHealthChecker is_available() via thread pool No

AWS checkers inherit AWSHealthChecker which provides lazy boto3 client caching, LocalStack endpoint compatibility, and short timeouts (3s connect, 3s read, no retries).


5. Risk Monitoring

The RiskMonitor is a pure evaluator with no I/O side effects. It receives pre-collected metrics and returns a RiskCheckResult indicating whether any limits are breached.

Source files:

  • libs/tradai-common/src/tradai/common/health/risk_monitor.py
  • libs/tradai-common/src/tradai/common/entities/risk_limits.py

RiskLimits Configuration

class RiskLimits(BaseModel):
    max_drawdown_pct: float = 20.0    # 1.0 - 100.0 (deployment: 1.0 - 50.0)
    max_open_trades: int = 5          # 1 - 50 (deployment: 1 - 20)
    max_leverage: float = 3.0         # 1.0 - 125.0 (deployment: 1.0 - 10.0)
    action_on_breach: str = "pause"   # "pause" or "stop"

Deployment-Safe Bounds

RiskLimits enforces stricter deployment bounds (via validate_deployment_bounds()) than the entity validation bounds. This guards against misconfiguration before any container runs.

Evaluation Order

Checks in order: (1) drawdown vs max_drawdown_pct, (2) open trade count vs max_open_trades, (3) max trade leverage vs max_leverage, (4) fail-closed after 5 consecutive metric failures (triggers monitoring_failure breach to stop blind trading).

Circuit Breaker Flow

flowchart LR
    RM[RiskMonitor.evaluate] -->|breached=True| BA{action_on_breach}
    BA -->|pause| FTP[FreqtradeAPIClient.pause]
    BA -->|stop| SC[shutdown_callback]
    SC -->|fallback on error| FTP
    FTP --> DDB[DynamoDB: status=PAUSED]
    DDB --> SNS[AlertService.send_critical]

    RM -->|breached=False| REC{Was previously breached?}
    REC -->|Yes| RECA[AlertService.send_info<br/>Recovery notification]
    REC -->|No| NOP[No action]

Breach actions execute once per breach run: "stop" calls shutdown_callback (falls back to pause), "pause" calls FreqtradeAPIClient.pause(). DynamoDB status is set to PAUSED with breach details, and a critical SNS alert fires. On recovery, an info-level notification is sent.


6. Tracing & Correlation

Every backtest and training execution carries a unified trace context for end-to-end debugging.

Trace Context Fields

Field Set By Stored In
trace_id (E2E correlation UUID) API Gateway / CLI DynamoDB, Step Functions, ECS env, MLflow tags
job_id (DynamoDB PK) Backend API DynamoDB (run_id), S3, MLflow tags
mlflow_run_id Strategy container DynamoDB, BacktestResult
git_commit (7 or 40 char SHA) get_git_commit() BacktestResult, MLflow tags

Trace ID Flow

flowchart LR
    API[API Gateway<br/>POST /backtests] -->|trace_id in SQS message| SQS[SQS FIFO Queue]
    SQS -->|BacktestJobMessage| SFN[Step Functions<br/>trace_id in input]
    SFN -->|TRACE_ID env var| ECS[ECS Task<br/>Strategy Container]
    ECS -->|trace_id tag| MLF[MLflow Run]
    ECS -->|trace_id field| DDB[(DynamoDB<br/>BacktestJobStatus)]
    DDB -->|trace_id-index GSI| QUERY[Correlation Queries]

Propagation chain: BacktestJobMessage.trace_id (SQS) -> Step Functions input ("trace_id") -> ECS env var (TRACE_ID) -> MLflow tag (trace_id) -> DynamoDB field (trace_id).

DynamoDB GSI Queries

The workflow-state table has two GSIs:

GSI Name Hash Key Range Key Use Case
status-created_at-index status created_at List jobs by status
trace_id-index trace_id -- E2E correlation lookup

7. MLflow Experiment Tracking

MLflow provides experiment tracking for backtest results, model versioning, and artifact storage.

Source files:

  • libs/tradai-common/src/tradai/common/mlflow/adapter.py
  • libs/tradai-common/src/tradai/common/mlflow/backtest_logger.py

Tracking URI

MLflow is accessed via Service Discovery:

http://mlflow.tradai-{env}.local:5000/mlflow

The MLflowAdapter uses REST API with Basic Authentication and supports lazy initialization with double-checked locking for thread safety.

What Gets Logged

Metrics: total_trades, winning_trades, losing_trades, total_profit, total_profit_pct (required). Optional: sharpe_ratio, sortino_ratio, calmar_ratio, profit_factor, win_rate, max_drawdown, sqn, cagr, expectancy.

Tags: strategy, timeframe, exchange, start_date, end_date, symbols, trading_mode, job_id, trace_id, git_commit, logged_at.

Parameters: strategy, timeframe, start_date, end_date, symbols, exchange, stake_amount, stoploss (logged via log_parameters()).

Artifacts: configs/config.json -- full strategy configuration for reproducibility.

MLflow Health Check

MLflowHealthChecker is non-critical (critical=False). If unavailable, the service enters DEGRADED status but continues operating.


8. CloudWatch Integration

CloudWatch serves as the centralized monitoring layer with custom metric namespaces, configurable alarms, and an operations dashboard.

Source files:

  • infra/edge/modules/cloudwatch_alarms.py
  • infra/edge/modules/cloudwatch_dashboard.py
  • infra/ami/files/cloudwatch-agent.json

Custom Metric Namespaces

Namespace Metrics Published By
tradai/ServiceHealth ServiceHealthy, HealthCheckLatency, ConsecutiveFailures Lambda health-check (every 2 min)
tradai/TradingHealth ActiveStrategies, StaleHeartbeats, HealthyHeartbeats Lambda trading-heartbeat-check (every 5 min)
tradai/InfraDrift DriftDetected, ResourcesToCreate/Update/Delete, CheckSuccess Lambda pulumi-drift-detector (every 6 hrs)
TradAI/Consolidated disk_used_percent, mem_used_percent CloudWatch Agent on EC2 (every 5 min)

CloudWatch Alarms

All alarms notify via SNS topic (tradai-alerts-{env}).

Alarm Metric / Namespace Condition Period Missing Data
Service Health (per service) ServiceHealthy / ServiceHealth < 1 for 2 periods 120s breaching (always-on) or notBreaching (on-demand)
Service Latency (per service) HealthCheckLatency / ServiceHealth > 5000ms for 3 periods 120s notBreaching
Stale Heartbeats StaleHeartbeats / TradingHealth >= 1 for 2 periods 300s notBreaching
Active Strategies ActiveStrategies / TradingHealth < configured min 300s notBreaching
Lambda Errors (per function) Errors / AWS/Lambda >= 1 300s notBreaching
DLQ Messages ApproximateNumberOfMessagesVisible / AWS/SQS > 0 300s notBreaching
Infrastructure Drift DriftDetected / InfraDrift >= 1 21600s notBreaching
RDS CPU CPUUtilization / AWS/RDS > 80% for 2 periods 300s notBreaching
RDS Storage FreeStorageSpace / AWS/RDS < 2GB 300s notBreaching
API 5xx Rate Math: 5xx / Count * 100 / AWS/ApiGateway > 1% for 2 periods 300s notBreaching
Step Function Failures ExecutionsFailed / AWS/States > 0 300s notBreaching

On-Demand Service Handling

Services with desired_count=0 (live-trading, dry-run-trading, strategy-container) use treat_missing_data="notBreaching" so they do not trigger false alarms when not running. Note: strategy-service (desired_count=1) is always-on and uses treat_missing_data="breaching" like other always-on services.

Composite Alarm

A composite alarm (tradai-system-health-{env}) triggers when ANY of these fire:

  • Any critical service health alarm (backend-api, data-collection, mlflow, strategy-service, live-trading, dry-run-trading)
  • Stale heartbeats alarm
  • RDS CPU or storage alarms
  • API Gateway 5xx rate alarm
  • Step Functions failure alarms

Operations Dashboard

The CloudWatch dashboard (tradai-{env}) auto-refreshes every 60 seconds across four rows:

Row Widgets Source
1 ECS CPU, Memory, Running Task Count AWS/ECS, ECS/ContainerInsights
2 Health Check Latency (p50/p95/p99), Service Health tradai/ServiceHealth
3 Active Strategies, Stale/Healthy Heartbeats tradai/TradingHealth
4 Alarm Status overview, Recent Logs (ERROR/WARNING) Alarms, Logs Insights

Monitored Lambda Functions

Function Schedule Purpose
orphan-scanner 5 min Detect orphaned ECS tasks
health-check 2 min Publish service health metrics
trading-heartbeat-check 5 min Publish trading heartbeat metrics
drift-monitor 12 hrs ML model/data drift detection
retraining-scheduler 6 hrs Schedule model retraining
pulumi-drift-detector 6 hrs Infrastructure drift detection

Changelog

Version Date Changes
1.0.0 2026-03-28 Initial creation from health module analysis

Dependencies

If This Changes Update This Doc
libs/tradai-common/src/tradai/common/health/ Health monitoring (Section 4)
libs/tradai-common/src/tradai/common/logger_mixin.py Logging architecture (Section 2)
infra/edge/modules/cloudwatch_alarms.py CloudWatch integration (Section 8)
New CloudWatch namespace or alarm added Metrics and alarms tables