Skip to content

TradAI Data Library - Design Documentation

Overview

tradai-data is a clean architecture implementation for market data collection and storage, following SOLID principles and Test-Driven Development (TDD).

Test Coverage: 93% (67 tests passing)

Architecture

3-Layer Clean Architecture

┌─────────────────────────────────────────────────────────┐
│                    Core Layer                            │
│  - Entities (Value Objects)                             │
│  - Repository Interfaces (ABCs)                         │
│  - Services (Business Logic)                            │
│  - NO external dependencies                             │
└─────────────────────────────────────────────────────────┘
                         │ depends on
┌─────────────────────────────────────────────────────────┐
│              Infrastructure Layer                        │
│  - CCXTRepository (CCXT exchange data)                  │
│  - ArcticAdapter (S3 storage)                           │
│  - Concrete implementations                             │
└─────────────────────────────────────────────────────────┘

Core Layer

Entities (Value Objects)

Purpose: Eliminate code duplication through validated, immutable domain objects.

DateRange

  • Eliminates: 6 duplicate validation sites
  • Features:
  • Start/end validation
  • Duration calculations
  • Contains() check
  • Handles datetime/string/pandas Timestamp
  • Test Coverage: 7 tests
from tradai.data.core.entities import DateRange

# From strings
dr = DateRange.from_strings("2024-01-01", "2024-01-31")

# Properties
dr.duration_days  # 30
dr.contains(datetime(2024, 1, 15))  # True

SymbolList

  • Eliminates: 4 duplicate conversions
  • Features:
  • Automatic deduplication
  • Handles string/list inputs
  • Validation (non-empty)
  • Sorted output for APIs
  • Test Coverage: 7 tests
from tradai.data.core.entities import SymbolList

# From various inputs
sl = SymbolList.from_input("BTC/USDT:USDT")
sl = SymbolList.from_input(["BTC/USDT:USDT", "ETH/USDT:USDT"])

# Deduplication automatic
sl = SymbolList.from_input(["BTC/USDT:USDT", "BTC/USDT:USDT"])
len(sl.symbols)  # 1

# To list (sorted)
sl.to_list()  # ['BTC/USDT:USDT', 'ETH/USDT:USDT']

Timeframe

  • Eliminates: 3 fragile regex parsings
  • Features:
  • Parse string to seconds
  • Supports: s/m/h/d/w/mo
  • Complex formats ("1h 30m")
  • Unit conversions
  • Test Coverage: 7 tests
from tradai.data.core.entities import Timeframe

tf = Timeframe.parse("1h")
tf.seconds   # 3600
tf.minutes   # 60

tf = Timeframe.parse("1h 30m")
tf.seconds   # 5400

OHLCVData

  • Features:
  • Validated DataFrame wrapper
  • Required columns enforced
  • Immutable (copies returned)
  • Symbol filtering
  • Date range extraction
  • Test Coverage: 6 tests
from tradai.data.core.entities import OHLCVData

data = OHLCVData.from_dataframe(df)

data.row_count       # Number of candles
data.symbols         # frozenset of symbols
data.date_range      # DateRange object

# Get data for specific symbol
btc_data = data.get_symbol_data("BTC/USDT:USDT")

Repositories (Interfaces)

Purpose: Abstract external dependencies following Dependency Inversion Principle.

DataRepository (ABC)

  • Interface for: Data sources (Binance, mock, etc.)
  • Method: fetch_ohlcv(symbols, date_range, timeframe) -> OHLCVData
  • Test Coverage: 4 tests
from tradai.data.core.repositories import DataRepository

class MyRepository(DataRepository):
    def fetch_ohlcv(self, symbols, date_range, timeframe):
        # Implementation
        return OHLCVData.from_dataframe(df)

DataAdapter (ABC)

  • Interface for: Storage backends (ArcticDB, PostgreSQL, etc.)
  • Methods:
  • save(data, symbols, latest_query_date)
  • load(symbols, date_range) -> OHLCVData
  • exists(symbols) -> dict[str, bool]
  • get_latest_date(symbols) -> dict[str, datetime]
  • Test Coverage: 9 tests
