Skip to content

backtest-consumer

Consumes SQS messages containing backtest requests and launches execution via ECS Fargate tasks or Step Functions, based on the configured launch mode.

Overview

Property Value
Trigger SQS Event Source Mapping
Runtime Python 3.11
Timeout 30 seconds
Memory 256 MB
Settings class SQSTriggerSettings

Implementation

The handler does not use the @lambda_handler decorator. Instead, it directly instantiates SQSTriggerSettings and delegates batch processing to SQSJobProcessor from tradai.common.lambda_.backtest_processor.

from tradai.common.aws.dynamodb import DynamoDBAdapter
from tradai.common.lambda_.backtest_processor import SQSJobProcessor
from tradai.common.lambda_.settings import SQSTriggerSettings

def handler(event: dict, context) -> dict:
    settings = SQSTriggerSettings()
    adapter = DynamoDBAdapter(table_name=settings.dynamodb_table_name)
    processor = SQSJobProcessor(settings=settings, dynamodb_adapter=adapter)
    result = processor.process_batch(event["Records"])
    return {"batchItemFailures": [{"itemIdentifier": mid} for mid in result.failed_message_ids]}

Launch Modes

The SQSTriggerSettings.launch_mode field (LaunchMode enum) determines how jobs are executed:

Mode Value Description
ECS ecs Launches ECS Fargate tasks directly
Step Functions stepfunctions Starts Step Functions state machine execution

Input Schema

# SQS Message Body (JSON)
{
    "job_id": "uuid-string",           # Optional, auto-generated if not provided
    "config": {                        # Nested format (preferred)
        "strategy": "MyStrategy",      # Required
        "timeframe": "1h",             # Default: "1h"
        "start_date": "2024-01-01",    # YYYY-MM-DD format
        "end_date": "2024-03-01",      # YYYY-MM-DD format
        "symbols": ["BTC/USDT:USDT"],  # Default: ["BTC/USDT:USDT"]
        "stoploss": -0.05,             # Optional
        "stake_amount": 1000.0         # Default: 1000.0
    },
    # OR legacy flat format:
    "strategy_name": "MyStrategy",
    "experiment_name": "backtest-exp", # Optional, for MLflow tracking
    "priority": 0,                     # Default: 0
    "submitted_at": "2024-01-01T00:00:00Z"
}

Output

# SQS Batch Response
{
    "batchItemFailures": [
        {"itemIdentifier": "message-id"}  # For partial failure handling
    ]
}

Environment Variables

Variable Required Default Description
LAUNCH_MODE Yes - Launch mode: ecs or stepfunctions
DYNAMODB_TABLE_NAME Yes - DynamoDB table for workflow state (also accepts WORKFLOW_STATE_TABLE)
ENVIRONMENT No dev Environment name
ECS_CLUSTER ECS mode - ECS cluster name or ARN
ECS_TASK_DEFINITION_PREFIX No strategy- Task definition prefix
ECS_SUBNETS ECS mode - Comma-separated subnet IDs
ECS_SECURITY_GROUPS ECS mode - Comma-separated security group IDs
ECS_CONTAINER_NAME No strategy Container name in task definition
ECS_ASSIGN_PUBLIC_IP No false Whether to assign public IP
STEP_FUNCTIONS_ARN SF mode - State machine ARN (required for stepfunctions mode)
MLFLOW_TRACKING_URI No - MLflow server URL
S3_RESULTS_BUCKET No - S3 bucket for results

AWS Services Used

  • SQS - Event source for backtest requests
  • ECS - Launches Fargate tasks for backtesting (ECS mode)
  • Step Functions - Starts state machine executions (Step Functions mode)
  • DynamoDB - Idempotency checking and job state tracking

Key Features

  • Supports both ecs and stepfunctions launch modes via LaunchMode enum
  • Idempotent processing: checks DynamoDB for existing jobs before launching
  • Supports partial batch failure handling for SQS
  • Supports both nested and legacy flat message formats
  • On unhandled exception, returns all records as failed for SQS retry

Example Usage

Submit Backtest via SQS

import boto3
import json

sqs = boto3.client('sqs')

message = {
    "config": {
        "strategy": "TrendFollowingStrategy",
        "timeframe": "1h",
        "start_date": "2024-01-01",
        "end_date": "2024-03-01",
        "symbols": ["BTC/USDT:USDT", "ETH/USDT:USDT"]
    }
}

sqs.send_message(
    QueueUrl="https://sqs.eu-central-1.amazonaws.com/123456789/backtest-queue",
    MessageBody=json.dumps(message)
)

Error Handling

Error Cause Resolution
LAUNCH_MODE is required Missing LAUNCH_MODE env var Set LAUNCH_MODE to ecs or stepfunctions
ECS_CLUSTER is required Missing ECS config in ECS mode Set required ECS env vars
STEP_FUNCTIONS_ARN is required Missing ARN in SF mode Set STEP_FUNCTIONS_ARN env var
Unhandled exception Lambda-level failure All records returned as batch failures for retry

See Also

Related Lambdas:

Architecture:

Services:

CLI: