Skip to content

TradAI Data Flow Architecture

Version: 1.0.0 | Date: 2026-03-28 | Status: CURRENT


1. TL;DR

Data enters TradAI through CCXT exchange connectors and is persisted in ArcticDB (S3-backed). Backtests flow through a Step Functions pipeline that validates strategies, ensures data coverage, runs Freqtrade in ECS containers, and stores results in DynamoDB + MLflow. Live trading containers load configs from MLflow, warm up from ArcticDB, execute Freqtrade with exchange connectivity, and report health via DynamoDB heartbeats.


2. Backtest Data Flow

The backtest pipeline is orchestrated by BacktestOrchestrationService using a protocol-based executor pattern. The service generates a UUID job_id and trace_id for end-to-end correlation, persists a PENDING status to DynamoDB, and dispatches the job through the configured executor (SQS, Step Functions, or local).

The Step Functions workflow (backtest_workflow.json.j2) defines 12 states (see 06-STEP-FUNCTIONS.md for the complete reference).

sequenceDiagram
    participant CLI as CLI / API
    participant Backend as Backend<br/>(FastAPI)
    participant DDB as DynamoDB
    participant SQS as SQS Queue
    participant Consumer as backtest-consumer<br/>(Lambda)
    participant SF as Step Functions
    participant Validate as validate-strategy<br/>(Lambda)
    participant Data as data-collection-proxy<br/>(Lambda)
    participant ECS as ECS Fargate<br/>(Freqtrade)
    participant MLflow as MLflow
    participant Notify as notify-completion<br/>(Lambda)
    participant WS as WebSocket<br/>(Backend)

    CLI->>Backend: POST /backtests (BacktestConfig)
    Backend->>Backend: Generate job_id + trace_id (UUID)
    Backend->>DDB: Create job (status=PENDING)
    Backend->>SQS: Submit BacktestJobMessage
    Backend-->>CLI: BacktestJobStatus (PENDING)

    SQS->>Consumer: SQS event trigger
    Consumer->>SF: StartExecution (job payload)

    SF->>Validate: ValidateStrategy
    Validate-->>SF: {valid, resolved_version}

    SF->>Data: EnsureData (symbols, timeframe, date range)
    Data-->>SF: Data coverage confirmed

    SF->>DDB: UpdateStatusRunning (via Lambda)

    SF->>ECS: runTask.sync (Fargate + Fargate Spot)
    Note over ECS: Container env: JOB_ID, TRACE_ID,<br/>STRATEGY, TIMERANGE, PAIRS,<br/>MLFLOW_TRACKING_URI, ARCTIC_BUCKET

    ECS->>ECS: Freqtrade backtesting subprocess
    ECS->>DDB: Update status (RUNNING → COMPLETED)
    ECS->>MLflow: Log metrics, params, artifacts

    SF->>Notify: NotifySuccess (SNS + optional Slack)
    SF->>DDB: UpdateStatusCompleted (via Lambda)

    CLI->>WS: ws://backend/ws/backtests/{job_id}
    WS-->>CLI: Stream progress updates (polling DynamoDB)

Executor Abstraction

BacktestJobSubmitter is a Protocol with four executor strategies: LocalExecutorStrategy (dev), ECSExecutorStrategy (direct task), SQSExecutorStrategy (queue-based), and StepFunctionsExecutorStrategy (full pipeline). The executor is selected via ExecutorFactory and injected via FastAPI Depends().

Data Guarantee

The EnsureData step uses a DB-first coverage check -- it queries ArcticDB for existing data and only fetches from the exchange when gaps exist. This prevents redundant API calls across repeated backtests.

Failure Handling

On ECS task failure or timeout, Step Functions routes through CleanupResources (stops orphaned ECS tasks) then UpdateStatusFailed then NotifyFailure. The container itself also updates DynamoDB directly as a belt-and-suspenders approach.


3. Live Trading Data Flow