from tradai.data.core.repositories import DataAdapter

class MyAdapter(DataAdapter):
    def save(self, data, symbols, latest_query_date):
        # Store to backend
        pass

    def load(self, symbols, date_range):
        # Load from backend
        return OHLCVData.from_dataframe(df)

    # ... implement exists, get_latest_date

Services (Business Logic)

Purpose: Coordinate repositories and adapters with NO global state.

DataQueryService

  • Features:
  • Query with caching
  • Storage fallback
  • Dependency injection
  • No StaticScope!
  • Test Coverage: 10 tests

Query Flow: 1. Check in-memory cache (if enabled) 2. Try loading from storage adapter 3. Fetch from repository (source) 4. Save to storage adapter 5. Cache in memory

from tradai.common import ExchangeConfig, TradingMode
from tradai.data.core.services import DataQueryService
from tradai.data.infrastructure.repositories import CCXTRepository

# Setup with dependency injection
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
repository = CCXTRepository(config)
service = DataQueryService(
    repository=repository,
    adapter=None,  # Optional
    enable_cache=True
)

# Query with convenient API
data = service.query(
    symbols="BTC/USDT:USDT",
    start_date="2024-01-01",
    end_date="2024-01-31",
    timeframe="1h"
)

# Or with value objects
data = service.query(
    symbols=SymbolList.from_input(["BTC/USDT:USDT"]),
    date_range=DateRange.from_strings("2024-01-01", "2024-01-31"),
    timeframe=Timeframe.parse("1h")
)

DataCollectionService

  • Features:
  • Batch data collection
  • Incremental updates
  • Automatic storage
  • Test Coverage: 5 tests
from tradai.common import ExchangeConfig, TradingMode
from tradai.data.core.services import DataCollectionService
from tradai.data.infrastructure.repositories import CCXTRepository

config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
service = DataCollectionService(
    repository=CCXTRepository(config),
    adapter=my_adapter
)

# Collect and store
service.collect(
    symbols=["BTC/USDT:USDT", "ETH/USDT:USDT"],
    start_date="2024-01-01",
    end_date="2024-01-31",
    timeframe="1h"
)

# Incremental (only new data since last stored date)
service.collect_incremental(
    symbols="BTC/USDT:USDT",
    start_date="2024-01-01",
    end_date="2024-01-31",
    timeframe="1h"
)

Infrastructure Layer

CCXTRepository

Concrete implementation of DataRepository using CCXT. Supports any CCXT-compatible exchange.

  • Features:
  • Fetches from any CCXT-supported exchange (Binance, Hyperliquid, Kraken, etc.)
  • Automatic pagination (exchange-specific limits)
  • Error handling for API failures with partial success support
  • Multi-symbol support
  • Converts CCXT → OHLCVData
  • Test Coverage: 12 tests, 100% coverage
from tradai.common import ExchangeConfig, TradingMode
from tradai.data.infrastructure.repositories import CCXTRepository
from tradai.data.core.entities import SymbolList, DateRange, Timeframe

# Binance Futures
config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
repository = CCXTRepository(config)

# Fetch OHLCV data
data = repository.fetch_ohlcv(
    symbols=SymbolList.from_input(["BTC/USDT:USDT"]),
    date_range=DateRange.from_strings("2024-01-01", "2024-01-31"),
    timeframe=Timeframe.parse("1h")
)

# Hyperliquid DEX with credentials from Secrets Manager
config = ExchangeConfig.from_secret("tradai/prod/hyperliquid", name="hyperliquid")
repo = CCXTRepository(config)

ArcticAdapter (Future)

Note: ArcticDB doesn't have macOS ARM wheels, so this adapter is pending for Linux deployment.

Will implement: - Save to ArcticDB on S3 - Load from ArcticDB - Check existence - Get latest dates

SOLID Principles Applied

Single Responsibility Principle (SRP)

  • Entities: Only data validation and conversions
  • Repositories: Only data fetching/storage
  • Services: Only business logic coordination

Open/Closed Principle (OCP)

  • ABCs: New repositories/adapters can be added without modifying core
  • Value Objects: Extensible through composition

