Skip to content

tradai-data

Market data collection, storage, and streaming for the TradAI platform.

from tradai.data.core.entities import OHLCVData, SymbolList, Timeframe, DateRange
from tradai.data.core.repositories import DataRepository
from tradai.data.infrastructure.adapters import ArcticAdapter

Core Entities

SymbolList

Validated collection of trading symbols.

from tradai.data.core.entities import SymbolList

# Create from list of strings
symbols = SymbolList.from_input(["BTC/USDT:USDT", "ETH/USDT:USDT"])

# Access symbols
print(len(symbols))           # 2
print(symbols.symbols)        # frozenset of symbols

Methods:

Method Description
from_input(symbols) Create from list of symbol strings

Timeframe

Candle timeframe with validation.

from tradai.data.core.entities import Timeframe

# Parse from string
tf = Timeframe.parse("1h")

# Access properties
print(tf.value)       # "1h"

Supported Values: - 1m, 3m, 5m, 15m, 30m - Minutes - 1h, 2h, 4h, 6h, 8h, 12h - Hours - 1d, 3d, 1w, 1M - Days/Weeks/Months


DateRange

Time range for data queries.

from tradai.data.core.entities import DateRange
from datetime import datetime, timedelta

# Create from datetime objects
date_range = DateRange(
    start=datetime(2024, 1, 1),
    end=datetime(2024, 6, 1)
)

# Create from strings
date_range = DateRange.from_strings("2024-01-01", "2024-06-01")

# Create relative range
end = datetime.now()
start = end - timedelta(days=30)
date_range = DateRange(start=start, end=end)

# Access properties
print(date_range.duration_days)    # Total duration in days

OHLCVData

OHLCV candlestick data container.

from tradai.data.core.entities import OHLCVData

# Access data
df = ohlcv.to_dataframe()      # Pandas DataFrame
print(ohlcv.row_count)         # Number of candles

# DataFrame columns:
# - timestamp (datetime)
# - open (float)
# - high (float)
# - low (float)
# - close (float)
# - volume (float)

Repository Protocols

DataRepository

Protocol for synchronous OHLCV data fetching.

from tradai.data.core.repositories import DataRepository
from typing import Protocol

class DataRepository(Protocol):
    def fetch_ohlcv(
        self,
        symbols: SymbolList,
        date_range: DateRange,
        timeframe: Timeframe
    ) -> OHLCVData: ...

AsyncDataRepository

Protocol for async data fetching.

from tradai.data.core.repositories import AsyncDataRepository

class AsyncDataRepository(Protocol):
    async def fetch_ohlcv(
        self,
        symbols: SymbolList,
        date_range: DateRange,
        timeframe: Timeframe
    ) -> OHLCVData: ...

Exchange Repositories

CCXTRepository

Synchronous CCXT-based data fetching.

from tradai.common import ExchangeConfig, TradingMode
from tradai.data.infrastructure.repositories import CCXTRepository
from tradai.data.core.entities import SymbolList, Timeframe, DateRange

# Create exchange config
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)

# Create repository
repo = CCXTRepository(config=config)

# Fetch data
symbols = SymbolList.from_input(["BTC/USDT:USDT"])
timeframe = Timeframe.parse("1h")
date_range = DateRange.from_strings("2024-01-01", "2024-01-31")

ohlcv = repo.fetch_ohlcv(symbols, date_range, timeframe)
print(f"Fetched {ohlcv.row_count} candles")

Supported Exchanges: - binance - Binance Spot - binance with TradingMode.FUTURES - Binance Futures - hyperliquid - Hyperliquid - Other CCXT-supported exchanges


AsyncCCXTRepository

Async version for concurrent fetching.

from tradai.data.infrastructure.repositories import AsyncCCXTRepository
import asyncio

async def fetch_data():
    config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
    repo = AsyncCCXTRepository(config=config)

    ohlcv = await repo.fetch_ohlcv(symbols, date_range, timeframe)
    return ohlcv

# Run async
data = asyncio.run(fetch_data())

CCXTProRepository

WebSocket-based real-time data via CCXT Pro.

from tradai.data.infrastructure.repositories import CCXTProRepository

config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
repo = CCXTProRepository(config=config)