The TradingHandler implements a four-phase lifecycle: INITIALIZING, WARMUP, RUNNING, STOPPED. It loads strategy configuration from MLflow (with S3 fallback via StrategyConfigLoader), fetches exchange credentials from AWS Secrets Manager, warms up historical data from ArcticDB, then starts a Freqtrade subprocess with health reporting.

The HealthReporter runs as a background thread publishing heartbeats to DynamoDB every 30 seconds. The MetricsCollector calls Freqtrade's /profit and /status API endpoints to build StrategyPnL and LiveTrade entities. The RiskMonitor evaluates RiskLimits against live positions.

sequenceDiagram
    participant ECS as ECS Task<br/>(Strategy Container)
    participant Handler as TradingHandler
    participant MLflow as MLflow
    participant Secrets as Secrets Manager
    participant Arctic as ArcticDB (S3)
    participant FT as Freqtrade<br/>(subprocess)
    participant Exchange as Exchange<br/>(Binance)
    participant Metrics as MetricsCollector
    participant Health as HealthReporter<br/>(background thread)
    participant DDB as DynamoDB
    participant SNS as SNS

    ECS->>Handler: run()

    Note over Handler: Phase 1: INITIALIZING
    Handler->>DDB: update_status(INITIALIZING)
    Handler->>MLflow: StrategyConfigLoader.load()
    MLflow-->>Handler: Strategy config JSON
    Handler->>Secrets: get_secret(exchange_credentials)
    Secrets-->>Handler: API key + secret

    Note over Handler: Phase 2: WARMUP
    Handler->>DDB: update_status(WARMUP)
    Handler->>Arctic: ArcticDBWarmupLoader.load()
    Arctic-->>Handler: Historical OHLCV data

    Note over Handler: Phase 3: RUNNING
    Handler->>DDB: update_status(RUNNING)
    Handler->>FT: Start subprocess (freqtrade trade)
    Handler->>Health: Start background thread

    loop Every 30 seconds
        Health->>Metrics: collect_all()
        Metrics->>FT: GET /profit + /status
        FT-->>Metrics: PnL + open trades
        Health->>DDB: record_heartbeat(pnl, trades, risk)
    end

    loop Trading cycle
        FT->>Exchange: fetch_ticker / create_order
        Exchange-->>FT: Market data / order confirmation
    end

    Note over Handler: Phase 4: STOPPED (or ERROR)
    Handler->>FT: SIGTERM (graceful shutdown)
    Handler->>DDB: update_status(STOPPED)
    Handler->>Health: Stop background thread

    opt On failure
        Handler->>SNS: AlertService.send_critical()
        Handler->>DDB: update_status(ERROR, error=msg)
    end

Signal Handling

Single SIGTERM triggers graceful shutdown (waits for open trades to close, sends Telegram notifications). Multiple signals force-kill. Docker stop sends SIGTERM for clean shutdown.

Heartbeat Monitoring

The trading-heartbeat-check Lambda (scheduled via EventBridge) scans DynamoDB for stale heartbeats. After 3 consecutive missed heartbeats, the AlertService sends an SNS notification (MN004).


4. Data Collection Flow

DataCollectionService wraps the tradai-data library's DataCollectionService and CCXTRepository. It supports per-exchange lazy-loaded repositories (keyed by exchange name like binance_futures), with ResilientDataRepository adding retry/circuit-breaker resilience. Storage uses ArcticAdapter backed by S3.

The sync_data method validates parameters, delegates to the library service's collect() which fetches via CCXT and writes to ArcticDB, then returns a SyncResult with row counts. Incremental sync (sync_incremental) fetches only data newer than what ArcticDB already holds.

