TradAI Data Flow Architecture¶
Version: 1.0.0 | Date: 2026-03-28 | Status: CURRENT
1. TL;DR¶
Data enters TradAI through CCXT exchange connectors and is persisted in ArcticDB (S3-backed). Backtests flow through a Step Functions pipeline that validates strategies, ensures data coverage, runs Freqtrade in ECS containers, and stores results in DynamoDB + MLflow. Live trading containers load configs from MLflow, warm up from ArcticDB, execute Freqtrade with exchange connectivity, and report health via DynamoDB heartbeats.
2. Backtest Data Flow¶
The backtest pipeline is orchestrated by BacktestOrchestrationService using a protocol-based executor pattern. The service generates a UUID job_id and trace_id for end-to-end correlation, persists a PENDING status to DynamoDB, and dispatches the job through the configured executor (SQS, Step Functions, or local).
The Step Functions workflow (backtest_workflow.json.j2) defines 12 states (see 06-STEP-FUNCTIONS.md for the complete reference).
sequenceDiagram
participant CLI as CLI / API
participant Backend as Backend<br/>(FastAPI)
participant DDB as DynamoDB
participant SQS as SQS Queue
participant Consumer as backtest-consumer<br/>(Lambda)
participant SF as Step Functions
participant Validate as validate-strategy<br/>(Lambda)
participant Data as data-collection-proxy<br/>(Lambda)
participant ECS as ECS Fargate<br/>(Freqtrade)
participant MLflow as MLflow
participant Notify as notify-completion<br/>(Lambda)
participant WS as WebSocket<br/>(Backend)
CLI->>Backend: POST /backtests (BacktestConfig)
Backend->>Backend: Generate job_id + trace_id (UUID)
Backend->>DDB: Create job (status=PENDING)
Backend->>SQS: Submit BacktestJobMessage
Backend-->>CLI: BacktestJobStatus (PENDING)
SQS->>Consumer: SQS event trigger
Consumer->>SF: StartExecution (job payload)
SF->>Validate: ValidateStrategy
Validate-->>SF: {valid, resolved_version}
SF->>Data: EnsureData (symbols, timeframe, date range)
Data-->>SF: Data coverage confirmed
SF->>DDB: UpdateStatusRunning (via Lambda)
SF->>ECS: runTask.sync (Fargate + Fargate Spot)
Note over ECS: Container env: JOB_ID, TRACE_ID,<br/>STRATEGY, TIMERANGE, PAIRS,<br/>MLFLOW_TRACKING_URI, ARCTIC_BUCKET
ECS->>ECS: Freqtrade backtesting subprocess
ECS->>DDB: Update status (RUNNING → COMPLETED)
ECS->>MLflow: Log metrics, params, artifacts
SF->>Notify: NotifySuccess (SNS + optional Slack)
SF->>DDB: UpdateStatusCompleted (via Lambda)
CLI->>WS: ws://backend/ws/backtests/{job_id}
WS-->>CLI: Stream progress updates (polling DynamoDB) Executor Abstraction
BacktestJobSubmitter is a Protocol with four executor strategies: LocalExecutorStrategy (dev), ECSExecutorStrategy (direct task), SQSExecutorStrategy (queue-based), and StepFunctionsExecutorStrategy (full pipeline). The executor is selected via ExecutorFactory and injected via FastAPI Depends().
Data Guarantee
The EnsureData step uses a DB-first coverage check -- it queries ArcticDB for existing data and only fetches from the exchange when gaps exist. This prevents redundant API calls across repeated backtests.
Failure Handling
On ECS task failure or timeout, Step Functions routes through CleanupResources (stops orphaned ECS tasks) then UpdateStatusFailed then NotifyFailure. The container itself also updates DynamoDB directly as a belt-and-suspenders approach.
3. Live Trading Data Flow¶
The TradingHandler implements a four-phase lifecycle: INITIALIZING, WARMUP, RUNNING, STOPPED. It loads strategy configuration from MLflow (with S3 fallback via StrategyConfigLoader), fetches exchange credentials from AWS Secrets Manager, warms up historical data from ArcticDB, then starts a Freqtrade subprocess with health reporting.
The HealthReporter runs as a background thread publishing heartbeats to DynamoDB every 30 seconds. The MetricsCollector calls Freqtrade's /profit and /status API endpoints to build StrategyPnL and LiveTrade entities. The RiskMonitor evaluates RiskLimits against live positions.
sequenceDiagram
participant ECS as ECS Task<br/>(Strategy Container)
participant Handler as TradingHandler
participant MLflow as MLflow
participant Secrets as Secrets Manager
participant Arctic as ArcticDB (S3)
participant FT as Freqtrade<br/>(subprocess)
participant Exchange as Exchange<br/>(Binance)
participant Metrics as MetricsCollector
participant Health as HealthReporter<br/>(background thread)
participant DDB as DynamoDB
participant SNS as SNS
ECS->>Handler: run()
Note over Handler: Phase 1: INITIALIZING
Handler->>DDB: update_status(INITIALIZING)
Handler->>MLflow: StrategyConfigLoader.load()
MLflow-->>Handler: Strategy config JSON
Handler->>Secrets: get_secret(exchange_credentials)
Secrets-->>Handler: API key + secret
Note over Handler: Phase 2: WARMUP
Handler->>DDB: update_status(WARMUP)
Handler->>Arctic: ArcticDBWarmupLoader.load()
Arctic-->>Handler: Historical OHLCV data
Note over Handler: Phase 3: RUNNING
Handler->>DDB: update_status(RUNNING)
Handler->>FT: Start subprocess (freqtrade trade)
Handler->>Health: Start background thread
loop Every 30 seconds
Health->>Metrics: collect_all()
Metrics->>FT: GET /profit + /status
FT-->>Metrics: PnL + open trades
Health->>DDB: record_heartbeat(pnl, trades, risk)
end
loop Trading cycle
FT->>Exchange: fetch_ticker / create_order
Exchange-->>FT: Market data / order confirmation
end
Note over Handler: Phase 4: STOPPED (or ERROR)
Handler->>FT: SIGTERM (graceful shutdown)
Handler->>DDB: update_status(STOPPED)
Handler->>Health: Stop background thread
opt On failure
Handler->>SNS: AlertService.send_critical()
Handler->>DDB: update_status(ERROR, error=msg)
end Signal Handling
Single SIGTERM triggers graceful shutdown (waits for open trades to close, sends Telegram notifications). Multiple signals force-kill. Docker stop sends SIGTERM for clean shutdown.
Heartbeat Monitoring
The trading-heartbeat-check Lambda (scheduled via EventBridge) scans DynamoDB for stale heartbeats. After 3 consecutive missed heartbeats, the AlertService sends an SNS notification (MN004).
4. Data Collection Flow¶
DataCollectionService wraps the tradai-data library's DataCollectionService and CCXTRepository. It supports per-exchange lazy-loaded repositories (keyed by exchange name like binance_futures), with ResilientDataRepository adding retry/circuit-breaker resilience. Storage uses ArcticAdapter backed by S3.
The sync_data method validates parameters, delegates to the library service's collect() which fetches via CCXT and writes to ArcticDB, then returns a SyncResult with row counts. Incremental sync (sync_incremental) fetches only data newer than what ArcticDB already holds.
sequenceDiagram
participant Client as CLI / API / Lambda
participant Service as DataCollectionService
participant Validate as _validate_sync_params()
participant LibService as LibraryDataCollectionService
participant CCXT as CCXTRepository<br/>(ResilientDataRepository)
participant Exchange as Exchange API<br/>(Binance)
participant Adapter as ArcticAdapter
participant Arctic as ArcticDB (S3)
Client->>Service: sync_data(symbols, start, end, timeframe, exchange)
Service->>Validate: Validate symbols + exchange + dates
Validate-->>Service: OK
Service->>LibService: collect(symbols, start, end, timeframe)
loop For each symbol
LibService->>CCXT: fetch_ohlcv(symbol, timeframe, since, limit)
CCXT->>Exchange: REST API call (paginated)
Exchange-->>CCXT: OHLCV candles
CCXT-->>LibService: OHLCVData (DataFrame)
end
LibService->>Adapter: write(symbol, data)
Adapter->>Arctic: library.write(symbol, df)
Arctic-->>Adapter: VersionedItem
LibService-->>Service: CollectionResult
Service-->>Client: SyncResult(symbols_synced, rows_synced) Multi-Exchange Support
Repositories are cached per exchange key (_data_repositories dict). Each exchange gets its own CCXTRepository wrapped in ResilientDataRepository for retry logic. The _resolve_exchange_key() method defaults to the first configured exchange.
Async Variant
AsyncCCXTRepository and LibraryAsyncDataCollectionService provide the same flow using asyncio for concurrent symbol fetches, used by the data-collection API endpoints.
5. ML Training Flow¶
The TrainHandler orchestrates FreqAI model training inside ECS containers. It delegates to focused collaborators: TrainingResultParser (parse Freqtrade output), MLflowReporter (log metrics/tags), ModelRegistrar (register and optionally promote models), and ReproducibilityManifestBuilder (record seeds, versions, configs).
The handler sets global random seeds for reproducibility, builds a TrainingConfig from environment variables, executes Freqtrade's training subprocess, parses results, logs everything to MLflow, and registers the model in the MLflow Model Registry.
sequenceDiagram
participant ECS as ECS Task<br/>(Strategy Container)
participant Handler as TrainHandler
participant Seeds as set_global_seeds()
participant FT as Freqtrade<br/>(subprocess)
participant FreqAI as FreqAI<br/>(LightGBM/XGBoost/CatBoost)
participant Parser as TrainingResultParser
participant Reporter as MLflowReporter
participant MLflow as MLflow Server
participant Registrar as ModelRegistrar
participant Manifest as ReproducibilityManifestBuilder
participant DDB as DynamoDB
ECS->>Handler: run()
Handler->>DDB: update_status(RUNNING)
opt Random seed configured
Handler->>Seeds: set_global_seeds(seed)
Seeds-->>Handler: {numpy, torch, python, ...}
end
Handler->>Handler: _build_config() from env vars
Note over Handler: STRATEGY, PAIRS, TIMEFRAME,<br/>FREQAI_MODEL, TRAIN_PERIOD_DAYS
Handler->>FT: subprocess (freqtrade backtesting with FreqAI)
FT->>FreqAI: Train model on historical data
FreqAI-->>FT: Trained model + metrics
FT-->>Handler: stdout/stderr + exit code
Handler->>Parser: parse(stdout)
Parser-->>Handler: TrainingResult (trades, sharpe, drawdown)
Handler->>Reporter: log(config, result)
Reporter->>MLflow: log_params + log_metrics + log_tags
MLflow-->>Reporter: mlflow_run_id
Reporter-->>Handler: run_id
Handler->>Registrar: register(config, result)
Registrar->>MLflow: register_model(name, run_id)
Registrar->>Manifest: build()
Manifest->>MLflow: log_artifact(manifest.json)
Registrar-->>Handler: TrainingResult (with model_version)
Handler->>DDB: update_status(COMPLETED, result) Reproducibility
The ReproducibilityManifestBuilder records random seeds, Python/library versions, git commit SHA, training config, and hyperparameters as an MLflow artifact. This enables exact reproduction of any training run.
Model Promotion
The ModelRegistrar registers trained models in MLflow's Model Registry. Promotion to staging/production happens separately via the promote-model Lambda, triggered by the compare-models Lambda which evaluates performance thresholds.
6. Cross-Cutting: Traceability and State Management¶
All data flows share a unified traceability model. Every job carries a trace_id (UUID) and job_id that propagate through SQS messages, Step Functions execution input, ECS container environment variables, DynamoDB records, and MLflow tags.
sequenceDiagram
participant API as Backend API
participant DDB as DynamoDB
participant SF as Step Functions
participant ECS as ECS Container
participant MLflow as MLflow
participant SNS as SNS Events
API->>API: Generate trace_id + job_id
API->>DDB: Create(job_id, trace_id, status=PENDING)
API->>SF: StartExecution(trace_id, job_id)
SF->>ECS: env: JOB_ID, TRACE_ID
ECS->>DDB: Update(job_id, trace_id, status=RUNNING)
ECS->>MLflow: log_tag("trace_id", trace_id)
ECS->>MLflow: log_tag("job_id", job_id)
ECS->>MLflow: log_tag("git_commit", sha)
ECS->>DDB: Update(job_id, mlflow_run_id, status=COMPLETED)
SF->>SNS: JobStateEvent(job_id, trace_id, new_status) | Field | Source | Propagation Path |
|---|---|---|
trace_id | uuid.uuid4() in Backend | DynamoDB, Step Functions input, ECS env, SNS events |
job_id | uuid.uuid4() in Backend | DynamoDB PK, SQS message, ECS env, S3 result key |
mlflow_run_id | MLflow start_run() | DynamoDB (written by container), BacktestResult |
git_commit | get_git_commit() | MLflow tags, BacktestResult, reproducibility manifest |
Event Publishing
State transitions are published via SNSEventPublisher (implementing the EventPublisher Protocol). The NoOpEventPublisher is used in dev/test mode. Events include previous_status, new_status, trace_id, and optional error details.
7. Key Source Files¶
| Flow | File | Purpose |
|---|---|---|
| Backtest orchestration | services/backend/src/tradai/backend/core/orchestration.py | Job submission, status tracking, executor dispatch |
| Step Functions workflow | infra/compute/asl_templates/backtest_workflow.json.j2 | State machine definition (12 states) |
| SQS consumer | lambdas/backtest-consumer/handler.py | SQS-to-Step Functions bridge |
| Live trading | libs/tradai-common/src/tradai/common/entrypoint/trading.py | Four-phase trading lifecycle |
| Health reporting | libs/tradai-common/src/tradai/common/health/reporter.py | DynamoDB heartbeat background thread |
| Metrics collection | libs/tradai-common/src/tradai/common/health/metrics_collector.py | Freqtrade API to StrategyPnL/LiveTrade |
| Data collection | services/data-collection/src/tradai/data_collection/core/service.py | Exchange-to-ArcticDB sync |
| ML training | libs/tradai-common/src/tradai/common/entrypoint/training/handler.py | FreqAI training orchestration |
| WebSocket streaming | services/backend/src/tradai/backend/api/websocket.py | Real-time backtest progress |
| Event publishing | libs/tradai-common/src/tradai/common/events.py | SNS state transition events |
| Notifications | lambdas/notify-completion/handler.py | SNS + Slack notification delivery |
Changelog¶
| Version | Date | Changes |
|---|---|---|
| 1.0.0 | 2026-03-28 | Initial creation from codebase analysis |
Dependencies¶
| If This Changes | Update This Doc |
|---|---|
services/backend/src/tradai/backend/core/orchestration.py | Backtest flow (Section 2) |
libs/tradai-common/src/tradai/common/entrypoint/trading.py | Live trading flow (Section 3) |
services/data-collection/src/tradai/data_collection/core/service.py | Data collection flow (Section 4) |
libs/tradai-common/src/tradai/common/entrypoint/training/handler.py | ML training flow (Section 5) |
infra/compute/asl_templates/backtest_workflow.json.j2 | Step Functions states in backtest flow |