DataCollection¶
Market data collection service for fetching and storing OHLCV data from exchanges.
Service Info¶
| Property | Value |
|---|---|
| Version | 0.1.0 |
| Type | full |
| Port | 8002 |
| Author | TradAI Team |
Installation¶
Configuration¶
Required Environment Variables¶
| Variable | Description | Example |
|---|---|---|
DATA_COLLECTION_ARCTIC_S3_BUCKET | S3 bucket for ArcticDB storage | tradai-data-prod |
DATA_COLLECTION_EXCHANGES | Exchange configs as JSON | See below |
Optional Environment Variables¶
| Variable | Description | Default |
|---|---|---|
DATA_COLLECTION_HOST | Server host | 0.0.0.0 |
DATA_COLLECTION_PORT | Server port | 8002 |
DATA_COLLECTION_DEBUG | Debug mode | false |
DATA_COLLECTION_LOG_LEVEL | Log level | INFO |
DATA_COLLECTION_ARCTIC_LIBRARY | ArcticDB library name | futures |
Exchange Configuration Format¶
export DATA_COLLECTION_EXCHANGES='{
"binance_futures": {
"name": "binance",
"type": "futures",
"api_key": "your-api-key",
"api_secret": "your-api-secret"
}
}'
Multiple exchanges can be configured:
export DATA_COLLECTION_EXCHANGES='{
"binance_futures": {"name": "binance", "type": "futures", "api_key": "...", "api_secret": "..."},
"binance_spot": {"name": "binance", "type": "spot", "api_key": "...", "api_secret": "..."}
}'
Usage¶
CLI Commands¶
Start API Server¶
data_collection serve
data_collection serve --port 8080
data_collection serve --reload # Development mode
Sync Data¶
# Sync specific symbols
data_collection sync BTC/USDT:USDT ETH/USDT:USDT \
--start 2024-01-01 --end 2024-01-31
# With options
data_collection sync BTC/USDT:USDT \
--start 2024-01-01 --end 2024-01-31 \
--timeframe 1h \
--exchange binance_futures
# Incremental sync (only new data)
data_collection sync BTC/USDT:USDT \
--start 2024-01-01 --end 2024-01-31 \
--incremental
List Available Symbols¶
data_collection list-symbols
data_collection list-symbols --exchange binance_futures
data_collection list-symbols --limit 10
Check Data Freshness¶
data_collection check-freshness BTC/USDT:USDT ETH/USDT:USDT
data_collection check-freshness BTC/USDT:USDT --threshold 48 # 48h stale threshold
Health & Config¶
API Endpoints¶
| Method | Path | Description |
|---|---|---|
| GET | /api/v1/health | Health check with dependency status |
| POST | /api/v1/sync | Full sync for symbols |
| POST | /api/v1/sync/incremental | Incremental sync (new data only) |
| GET | /api/v1/freshness | Check data freshness for symbols |
| GET | /api/v1/symbols | List available symbols from exchange |
| GET | /docs | OpenAPI documentation |
Streaming Endpoints (4 endpoints)¶
Real-time market data streaming via WebSocket connections to exchanges.
| Method | Path | Description |
|---|---|---|
| POST | /api/v1/streams | Start a new data stream |
| GET | /api/v1/streams | List active streams |
| GET | /api/v1/streams/{stream_id} | Get stream status |
| DELETE | /api/v1/streams/{stream_id} | Stop a stream |
Start Stream Request¶
curl -X POST http://localhost:8002/api/v1/streams \
-H "Content-Type: application/json" \
-d '{
"symbols": ["BTC/USDT:USDT", "ETH/USDT:USDT"],
"exchange": "binance_futures",
"timeframe": "1m",
"buffer_size": 1000,
"flush_interval": 60
}'
Response:
{
"stream_id": "stream-abc123",
"status": "running",
"symbols": ["BTC/USDT:USDT", "ETH/USDT:USDT"],
"message": "Started streaming 2 symbols"
}
Streaming Configuration¶
| Variable | Description | Default |
|---|---|---|
DATA_COLLECTION_STREAMING_BUFFER_SIZE | Candles before flush | 1000 |
DATA_COLLECTION_STREAMING_FLUSH_INTERVAL | Seconds between flushes | 60 |
DATA_COLLECTION_STREAMING_MAX_RECONNECTS | Max reconnect attempts | 5 |
Example API Calls¶
# Health check
curl http://localhost:8002/api/v1/health
# Sync data
curl -X POST http://localhost:8002/api/v1/sync \
-H "Content-Type: application/json" \
-d '{
"symbols": ["BTC/USDT:USDT"],
"start_date": "2024-01-01",
"end_date": "2024-01-31",
"timeframe": "1h",
"exchange": "binance_futures"
}'
# Check freshness
curl "http://localhost:8002/api/v1/freshness?symbols=BTC/USDT:USDT&symbols=ETH/USDT:USDT"
# List symbols
curl "http://localhost:8002/api/v1/symbols?exchange=binance_futures"
Docker¶
Build¶
# Build from workspace root
docker build -f services/data-collection/Dockerfile -t data-collection:latest .
Run¶
docker run -p 8002:8002 \
-e DATA_COLLECTION_ARCTIC_S3_BUCKET=my-bucket \
-e DATA_COLLECTION_EXCHANGES='{"binance_futures":{"name":"binance","type":"futures","api_key":"xxx","api_secret":"xxx"}}' \
data-collection:latest
Testing¶
# Run all tests
uv run pytest
# Run with coverage
uv run pytest --cov=tradai.data_collection
# Run specific test file
uv run pytest tests/unit/test_service.py
# Run by marker
uv run pytest -m unit
uv run pytest -m integration
Architecture¶
This service follows the TradAI 3-layer architecture:
src/tradai/data_collection/
├── api/ # Presentation layer
│ ├── routes.py # API endpoints (/sync, /freshness, /symbols)
│ ├── schemas.py # Request/response models
│ └── dependencies.py
├── core/ # Business logic
│ ├── service.py # DataCollectionService
│ ├── settings.py # Configuration (Settings + ExchangeConfig)
│ └── entities.py # SyncResult, SymbolFreshness
├── infrastructure/ # External adapters
│ └── health_checkers.py
├── cli.py # Typer CLI (sync, list-symbols, check-freshness)
└── main.py # FastAPI app factory
Dependencies¶
- tradai-common: BaseService, logging, health checks
- tradai-data: CCXTRepository, ArcticAdapter, OHLCV entities
License¶
Proprietary - TradAI