Liskov Substitution Principle (LSP)

  • Any DataRepository implementation is substitutable
  • Any DataAdapter implementation is substitutable
  • All value objects are immutable and consistent

Interface Segregation Principle (ISP)

  • DataRepository: Single focused method (fetch_ohlcv)
  • DataAdapter: Focused methods for storage operations
  • No fat interfaces

Dependency Inversion Principle (DIP)

  • Services depend on: DataRepository, DataAdapter (abstractions)
  • Infrastructure depends on: Same abstractions
  • No dependencies on: Concrete implementations

DRY (Don't Repeat Yourself)

Before Migration

# Date validation (repeated 6 times)
start_date = to_datetime(start_date)
end_date = to_datetime(end_date)
verify_date_range(start_date, end_date)

# Symbol preparation (repeated 4 times)
unique_syms = frozenset((symbols,)) if isinstance(symbols, str) else frozenset(symbols)
if not unique_syms:
    raise ValueError("Symbols cannot be empty.")

# Timeframe parsing (repeated 3 times)
tf_seconds = to_seconds(timeframe)
pattern = r"(\d+)\s*([a-z]+)"
# ... fragile regex

After Migration

# Date validation (1 place)
date_range = DateRange.from_strings("2024-01-01", "2024-01-31")

# Symbol preparation (1 place)
symbols = SymbolList.from_input(["BTC/USDT:USDT"])

# Timeframe parsing (1 place)
timeframe = Timeframe.parse("1h")

Eliminated Patterns

❌ Global State (StaticScope)

# OLD (bad)
scope = StaticScope.instance()  # Global singleton
cache = scope.data_source_cache

# NEW (good)
service = DataQueryService(
    repository=repository,
    adapter=adapter,
    enable_cache=True
)

❌ Dead Code

  • PlaceHolderSource: Incomplete stub removed
  • SourceFactory: Overly complex Enum pattern removed
  • 8 unused imports: Removed
  • 6 commented code blocks: Removed

❌ Tight Coupling

# OLD (bad)
class Binance(DataSource):  # Tightly coupled to DataSource
    def query(...):
        scope = StaticScope.instance()  # Global state
        # ... implementation

# NEW (good)
class CCXTRepository(DataRepository):  # Depends on abstraction
    def fetch_ohlcv(...):
        # No global state, configurable via ExchangeConfig
        # ... implementation

Testing Strategy

Test-Driven Development (TDD)

  1. RED: Write failing tests
  2. GREEN: Implement minimal code to pass
  3. REFACTOR: Improve code quality

Test Coverage: 93%

Core Entities:      27 tests, 93% coverage
Core Repositories:  13 tests, 75% coverage (ABCs)
Core Services:      15 tests, 94% coverage
Infrastructure:     12 tests, 100% coverage
────────────────────────────────────────────
TOTAL:              67 tests, 93% coverage

Test Categories

Unit Tests

  • Mock all external dependencies
  • Test business logic in isolation
  • Fast execution (~6 seconds)

Integration Tests

  • Test with real CCXT (optional)
  • Marked with @pytest.mark.integration
  • Skipped by default
# Run unit tests
uv run pytest -m "not integration"

# Run integration tests
uv run pytest -m integration

# Run all tests
uv run pytest

Usage Examples

Simple Query

from tradai.common import ExchangeConfig, TradingMode
from tradai.data.core.services import DataQueryService
from tradai.data.infrastructure.repositories import CCXTRepository

config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
repository = CCXTRepository(config)
service = DataQueryService(repository=repository)

data = service.query(
    symbols="BTC/USDT:USDT",
    start_date="2024-01-01",
    end_date="2024-01-31",
    timeframe="1h"
)

print(f"Fetched {data.row_count} candles")

With Caching

from tradai.common import ExchangeConfig, TradingMode
from tradai.data.infrastructure.repositories import CCXTRepository

config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
service = DataQueryService(
    repository=CCXTRepository(config),
    enable_cache=True
)

# First call - fetches from exchange
data1 = service.query("BTC/USDT:USDT", "2024-01-01", "2024-01-31", "1h")

# Second call - uses cache
data2 = service.query("BTC/USDT:USDT", "2024-01-01", "2024-01-31", "1h")

With Storage Adapter

from tradai.common import ExchangeConfig, TradingMode
from tradai.data.infrastructure.adapters.arctic_adapter import ArcticAdapter
from tradai.data.infrastructure.repositories import CCXTRepository

config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
adapter = ArcticAdapter(bucket_name="my-bucket", library="futures")
service = DataQueryService(
    repository=CCXTRepository(config),
    adapter=adapter,
    enable_cache=True
)

# Query flow: Cache → Storage → Source → Store & Cache
data = service.query("BTC/USDT:USDT", "2024-01-01", "2024-01-31", "1h")

Batch Collection

from tradai.common import ExchangeConfig, TradingMode
from tradai.data.core.services import DataCollectionService
from tradai.data.infrastructure.repositories import CCXTRepository

config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
collector = DataCollectionService(
    repository=CCXTRepository(config),
    adapter=adapter
)

# Collect multiple symbols
collector.collect(
    symbols=["BTC/USDT:USDT", "ETH/USDT:USDT", "SOL/USDT:USDT"],
    start_date="2024-01-01",
    end_date="2024-01-31",
    timeframe="1h"
)

Incremental Updates

# Only fetches data since last stored date
collector.collect_incremental(
    symbols="BTC/USDT:USDT",
    start_date="2024-01-01",
    end_date="2024-01-31",
    timeframe="1h"
)

Migration from Old Codebase

Removed Files (485 lines)

  • source.py (330 lines) - Replaced by entities + repositories + CCXTRepository
  • adapter.py (155 lines) - Replaced by DataAdapter ABC + concrete implementations

Code Reduction

  • Old: 485 lines with global state, duplication, dead code
  • New: 257 lines with clean architecture, SOLID, DRY
  • Reduction: 47% less code, 100% more maintainable

Breaking Changes

  1. No StaticScope: Use dependency injection
  2. Different API: Services instead of direct class usage
  3. Value Objects: Use entities for type safety

Migration Guide

# OLD
from libs.data.src.source import Binance
binance = Binance()
df = binance.query(
    symbols="BTC/USDT:USDT",
    start_date="2024-01-01",
    end_date="2024-01-31",
    timeframe="1h"
)

# NEW
from tradai.common import ExchangeConfig, TradingMode
from tradai.data.core.services import DataQueryService
from tradai.data.infrastructure.repositories import CCXTRepository

config = ExchangeConfig(name="binance", trading_mode=TradingMode.FUTURES)
service = DataQueryService(repository=CCXTRepository(config))
data = service.query(
    symbols="BTC/USDT:USDT",
    start_date="2024-01-01",
    end_date="2024-01-31",
    timeframe="1h"
)
df = data.to_dataframe()

Performance

Optimizations

  • Caching: In-memory cache for repeated queries
  • Pagination: Automatic for large date ranges
  • Validation: Early validation with value objects
  • Immutability: Thread-safe by default

Benchmarks (TODO)

  • Query with cache hit: <1ms
  • Query with cache miss: ~500ms (network bound)
  • Value object creation: <0.1ms

Future Enhancements

Planned Features

  1. ArcticAdapter: For production S3 storage (Linux only)
  2. PostgresAdapter: SQL-based storage option
  3. Rate Limiting: Respect exchange API limits
  4. Retry Logic: Automatic retry with exponential backoff
  5. Multiple Exchanges: Support more than Binance
  6. WebSocket Support: Real-time data streaming

Extensibility

  • Add new repositories: Implement DataRepository
  • Add new adapters: Implement DataAdapter
  • Add new services: Use existing repositories/adapters
  • All via dependency injection, no core changes needed

Conclusion

The tradai-data library demonstrates: - ✅ Clean Architecture - ✅ SOLID Principles - ✅ DRY (Don't Repeat Yourself) - ✅ Test-Driven Development - ✅ 93% Test Coverage - ✅ Zero Dead Code - ✅ Type Safety (100% type hints) - ✅ Thread Safety (immutable entities)

Result: Production-ready, maintainable, extensible data layer for TradAI platform.