# Watch real-time OHLCV
async for candle in repo.watch_ohlcv(symbols, timeframe):
    print(f"New candle: {candle}")

Storage Adapters

ArcticAdapter

High-performance time-series storage on S3 using ArcticDB.

from tradai.data.infrastructure.adapters import ArcticAdapter
from datetime import datetime, UTC

# Create adapter (S3 storage)
adapter = ArcticAdapter(bucket="my-bucket", library_name="futures")

# Save data
adapter.save(ohlcv, symbols, datetime.now(UTC))

# Load data
loaded = adapter.load(symbols, date_range)
print(f"Loaded {loaded.row_count} rows")

Features: - Versioned data storage - Efficient range queries - Compression - S3-native storage


InMemoryAdapter

In-memory storage for testing.

from tradai.data.infrastructure.adapters import InMemoryAdapter

adapter = InMemoryAdapter()

# Same interface as ArcticAdapter
adapter.save(ohlcv, symbols, datetime.now(UTC))
loaded = adapter.load(symbols, date_range)

Resilient Repositories

ResilientDataRepository

Wrapper with retry and circuit breaker protection.

from tradai.data.infrastructure.repositories import ResilientDataRepository, CCXTRepository
from tradai.common import CircuitBreaker, CircuitBreakerConfig

config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)

repo = ResilientDataRepository(
    repository=CCXTRepository(config=config),
    circuit_breaker=CircuitBreaker(
        config=CircuitBreakerConfig(
            failure_threshold=5,
            recovery_timeout=30.0
        )
    )
)

# Automatic retry on failure with circuit breaker
ohlcv = repo.fetch_ohlcv(symbols, date_range, timeframe)

Streaming

WebSocketDataCollector

Real-time data streaming with buffered storage.

from tradai.data.infrastructure.repositories import CCXTProRepository
from tradai.data.infrastructure.adapters import ArcticAdapter
from tradai.data.core.streaming_service import WebSocketDataCollector
from tradai.data.core.streaming_entities import StreamConfig

# Setup components
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
repo = CCXTProRepository(config=config)
adapter = ArcticAdapter(bucket="my-bucket", library_name="futures")

# Create stream config
stream_config = StreamConfig.create(
    symbols=symbols,
    timeframe=timeframe,
    buffer_size=100
)

# Start collector
collector = WebSocketDataCollector(
    repository=repo,
    adapter=adapter,
    config=stream_config
)
await collector.start()

StreamConfig

Configuration for WebSocket streaming.

from tradai.data.core.streaming_entities import StreamConfig, StreamState

config = StreamConfig.create(
    symbols=symbols,
    timeframe=timeframe,
    buffer_size=100,
    max_reconnect_attempts=5,
    reconnect_delay_seconds=1.0
)

StreamState

Connection state enum.

from tradai.data.core.streaming_entities import StreamState

# Available states
StreamState.CONNECTING
StreamState.CONNECTED
StreamState.RECONNECTING
StreamState.DISCONNECTED
StreamState.ERROR
StreamState.STOPPED

Data Quality

DataQualityService

Validate data quality before use.

from tradai.data.quality.service import DataQualityService

# Create service with adapter
service = DataQualityService(adapter=adapter)

# Run quality checks
reports = service.check(symbols, date_range, timeframe)

for report in reports:
    if report.has_issues:
        print(f"Symbol: {report.symbol}")
        for issue in report.issues:
            print(f"  {issue.type}: {issue.description}")

GapDetector

Detect missing candles in time series.

from tradai.data.quality.detectors import GapDetector

detector = GapDetector()

# Detect gaps in DataFrame
gaps = detector.detect(df, timeframe, symbol="BTC/USDT:USDT")

for gap in gaps:
    print(f"Gap: {gap.start} to {gap.end} ({gap.missing_candles} candles)")

AnomalyDetector

Detect price/volume anomalies.

from tradai.data.quality.detectors import AnomalyDetector

detector = AnomalyDetector(
    price_spike_threshold=0.1,    # 10% price spike
    min_volume_threshold=0.0      # Minimum volume
)

anomalies = detector.detect(df, symbol="BTC/USDT:USDT")

for anomaly in anomalies:
    print(f"Anomaly at {anomaly.timestamp}: {anomaly.type}")