sequenceDiagram
    participant Client as CLI / API / Lambda
    participant Service as DataCollectionService
    participant Validate as _validate_sync_params()
    participant LibService as LibraryDataCollectionService
    participant CCXT as CCXTRepository<br/>(ResilientDataRepository)
    participant Exchange as Exchange API<br/>(Binance)
    participant Adapter as ArcticAdapter
    participant Arctic as ArcticDB (S3)

    Client->>Service: sync_data(symbols, start, end, timeframe, exchange)
    Service->>Validate: Validate symbols + exchange + dates
    Validate-->>Service: OK

    Service->>LibService: collect(symbols, start, end, timeframe)

    loop For each symbol
        LibService->>CCXT: fetch_ohlcv(symbol, timeframe, since, limit)
        CCXT->>Exchange: REST API call (paginated)
        Exchange-->>CCXT: OHLCV candles
        CCXT-->>LibService: OHLCVData (DataFrame)
    end

    LibService->>Adapter: write(symbol, data)
    Adapter->>Arctic: library.write(symbol, df)
    Arctic-->>Adapter: VersionedItem

    LibService-->>Service: CollectionResult
    Service-->>Client: SyncResult(symbols_synced, rows_synced)

Multi-Exchange Support

Repositories are cached per exchange key (_data_repositories dict). Each exchange gets its own CCXTRepository wrapped in ResilientDataRepository for retry logic. The _resolve_exchange_key() method defaults to the first configured exchange.

Async Variant

AsyncCCXTRepository and LibraryAsyncDataCollectionService provide the same flow using asyncio for concurrent symbol fetches, used by the data-collection API endpoints.


5. ML Training Flow

The TrainHandler orchestrates FreqAI model training inside ECS containers. It delegates to focused collaborators: TrainingResultParser (parse Freqtrade output), MLflowReporter (log metrics/tags), ModelRegistrar (register and optionally promote models), and ReproducibilityManifestBuilder (record seeds, versions, configs).

The handler sets global random seeds for reproducibility, builds a TrainingConfig from environment variables, executes Freqtrade's training subprocess, parses results, logs everything to MLflow, and registers the model in the MLflow Model Registry.

sequenceDiagram
    participant ECS as ECS Task<br/>(Strategy Container)
    participant Handler as TrainHandler
    participant Seeds as set_global_seeds()
    participant FT as Freqtrade<br/>(subprocess)
    participant FreqAI as FreqAI<br/>(LightGBM/XGBoost/CatBoost)
    participant Parser as TrainingResultParser
    participant Reporter as MLflowReporter
    participant MLflow as MLflow Server
    participant Registrar as ModelRegistrar
    participant Manifest as ReproducibilityManifestBuilder
    participant DDB as DynamoDB

    ECS->>Handler: run()
    Handler->>DDB: update_status(RUNNING)

    opt Random seed configured
        Handler->>Seeds: set_global_seeds(seed)
        Seeds-->>Handler: {numpy, torch, python, ...}
    end

    Handler->>Handler: _build_config() from env vars
    Note over Handler: STRATEGY, PAIRS, TIMEFRAME,<br/>FREQAI_MODEL, TRAIN_PERIOD_DAYS

    Handler->>FT: subprocess (freqtrade backtesting with FreqAI)
    FT->>FreqAI: Train model on historical data
    FreqAI-->>FT: Trained model + metrics
    FT-->>Handler: stdout/stderr + exit code

    Handler->>Parser: parse(stdout)
    Parser-->>Handler: TrainingResult (trades, sharpe, drawdown)

    Handler->>Reporter: log(config, result)
    Reporter->>MLflow: log_params + log_metrics + log_tags
    MLflow-->>Reporter: mlflow_run_id
    Reporter-->>Handler: run_id

    Handler->>Registrar: register(config, result)
    Registrar->>MLflow: register_model(name, run_id)
    Registrar->>Manifest: build()
    Manifest->>MLflow: log_artifact(manifest.json)
    Registrar-->>Handler: TrainingResult (with model_version)

    Handler->>DDB: update_status(COMPLETED, result)

Reproducibility

