backtest-consumer¶
Consumes SQS messages containing backtest requests and launches execution via ECS Fargate tasks or Step Functions, based on the configured launch mode.
Overview¶
| Property | Value |
|---|---|
| Trigger | SQS Event Source Mapping |
| Runtime | Python 3.11 |
| Timeout | 30 seconds |
| Memory | 256 MB |
| Settings class | SQSTriggerSettings |
Implementation¶
The handler does not use the @lambda_handler decorator. Instead, it directly instantiates SQSTriggerSettings and delegates batch processing to SQSJobProcessor from tradai.common.lambda_.backtest_processor.
from tradai.common.aws.dynamodb import DynamoDBAdapter
from tradai.common.lambda_.backtest_processor import SQSJobProcessor
from tradai.common.lambda_.settings import SQSTriggerSettings
def handler(event: dict, context) -> dict:
settings = SQSTriggerSettings()
adapter = DynamoDBAdapter(table_name=settings.dynamodb_table_name)
processor = SQSJobProcessor(settings=settings, dynamodb_adapter=adapter)
result = processor.process_batch(event["Records"])
return {"batchItemFailures": [{"itemIdentifier": mid} for mid in result.failed_message_ids]}
Launch Modes¶
The SQSTriggerSettings.launch_mode field (LaunchMode enum) determines how jobs are executed:
| Mode | Value | Description |
|---|---|---|
| ECS | ecs | Launches ECS Fargate tasks directly |
| Step Functions | stepfunctions | Starts Step Functions state machine execution |
Input Schema¶
# SQS Message Body (JSON)
{
"job_id": "uuid-string", # Optional, auto-generated if not provided
"config": { # Nested format (preferred)
"strategy": "MyStrategy", # Required
"timeframe": "1h", # Default: "1h"
"start_date": "2024-01-01", # YYYY-MM-DD format
"end_date": "2024-03-01", # YYYY-MM-DD format
"symbols": ["BTC/USDT:USDT"], # Default: ["BTC/USDT:USDT"]
"stoploss": -0.05, # Optional
"stake_amount": 1000.0 # Default: 1000.0
},
# OR legacy flat format:
"strategy_name": "MyStrategy",
"experiment_name": "backtest-exp", # Optional, for MLflow tracking
"priority": 0, # Default: 0
"submitted_at": "2024-01-01T00:00:00Z"
}
Output¶
# SQS Batch Response
{
"batchItemFailures": [
{"itemIdentifier": "message-id"} # For partial failure handling
]
}
Environment Variables¶
| Variable | Required | Default | Description |
|---|---|---|---|
LAUNCH_MODE | Yes | - | Launch mode: ecs or stepfunctions |
DYNAMODB_TABLE_NAME | Yes | - | DynamoDB table for workflow state (also accepts WORKFLOW_STATE_TABLE) |
ENVIRONMENT | No | dev | Environment name |
ECS_CLUSTER | ECS mode | - | ECS cluster name or ARN |
ECS_TASK_DEFINITION_PREFIX | No | strategy- | Task definition prefix |
ECS_SUBNETS | ECS mode | - | Comma-separated subnet IDs |
ECS_SECURITY_GROUPS | ECS mode | - | Comma-separated security group IDs |
ECS_CONTAINER_NAME | No | strategy | Container name in task definition |
ECS_ASSIGN_PUBLIC_IP | No | false | Whether to assign public IP |
STEP_FUNCTIONS_ARN | SF mode | - | State machine ARN (required for stepfunctions mode) |
MLFLOW_TRACKING_URI | No | - | MLflow server URL |
S3_RESULTS_BUCKET | No | - | S3 bucket for results |
AWS Services Used¶
- SQS - Event source for backtest requests
- ECS - Launches Fargate tasks for backtesting (ECS mode)
- Step Functions - Starts state machine executions (Step Functions mode)
- DynamoDB - Idempotency checking and job state tracking
Key Features¶
- Supports both
ecsandstepfunctionslaunch modes viaLaunchModeenum - Idempotent processing: checks DynamoDB for existing jobs before launching
- Supports partial batch failure handling for SQS
- Supports both nested and legacy flat message formats
- On unhandled exception, returns all records as failed for SQS retry
Example Usage¶
Submit Backtest via SQS¶
import boto3
import json
sqs = boto3.client('sqs')
message = {
"config": {
"strategy": "TrendFollowingStrategy",
"timeframe": "1h",
"start_date": "2024-01-01",
"end_date": "2024-03-01",
"symbols": ["BTC/USDT:USDT", "ETH/USDT:USDT"]
}
}
sqs.send_message(
QueueUrl="https://sqs.eu-central-1.amazonaws.com/123456789/backtest-queue",
MessageBody=json.dumps(message)
)
Error Handling¶
| Error | Cause | Resolution |
|---|---|---|
LAUNCH_MODE is required | Missing LAUNCH_MODE env var | Set LAUNCH_MODE to ecs or stepfunctions |
ECS_CLUSTER is required | Missing ECS config in ECS mode | Set required ECS env vars |
STEP_FUNCTIONS_ARN is required | Missing ARN in SF mode | Set STEP_FUNCTIONS_ARN env var |
| Unhandled exception | Lambda-level failure | All records returned as batch failures for retry |
See Also¶
Related Lambdas:
- sqs-consumer - Similar pattern for retraining tasks
- cleanup-resources - Handles failed task cleanup
- notify-completion - Sends completion notifications
Architecture:
- Step Functions Workflows - Backtest workflow orchestration
- Step Functions Workflows - Workflow diagram
Services:
- Backend Service - API for submitting backtests
- Strategy Service - Strategy configuration
CLI:
- CLI Reference -
tradai backtestcommands