Skip to content

DataCollection

Market data collection service for fetching and storing OHLCV data from exchanges.

Service Info

Property Value
Version 0.1.0
Type full
Port 8002
Author TradAI Team

Installation

# Install with uv (recommended)
cd services/data-collection
uv sync

# Or with pip
pip install -e .

Configuration

Required Environment Variables

Variable Description Example
DATA_COLLECTION_ARCTIC_S3_BUCKET S3 bucket for ArcticDB storage tradai-data-prod
DATA_COLLECTION_EXCHANGES Exchange configs as JSON See below

Optional Environment Variables

Variable Description Default
DATA_COLLECTION_HOST Server host 0.0.0.0
DATA_COLLECTION_PORT Server port 8002
DATA_COLLECTION_DEBUG Debug mode false
DATA_COLLECTION_LOG_LEVEL Log level INFO
DATA_COLLECTION_ARCTIC_LIBRARY ArcticDB library name futures

Exchange Configuration Format

export DATA_COLLECTION_EXCHANGES='{
  "binance_futures": {
    "name": "binance",
    "type": "futures",
    "api_key": "your-api-key",
    "api_secret": "your-api-secret"
  }
}'

Multiple exchanges can be configured:

export DATA_COLLECTION_EXCHANGES='{
  "binance_futures": {"name": "binance", "type": "futures", "api_key": "...", "api_secret": "..."},
  "binance_spot": {"name": "binance", "type": "spot", "api_key": "...", "api_secret": "..."}
}'

Usage

CLI Commands

Start API Server

data_collection serve
data_collection serve --port 8080
data_collection serve --reload  # Development mode

Sync Data

# Sync specific symbols
data_collection sync BTC/USDT:USDT ETH/USDT:USDT \
  --start 2024-01-01 --end 2024-01-31

# With options
data_collection sync BTC/USDT:USDT \
  --start 2024-01-01 --end 2024-01-31 \
  --timeframe 1h \
  --exchange binance_futures

# Incremental sync (only new data)
data_collection sync BTC/USDT:USDT \
  --start 2024-01-01 --end 2024-01-31 \
  --incremental

List Available Symbols

data_collection list-symbols
data_collection list-symbols --exchange binance_futures
data_collection list-symbols --limit 10

Check Data Freshness

data_collection check-freshness BTC/USDT:USDT ETH/USDT:USDT
data_collection check-freshness BTC/USDT:USDT --threshold 48  # 48h stale threshold

Health & Config

data_collection health
data_collection config

API Endpoints

Method Path Description
GET /api/v1/health Health check with dependency status
POST /api/v1/sync Full sync for symbols
POST /api/v1/sync/incremental Incremental sync (new data only)
GET /api/v1/freshness Check data freshness for symbols
GET /api/v1/symbols List available symbols from exchange
GET /docs OpenAPI documentation

Streaming Endpoints (4 endpoints)

Real-time market data streaming via WebSocket connections to exchanges.

Method Path Description
POST /api/v1/streams Start a new data stream
GET /api/v1/streams List active streams
GET /api/v1/streams/{stream_id} Get stream status
DELETE /api/v1/streams/{stream_id} Stop a stream

Start Stream Request

curl -X POST http://localhost:8002/api/v1/streams \
  -H "Content-Type: application/json" \
  -d '{
    "symbols": ["BTC/USDT:USDT", "ETH/USDT:USDT"],
    "exchange": "binance_futures",
    "timeframe": "1m",
    "buffer_size": 1000,
    "flush_interval": 60
  }'

Response:

{
  "stream_id": "stream-abc123",
  "status": "running",
  "symbols": ["BTC/USDT:USDT", "ETH/USDT:USDT"],
  "message": "Started streaming 2 symbols"
}

Streaming Configuration

Variable Description Default
DATA_COLLECTION_STREAMING_BUFFER_SIZE Candles before flush 1000
DATA_COLLECTION_STREAMING_FLUSH_INTERVAL Seconds between flushes 60
DATA_COLLECTION_STREAMING_MAX_RECONNECTS Max reconnect attempts 5

Example API Calls

# Health check
curl http://localhost:8002/api/v1/health

# Sync data
curl -X POST http://localhost:8002/api/v1/sync \
  -H "Content-Type: application/json" \
  -d '{
    "symbols": ["BTC/USDT:USDT"],
    "start_date": "2024-01-01",
    "end_date": "2024-01-31",
    "timeframe": "1h",
    "exchange": "binance_futures"
  }'

# Check freshness
curl "http://localhost:8002/api/v1/freshness?symbols=BTC/USDT:USDT&symbols=ETH/USDT:USDT"

# List symbols
curl "http://localhost:8002/api/v1/symbols?exchange=binance_futures"

Docker

Build

# Build from workspace root
docker build -f services/data-collection/Dockerfile -t data-collection:latest .

Run

docker run -p 8002:8002 \
  -e DATA_COLLECTION_ARCTIC_S3_BUCKET=my-bucket \
  -e DATA_COLLECTION_EXCHANGES='{"binance_futures":{"name":"binance","type":"futures","api_key":"xxx","api_secret":"xxx"}}' \
  data-collection:latest

Testing

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov=tradai.data_collection

# Run specific test file
uv run pytest tests/unit/test_service.py

# Run by marker
uv run pytest -m unit
uv run pytest -m integration

Architecture

This service follows the TradAI 3-layer architecture:

src/tradai/data_collection/
├── api/              # Presentation layer
│   ├── routes.py     # API endpoints (/sync, /freshness, /symbols)
│   ├── schemas.py    # Request/response models
│   └── dependencies.py
├── core/             # Business logic
│   ├── service.py    # DataCollectionService
│   ├── settings.py   # Configuration (Settings + ExchangeConfig)
│   └── entities.py   # SyncResult, SymbolFreshness
├── infrastructure/   # External adapters
│   └── health_checkers.py
├── cli.py            # Typer CLI (sync, list-symbols, check-freshness)
└── main.py           # FastAPI app factory

Dependencies

  • tradai-common: BaseService, logging, health checks
  • tradai-data: CCXTRepository, ArcticAdapter, OHLCV entities

License

Proprietary - TradAI