The ReproducibilityManifestBuilder records random seeds, Python/library versions, git commit SHA, training config, and hyperparameters as an MLflow artifact. This enables exact reproduction of any training run.

Model Promotion

The ModelRegistrar registers trained models in MLflow's Model Registry. Promotion to staging/production happens separately via the promote-model Lambda, triggered by the compare-models Lambda which evaluates performance thresholds.


6. Cross-Cutting: Traceability and State Management

All data flows share a unified traceability model. Every job carries a trace_id (UUID) and job_id that propagate through SQS messages, Step Functions execution input, ECS container environment variables, DynamoDB records, and MLflow tags.

sequenceDiagram
    participant API as Backend API
    participant DDB as DynamoDB
    participant SF as Step Functions
    participant ECS as ECS Container
    participant MLflow as MLflow
    participant SNS as SNS Events

    API->>API: Generate trace_id + job_id
    API->>DDB: Create(job_id, trace_id, status=PENDING)
    API->>SF: StartExecution(trace_id, job_id)

    SF->>ECS: env: JOB_ID, TRACE_ID
    ECS->>DDB: Update(job_id, trace_id, status=RUNNING)
    ECS->>MLflow: log_tag("trace_id", trace_id)
    ECS->>MLflow: log_tag("job_id", job_id)
    ECS->>MLflow: log_tag("git_commit", sha)

    ECS->>DDB: Update(job_id, mlflow_run_id, status=COMPLETED)
    SF->>SNS: JobStateEvent(job_id, trace_id, new_status)
Field Source Propagation Path
trace_id uuid.uuid4() in Backend DynamoDB, Step Functions input, ECS env, SNS events
job_id uuid.uuid4() in Backend DynamoDB PK, SQS message, ECS env, S3 result key
mlflow_run_id MLflow start_run() DynamoDB (written by container), BacktestResult
git_commit get_git_commit() MLflow tags, BacktestResult, reproducibility manifest

Event Publishing

State transitions are published via SNSEventPublisher (implementing the EventPublisher Protocol). The NoOpEventPublisher is used in dev/test mode. Events include previous_status, new_status, trace_id, and optional error details.


7. Key Source Files

Flow File Purpose
Backtest orchestration services/backend/src/tradai/backend/core/orchestration.py Job submission, status tracking, executor dispatch
Step Functions workflow infra/compute/asl_templates/backtest_workflow.json.j2 State machine definition (12 states)
SQS consumer lambdas/backtest-consumer/handler.py SQS-to-Step Functions bridge
Live trading libs/tradai-common/src/tradai/common/entrypoint/trading.py Four-phase trading lifecycle
Health reporting libs/tradai-common/src/tradai/common/health/reporter.py DynamoDB heartbeat background thread
Metrics collection libs/tradai-common/src/tradai/common/health/metrics_collector.py Freqtrade API to StrategyPnL/LiveTrade
Data collection services/data-collection/src/tradai/data_collection/core/service.py Exchange-to-ArcticDB sync
ML training libs/tradai-common/src/tradai/common/entrypoint/training/handler.py FreqAI training orchestration
WebSocket streaming services/backend/src/tradai/backend/api/websocket.py Real-time backtest progress
Event publishing libs/tradai-common/src/tradai/common/events.py SNS state transition events
Notifications lambdas/notify-completion/handler.py SNS + Slack notification delivery

Changelog

Version Date Changes
1.0.0 2026-03-28 Initial creation from codebase analysis

Dependencies

If This Changes Update This Doc
services/backend/src/tradai/backend/core/orchestration.py Backtest flow (Section 2)
libs/tradai-common/src/tradai/common/entrypoint/trading.py Live trading flow (Section 3)
services/data-collection/src/tradai/data_collection/core/service.py Data collection flow (Section 4)
libs/tradai-common/src/tradai/common/entrypoint/training/handler.py ML training flow (Section 5)
infra/compute/asl_templates/backtest_workflow.json.j2 Step Functions states in backtest flow