Skip to content

sqs-consumer

Consumes messages from SQS queue and launches ECS tasks for model retraining with batch processing and partial failure handling.

Overview

Property Value
Trigger SQS Event Source
Runtime Python 3.11
Timeout 300 seconds
Memory 256 MB
Batch Size 10 (configurable)

Input Schema

SQS message body:

{
    "model_name": "PascalStrategy",
    "trigger": "scheduled"              # scheduled | drift_detected | manual
}

Output Schema

{
    "batchItemFailures": [
        {"itemIdentifier": "message-id-1"},
        {"itemIdentifier": "message-id-2"}
    ]
}

Empty batchItemFailures indicates all messages processed successfully.

Environment Variables

Variable Required Default Description
ECS_CLUSTER Yes - ECS cluster name/ARN
ECS_TASK_DEFINITION_ARN Yes - Full task definition ARN
ECS_TASK_DEFINITION_FAMILY No "tradai-strategy-generic" Fallback family name
ECS_SUBNETS Yes - Comma-separated subnet IDs
ECS_SECURITY_GROUPS Yes - Comma-separated SG IDs
ECS_CONTAINER_NAME No "strategy" Container name
USE_SPOT No false Use Fargate Spot
RETRAINING_STATE_TABLE Yes - DynamoDB state table
MAX_CONCURRENT_TASKS No 5 Max concurrent ECS tasks

Processing Flow

flowchart TD
    A[SQS Messages] --> B[Count Running Tasks]
    B --> C{At Capacity?}
    C -->|Yes| D[Defer to batchItemFailures]
    C -->|No| E[Parse Message]
    E --> F{Valid JSON?}
    F -->|No| G[Skip invalid message]
    F -->|Yes| H{model_name present?}
    H -->|No| I[Skip incomplete message]
    H -->|Yes| J[Launch ECS Task]
    J --> K{Task launched?}
    K -->|Yes| L[Record in DynamoDB]
    K -->|No| M[Add to failures]
    L --> N[Continue to next]
    M --> N
    G --> N
    I --> N
    N --> O[Publish Metrics]
    O --> P[Return batchItemFailures]

Key Features

Partial Failure Handling

Returns batchItemFailures for SQS to requeue failed messages:

{
    "batchItemFailures": [
        {"itemIdentifier": "failed-message-id"}
    ]
}

Capacity Management

  • Counts currently running ECS tasks before processing
  • Defers messages when at MAX_CONCURRENT_TASKS limit
  • Uses startedBy filter to track only sqs-consumer tasks

Task Tagging

ECS tasks are tagged for traceability:

tags=[
    {"key": "job_id", "value": "uuid"},
    {"key": "model_name", "value": "PascalStrategy"},
    {"key": "strategy", "value": "pascal"},
    {"key": "trigger", "value": "scheduled"},
    {"key": "environment", "value": "prod"},
]

CloudWatch Metrics

Metric Description
SQSMessagesReceived Total messages in batch
SQSMessagesProcessed Successfully processed
SQSMessagesFailed Failed messages (retried)
ECSTasksLaunched Tasks actually launched

DynamoDB State Record

{
    "model_name": "PascalStrategy",     # Partition key
    "task_arn": "arn:aws:ecs:...",
    "job_id": "uuid-123",
    "trigger_type": "scheduled",
    "started_at": "2024-01-01T12:00:00Z",
    "status": "running",
    "updated_at": "2024-01-01T12:00:00Z"
}

SQS Configuration

{
  "QueueName": "tradai-retraining-queue",
  "VisibilityTimeout": 600,
  "MessageRetentionPeriod": 86400,
  "RedrivePolicy": {
    "maxReceiveCount": 3,
    "deadLetterTargetArn": "arn:aws:sqs:...:retraining-dlq"
  }
}

Lambda Event Source Mapping

{
  "EventSourceArn": "arn:aws:sqs:...:retraining-queue",
  "FunctionName": "sqs-consumer",
  "BatchSize": 10,
  "MaximumBatchingWindowInSeconds": 5,
  "FunctionResponseTypes": ["ReportBatchItemFailures"]
}

Error Handling

Error Type Behavior
Invalid JSON Skip message (no retry)
Missing model_name Skip message (no retry)
ECS launch failure Add to batchItemFailures (retry)
At capacity Add to batchItemFailures (retry)
ClientError Add to batchItemFailures (retry)