Anomaly Types: - Price spikes - Zero volumes - Invalid OHLC (high < low, etc.) - Duplicate timestamps


Usage Examples

Complete Data Pipeline

from tradai.common import ExchangeConfig, TradingMode
from tradai.data.infrastructure.repositories import CCXTRepository
from tradai.data.infrastructure.adapters import ArcticAdapter
from tradai.data.quality.service import DataQualityService
from tradai.data.core.entities import SymbolList, Timeframe, DateRange
from datetime import datetime, UTC

# 1. Create components
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
exchange_repo = CCXTRepository(config=config)
storage = ArcticAdapter(bucket="my-bucket", library_name="futures")
quality_service = DataQualityService(adapter=storage)

# 2. Fetch from exchange
symbols = SymbolList.from_input(["BTC/USDT:USDT"])
timeframe = Timeframe.parse("1h")
date_range = DateRange.from_strings("2024-01-01", "2024-03-31")

ohlcv = exchange_repo.fetch_ohlcv(symbols, date_range, timeframe)
print(f"Fetched {ohlcv.row_count} candles")

# 3. Store to ArcticDB
storage.save(ohlcv, symbols, datetime.now(UTC))
print("Stored to ArcticDB")

# 4. Validate quality
reports = quality_service.check(symbols, date_range, timeframe)
for report in reports:
    if report.has_issues:
        print(f"Quality issues: {len(report.issues)}")

# 5. Read back
loaded = storage.load(symbols, date_range)
print(f"Loaded {loaded.row_count} rows")

Multi-Symbol Fetch

import asyncio
from tradai.common import ExchangeConfig, TradingMode
from tradai.data.infrastructure.repositories import AsyncCCXTRepository
from tradai.data.core.entities import SymbolList, Timeframe, DateRange

async def fetch_multiple_symbols():
    config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
    repo = AsyncCCXTRepository(config=config)

    # All symbols in one call
    symbols = SymbolList.from_input([
        "BTC/USDT:USDT",
        "ETH/USDT:USDT",
        "SOL/USDT:USDT",
    ])

    timeframe = Timeframe.parse("1h")
    date_range = DateRange.from_strings("2024-01-01", "2024-01-31")

    ohlcv = await repo.fetch_ohlcv(symbols, date_range, timeframe)
    print(f"Fetched {ohlcv.row_count} candles for {len(symbols)} symbols")

    return ohlcv

# Run
data = asyncio.run(fetch_multiple_symbols())

Resilient Data Fetching

Production-grade fetching with circuit breaker and retries:

from datetime import datetime, timedelta
import logging

from tradai.common import (
    ExchangeConfig,
    TradingMode,
    CircuitBreaker,
    CircuitBreakerConfig,
    LoggerMixin,
)
from tradai.data.infrastructure.repositories import (
    CCXTRepository,
    ResilientDataRepository,
)
from tradai.data.core.entities import SymbolList, Timeframe, DateRange


class DataFetcher(LoggerMixin):
    """Production data fetcher with resilience patterns."""

    def __init__(self, exchange: str = "binance"):
        self.config = ExchangeConfig(
            name=exchange,
            trading_mode=TradingMode.FUTURES,
        )

        # Create resilient repository with circuit breaker
        base_repo = CCXTRepository(config=self.config)
        self.repository = ResilientDataRepository(
            repository=base_repo,
            circuit_breaker=CircuitBreaker(
                config=CircuitBreakerConfig(
                    failure_threshold=5,     # Open after 5 failures
                    recovery_timeout=30.0,   # Try recovery after 30s
                    half_open_requests=2,    # Allow 2 requests in half-open
                )
            ),
            max_retries=3,
            retry_delay=1.0,
        )

    def fetch_historical(
        self,
        symbols: list[str],
        days: int = 30,
        timeframe: str = "1h",
    ) -> dict:
        """Fetch historical data with error handling."""
        symbol_list = SymbolList.from_input(symbols)
        tf = Timeframe.parse(timeframe)
        date_range = DateRange(
            start=datetime.now() - timedelta(days=days),
            end=datetime.now(),
        )

        self.logger.info(
            f"Fetching {days} days of {timeframe} data for {len(symbols)} symbols"
        )

        try:
            ohlcv = self.repository.fetch_ohlcv(symbol_list, date_range, tf)
            self.logger.info(f"Successfully fetched {ohlcv.row_count} candles")
            return {
                "success": True,
                "row_count": ohlcv.row_count,
                "dataframe": ohlcv.to_dataframe(),
            }
        except Exception as e:
            self.logger.error(f"Failed to fetch data: {e}")
            return {
                "success": False,
                "error": str(e),
            }


