tradai-data¶
Market data collection, storage, and streaming for the TradAI platform.
from tradai.data.core.entities import OHLCVData, SymbolList, Timeframe, DateRange
from tradai.data.core.repositories import DataRepository
from tradai.data.infrastructure.adapters import ArcticAdapter
Core Entities¶
SymbolList¶
Validated collection of trading symbols.
from tradai.data.core.entities import SymbolList
# Create from list of strings
symbols = SymbolList.from_input(["BTC/USDT:USDT", "ETH/USDT:USDT"])
# Access symbols
print(len(symbols)) # 2
print(symbols.symbols) # frozenset of symbols
Methods:
| Method | Description |
|---|---|
from_input(symbols) | Create from list of symbol strings |
Timeframe¶
Candle timeframe with validation.
from tradai.data.core.entities import Timeframe
# Parse from string
tf = Timeframe.parse("1h")
# Access properties
print(tf.value) # "1h"
Supported Values: - 1m, 3m, 5m, 15m, 30m - Minutes - 1h, 2h, 4h, 6h, 8h, 12h - Hours - 1d, 3d, 1w, 1M - Days/Weeks/Months
DateRange¶
Time range for data queries.
from tradai.data.core.entities import DateRange
from datetime import datetime, timedelta
# Create from datetime objects
date_range = DateRange(
start=datetime(2024, 1, 1),
end=datetime(2024, 6, 1)
)
# Create from strings
date_range = DateRange.from_strings("2024-01-01", "2024-06-01")
# Create relative range
end = datetime.now()
start = end - timedelta(days=30)
date_range = DateRange(start=start, end=end)
# Access properties
print(date_range.duration_days) # Total duration in days
OHLCVData¶
OHLCV candlestick data container.
from tradai.data.core.entities import OHLCVData
# Access data
df = ohlcv.to_dataframe() # Pandas DataFrame
print(ohlcv.row_count) # Number of candles
# DataFrame columns:
# - timestamp (datetime)
# - open (float)
# - high (float)
# - low (float)
# - close (float)
# - volume (float)
Repository Protocols¶
DataRepository¶
Protocol for synchronous OHLCV data fetching.
from tradai.data.core.repositories import DataRepository
from typing import Protocol
class DataRepository(Protocol):
def fetch_ohlcv(
self,
symbols: SymbolList,
date_range: DateRange,
timeframe: Timeframe
) -> OHLCVData: ...
AsyncDataRepository¶
Protocol for async data fetching.
from tradai.data.core.repositories import AsyncDataRepository
class AsyncDataRepository(Protocol):
async def fetch_ohlcv(
self,
symbols: SymbolList,
date_range: DateRange,
timeframe: Timeframe
) -> OHLCVData: ...
Exchange Repositories¶
CCXTRepository¶
Synchronous CCXT-based data fetching.
from tradai.common import ExchangeConfig, TradingMode
from tradai.data.infrastructure.repositories import CCXTRepository
from tradai.data.core.entities import SymbolList, Timeframe, DateRange
# Create exchange config
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
# Create repository
repo = CCXTRepository(config=config)
# Fetch data
symbols = SymbolList.from_input(["BTC/USDT:USDT"])
timeframe = Timeframe.parse("1h")
date_range = DateRange.from_strings("2024-01-01", "2024-01-31")
ohlcv = repo.fetch_ohlcv(symbols, date_range, timeframe)
print(f"Fetched {ohlcv.row_count} candles")
Supported Exchanges: - binance - Binance Spot - binance with TradingMode.FUTURES - Binance Futures - hyperliquid - Hyperliquid - Other CCXT-supported exchanges
AsyncCCXTRepository¶
Async version for concurrent fetching.
from tradai.data.infrastructure.repositories import AsyncCCXTRepository
import asyncio
async def fetch_data():
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
repo = AsyncCCXTRepository(config=config)
ohlcv = await repo.fetch_ohlcv(symbols, date_range, timeframe)
return ohlcv
# Run async
data = asyncio.run(fetch_data())
CCXTProRepository¶
WebSocket-based real-time data via CCXT Pro.
from tradai.data.infrastructure.repositories import CCXTProRepository
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
repo = CCXTProRepository(config=config)
# Watch real-time OHLCV
async for candle in repo.watch_ohlcv(symbols, timeframe):
print(f"New candle: {candle}")
Storage Adapters¶
ArcticAdapter¶
High-performance time-series storage on S3 using ArcticDB.
from tradai.data.infrastructure.adapters import ArcticAdapter
from datetime import datetime, UTC
# Create adapter (S3 storage)
adapter = ArcticAdapter(bucket="my-bucket", library_name="futures")
# Save data
adapter.save(ohlcv, symbols, datetime.now(UTC))
# Load data
loaded = adapter.load(symbols, date_range)
print(f"Loaded {loaded.row_count} rows")
Features: - Versioned data storage - Efficient range queries - Compression - S3-native storage
InMemoryAdapter¶
In-memory storage for testing.
from tradai.data.infrastructure.adapters import InMemoryAdapter
adapter = InMemoryAdapter()
# Same interface as ArcticAdapter
adapter.save(ohlcv, symbols, datetime.now(UTC))
loaded = adapter.load(symbols, date_range)
Resilient Repositories¶
ResilientDataRepository¶
Wrapper with retry and circuit breaker protection.
from tradai.data.infrastructure.repositories import ResilientDataRepository, CCXTRepository
from tradai.common import CircuitBreaker, CircuitBreakerConfig
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
repo = ResilientDataRepository(
repository=CCXTRepository(config=config),
circuit_breaker=CircuitBreaker(
config=CircuitBreakerConfig(
failure_threshold=5,
recovery_timeout=30.0
)
)
)
# Automatic retry on failure with circuit breaker
ohlcv = repo.fetch_ohlcv(symbols, date_range, timeframe)
Streaming¶
WebSocketDataCollector¶
Real-time data streaming with buffered storage.
from tradai.data.infrastructure.repositories import CCXTProRepository
from tradai.data.infrastructure.adapters import ArcticAdapter
from tradai.data.core.streaming_service import WebSocketDataCollector
from tradai.data.core.streaming_entities import StreamConfig
# Setup components
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
repo = CCXTProRepository(config=config)
adapter = ArcticAdapter(bucket="my-bucket", library_name="futures")
# Create stream config
stream_config = StreamConfig.create(
symbols=symbols,
timeframe=timeframe,
buffer_size=100
)
# Start collector
collector = WebSocketDataCollector(
repository=repo,
adapter=adapter,
config=stream_config
)
await collector.start()
StreamConfig¶
Configuration for WebSocket streaming.
from tradai.data.core.streaming_entities import StreamConfig, StreamState
config = StreamConfig.create(
symbols=symbols,
timeframe=timeframe,
buffer_size=100,
max_reconnect_attempts=5,
reconnect_delay_seconds=1.0
)
StreamState¶
Connection state enum.
from tradai.data.core.streaming_entities import StreamState
# Available states
StreamState.CONNECTING
StreamState.CONNECTED
StreamState.RECONNECTING
StreamState.DISCONNECTED
StreamState.ERROR
StreamState.STOPPED
Data Quality¶
DataQualityService¶
Validate data quality before use.
from tradai.data.quality.service import DataQualityService
# Create service with adapter
service = DataQualityService(adapter=adapter)
# Run quality checks
reports = service.check(symbols, date_range, timeframe)
for report in reports:
if report.has_issues:
print(f"Symbol: {report.symbol}")
for issue in report.issues:
print(f" {issue.type}: {issue.description}")
GapDetector¶
Detect missing candles in time series.
from tradai.data.quality.detectors import GapDetector
detector = GapDetector()
# Detect gaps in DataFrame
gaps = detector.detect(df, timeframe, symbol="BTC/USDT:USDT")
for gap in gaps:
print(f"Gap: {gap.start} to {gap.end} ({gap.missing_candles} candles)")
AnomalyDetector¶
Detect price/volume anomalies.
from tradai.data.quality.detectors import AnomalyDetector
detector = AnomalyDetector(
price_spike_threshold=0.1, # 10% price spike
min_volume_threshold=0.0 # Minimum volume
)
anomalies = detector.detect(df, symbol="BTC/USDT:USDT")
for anomaly in anomalies:
print(f"Anomaly at {anomaly.timestamp}: {anomaly.type}")
Anomaly Types: - Price spikes - Zero volumes - Invalid OHLC (high < low, etc.) - Duplicate timestamps
Usage Examples¶
Complete Data Pipeline¶
from tradai.common import ExchangeConfig, TradingMode
from tradai.data.infrastructure.repositories import CCXTRepository
from tradai.data.infrastructure.adapters import ArcticAdapter
from tradai.data.quality.service import DataQualityService
from tradai.data.core.entities import SymbolList, Timeframe, DateRange
from datetime import datetime, UTC
# 1. Create components
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
exchange_repo = CCXTRepository(config=config)
storage = ArcticAdapter(bucket="my-bucket", library_name="futures")
quality_service = DataQualityService(adapter=storage)
# 2. Fetch from exchange
symbols = SymbolList.from_input(["BTC/USDT:USDT"])
timeframe = Timeframe.parse("1h")
date_range = DateRange.from_strings("2024-01-01", "2024-03-31")
ohlcv = exchange_repo.fetch_ohlcv(symbols, date_range, timeframe)
print(f"Fetched {ohlcv.row_count} candles")
# 3. Store to ArcticDB
storage.save(ohlcv, symbols, datetime.now(UTC))
print("Stored to ArcticDB")
# 4. Validate quality
reports = quality_service.check(symbols, date_range, timeframe)
for report in reports:
if report.has_issues:
print(f"Quality issues: {len(report.issues)}")
# 5. Read back
loaded = storage.load(symbols, date_range)
print(f"Loaded {loaded.row_count} rows")
Multi-Symbol Fetch¶
import asyncio
from tradai.common import ExchangeConfig, TradingMode
from tradai.data.infrastructure.repositories import AsyncCCXTRepository
from tradai.data.core.entities import SymbolList, Timeframe, DateRange
async def fetch_multiple_symbols():
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
repo = AsyncCCXTRepository(config=config)
# All symbols in one call
symbols = SymbolList.from_input([
"BTC/USDT:USDT",
"ETH/USDT:USDT",
"SOL/USDT:USDT",
])
timeframe = Timeframe.parse("1h")
date_range = DateRange.from_strings("2024-01-01", "2024-01-31")
ohlcv = await repo.fetch_ohlcv(symbols, date_range, timeframe)
print(f"Fetched {ohlcv.row_count} candles for {len(symbols)} symbols")
return ohlcv
# Run
data = asyncio.run(fetch_multiple_symbols())
Resilient Data Fetching¶
Production-grade fetching with circuit breaker and retries:
from datetime import datetime, timedelta
import logging
from tradai.common import (
ExchangeConfig,
TradingMode,
CircuitBreaker,
CircuitBreakerConfig,
LoggerMixin,
)
from tradai.data.infrastructure.repositories import (
CCXTRepository,
ResilientDataRepository,
)
from tradai.data.core.entities import SymbolList, Timeframe, DateRange
class DataFetcher(LoggerMixin):
"""Production data fetcher with resilience patterns."""
def __init__(self, exchange: str = "binance"):
self.config = ExchangeConfig(
name=exchange,
trading_mode=TradingMode.FUTURES,
)
# Create resilient repository with circuit breaker
base_repo = CCXTRepository(config=self.config)
self.repository = ResilientDataRepository(
repository=base_repo,
circuit_breaker=CircuitBreaker(
config=CircuitBreakerConfig(
failure_threshold=5, # Open after 5 failures
recovery_timeout=30.0, # Try recovery after 30s
half_open_requests=2, # Allow 2 requests in half-open
)
),
max_retries=3,
retry_delay=1.0,
)
def fetch_historical(
self,
symbols: list[str],
days: int = 30,
timeframe: str = "1h",
) -> dict:
"""Fetch historical data with error handling."""
symbol_list = SymbolList.from_input(symbols)
tf = Timeframe.parse(timeframe)
date_range = DateRange(
start=datetime.now() - timedelta(days=days),
end=datetime.now(),
)
self.logger.info(
f"Fetching {days} days of {timeframe} data for {len(symbols)} symbols"
)
try:
ohlcv = self.repository.fetch_ohlcv(symbol_list, date_range, tf)
self.logger.info(f"Successfully fetched {ohlcv.row_count} candles")
return {
"success": True,
"row_count": ohlcv.row_count,
"dataframe": ohlcv.to_dataframe(),
}
except Exception as e:
self.logger.error(f"Failed to fetch data: {e}")
return {
"success": False,
"error": str(e),
}
# Usage
fetcher = DataFetcher(exchange="binance")
result = fetcher.fetch_historical(
symbols=["BTC/USDT:USDT", "ETH/USDT:USDT"],
days=90,
timeframe="4h",
)
if result["success"]:
df = result["dataframe"]
print(f"Got {len(df)} rows")
Real-Time WebSocket Streaming¶
Set up live data streaming with automatic reconnection:
import asyncio
from datetime import datetime, UTC
from tradai.common import ExchangeConfig, TradingMode, LoggerMixin
from tradai.data.infrastructure.repositories import CCXTProRepository
from tradai.data.infrastructure.adapters import ArcticAdapter
from tradai.data.core.streaming_service import WebSocketDataCollector
from tradai.data.core.streaming_entities import StreamConfig, StreamState
from tradai.data.core.entities import SymbolList, Timeframe
class LiveDataCollector(LoggerMixin):
"""Real-time data collection service."""
def __init__(
self,
symbols: list[str],
timeframe: str = "1m",
storage_bucket: str = "tradai-data",
):
self.symbols = SymbolList.from_input(symbols)
self.timeframe = Timeframe.parse(timeframe)
# WebSocket repository
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
self.ws_repo = CCXTProRepository(config=config)
# Storage adapter
self.storage = ArcticAdapter(
bucket=storage_bucket,
library_name="live_candles",
)
# Stream configuration
self.stream_config = StreamConfig.create(
symbols=self.symbols,
timeframe=self.timeframe,
buffer_size=100, # Buffer 100 candles before flush
max_reconnect_attempts=10,
reconnect_delay_seconds=1.0,
)
self.collector = None
self._running = False
async def start(self):
"""Start live data collection."""
self.logger.info(
f"Starting live collection for {len(self.symbols.symbols)} symbols"
)
self.collector = WebSocketDataCollector(
repository=self.ws_repo,
adapter=self.storage,
config=self.stream_config,
)
# Register callbacks
self.collector.on_state_change = self._handle_state_change
self.collector.on_candle = self._handle_candle
self.collector.on_error = self._handle_error
self._running = True
await self.collector.start()
async def stop(self):
"""Stop collection gracefully."""
self._running = False
if self.collector:
await self.collector.stop()
self.logger.info("Live collection stopped")
def _handle_state_change(self, old_state: StreamState, new_state: StreamState):
"""Handle connection state changes."""
self.logger.info(f"Stream state: {old_state.name} -> {new_state.name}")
if new_state == StreamState.RECONNECTING:
self.logger.warning("Connection lost, attempting reconnect...")
elif new_state == StreamState.CONNECTED:
self.logger.info("Successfully connected to WebSocket")
elif new_state == StreamState.ERROR:
self.logger.error("Stream encountered an error")
def _handle_candle(self, symbol: str, candle: dict):
"""Process incoming candles."""
self.logger.debug(
f"New candle: {symbol} @ {candle['timestamp']} "
f"O={candle['open']:.2f} H={candle['high']:.2f} "
f"L={candle['low']:.2f} C={candle['close']:.2f}"
)
def _handle_error(self, error: Exception):
"""Handle stream errors."""
self.logger.error(f"Stream error: {error}")
# Usage
async def main():
collector = LiveDataCollector(
symbols=["BTC/USDT:USDT", "ETH/USDT:USDT"],
timeframe="1m",
storage_bucket="my-tradai-bucket",
)
try:
await collector.start()
# Run for 1 hour
await asyncio.sleep(3600)
finally:
await collector.stop()
if __name__ == "__main__":
asyncio.run(main())
See Also¶
Related SDKs:
- tradai-common - Base utilities (ExchangeConfig, CircuitBreaker)
- tradai-strategy - Strategy framework using data
Services:
- Data Collection Service - REST API for data fetching
- Data Collection README - Streaming API
Architecture:
- Architecture Overview - Streaming data flow diagram
- DESIGN.md - Design decisions and patterns
Lambdas:
- data-collection-proxy - Lambda proxy to data service
CLI:
- CLI Reference -
tradai datacommands