Skip to content

Data Collection

Market data collection service for fetching and storing OHLCV data from exchanges.

Service Info

Property Value
Version 0.1.0
Type full
Port 8002
Author TradAI Team

Installation

# Install with uv (recommended)
cd services/data-collection
uv sync

# Or with pip
pip install -e .

Configuration

Required Environment Variables

Variable Description Example
DATA_COLLECTION_ARCTIC_S3_BUCKET S3 bucket for ArcticDB storage tradai-arcticdb-prod
DATA_COLLECTION_EXCHANGES Exchange configs as JSON See below

Optional Environment Variables

Variable Description Default
DATA_COLLECTION_HOST Server host 0.0.0.0
DATA_COLLECTION_PORT Server port 8002
DATA_COLLECTION_DEBUG Debug mode false
DATA_COLLECTION_LOG_LEVEL Log level INFO
DATA_COLLECTION_ARCTIC_LIBRARY ArcticDB library name ohlcv

Exchange Configuration Format

export DATA_COLLECTION_EXCHANGES='{
  "binance_futures": {
    "name": "binance",
    "type": "futures",
    "api_key": "your-api-key",
    "api_secret": "your-api-secret"
  }
}'

Multiple exchanges can be configured:

export DATA_COLLECTION_EXCHANGES='{
  "binance_futures": {"name": "binance", "type": "futures", "api_key": "...", "api_secret": "..."},
  "binance_spot": {"name": "binance", "type": "spot", "api_key": "...", "api_secret": "..."}
}'

Usage

CLI Commands

Start API Server

data_collection serve
data_collection serve --port 8080
data_collection serve --reload  # Development mode

Sync Data

# Sync specific symbols
data_collection sync BTC/USDT:USDT ETH/USDT:USDT \
  --start 2024-01-01 --end 2024-01-31

# With options
data_collection sync BTC/USDT:USDT \
  --start 2024-01-01 --end 2024-01-31 \
  --timeframe 1h \
  --exchange binance_futures

# Incremental sync (only new data)
data_collection sync BTC/USDT:USDT \
  --start 2024-01-01 --end 2024-01-31 \
  --incremental

List Available Symbols

data_collection list-symbols
data_collection list-symbols --exchange binance_futures
data_collection list-symbols --limit 10

Check Data Freshness

data_collection check-freshness BTC/USDT:USDT ETH/USDT:USDT
data_collection check-freshness BTC/USDT:USDT --threshold 48  # 48h stale threshold

Health & Config

data_collection health
data_collection config

API Endpoints

Method Path Description
GET /api/v1/health Health check with dependency status
POST /api/v1/sync Full sync for symbols
POST /api/v1/sync/incremental Incremental sync (new data only)
GET /api/v1/freshness Check data freshness for symbols
GET /api/v1/symbols List available symbols from exchange
POST /api/v1/export Export OHLCV data as typed rows
GET /docs OpenAPI documentation

Streaming Endpoints (4 endpoints)

Real-time market data streaming via WebSocket connections to exchanges.

Method Path Description
POST /api/v1/streams Start a new data stream
GET /api/v1/streams List active streams
GET /api/v1/streams/{stream_id} Get stream status
DELETE /api/v1/streams/{stream_id} Stop a stream

Start Stream Request

curl -X POST http://localhost:8002/api/v1/streams \
  -H "Content-Type: application/json" \
  -d '{
    "symbols": ["BTC/USDT:USDT", "ETH/USDT:USDT"],
    "exchange": "binance_futures",
    "timeframe": "1m",
    "buffer_size": 1000,
    "flush_interval": 60
  }'

Response:

{
  "stream_id": "stream-abc123",
  "status": "running",
  "symbols": ["BTC/USDT:USDT", "ETH/USDT:USDT"],
  "message": "Started streaming 2 symbols"
}

Streaming Configuration

Variable Description Default
DATA_COLLECTION_STREAMING_BUFFER_SIZE Candles before flush 1000
DATA_COLLECTION_STREAMING_FLUSH_INTERVAL Seconds between flushes 60
DATA_COLLECTION_STREAMING_MAX_RECONNECTS Max reconnect attempts 5

Example API Calls

# Health check
curl http://localhost:8002/api/v1/health

# Sync data
curl -X POST http://localhost:8002/api/v1/sync \
  -H "Content-Type: application/json" \
  -d '{
    "symbols": ["BTC/USDT:USDT"],
    "start_date": "2024-01-01",
    "end_date": "2024-01-31",
    "timeframe": "1h",
    "exchange": "binance_futures"
  }'

# Check freshness
curl "http://localhost:8002/api/v1/freshness?symbols=BTC/USDT:USDT&symbols=ETH/USDT:USDT"

# List symbols
curl "http://localhost:8002/api/v1/symbols?exchange=binance_futures"

Docker

Build

# Build from workspace root
docker build -f services/data-collection/Dockerfile -t data-collection:latest .

Run

docker run -p 8002:8002 \
  -e DATA_COLLECTION_ARCTIC_S3_BUCKET=my-bucket \
  -e DATA_COLLECTION_EXCHANGES='{"binance_futures":{"name":"binance","type":"futures","api_key":"xxx","api_secret":"xxx"}}' \
  data-collection:latest

Testing

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov=tradai.data_collection

# Run specific test file
uv run pytest tests/unit/test_service.py

# Run by marker
uv run pytest -m unit
uv run pytest -m integration

Architecture

This service follows the TradAI 3-layer architecture:

src/tradai/data_collection/
├── api/              # Presentation layer
│   ├── routes.py     # API endpoints (/sync, /freshness, /symbols)
│   ├── schemas.py    # Request/response models
│   └── dependencies.py
├── core/             # Business logic
│   ├── service.py    # DataCollectionService
│   ├── settings.py   # Configuration (Settings + ExchangeConfig)
│   └── entities.py   # SyncResult, SymbolFreshness
├── infrastructure/   # External adapters
│   └── health_checkers.py
├── cli.py            # Typer CLI (sync, list-symbols, check-freshness)
└── main.py           # FastAPI app factory

Dependencies

  • tradai-common: BaseService, logging, health checks
  • tradai-data: CCXTRepository, ArcticAdapter, OHLCV entities

License

Proprietary - TradAI