# Usage
fetcher = DataFetcher(exchange="binance")
result = fetcher.fetch_historical(
    symbols=["BTC/USDT:USDT", "ETH/USDT:USDT"],
    days=90,
    timeframe="4h",
)

if result["success"]:
    df = result["dataframe"]
    print(f"Got {len(df)} rows")

Real-Time WebSocket Streaming

Set up live data streaming with automatic reconnection:

import asyncio
from datetime import datetime, UTC

from tradai.common import ExchangeConfig, TradingMode, LoggerMixin
from tradai.data.infrastructure.repositories import CCXTProRepository
from tradai.data.infrastructure.adapters import ArcticAdapter
from tradai.data.core.streaming_service import WebSocketDataCollector
from tradai.data.core.streaming_entities import StreamConfig, StreamState
from tradai.data.core.entities import SymbolList, Timeframe


class LiveDataCollector(LoggerMixin):
    """Real-time data collection service."""

    def __init__(
        self,
        symbols: list[str],
        timeframe: str = "1m",
        storage_bucket: str = "tradai-data",
    ):
        self.symbols = SymbolList.from_input(symbols)
        self.timeframe = Timeframe.parse(timeframe)

        # WebSocket repository
        config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
        self.ws_repo = CCXTProRepository(config=config)

        # Storage adapter
        self.storage = ArcticAdapter(
            bucket=storage_bucket,
            library_name="live_candles",
        )

        # Stream configuration
        self.stream_config = StreamConfig.create(
            symbols=self.symbols,
            timeframe=self.timeframe,
            buffer_size=100,  # Buffer 100 candles before flush
            max_reconnect_attempts=10,
            reconnect_delay_seconds=1.0,
        )

        self.collector = None
        self._running = False

    async def start(self):
        """Start live data collection."""
        self.logger.info(
            f"Starting live collection for {len(self.symbols.symbols)} symbols"
        )

        self.collector = WebSocketDataCollector(
            repository=self.ws_repo,
            adapter=self.storage,
            config=self.stream_config,
        )

        # Register callbacks
        self.collector.on_state_change = self._handle_state_change
        self.collector.on_candle = self._handle_candle
        self.collector.on_error = self._handle_error

        self._running = True
        await self.collector.start()

    async def stop(self):
        """Stop collection gracefully."""
        self._running = False
        if self.collector:
            await self.collector.stop()
        self.logger.info("Live collection stopped")

    def _handle_state_change(self, old_state: StreamState, new_state: StreamState):
        """Handle connection state changes."""
        self.logger.info(f"Stream state: {old_state.name} -> {new_state.name}")

        if new_state == StreamState.RECONNECTING:
            self.logger.warning("Connection lost, attempting reconnect...")
        elif new_state == StreamState.CONNECTED:
            self.logger.info("Successfully connected to WebSocket")
        elif new_state == StreamState.ERROR:
            self.logger.error("Stream encountered an error")

    def _handle_candle(self, symbol: str, candle: dict):
        """Process incoming candles."""
        self.logger.debug(
            f"New candle: {symbol} @ {candle['timestamp']} "
            f"O={candle['open']:.2f} H={candle['high']:.2f} "
            f"L={candle['low']:.2f} C={candle['close']:.2f}"
        )

    def _handle_error(self, error: Exception):
        """Handle stream errors."""
        self.logger.error(f"Stream error: {error}")


# Usage
async def main():
    collector = LiveDataCollector(
        symbols=["BTC/USDT:USDT", "ETH/USDT:USDT"],
        timeframe="1m",
        storage_bucket="my-tradai-bucket",
    )

    try:
        await collector.start()
        # Run for 1 hour
        await asyncio.sleep(3600)
    finally:
        await collector.stop()


if __name__ == "__main__":
    asyncio.run(main())

See Also

Related SDKs:

Services:

Architecture:

Lambdas:

CLI: