sqs-consumer¶
Consumes messages from SQS queue and launches ECS tasks for model retraining with batch processing and partial failure handling.
Overview¶
| Property | Value |
|---|---|
| Trigger | SQS Event Source |
| Runtime | Python 3.11 |
| Timeout | 300 seconds |
| Memory | 256 MB |
| Batch Size | 10 (configurable) |
Input Schema¶
SQS message body:
Output Schema¶
Empty batchItemFailures indicates all messages processed successfully.
Environment Variables¶
| Variable | Required | Default | Description |
|---|---|---|---|
ECS_CLUSTER | Yes | - | ECS cluster name/ARN |
ECS_TASK_DEFINITION_ARN | Yes | - | Full task definition ARN |
ECS_TASK_DEFINITION_FAMILY | No | "tradai-strategy-generic" | Fallback family name |
ECS_SUBNETS | Yes | - | Comma-separated subnet IDs |
ECS_SECURITY_GROUPS | Yes | - | Comma-separated SG IDs |
ECS_CONTAINER_NAME | No | "strategy" | Container name |
USE_SPOT | No | false | Use Fargate Spot |
RETRAINING_STATE_TABLE | Yes | - | DynamoDB state table |
MAX_CONCURRENT_TASKS | No | 5 | Max concurrent ECS tasks |
Processing Flow¶
flowchart TD
A[SQS Messages] --> B[Count Running Tasks]
B --> C{At Capacity?}
C -->|Yes| D[Defer to batchItemFailures]
C -->|No| E[Parse Message]
E --> F{Valid JSON?}
F -->|No| G[Skip invalid message]
F -->|Yes| H{model_name present?}
H -->|No| I[Skip incomplete message]
H -->|Yes| J[Launch ECS Task]
J --> K{Task launched?}
K -->|Yes| L[Record in DynamoDB]
K -->|No| M[Add to failures]
L --> N[Continue to next]
M --> N
G --> N
I --> N
N --> O[Publish Metrics]
O --> P[Return batchItemFailures] Key Features¶
Partial Failure Handling¶
Returns batchItemFailures for SQS to requeue failed messages:
Capacity Management¶
- Counts currently running ECS tasks before processing
- Defers messages when at
MAX_CONCURRENT_TASKSlimit - Uses
startedByfilter to track only sqs-consumer tasks
Task Tagging¶
ECS tasks are tagged for traceability:
tags=[
{"key": "job_id", "value": "uuid"},
{"key": "model_name", "value": "PascalStrategy"},
{"key": "strategy", "value": "pascal"},
{"key": "trigger", "value": "scheduled"},
{"key": "environment", "value": "prod"},
]
CloudWatch Metrics¶
| Metric | Description |
|---|---|
SQSMessagesReceived | Total messages in batch |
SQSMessagesProcessed | Successfully processed |
SQSMessagesFailed | Failed messages (retried) |
ECSTasksLaunched | Tasks actually launched |
DynamoDB State Record¶
{
"model_name": "PascalStrategy", # Partition key
"task_arn": "arn:aws:ecs:...",
"job_id": "uuid-123",
"trigger_type": "scheduled",
"started_at": "2024-01-01T12:00:00Z",
"status": "running",
"updated_at": "2024-01-01T12:00:00Z"
}
SQS Configuration¶
{
"QueueName": "tradai-retraining-queue",
"VisibilityTimeout": 600,
"MessageRetentionPeriod": 86400,
"RedrivePolicy": {
"maxReceiveCount": 3,
"deadLetterTargetArn": "arn:aws:sqs:...:retraining-dlq"
}
}
Lambda Event Source Mapping¶
{
"EventSourceArn": "arn:aws:sqs:...:retraining-queue",
"FunctionName": "sqs-consumer",
"BatchSize": 10,
"MaximumBatchingWindowInSeconds": 5,
"FunctionResponseTypes": ["ReportBatchItemFailures"]
}
Error Handling¶
| Error Type | Behavior |
|---|---|
| Invalid JSON | Skip message (no retry) |
| Missing model_name | Skip message (no retry) |
| ECS launch failure | Add to batchItemFailures (retry) |
| At capacity | Add to batchItemFailures (retry) |
| ClientError | Add to batchItemFailures (retry) |
Related¶
- retraining-scheduler - EventBridge-based scheduling
- backtest-consumer - Similar SQS consumer for backtests