sqs-consumer¶
Consumes messages from SQS queue and launches ECS tasks for model retraining with batch processing and partial failure handling.
Overview¶
| Property | Value |
|---|---|
| Trigger | SQS Event Source |
| Runtime | Python 3.11 |
| Timeout | 30 seconds |
| Memory | 256 MB |
| Batch Size | 10 (configurable) |
| Settings class | SQSConsumerSettings |
Input Schema¶
SQS message body:
Output Schema¶
Empty batchItemFailures indicates all messages processed successfully.
Environment Variables¶
| Variable | Required | Default | Description |
|---|---|---|---|
ECS_CLUSTER | Yes | - | ECS cluster name/ARN |
ECS_TASK_DEFINITION_ARN | No | - | Full task definition ARN (preferred) |
ECS_TASK_DEFINITION_FAMILY | No | tradai-strategy-generic | Fallback family prefix if ARN not set |
ECS_SUBNETS | Yes | - | Comma-separated subnet IDs |
ECS_SECURITY_GROUPS | Yes | - | Comma-separated SG IDs |
ECS_CONTAINER_NAME | No | strategy | Container name for overrides |
USE_SPOT | No | false | Use Fargate Spot |
RETRAINING_STATE_TABLE | No | - | DynamoDB state table for recording launches |
MAX_CONCURRENT_TASKS | No | 5 | Max concurrent ECS tasks |
Task Definition Resolution¶
The task definition is resolved via settings.get_task_definition():
- Preferred: Use
ECS_TASK_DEFINITION_ARNdirectly if set (full ARN or family:revision) - Fallback: Construct
{ECS_TASK_DEFINITION_FAMILY}-{environment}(e.g.,tradai-strategy-generic-dev)
The default family is tradai-strategy-generic, providing a generic task definition usable across strategies.
Container Environment Overrides¶
Each ECS task is launched with these container environment overrides (matching EntrypointSettings):
| Env Var | Value | Description |
|---|---|---|
JOB_ID | UUID | Unique job identifier for tracking |
TRADING_MODE | train | Retraining mode |
STRATEGY | Extracted from model name | Strategy name (first segment before -) |
MODEL_NAME | From SQS message | Full model identifier |
RETRAINING_TRIGGER | From SQS message | Trigger type |
ENVIRONMENT | From settings | Deployment environment |
Capacity Provider¶
The ECS task launch uses capacityProviderStrategy (not launchType) to support Fargate Spot:
USE_SPOT=true: UsesFARGATE_SPOTcapacity provider (weight=1)USE_SPOT=false: UsesFARGATEcapacity provider (weight=1)
Processing Flow¶
flowchart TD
A[SQS Messages] --> B[Count Running Tasks via ECS]
B --> C{At Capacity?}
C -->|Yes| D[Defer to batchItemFailures]
C -->|No| E[Parse Message JSON]
E --> F{Valid JSON?}
F -->|No| G[Skip invalid message - no retry]
F -->|Yes| H{model_name present?}
H -->|No| I[Skip incomplete message - no retry]
H -->|Yes| J[Launch ECS Task]
J --> K{Task launched?}
K -->|Yes| L[Record in DynamoDB]
K -->|No| M[Add to batchItemFailures]
L --> N[Continue to next]
M --> N
G --> N
I --> N
N --> O[Publish Metrics]
O --> P[Return batchItemFailures] Key Features¶
Partial Failure Handling¶
Returns batchItemFailures for SQS to requeue failed messages:
Capacity Management¶
- Counts currently running ECS tasks using
startedBy="retraining-scheduler"filter - Defers messages when at
MAX_CONCURRENT_TASKSlimit (adds tobatchItemFailuresfor retry)
Task Tagging¶
ECS tasks are tagged for traceability:
tags=[
{"key": "job_id", "value": "uuid"},
{"key": "model_name", "value": "PascalStrategy"},
{"key": "strategy", "value": "pascal"},
{"key": "trigger", "value": "scheduled"},
{"key": "environment", "value": "prod"},
]
CloudWatch Metrics¶
Namespace suffix: RetrainingPipeline
| Metric | Dimensions | Description |
|---|---|---|
SQSMessagesReceived | Environment | Total messages in batch |
SQSMessagesProcessed | Environment | Successfully processed |
SQSMessagesFailed | Environment | Failed messages (retried) |
ECSTasksLaunched | Environment | Tasks actually launched |
DynamoDB State Record¶
Written to RETRAINING_STATE_TABLE via update_item:
{
"model_name": "PascalStrategy", # Partition key
"task_arn": "arn:aws:ecs:...",
"job_id": "uuid-123",
"trigger_type": "scheduled",
"started_at": "2024-01-01T12:00:00+00:00",
"status": "running",
"updated_at": "2024-01-01T12:00:00+00:00"
}
SQS Configuration¶
{
"QueueName": "tradai-retraining-queue",
"VisibilityTimeout": 600,
"MessageRetentionPeriod": 86400,
"RedrivePolicy": {
"maxReceiveCount": 3,
"deadLetterTargetArn": "arn:aws:sqs:...:retraining-dlq"
}
}
Lambda Event Source Mapping¶
{
"EventSourceArn": "arn:aws:sqs:...:retraining-queue",
"FunctionName": "sqs-consumer",
"BatchSize": 10,
"MaximumBatchingWindowInSeconds": 5,
"FunctionResponseTypes": ["ReportBatchItemFailures"]
}
Error Handling¶
| Error Type | Behavior |
|---|---|
| Invalid JSON | Skip message (no retry) |
| Missing model_name | Skip message (no retry) |
| ECS launch failure | Add to batchItemFailures (retry) |
| At capacity | Add to batchItemFailures (retry) |
| ClientError | Add to batchItemFailures (retry) |
Related¶
- retraining-scheduler - EventBridge-based scheduling
- backtest-consumer - Similar SQS consumer for backtests