Performance Tuning¶
Guide for optimizing TradAI performance.
Backtest Optimization¶
Data Loading¶
Problem: Slow data loading for large backtests.
Solutions:
-
Pre-fetch data to ArcticDB:
-
Use appropriate timeframes:
- Development: Use 1h or 4h for faster iteration
-
Production: Use 5m or 15m for final validation
-
Limit symbol count for development:
Memory Management¶
Problem: Out of memory during backtest.
Solutions:
-
Reduce data window:
-
Use chunked processing:
-
Enable swap space:
Parallel Backtests¶
Run multiple backtests concurrently:
# Using GNU parallel
parallel -j 4 tradai backtest --strategy {} ::: Strategy1 Strategy2 Strategy3 Strategy4
Data Sync Optimization¶
Batch Size¶
Adjust batch size based on exchange rate limits:
# data-collection settings
DATA_COLLECTION_STREAMING_BUFFER_SIZE=100 # Candles per batch
DATA_COLLECTION_STREAMING_FLUSH_INTERVAL=60 # Seconds between flushes
Concurrent Fetching¶
Use AsyncCCXTRepository for concurrent symbol fetching:
from tradai.data.infrastructure.repositories import AsyncCCXTRepository
async def fetch_all_symbols():
repo = AsyncCCXTRepository(config=exchange_config)
# Fetches symbols concurrently
tasks = [
repo.fetch_ohlcv(SymbolList.from_input([symbol]), date_range, timeframe)
for symbol in symbols
]
results = await asyncio.gather(*tasks)
ArcticDB Optimization¶
-
Use versioning wisely:
-
Configure deduplication:
API Performance¶
Caching¶
Enable response caching for read-heavy endpoints:
from fastapi_cache.decorator import cache
@router.get("/strategies")
@cache(expire=60) # Cache for 60 seconds
async def list_strategies():
...
Connection Pooling¶
Configure database connection pooling:
Async Operations¶
Use async for I/O-bound operations:
# Good: Async database query
async def get_strategy(strategy_id: str):
async with session.begin():
return await session.get(Strategy, strategy_id)
# Bad: Sync in async context
def get_strategy_sync(strategy_id: str):
return session.query(Strategy).get(strategy_id)
ML Training Optimization¶
FreqAI Settings¶
# Reduce model complexity for faster training
freqai_config = {
"feature_parameters": {
"include_timeframes": ["1h"], # Fewer timeframes
"indicator_periods_candles": [10, 20], # Fewer periods
},
"data_split_parameters": {
"test_size": 0.2, # Smaller test set for training
},
"model_training_parameters": {
"n_estimators": 100, # Reduce from 1000
"max_depth": 5, # Limit tree depth
},
}
GPU Acceleration¶
For supported models:
# Enable CUDA
export CUDA_VISIBLE_DEVICES=0
# Install GPU dependencies
pip install tensorflow-gpu torch --index-url https://download.pytorch.org/whl/cu118
Model Caching¶
Cache trained models to avoid retraining:
# MLflow model caching
from mlflow.tracking import MlflowClient
client = MlflowClient()
model = mlflow.pyfunc.load_model(f"models:/{model_name}/Production")
Infrastructure Optimization¶
ECS Task Sizing¶
| Task Type | CPU | Memory | Use Case |
|---|---|---|---|
| Small | 256 | 512 | API services |
| Medium | 512 | 1024 | Data sync |
| Large | 1024 | 2048 | Backtests |
| XLarge | 2048 | 4096 | ML training |
# Pulumi task definition
task_def = aws.ecs.TaskDefinition(
f"strategy-task",
cpu="1024",
memory="2048",
...
)
Auto-Scaling¶
Configure ECS auto-scaling:
# Scale on CPU utilization
scaling_target = aws.appautoscaling.Target(
max_capacity=10,
min_capacity=1,
scalable_dimension="ecs:service:DesiredCount",
)
scaling_policy = aws.appautoscaling.Policy(
policy_type="TargetTrackingScaling",
target_tracking_scaling_policy_configuration={
"target_value": 70.0, # Target 70% CPU
"predefined_metric_specification": {
"predefined_metric_type": "ECSServiceAverageCPUUtilization",
},
},
)
Fargate Spot¶
Use Fargate Spot for cost savings on non-critical workloads:
# Use Spot for backtests
capacity_provider_strategy=[
{
"capacity_provider": "FARGATE_SPOT",
"weight": 1,
"base": 0,
},
]
Profiling Tools¶
Python Profiling¶
import cProfile
import pstats
# Profile a function
profiler = cProfile.Profile()
profiler.enable()
result = slow_function()
profiler.disable()
# Print stats
stats = pstats.Stats(profiler)
stats.sort_stats("cumulative")
stats.print_stats(20)
Memory Profiling¶
# Install memory profiler
pip install memory-profiler
# Run with profiling
python -m memory_profiler script.py
Line Profiling¶
# Install line profiler
pip install line_profiler
# Profile specific function
kernprof -l -v script.py
Monitoring Performance¶
CloudWatch Metrics¶
Key metrics to monitor:
| Metric | Alert Threshold | Description |
|---|---|---|
| CPU Utilization | > 80% | Task overloaded |
| Memory Utilization | > 80% | Memory pressure |
| Request Latency p99 | > 1s | Slow responses |
| Error Rate | > 1% | Too many failures |
Custom Metrics¶
import boto3
cloudwatch = boto3.client("cloudwatch")
def record_metric(name: str, value: float, unit: str = "Count"):
cloudwatch.put_metric_data(
Namespace="TradAI",
MetricData=[{
"MetricName": name,
"Value": value,
"Unit": unit,
}],
)
# Record backtest duration
record_metric("BacktestDuration", duration_seconds, "Seconds")
Benchmarks¶
Expected Performance¶
| Operation | Time | Notes |
|---|---|---|
| Backtest (30 days, 1h) | < 30s | Single symbol |
| Backtest (365 days, 1h) | < 5min | Single symbol |
| Data sync (1 symbol, 1h) | < 10s | Per 1000 candles |
| API response (cached) | < 50ms | With caching |
| API response (uncached) | < 500ms | Without caching |
Benchmark Script¶
# Run performance benchmark
tradai benchmark --strategy MyStrategy --days 30
# Output:
# Data loading: 2.3s
# Feature calculation: 5.1s
# Model inference: 1.2s
# Total backtest time: 8.6s