Skip to content

sqs-consumer

Consumes messages from SQS queue and launches ECS tasks for model retraining with batch processing and partial failure handling.

Overview

Property Value
Trigger SQS Event Source
Runtime Python 3.11
Timeout 30 seconds
Memory 256 MB
Batch Size 10 (configurable)
Settings class SQSConsumerSettings

Input Schema

SQS message body:

{
    "model_name": "PascalStrategy",
    "trigger": "scheduled"              # scheduled | drift_detected | manual
}

Output Schema

{
    "batchItemFailures": [
        {"itemIdentifier": "message-id-1"},
        {"itemIdentifier": "message-id-2"}
    ]
}

Empty batchItemFailures indicates all messages processed successfully.

Environment Variables

Variable Required Default Description
ECS_CLUSTER Yes - ECS cluster name/ARN
ECS_TASK_DEFINITION_ARN No - Full task definition ARN (preferred)
ECS_TASK_DEFINITION_FAMILY No tradai-strategy-generic Fallback family prefix if ARN not set
ECS_SUBNETS Yes - Comma-separated subnet IDs
ECS_SECURITY_GROUPS Yes - Comma-separated SG IDs
ECS_CONTAINER_NAME No strategy Container name for overrides
USE_SPOT No false Use Fargate Spot
RETRAINING_STATE_TABLE No - DynamoDB state table for recording launches
MAX_CONCURRENT_TASKS No 5 Max concurrent ECS tasks

Task Definition Resolution

The task definition is resolved via settings.get_task_definition():

  1. Preferred: Use ECS_TASK_DEFINITION_ARN directly if set (full ARN or family:revision)
  2. Fallback: Construct {ECS_TASK_DEFINITION_FAMILY}-{environment} (e.g., tradai-strategy-generic-dev)

The default family is tradai-strategy-generic, providing a generic task definition usable across strategies.

Container Environment Overrides

Each ECS task is launched with these container environment overrides (matching EntrypointSettings):

Env Var Value Description
JOB_ID UUID Unique job identifier for tracking
TRADING_MODE train Retraining mode
STRATEGY Extracted from model name Strategy name (first segment before -)
MODEL_NAME From SQS message Full model identifier
RETRAINING_TRIGGER From SQS message Trigger type
ENVIRONMENT From settings Deployment environment

Capacity Provider

The ECS task launch uses capacityProviderStrategy (not launchType) to support Fargate Spot:

  • USE_SPOT=true: Uses FARGATE_SPOT capacity provider (weight=1)
  • USE_SPOT=false: Uses FARGATE capacity provider (weight=1)

Processing Flow

flowchart TD
    A[SQS Messages] --> B[Count Running Tasks via ECS]
    B --> C{At Capacity?}
    C -->|Yes| D[Defer to batchItemFailures]
    C -->|No| E[Parse Message JSON]
    E --> F{Valid JSON?}
    F -->|No| G[Skip invalid message - no retry]
    F -->|Yes| H{model_name present?}
    H -->|No| I[Skip incomplete message - no retry]
    H -->|Yes| J[Launch ECS Task]
    J --> K{Task launched?}
    K -->|Yes| L[Record in DynamoDB]
    K -->|No| M[Add to batchItemFailures]
    L --> N[Continue to next]
    M --> N
    G --> N
    I --> N
    N --> O[Publish Metrics]
    O --> P[Return batchItemFailures]

Key Features

Partial Failure Handling

Returns batchItemFailures for SQS to requeue failed messages:

{
    "batchItemFailures": [
        {"itemIdentifier": "failed-message-id"}
    ]
}

Capacity Management

  • Counts currently running ECS tasks using startedBy="retraining-scheduler" filter
  • Defers messages when at MAX_CONCURRENT_TASKS limit (adds to batchItemFailures for retry)

Task Tagging

ECS tasks are tagged for traceability:

tags=[
    {"key": "job_id", "value": "uuid"},
    {"key": "model_name", "value": "PascalStrategy"},
    {"key": "strategy", "value": "pascal"},
    {"key": "trigger", "value": "scheduled"},
    {"key": "environment", "value": "prod"},
]

CloudWatch Metrics

Namespace suffix: RetrainingPipeline

Metric Dimensions Description
SQSMessagesReceived Environment Total messages in batch
SQSMessagesProcessed Environment Successfully processed
SQSMessagesFailed Environment Failed messages (retried)
ECSTasksLaunched Environment Tasks actually launched

DynamoDB State Record

Written to RETRAINING_STATE_TABLE via update_item:

{
    "model_name": "PascalStrategy",     # Partition key
    "task_arn": "arn:aws:ecs:...",
    "job_id": "uuid-123",
    "trigger_type": "scheduled",
    "started_at": "2024-01-01T12:00:00+00:00",
    "status": "running",
    "updated_at": "2024-01-01T12:00:00+00:00"
}

SQS Configuration

{
  "QueueName": "tradai-retraining-queue",
  "VisibilityTimeout": 600,
  "MessageRetentionPeriod": 86400,
  "RedrivePolicy": {
    "maxReceiveCount": 3,
    "deadLetterTargetArn": "arn:aws:sqs:...:retraining-dlq"
  }
}

Lambda Event Source Mapping

{
  "EventSourceArn": "arn:aws:sqs:...:retraining-queue",
  "FunctionName": "sqs-consumer",
  "BatchSize": 10,
  "MaximumBatchingWindowInSeconds": 5,
  "FunctionResponseTypes": ["ReportBatchItemFailures"]
}

Error Handling

Error Type Behavior
Invalid JSON Skip message (no retry)
Missing model_name Skip message (no retry)
ECS launch failure Add to batchItemFailures (retry)
At capacity Add to batchItemFailures (retry)
ClientError Add to batchItemFailures (retry)