Skip to content

TradAI Final Architecture - Pulumi Infrastructure Code

Version: 8.0 | Date: 2025-11-27


Overview

This document contains production-ready Pulumi Python code for deploying the TradAI infrastructure on AWS.


Project Structure

infra/
├── __main__.py              # Main entry point
├── Pulumi.yaml              # Project configuration
├── Pulumi.dev.yaml          # Dev stack config
├── Pulumi.prod.yaml         # Production stack config
├── config.py                # Shared configuration
├── vpc/
│   ├── __init__.py
│   ├── network.py           # VPC, subnets, route tables
│   └── security.py          # Security groups, NACLs
├── compute/
│   ├── __init__.py
│   ├── ecs.py               # ECS cluster and services
│   ├── lambda_funcs.py      # Lambda functions
│   └── nat.py               # NAT instance
├── storage/
│   ├── __init__.py
│   ├── rds.py               # PostgreSQL database
│   ├── s3.py                # S3 buckets
│   └── dynamodb.py          # DynamoDB tables
├── orchestration/
│   ├── __init__.py
│   ├── step_functions.py    # State machines
│   └── sqs.py               # SQS queues
├── security/
│   ├── __init__.py
│   ├── iam.py               # IAM roles and policies
│   ├── cognito.py           # User authentication
│   └── secrets.py           # Secrets Manager
└── networking/
    ├── __init__.py
    ├── alb.py               # Application Load Balancer
    └── api_gateway.py       # API Gateway

1. Configuration

Pulumi.yaml

name: tradai-infrastructure
runtime:
  name: python
  options:
    virtualenv: venv
description: TradAI AWS Infrastructure

config.py

"""Shared configuration for TradAI infrastructure.

CANONICAL SOURCE: See 10-CANONICAL-CONFIG.md for authoritative values.
This file implements those values in Pulumi-compatible format.
"""

import pulumi
from typing import Optional

config = pulumi.Config()
aws_config = pulumi.Config("aws")

# Environment
ENVIRONMENT = pulumi.get_stack()
PROJECT_NAME = "tradai"
AWS_REGION = aws_config.get("region") or "us-east-1"

# Network Configuration (CANONICAL: Section 1)
VPC_CIDR = "10.0.0.0/16"
AVAILABILITY_ZONES = ["us-east-1a", "us-east-1b"]

# Subnet CIDRs - CANONICAL VALUES from 10-CANONICAL-CONFIG.md
SUBNETS = {
    "public": ["10.0.1.0/24", "10.0.2.0/24"],
    "private": ["10.0.11.0/24", "10.0.12.0/24"],
    "database": ["10.0.21.0/24", "10.0.22.0/24"],
}

# ECS Configuration
ECS_CLUSTER_NAME = f"{PROJECT_NAME}-{ENVIRONMENT}"

# Service Definitions - CANONICAL VALUES (ports from Section 2.1)
SERVICES = {
    "backend-api": {
        "cpu": 512,
        "memory": 1024,
        "port": 8000,
        "desired_count": 1 if ENVIRONMENT != "prod" else 2,
        "health_check_path": "/health",
    },
    "data-collection": {
        "cpu": 256,
        "memory": 512,
        "port": 8002,  # FIXED: Was 8001, canonical is 8002
        "desired_count": 1 if ENVIRONMENT != "prod" else 2,
        "health_check_path": "/health",
    },
    "mlflow": {
        "cpu": 512,
        "memory": 1024,
        "port": 5000,
        "desired_count": 1,
        "health_check_path": "/health",
    },
    "strategy-service": {
        "cpu": 512,
        "memory": 1024,
        "port": 8003,
        "desired_count": 0,  # On-demand only
        "health_check_path": "/health",
    },
}

# Strategy Container (Fargate Spot for backtests)
STRATEGY_CONTAINER = {
    "cpu": 1024,
    "memory": 2048,
    "use_spot": True,
}

# Database Configuration - Environment-aware (CANONICAL: Section 3.3)
RDS_CONFIG = {
    "instance_class": "db.t4g.micro" if ENVIRONMENT != "prod" else "db.t4g.small",
    "allocated_storage": 20 if ENVIRONMENT != "prod" else 50,
    "engine_version": "15.4",
    "multi_az": ENVIRONMENT == "prod",
    "backup_retention": 7 if ENVIRONMENT != "prod" else 14,
}

# S3 Buckets - CANONICAL NAMES (Section 3.1)
S3_BUCKETS = {
    "configs": f"{PROJECT_NAME}-configs-{ENVIRONMENT}",
    "results": f"{PROJECT_NAME}-results-{ENVIRONMENT}",
    "arcticdb": f"{PROJECT_NAME}-arcticdb-{ENVIRONMENT}",
    "logs": f"{PROJECT_NAME}-logs-{ENVIRONMENT}",
    "mlflow": f"{PROJECT_NAME}-mlflow-{ENVIRONMENT}",
}

# SQS Configuration - CANONICAL VALUES (Section 6)
SQS_CONFIG = {
    "visibility_timeout": 900,  # 15 minutes
    "message_retention": 345600,  # 4 days
    "dlq_retention": 1209600,  # 14 days
    "max_receive_count": 3,
}

# Lambda Functions - CANONICAL LIST (Section 7)
LAMBDA_FUNCTIONS = [
    "validate-strategy",
    "validate-data",
    "sqs-consumer",
    "transform-results",
    "cleanup-resources",
    "notify-completion",
    "update-nat-routes",
]

# Log Retention - Environment-aware (CANONICAL: Section 10)
LOG_RETENTION_DAYS = {
    "dev": 7,
    "staging": 14,
    "prod": 30,
}.get(ENVIRONMENT, 7)

# Required config inputs with validation
def get_required_config(key: str) -> str:
    """Get required config value with validation."""
    value = config.require(key)
    return value

def get_optional_config(key: str, default: str = None) -> Optional[str]:
    """Get optional config value."""
    return config.get(key) or default

# Tags - CANONICAL REQUIREMENTS (Section 9.2)
def get_tags(service: str = None) -> dict:
    """Generate standard tags for resources per canonical spec."""
    tags = {
        "Application": PROJECT_NAME,
        "Environment": ENVIRONMENT,
        "ManagedBy": "pulumi",
        "CostCenter": "trading-platform",
        "Owner": "platform-team",
    }
    if service:
        tags["Service"] = service
    return tags

# Validation helper
def validate_environment():
    """Validate environment configuration."""
    valid_envs = ["dev", "staging", "prod"]
    if ENVIRONMENT not in valid_envs:
        raise ValueError(f"Invalid environment: {ENVIRONMENT}. Must be one of {valid_envs}")

validate_environment()

2. VPC and Networking

vpc/network.py

"""VPC and subnet configuration."""

import pulumi
import pulumi_aws as aws
from config import (
    VPC_CIDR,
    AVAILABILITY_ZONES,
    SUBNETS,
    PROJECT_NAME,
    ENVIRONMENT,
    get_tags,
)


class VpcNetwork:
    """Creates VPC with public, private, and database subnets."""

    def __init__(self):
        self.vpc = self._create_vpc()
        self.internet_gateway = self._create_internet_gateway()
        self.public_subnets = self._create_public_subnets()
        self.private_subnets = self._create_private_subnets()
        self.database_subnets = self._create_database_subnets()
        self.db_subnet_group = self._create_db_subnet_group()

    def _create_vpc(self) -> aws.ec2.Vpc:
        return aws.ec2.Vpc(
            f"{PROJECT_NAME}-vpc",
            cidr_block=VPC_CIDR,
            enable_dns_hostnames=True,
            enable_dns_support=True,
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-{ENVIRONMENT}-vpc"},
        )

    def _create_internet_gateway(self) -> aws.ec2.InternetGateway:
        return aws.ec2.InternetGateway(
            f"{PROJECT_NAME}-igw",
            vpc_id=self.vpc.id,
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-{ENVIRONMENT}-igw"},
        )

    def _create_public_subnets(self) -> list[aws.ec2.Subnet]:
        subnets = []
        for i, (az, cidr) in enumerate(
            zip(AVAILABILITY_ZONES, SUBNETS["public"])
        ):
            subnet = aws.ec2.Subnet(
                f"{PROJECT_NAME}-public-{i+1}",
                vpc_id=self.vpc.id,
                cidr_block=cidr,
                availability_zone=az,
                map_public_ip_on_launch=True,
                tags=get_tags() | {"Name": f"{PROJECT_NAME}-public-{az[-2:]}"},
            )
            subnets.append(subnet)

        # Public route table
        public_rt = aws.ec2.RouteTable(
            f"{PROJECT_NAME}-public-rt",
            vpc_id=self.vpc.id,
            routes=[
                aws.ec2.RouteTableRouteArgs(
                    cidr_block="0.0.0.0/0",
                    gateway_id=self.internet_gateway.id,
                )
            ],
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-public-rt"},
        )

        # Associate subnets with route table
        for i, subnet in enumerate(subnets):
            aws.ec2.RouteTableAssociation(
                f"{PROJECT_NAME}-public-rta-{i+1}",
                subnet_id=subnet.id,
                route_table_id=public_rt.id,
            )

        return subnets

    def _create_private_subnets(self) -> list[aws.ec2.Subnet]:
        subnets = []
        for i, (az, cidr) in enumerate(
            zip(AVAILABILITY_ZONES, SUBNETS["private"])
        ):
            subnet = aws.ec2.Subnet(
                f"{PROJECT_NAME}-private-{i+1}",
                vpc_id=self.vpc.id,
                cidr_block=cidr,
                availability_zone=az,
                tags=get_tags() | {"Name": f"{PROJECT_NAME}-private-{az[-2:]}"},
            )
            subnets.append(subnet)
        return subnets

    def _create_database_subnets(self) -> list[aws.ec2.Subnet]:
        subnets = []
        for i, (az, cidr) in enumerate(
            zip(AVAILABILITY_ZONES, SUBNETS["database"])
        ):
            subnet = aws.ec2.Subnet(
                f"{PROJECT_NAME}-database-{i+1}",
                vpc_id=self.vpc.id,
                cidr_block=cidr,
                availability_zone=az,
                tags=get_tags() | {"Name": f"{PROJECT_NAME}-database-{az[-2:]}"},
            )
            subnets.append(subnet)
        return subnets

    def _create_db_subnet_group(self) -> aws.rds.SubnetGroup:
        return aws.rds.SubnetGroup(
            f"{PROJECT_NAME}-db-subnet-group",
            subnet_ids=[s.id for s in self.database_subnets],
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-db-subnet-group"},
        )

vpc/security.py

"""Security groups and NACLs.

FIXED: Added port 80 to ALB SG for HTTP redirect.
FIXED: Added MLflow port 5000 to ECS SG.
FIXED: Updated NAT SG to use canonical CIDRs.
FIXED: Updated database subnet CIDRs.
"""

import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, VPC_CIDR, SUBNETS, get_tags


class SecurityGroups:
    """Creates all security groups for TradAI services."""

    def __init__(self, vpc_id: pulumi.Output[str]):
        self.vpc_id = vpc_id
        # Create in dependency order
        self.alb_sg = self._create_alb_sg()
        self.ecs_sg = self._create_ecs_sg()
        self.lambda_sg = self._create_lambda_sg()
        self.rds_sg = self._create_rds_sg()
        self.nat_sg = self._create_nat_sg()
        # Add cross-references after creation
        self._add_cross_references()

    def _create_alb_sg(self) -> aws.ec2.SecurityGroup:
        """ALB Security Group - allows HTTP (for redirect) and HTTPS."""
        return aws.ec2.SecurityGroup(
            f"{PROJECT_NAME}-alb-sg",
            vpc_id=self.vpc_id,
            description="ALB Security Group - HTTP/HTTPS",
            ingress=[
                # FIXED: Added port 80 for HTTP->HTTPS redirect
                aws.ec2.SecurityGroupIngressArgs(
                    description="HTTP for redirect",
                    from_port=80,
                    to_port=80,
                    protocol="tcp",
                    cidr_blocks=["0.0.0.0/0"],
                ),
                aws.ec2.SecurityGroupIngressArgs(
                    description="HTTPS from anywhere",
                    from_port=443,
                    to_port=443,
                    protocol="tcp",
                    cidr_blocks=["0.0.0.0/0"],
                ),
            ],
            egress=[
                aws.ec2.SecurityGroupEgressArgs(
                    description="All outbound to VPC",
                    from_port=0,
                    to_port=0,
                    protocol="-1",
                    cidr_blocks=[VPC_CIDR],
                ),
            ],
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-alb-sg"},
        )

    def _create_ecs_sg(self) -> aws.ec2.SecurityGroup:
        """ECS Tasks Security Group - receives traffic from ALB and Lambda."""
        return aws.ec2.SecurityGroup(
            f"{PROJECT_NAME}-ecs-sg",
            vpc_id=self.vpc_id,
            description="ECS Tasks Security Group",
            ingress=[
                # Backend API (8000)
                aws.ec2.SecurityGroupIngressArgs(
                    description="Backend API from ALB",
                    from_port=8000,
                    to_port=8000,
                    protocol="tcp",
                    security_groups=[self.alb_sg.id],
                ),
                # Data Collection (8002) - FIXED: was 8001
                aws.ec2.SecurityGroupIngressArgs(
                    description="Data Collection from ALB",
                    from_port=8002,
                    to_port=8002,
                    protocol="tcp",
                    security_groups=[self.alb_sg.id],
                ),
                # Strategy Service (8003)
                aws.ec2.SecurityGroupIngressArgs(
                    description="Strategy Service from ALB",
                    from_port=8003,
                    to_port=8003,
                    protocol="tcp",
                    security_groups=[self.alb_sg.id],
                ),
                # FIXED: Added MLflow (5000)
                aws.ec2.SecurityGroupIngressArgs(
                    description="MLflow from ALB",
                    from_port=5000,
                    to_port=5000,
                    protocol="tcp",
                    security_groups=[self.alb_sg.id],
                ),
            ],
            egress=[
                aws.ec2.SecurityGroupEgressArgs(
                    description="HTTPS to internet (AWS APIs)",
                    from_port=443,
                    to_port=443,
                    protocol="tcp",
                    cidr_blocks=["0.0.0.0/0"],
                ),
                # FIXED: Use canonical database subnet CIDRs
                aws.ec2.SecurityGroupEgressArgs(
                    description="PostgreSQL to DB subnets",
                    from_port=5432,
                    to_port=5432,
                    protocol="tcp",
                    cidr_blocks=SUBNETS["database"],  # 10.0.21.0/24, 10.0.22.0/24
                ),
            ],
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-ecs-sg"},
        )

    def _create_lambda_sg(self) -> aws.ec2.SecurityGroup:
        """Lambda Functions Security Group."""
        return aws.ec2.SecurityGroup(
            f"{PROJECT_NAME}-lambda-sg",
            vpc_id=self.vpc_id,
            description="Lambda Functions Security Group",
            # No ingress - Lambda functions initiate connections
            egress=[
                aws.ec2.SecurityGroupEgressArgs(
                    description="HTTPS to internet (AWS APIs)",
                    from_port=443,
                    to_port=443,
                    protocol="tcp",
                    cidr_blocks=["0.0.0.0/0"],
                ),
                # Will add ECS service ports via cross-reference
            ],
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-lambda-sg"},
        )

    def _create_rds_sg(self) -> aws.ec2.SecurityGroup:
        """RDS PostgreSQL Security Group - only accepts connections from ECS and Lambda."""
        return aws.ec2.SecurityGroup(
            f"{PROJECT_NAME}-rds-sg",
            vpc_id=self.vpc_id,
            description="RDS PostgreSQL Security Group",
            ingress=[
                aws.ec2.SecurityGroupIngressArgs(
                    description="PostgreSQL from ECS",
                    from_port=5432,
                    to_port=5432,
                    protocol="tcp",
                    security_groups=[self.ecs_sg.id],
                ),
            ],
            egress=[],  # No outbound needed for RDS
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-rds-sg"},
        )

    def _create_nat_sg(self) -> aws.ec2.SecurityGroup:
        """NAT Instance Security Group - allows traffic from private subnets."""
        return aws.ec2.SecurityGroup(
            f"{PROJECT_NAME}-nat-sg",
            vpc_id=self.vpc_id,
            description="NAT Instance Security Group",
            ingress=[
                # FIXED: Use canonical private subnet CIDRs
                aws.ec2.SecurityGroupIngressArgs(
                    description="All TCP from private subnets",
                    from_port=0,
                    to_port=65535,
                    protocol="tcp",
                    cidr_blocks=SUBNETS["private"],  # 10.0.11.0/24, 10.0.12.0/24
                ),
                aws.ec2.SecurityGroupIngressArgs(
                    description="All UDP from private subnets",
                    from_port=0,
                    to_port=65535,
                    protocol="udp",
                    cidr_blocks=SUBNETS["private"],
                ),
                aws.ec2.SecurityGroupIngressArgs(
                    description="ICMP from private subnets",
                    from_port=-1,
                    to_port=-1,
                    protocol="icmp",
                    cidr_blocks=SUBNETS["private"],
                ),
            ],
            egress=[
                aws.ec2.SecurityGroupEgressArgs(
                    description="All outbound to internet",
                    from_port=0,
                    to_port=0,
                    protocol="-1",
                    cidr_blocks=["0.0.0.0/0"],
                ),
            ],
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-nat-sg"},
        )

    def _add_cross_references(self):
        """Add security group rules that require cross-references."""
        # Lambda -> ECS services
        aws.ec2.SecurityGroupRule(
            f"{PROJECT_NAME}-lambda-to-ecs-backend",
            type="egress",
            security_group_id=self.lambda_sg.id,
            source_security_group_id=self.ecs_sg.id,
            from_port=8000,
            to_port=8000,
            protocol="tcp",
            description="Lambda to Backend API",
        )

        aws.ec2.SecurityGroupRule(
            f"{PROJECT_NAME}-lambda-to-ecs-data",
            type="egress",
            security_group_id=self.lambda_sg.id,
            source_security_group_id=self.ecs_sg.id,
            from_port=8002,
            to_port=8002,
            protocol="tcp",
            description="Lambda to Data Collection",
        )

        aws.ec2.SecurityGroupRule(
            f"{PROJECT_NAME}-lambda-to-ecs-mlflow",
            type="egress",
            security_group_id=self.lambda_sg.id,
            source_security_group_id=self.ecs_sg.id,
            from_port=5000,
            to_port=5000,
            protocol="tcp",
            description="Lambda to MLflow",
        )

        # ECS ingress from Lambda
        aws.ec2.SecurityGroupRule(
            f"{PROJECT_NAME}-ecs-from-lambda",
            type="ingress",
            security_group_id=self.ecs_sg.id,
            source_security_group_id=self.lambda_sg.id,
            from_port=8000,
            to_port=8003,
            protocol="tcp",
            description="ECS services from Lambda",
        )

        aws.ec2.SecurityGroupRule(
            f"{PROJECT_NAME}-ecs-mlflow-from-lambda",
            type="ingress",
            security_group_id=self.ecs_sg.id,
            source_security_group_id=self.lambda_sg.id,
            from_port=5000,
            to_port=5000,
            protocol="tcp",
            description="MLflow from Lambda",
        )

        # RDS ingress from Lambda
        aws.ec2.SecurityGroupRule(
            f"{PROJECT_NAME}-rds-from-lambda",
            type="ingress",
            security_group_id=self.rds_sg.id,
            source_security_group_id=self.lambda_sg.id,
            from_port=5432,
            to_port=5432,
            protocol="tcp",
            description="PostgreSQL from Lambda",
        )

3. NAT Instance

compute/nat.py

"""NAT Instance for cost-optimized internet access.

FIXED: Uses ARM-compatible AMI with t4g.nano instance.
ADDED: Auto Scaling Group for HA with lifecycle hook for route updates.
"""

import pulumi
import pulumi_aws as aws
import json
from config import PROJECT_NAME, ENVIRONMENT, get_tags, SUBNETS


class NatInstance:
    """Creates a NAT instance with ASG for HA instead of NAT Gateway for cost savings."""

    def __init__(
        self,
        public_subnet_ids: list[pulumi.Output[str]],
        private_subnet_ids: list[pulumi.Output[str]],
        security_group_id: pulumi.Output[str],
        vpc_id: pulumi.Output[str],
    ):
        self.eip = self._create_eip()
        self.iam_role = self._create_iam_role()
        self.instance_profile = self._create_instance_profile()
        self.launch_template = self._create_launch_template(
            public_subnet_ids[0], security_group_id
        )
        self.asg = self._create_asg(public_subnet_ids)
        self.route_table = self._create_private_route_table(vpc_id)
        self._associate_subnets(private_subnet_ids)
        self.lifecycle_hook = self._create_lifecycle_hook()
        self.route_update_lambda = self._create_route_update_lambda(vpc_id)

    def _create_eip(self) -> aws.ec2.Eip:
        """Create Elastic IP for NAT instance."""
        return aws.ec2.Eip(
            f"{PROJECT_NAME}-nat-eip",
            domain="vpc",
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-nat-eip"},
        )

    def _create_iam_role(self) -> aws.iam.Role:
        """Create IAM role for NAT instance to associate EIP."""
        assume_role_policy = json.dumps({
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "ec2.amazonaws.com"},
                "Action": "sts:AssumeRole",
            }],
        })

        role = aws.iam.Role(
            f"{PROJECT_NAME}-nat-role",
            name=f"{PROJECT_NAME}-nat-role",
            assume_role_policy=assume_role_policy,
            tags=get_tags(),
        )

        # Policy to associate EIP and modify routes
        aws.iam.RolePolicy(
            f"{PROJECT_NAME}-nat-policy",
            role=role.id,
            policy=json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Action": [
                            "ec2:AssociateAddress",
                            "ec2:DisassociateAddress",
                            "ec2:DescribeAddresses",
                        ],
                        "Resource": "*",
                    },
                    {
                        "Effect": "Allow",
                        "Action": [
                            "ec2:ReplaceRoute",
                            "ec2:CreateRoute",
                            "ec2:DescribeRouteTables",
                        ],
                        "Resource": "*",
                    },
                ],
            }),
        )

        return role

    def _create_instance_profile(self) -> aws.iam.InstanceProfile:
        """Create instance profile for NAT instance."""
        return aws.iam.InstanceProfile(
            f"{PROJECT_NAME}-nat-profile",
            name=f"{PROJECT_NAME}-nat-profile",
            role=self.iam_role.name,
        )

    def _create_launch_template(
        self,
        subnet_id: pulumi.Output[str],
        sg_id: pulumi.Output[str],
    ) -> aws.ec2.LaunchTemplate:
        """Create launch template for NAT instance ASG."""

        # FIXED: Use Amazon Linux 2023 ARM AMI (compatible with t4g.nano)
        ami = aws.ec2.get_ami(
            most_recent=True,
            owners=["amazon"],
            filters=[
                aws.ec2.GetAmiFilterArgs(
                    name="name",
                    values=["al2023-ami-*-arm64"],  # ARM64 for Graviton
                ),
                aws.ec2.GetAmiFilterArgs(
                    name="architecture",
                    values=["arm64"],
                ),
                aws.ec2.GetAmiFilterArgs(
                    name="virtualization-type",
                    values=["hvm"],
                ),
            ],
        )

        # User data script to configure NAT and associate EIP
        user_data = pulumi.Output.all(self.eip.allocation_id).apply(
            lambda args: f"""#!/bin/bash
set -e

# Install required packages
yum install -y iptables-services

# Enable IP forwarding
echo "net.ipv4.ip_forward = 1" >> /etc/sysctl.conf
sysctl -p

# Configure iptables for NAT
iptables -t nat -A POSTROUTING -o ens5 -j MASQUERADE
iptables -A FORWARD -i ens5 -o ens5 -m state --state RELATED,ESTABLISHED -j ACCEPT
iptables -A FORWARD -i ens5 -o ens5 -j ACCEPT

# Save iptables rules
service iptables save
systemctl enable iptables

# Get instance ID and region
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
REGION=$(curl -s http://169.254.169.254/latest/meta-data/placement/region)

# Associate Elastic IP
aws ec2 associate-address --instance-id $INSTANCE_ID --allocation-id {args[0]} --region $REGION

# Disable source/dest check (required for NAT)
aws ec2 modify-instance-attribute --instance-id $INSTANCE_ID --no-source-dest-check --region $REGION

# Signal success to CloudFormation/ASG
/opt/aws/bin/cfn-signal -e $? --region $REGION || true
"""
        )

        return aws.ec2.LaunchTemplate(
            f"{PROJECT_NAME}-nat-lt",
            name=f"{PROJECT_NAME}-nat-lt",
            image_id=ami.id,
            instance_type="t4g.nano",  # ARM instance - matches ARM AMI
            vpc_security_group_ids=[sg_id],
            iam_instance_profile=aws.ec2.LaunchTemplateIamInstanceProfileArgs(
                arn=self.instance_profile.arn,
            ),
            user_data=user_data.apply(lambda ud: __import__('base64').b64encode(ud.encode()).decode()),
            metadata_options=aws.ec2.LaunchTemplateMetadataOptionsArgs(
                http_endpoint="enabled",
                http_tokens="required",  # IMDSv2 required
            ),
            monitoring=aws.ec2.LaunchTemplateMonitoringArgs(
                enabled=True,
            ),
            tag_specifications=[
                aws.ec2.LaunchTemplateTagSpecificationArgs(
                    resource_type="instance",
                    tags=get_tags() | {"Name": f"{PROJECT_NAME}-nat-instance"},
                ),
            ],
            tags=get_tags(),
        )

    def _create_asg(
        self,
        subnet_ids: list[pulumi.Output[str]],
    ) -> aws.autoscaling.Group:
        """Create Auto Scaling Group for NAT instance HA."""
        return aws.autoscaling.Group(
            f"{PROJECT_NAME}-nat-asg",
            name=f"{PROJECT_NAME}-nat-asg",
            min_size=1,
            max_size=1,
            desired_capacity=1,
            vpc_zone_identifiers=subnet_ids,
            launch_template=aws.autoscaling.GroupLaunchTemplateArgs(
                id=self.launch_template.id,
                version="$Latest",
            ),
            health_check_type="EC2",
            health_check_grace_period=300,
            termination_policies=["OldestInstance"],
            tags=[
                aws.autoscaling.GroupTagArgs(
                    key="Name",
                    value=f"{PROJECT_NAME}-nat-instance",
                    propagate_at_launch=True,
                ),
                aws.autoscaling.GroupTagArgs(
                    key="Application",
                    value=PROJECT_NAME,
                    propagate_at_launch=True,
                ),
            ],
        )

    def _create_private_route_table(
        self,
        vpc_id: pulumi.Output[str],
    ) -> aws.ec2.RouteTable:
        """Create private route table (route will be added by Lambda on instance launch)."""
        return aws.ec2.RouteTable(
            f"{PROJECT_NAME}-private-rt",
            vpc_id=vpc_id,
            # Note: Default route added by Lambda when NAT instance launches
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-private-rt"},
        )

    def _associate_subnets(
        self,
        subnet_ids: list[pulumi.Output[str]],
    ):
        """Associate private subnets with route table."""
        for i, subnet_id in enumerate(subnet_ids):
            aws.ec2.RouteTableAssociation(
                f"{PROJECT_NAME}-private-rta-{i+1}",
                subnet_id=subnet_id,
                route_table_id=self.route_table.id,
            )

    def _create_lifecycle_hook(self) -> aws.autoscaling.LifecycleHook:
        """Create lifecycle hook to trigger route update on instance launch."""
        return aws.autoscaling.LifecycleHook(
            f"{PROJECT_NAME}-nat-lifecycle-hook",
            name=f"{PROJECT_NAME}-nat-launching",
            autoscaling_group_name=self.asg.name,
            lifecycle_transition="autoscaling:EC2_INSTANCE_LAUNCHING",
            heartbeat_timeout=300,
            default_result="CONTINUE",
        )

    def _create_route_update_lambda(
        self,
        vpc_id: pulumi.Output[str],
    ) -> aws.lambda_.Function:
        """Create Lambda function to update routes when NAT instance changes."""

        # IAM role for Lambda
        assume_role = json.dumps({
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "lambda.amazonaws.com"},
                "Action": "sts:AssumeRole",
            }],
        })

        role = aws.iam.Role(
            f"{PROJECT_NAME}-nat-route-lambda-role",
            assume_role_policy=assume_role,
            tags=get_tags(),
        )

        aws.iam.RolePolicyAttachment(
            f"{PROJECT_NAME}-nat-route-lambda-basic",
            role=role.name,
            policy_arn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
        )

        aws.iam.RolePolicy(
            f"{PROJECT_NAME}-nat-route-lambda-policy",
            role=role.id,
            policy=pulumi.Output.all(self.route_table.id).apply(lambda args: json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Action": [
                            "ec2:CreateRoute",
                            "ec2:ReplaceRoute",
                            "ec2:DeleteRoute",
                            "ec2:DescribeRouteTables",
                            "ec2:DescribeInstances",
                        ],
                        "Resource": "*",
                    },
                    {
                        "Effect": "Allow",
                        "Action": [
                            "autoscaling:CompleteLifecycleAction",
                        ],
                        "Resource": "*",
                    },
                ],
            })),
        )

        # Lambda function code
        lambda_code = """
import boto3
import json
import os

def handler(event, context):
    print(f"Received event: {json.dumps(event)}")

    ec2 = boto3.client('ec2')
    asg = boto3.client('autoscaling')

    route_table_id = os.environ['ROUTE_TABLE_ID']

    # Get the instance ID from the event
    detail = event.get('detail', {})
    instance_id = detail.get('EC2InstanceId')
    asg_name = detail.get('AutoScalingGroupName')
    lifecycle_hook = detail.get('LifecycleHookName')
    lifecycle_token = detail.get('LifecycleActionToken')

    if not instance_id:
        print("No instance ID found in event")
        return

    try:
        # Wait for instance to be running
        waiter = ec2.get_waiter('instance_running')
        waiter.wait(InstanceIds=[instance_id], WaiterConfig={'Delay': 5, 'MaxAttempts': 60})

        # Update route table to point to new NAT instance
        try:
            ec2.replace_route(
                RouteTableId=route_table_id,
                DestinationCidrBlock='0.0.0.0/0',
                InstanceId=instance_id,
            )
            print(f"Replaced route to {instance_id}")
        except ec2.exceptions.ClientError as e:
            if 'InvalidParameterValue' in str(e):
                # Route doesn't exist, create it
                ec2.create_route(
                    RouteTableId=route_table_id,
                    DestinationCidrBlock='0.0.0.0/0',
                    InstanceId=instance_id,
                )
                print(f"Created route to {instance_id}")
            else:
                raise

        # Complete lifecycle action
        if lifecycle_hook and lifecycle_token:
            asg.complete_lifecycle_action(
                LifecycleHookName=lifecycle_hook,
                AutoScalingGroupName=asg_name,
                LifecycleActionToken=lifecycle_token,
                LifecycleActionResult='CONTINUE',
            )
            print("Completed lifecycle action")

        return {'statusCode': 200, 'body': 'Route updated successfully'}

    except Exception as e:
        print(f"Error: {str(e)}")
        raise
"""

        func = aws.lambda_.Function(
            f"{PROJECT_NAME}-update-nat-routes",
            function_name=f"{PROJECT_NAME}-update-nat-routes",
            runtime="python3.11",
            handler="index.handler",
            role=role.arn,
            timeout=120,
            memory_size=128,
            environment=aws.lambda_.FunctionEnvironmentArgs(
                variables={
                    "ROUTE_TABLE_ID": self.route_table.id,
                },
            ),
            code=pulumi.AssetArchive({
                "index.py": pulumi.StringAsset(lambda_code),
            }),
            tags=get_tags("lambda"),
        )

        # EventBridge rule to trigger Lambda on ASG lifecycle events
        rule = aws.cloudwatch.EventRule(
            f"{PROJECT_NAME}-nat-lifecycle-rule",
            name=f"{PROJECT_NAME}-nat-lifecycle",
            event_pattern=json.dumps({
                "source": ["aws.autoscaling"],
                "detail-type": ["EC2 Instance-launch Lifecycle Action"],
                "detail": {
                    "AutoScalingGroupName": [f"{PROJECT_NAME}-nat-asg"],
                },
            }),
            tags=get_tags(),
        )

        aws.cloudwatch.EventTarget(
            f"{PROJECT_NAME}-nat-lifecycle-target",
            rule=rule.name,
            arn=func.arn,
        )

        aws.lambda_.Permission(
            f"{PROJECT_NAME}-nat-lambda-permission",
            action="lambda:InvokeFunction",
            function=func.name,
            principal="events.amazonaws.com",
            source_arn=rule.arn,
        )

        return func

4. ECS Cluster and Services

compute/ecs.py

"""ECS Fargate cluster and service definitions."""

import json
import pulumi
import pulumi_aws as aws
from config import (
    PROJECT_NAME,
    ENVIRONMENT,
    ECS_CLUSTER_NAME,
    SERVICES,
    get_tags,
)


class EcsCluster:
    """Creates ECS Fargate cluster with services."""

    def __init__(
        self,
        private_subnet_ids: list[pulumi.Output[str]],
        security_group_id: pulumi.Output[str],
        execution_role_arn: pulumi.Output[str],
        task_role_arn: pulumi.Output[str],
        target_groups: dict,
    ):
        self.cluster = self._create_cluster()
        self.log_group = self._create_log_group()
        self.services = {}

        for service_name, config in SERVICES.items():
            self.services[service_name] = self._create_service(
                service_name,
                config,
                private_subnet_ids,
                security_group_id,
                execution_role_arn,
                task_role_arn,
                target_groups.get(service_name),
            )

    def _create_cluster(self) -> aws.ecs.Cluster:
        return aws.ecs.Cluster(
            f"{PROJECT_NAME}-cluster",
            name=ECS_CLUSTER_NAME,
            settings=[
                aws.ecs.ClusterSettingArgs(
                    name="containerInsights",
                    value="enabled",
                )
            ],
            tags=get_tags(),
        )

    def _create_log_group(self) -> aws.cloudwatch.LogGroup:
        return aws.cloudwatch.LogGroup(
            f"{PROJECT_NAME}-logs",
            name=f"/ecs/{PROJECT_NAME}",
            retention_in_days=7,  # Cost optimization
            tags=get_tags(),
        )

    def _create_service(
        self,
        service_name: str,
        config: dict,
        subnet_ids: list[pulumi.Output[str]],
        sg_id: pulumi.Output[str],
        execution_role_arn: pulumi.Output[str],
        task_role_arn: pulumi.Output[str],
        target_group: aws.lb.TargetGroup | None,
    ) -> dict:
        # Task Definition
        container_def = json.dumps([{
            "name": service_name,
            "image": f"${{AWS_ACCOUNT_ID}}.dkr.ecr.${{AWS_REGION}}.amazonaws.com/{PROJECT_NAME}-{service_name}:latest",
            "cpu": config["cpu"],
            "memory": config["memory"],
            "essential": True,
            "portMappings": [{
                "containerPort": config["port"],
                "protocol": "tcp",
            }],
            "environment": [
                {"name": "ENVIRONMENT", "value": ENVIRONMENT},
                {"name": "SERVICE_NAME", "value": service_name},
            ],
            "secrets": [
                {
                    "name": "DATABASE_URL",
                    "valueFrom": f"arn:aws:secretsmanager:${{AWS_REGION}}:${{AWS_ACCOUNT_ID}}:secret:{PROJECT_NAME}/database-url",
                },
            ],
            "logConfiguration": {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-group": f"/ecs/{PROJECT_NAME}",
                    "awslogs-region": "${AWS_REGION}",
                    "awslogs-stream-prefix": service_name,
                },
            },
            "healthCheck": {
                "command": ["CMD-SHELL", f"curl -f http://localhost:{config['port']}{config['health_check_path']} || exit 1"],
                "interval": 30,
                "timeout": 5,
                "retries": 3,
                "startPeriod": 60,
            },
        }])

        task_def = aws.ecs.TaskDefinition(
            f"{PROJECT_NAME}-{service_name}-task",
            family=f"{PROJECT_NAME}-{service_name}",
            cpu=str(config["cpu"]),
            memory=str(config["memory"]),
            network_mode="awsvpc",
            requires_compatibilities=["FARGATE"],
            execution_role_arn=execution_role_arn,
            task_role_arn=task_role_arn,
            container_definitions=container_def,
            tags=get_tags(service_name),
        )

        # Service
        load_balancers = []
        if target_group:
            load_balancers = [
                aws.ecs.ServiceLoadBalancerArgs(
                    target_group_arn=target_group.arn,
                    container_name=service_name,
                    container_port=config["port"],
                )
            ]

        service = aws.ecs.Service(
            f"{PROJECT_NAME}-{service_name}-service",
            name=service_name,
            cluster=self.cluster.arn,
            task_definition=task_def.arn,
            desired_count=config["desired_count"],
            launch_type="FARGATE",
            network_configuration=aws.ecs.ServiceNetworkConfigurationArgs(
                subnets=subnet_ids,
                security_groups=[sg_id],
                assign_public_ip=False,
            ),
            load_balancers=load_balancers,
            health_check_grace_period_seconds=60 if load_balancers else None,
            tags=get_tags(service_name),
        )

        return {"task_definition": task_def, "service": service}

5. Lambda Functions

compute/lambda_funcs.py

"""Lambda function definitions."""

import json
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags


class LambdaFunctions:
    """Creates Lambda functions for TradAI."""

    def __init__(
        self,
        subnet_ids: list[pulumi.Output[str]],
        security_group_id: pulumi.Output[str],
        role_arn: pulumi.Output[str],
        sqs_queue_arn: pulumi.Output[str],
    ):
        self.sqs_consumer = self._create_sqs_consumer(
            subnet_ids, security_group_id, role_arn, sqs_queue_arn
        )
        self.data_collection_proxy = self._create_data_collection_proxy(
            subnet_ids, security_group_id, role_arn
        )
        self.backtest_trigger = self._create_backtest_trigger(
            subnet_ids, security_group_id, role_arn
        )

    def _create_sqs_consumer(
        self,
        subnet_ids: list[pulumi.Output[str]],
        sg_id: pulumi.Output[str],
        role_arn: pulumi.Output[str],
        sqs_queue_arn: pulumi.Output[str],
    ) -> aws.lambda_.Function:
        func = aws.lambda_.Function(
            f"{PROJECT_NAME}-sqs-consumer",
            function_name=f"{PROJECT_NAME}-sqs-consumer",
            runtime="python3.11",
            handler="handler.lambda_handler",
            role=role_arn,
            timeout=30,
            memory_size=256,
            vpc_config=aws.lambda_.FunctionVpcConfigArgs(
                subnet_ids=subnet_ids,
                security_group_ids=[sg_id],
            ),
            environment=aws.lambda_.FunctionEnvironmentArgs(
                variables={
                    "ENVIRONMENT": ENVIRONMENT,
                    "BACKEND_API_URL": "http://backend-api.tradai.local:8000",
                },
            ),
            code=pulumi.AssetArchive({
                ".": pulumi.FileArchive("./lambda/sqs-consumer"),
            }),
            tags=get_tags("sqs-consumer"),
        )

        # SQS trigger
        aws.lambda_.EventSourceMapping(
            f"{PROJECT_NAME}-sqs-trigger",
            event_source_arn=sqs_queue_arn,
            function_name=func.name,
            batch_size=1,
        )

        return func

    def _create_data_collection_proxy(
        self,
        subnet_ids: list[pulumi.Output[str]],
        sg_id: pulumi.Output[str],
        role_arn: pulumi.Output[str],
    ) -> aws.lambda_.Function:
        return aws.lambda_.Function(
            f"{PROJECT_NAME}-data-collection-proxy",
            function_name=f"{PROJECT_NAME}-data-collection-proxy",
            runtime="python3.11",
            handler="handler.lambda_handler",
            role=role_arn,
            timeout=60,
            memory_size=256,
            vpc_config=aws.lambda_.FunctionVpcConfigArgs(
                subnet_ids=subnet_ids,
                security_group_ids=[sg_id],
            ),
            environment=aws.lambda_.FunctionEnvironmentArgs(
                variables={
                    "ENVIRONMENT": ENVIRONMENT,
                    "DATA_COLLECTION_URL": "http://data-collection.tradai.local:8001",
                },
            ),
            code=pulumi.AssetArchive({
                ".": pulumi.FileArchive("./lambda/data-collection-proxy"),
            }),
            tags=get_tags("data-collection-proxy"),
        )

    def _create_backtest_trigger(
        self,
        subnet_ids: list[pulumi.Output[str]],
        sg_id: pulumi.Output[str],
        role_arn: pulumi.Output[str],
    ) -> aws.lambda_.Function:
        return aws.lambda_.Function(
            f"{PROJECT_NAME}-backtest-trigger",
            function_name=f"{PROJECT_NAME}-backtest-trigger",
            runtime="python3.11",
            handler="handler.lambda_handler",
            role=role_arn,
            timeout=30,
            memory_size=128,
            vpc_config=aws.lambda_.FunctionVpcConfigArgs(
                subnet_ids=subnet_ids,
                security_group_ids=[sg_id],
            ),
            environment=aws.lambda_.FunctionEnvironmentArgs(
                variables={
                    "ENVIRONMENT": ENVIRONMENT,
                    "STEP_FUNCTION_ARN": f"arn:aws:states:${{AWS_REGION}}:${{AWS_ACCOUNT_ID}}:stateMachine:{PROJECT_NAME}-backtest-workflow",
                },
            ),
            code=pulumi.AssetArchive({
                ".": pulumi.FileArchive("./lambda/backtest-trigger"),
            }),
            tags=get_tags("backtest-trigger"),
        )

6. Step Functions

orchestration/step_functions.py

"""Step Functions state machine definitions."""

import json
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags


class StepFunctions:
    """Creates Step Functions state machines for TradAI workflows."""

    def __init__(
        self,
        role_arn: pulumi.Output[str],
        ecs_cluster_arn: pulumi.Output[str],
        task_definition_arn: pulumi.Output[str],
        subnet_ids: list[pulumi.Output[str]],
        security_group_id: pulumi.Output[str],
    ):
        self.backtest_workflow = self._create_backtest_workflow(
            role_arn,
            ecs_cluster_arn,
            task_definition_arn,
            subnet_ids,
            security_group_id,
        )

    def _create_backtest_workflow(
        self,
        role_arn: pulumi.Output[str],
        cluster_arn: pulumi.Output[str],
        task_def_arn: pulumi.Output[str],
        subnet_ids: list[pulumi.Output[str]],
        sg_id: pulumi.Output[str],
    ) -> aws.sfn.StateMachine:
        # State machine definition
        definition = {
            "Comment": "TradAI Backtest Workflow v2.0",
            "StartAt": "InitializeBacktest",
            "States": {
                "InitializeBacktest": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::dynamodb:putItem",
                    "Parameters": {
                        "TableName": f"{PROJECT_NAME}-backtests",
                        "Item": {
                            "backtest_id": {"S.$": "$.backtest_id"},
                            "status": {"S": "INITIALIZING"},
                            "created_at": {"S.$": "$$.State.EnteredTime"},
                            "strategy_name": {"S.$": "$.strategy_name"},
                            "config": {"S.$": "States.JsonToString($.config)"},
                        },
                    },
                    "ResultPath": "$.dynamodb_result",
                    "Next": "ValidateInputs",
                },
                "ValidateInputs": {
                    "Type": "Parallel",
                    "Branches": [
                        {
                            "StartAt": "ValidateStrategy",
                            "States": {
                                "ValidateStrategy": {
                                    "Type": "Task",
                                    "Resource": "arn:aws:states:::lambda:invoke",
                                    "Parameters": {
                                        "FunctionName": f"{PROJECT_NAME}-validate-strategy",
                                        "Payload.$": "$",
                                    },
                                    "ResultPath": "$.strategy_validation",
                                    "End": True,
                                },
                            },
                        },
                        {
                            "StartAt": "ValidateData",
                            "States": {
                                "ValidateData": {
                                    "Type": "Task",
                                    "Resource": "arn:aws:states:::lambda:invoke",
                                    "Parameters": {
                                        "FunctionName": f"{PROJECT_NAME}-validate-data",
                                        "Payload.$": "$",
                                    },
                                    "ResultPath": "$.data_validation",
                                    "End": True,
                                },
                            },
                        },
                    ],
                    "ResultPath": "$.validation_results",
                    "Next": "UpdateStatusToRunning",
                    "Catch": [
                        {
                            "ErrorEquals": ["States.ALL"],
                            "ResultPath": "$.error",
                            "Next": "HandleValidationError",
                        }
                    ],
                },
                "HandleValidationError": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::dynamodb:updateItem",
                    "Parameters": {
                        "TableName": f"{PROJECT_NAME}-backtests",
                        "Key": {"backtest_id": {"S.$": "$.backtest_id"}},
                        "UpdateExpression": "SET #status = :status, error_message = :error",
                        "ExpressionAttributeNames": {"#status": "status"},
                        "ExpressionAttributeValues": {
                            ":status": {"S": "VALIDATION_FAILED"},
                            ":error": {"S.$": "States.JsonToString($.error)"},
                        },
                    },
                    "Next": "BacktestFailed",
                },
                "UpdateStatusToRunning": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::dynamodb:updateItem",
                    "Parameters": {
                        "TableName": f"{PROJECT_NAME}-backtests",
                        "Key": {"backtest_id": {"S.$": "$.backtest_id"}},
                        "UpdateExpression": "SET #status = :status",
                        "ExpressionAttributeNames": {"#status": "status"},
                        "ExpressionAttributeValues": {
                            ":status": {"S": "RUNNING"},
                        },
                    },
                    "ResultPath": None,
                    "Next": "RunBacktest",
                },
                "RunBacktest": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::ecs:runTask.sync",
                    "Parameters": {
                        "Cluster": cluster_arn,
                        "TaskDefinition": task_def_arn,
                        "LaunchType": "FARGATE",
                        "NetworkConfiguration": {
                            "AwsvpcConfiguration": {
                                "Subnets": subnet_ids,
                                "SecurityGroups": [sg_id],
                                "AssignPublicIp": "DISABLED",
                            }
                        },
                        "Overrides": {
                            "ContainerOverrides": [
                                {
                                    "Name": "strategy-container",
                                    "Environment": [
                                        {"Name": "BACKTEST_ID", "Value.$": "$.backtest_id"},
                                        {"Name": "STRATEGY_NAME", "Value.$": "$.strategy_name"},
                                        {"Name": "CONFIG", "Value.$": "States.JsonToString($.config)"},
                                    ],
                                }
                            ]
                        },
                        "CapacityProviderStrategy": [
                            {
                                "CapacityProvider": "FARGATE_SPOT",
                                "Weight": 1,
                            }
                        ],
                    },
                    "ResultPath": "$.ecs_result",
                    "Next": "UpdateStatusToCompleted",
                    "Retry": [
                        {
                            "ErrorEquals": ["ECS.AmazonECSException"],
                            "IntervalSeconds": 30,
                            "MaxAttempts": 3,
                            "BackoffRate": 2,
                        }
                    ],
                    "Catch": [
                        {
                            "ErrorEquals": ["States.ALL"],
                            "ResultPath": "$.error",
                            "Next": "HandleBacktestError",
                        }
                    ],
                },
                "HandleBacktestError": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::dynamodb:updateItem",
                    "Parameters": {
                        "TableName": f"{PROJECT_NAME}-backtests",
                        "Key": {"backtest_id": {"S.$": "$.backtest_id"}},
                        "UpdateExpression": "SET #status = :status, error_message = :error, completed_at = :completed",
                        "ExpressionAttributeNames": {"#status": "status"},
                        "ExpressionAttributeValues": {
                            ":status": {"S": "FAILED"},
                            ":error": {"S.$": "States.JsonToString($.error)"},
                            ":completed": {"S.$": "$$.State.EnteredTime"},
                        },
                    },
                    "Next": "BacktestFailed",
                },
                "UpdateStatusToCompleted": {
                    "Type": "Task",
                    "Resource": "arn:aws:states:::dynamodb:updateItem",
                    "Parameters": {
                        "TableName": f"{PROJECT_NAME}-backtests",
                        "Key": {"backtest_id": {"S.$": "$.backtest_id"}},
                        "UpdateExpression": "SET #status = :status, completed_at = :completed",
                        "ExpressionAttributeNames": {"#status": "status"},
                        "ExpressionAttributeValues": {
                            ":status": {"S": "COMPLETED"},
                            ":completed": {"S.$": "$$.State.EnteredTime"},
                        },
                    },
                    "ResultPath": None,
                    "Next": "BacktestSucceeded",
                },
                "BacktestSucceeded": {
                    "Type": "Succeed",
                },
                "BacktestFailed": {
                    "Type": "Fail",
                    "Error": "BacktestFailed",
                    "Cause": "Backtest execution failed",
                },
            },
        }

        return aws.sfn.StateMachine(
            f"{PROJECT_NAME}-backtest-workflow",
            name=f"{PROJECT_NAME}-backtest-workflow",
            role_arn=role_arn,
            type="STANDARD",
            definition=json.dumps(definition),
            logging_configuration=aws.sfn.StateMachineLoggingConfigurationArgs(
                level="ERROR",
                include_execution_data=True,
                log_destination=f"arn:aws:logs:${{AWS_REGION}}:${{AWS_ACCOUNT_ID}}:log-group:/aws/states/{PROJECT_NAME}:*",
            ),
            tags=get_tags("step-functions"),
        )

7. Storage

storage/rds.py

"""RDS PostgreSQL database."""

import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, RDS_CONFIG, get_tags


class RdsDatabase:
    """Creates RDS PostgreSQL instance."""

    def __init__(
        self,
        subnet_group_name: pulumi.Output[str],
        security_group_id: pulumi.Output[str],
        secret_arn: pulumi.Output[str],
    ):
        self.parameter_group = self._create_parameter_group()
        self.instance = self._create_instance(
            subnet_group_name,
            security_group_id,
            secret_arn,
        )

    def _create_parameter_group(self) -> aws.rds.ParameterGroup:
        return aws.rds.ParameterGroup(
            f"{PROJECT_NAME}-pg-params",
            family="postgres15",
            description="TradAI PostgreSQL parameters",
            parameters=[
                aws.rds.ParameterGroupParameterArgs(
                    name="log_statement",
                    value="ddl",
                ),
                aws.rds.ParameterGroupParameterArgs(
                    name="log_min_duration_statement",
                    value="1000",  # Log queries > 1 second
                ),
            ],
            tags=get_tags(),
        )

    def _create_instance(
        self,
        subnet_group_name: pulumi.Output[str],
        sg_id: pulumi.Output[str],
        secret_arn: pulumi.Output[str],
    ) -> aws.rds.Instance:
        return aws.rds.Instance(
            f"{PROJECT_NAME}-postgres",
            identifier=f"{PROJECT_NAME}-{ENVIRONMENT}",
            engine="postgres",
            engine_version=RDS_CONFIG["engine_version"],
            instance_class=RDS_CONFIG["instance_class"],
            allocated_storage=RDS_CONFIG["allocated_storage"],
            storage_type="gp3",
            storage_encrypted=True,
            db_subnet_group_name=subnet_group_name,
            vpc_security_group_ids=[sg_id],
            parameter_group_name=self.parameter_group.name,
            db_name="tradai",
            username="tradai_admin",
            manage_master_user_password=True,
            master_user_secret_kms_key_id=None,  # Use default
            multi_az=RDS_CONFIG["multi_az"],
            backup_retention_period=7,
            backup_window="03:00-04:00",
            maintenance_window="Mon:04:00-Mon:05:00",
            deletion_protection=ENVIRONMENT == "prod",
            skip_final_snapshot=ENVIRONMENT != "prod",
            final_snapshot_identifier=f"{PROJECT_NAME}-final-snapshot" if ENVIRONMENT == "prod" else None,
            enabled_cloudwatch_logs_exports=["postgresql"],
            performance_insights_enabled=True,
            performance_insights_retention_period=7,
            tags=get_tags("rds"),
        )

storage/s3.py

"""S3 buckets for TradAI."""

import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags


class S3Buckets:
    """Creates S3 buckets with lifecycle policies."""

    def __init__(self):
        self.data_bucket = self._create_data_bucket()
        self.results_bucket = self._create_results_bucket()
        self.mlflow_bucket = self._create_mlflow_bucket()

    def _create_data_bucket(self) -> aws.s3.BucketV2:
        bucket = aws.s3.BucketV2(
            f"{PROJECT_NAME}-data",
            bucket=f"{PROJECT_NAME}-data-{ENVIRONMENT}",
            tags=get_tags("data"),
        )

        # Enable versioning
        aws.s3.BucketVersioningV2(
            f"{PROJECT_NAME}-data-versioning",
            bucket=bucket.id,
            versioning_configuration=aws.s3.BucketVersioningV2VersioningConfigurationArgs(
                status="Enabled",
            ),
        )

        # Server-side encryption
        aws.s3.BucketServerSideEncryptionConfigurationV2(
            f"{PROJECT_NAME}-data-encryption",
            bucket=bucket.id,
            rules=[
                aws.s3.BucketServerSideEncryptionConfigurationV2RuleArgs(
                    apply_server_side_encryption_by_default=aws.s3.BucketServerSideEncryptionConfigurationV2RuleApplyServerSideEncryptionByDefaultArgs(
                        sse_algorithm="AES256",
                    ),
                )
            ],
        )

        # Block public access
        aws.s3.BucketPublicAccessBlock(
            f"{PROJECT_NAME}-data-public-block",
            bucket=bucket.id,
            block_public_acls=True,
            block_public_policy=True,
            ignore_public_acls=True,
            restrict_public_buckets=True,
        )

        return bucket

    def _create_results_bucket(self) -> aws.s3.BucketV2:
        bucket = aws.s3.BucketV2(
            f"{PROJECT_NAME}-results",
            bucket=f"{PROJECT_NAME}-results-{ENVIRONMENT}",
            tags=get_tags("results"),
        )

        # Lifecycle policy
        aws.s3.BucketLifecycleConfigurationV2(
            f"{PROJECT_NAME}-results-lifecycle",
            bucket=bucket.id,
            rules=[
                aws.s3.BucketLifecycleConfigurationV2RuleArgs(
                    id="expire-temp-files",
                    status="Enabled",
                    filter=aws.s3.BucketLifecycleConfigurationV2RuleFilterArgs(
                        prefix="temp/",
                    ),
                    expiration=aws.s3.BucketLifecycleConfigurationV2RuleExpirationArgs(
                        days=7,
                    ),
                ),
                aws.s3.BucketLifecycleConfigurationV2RuleArgs(
                    id="archive-old-results",
                    status="Enabled",
                    filter=aws.s3.BucketLifecycleConfigurationV2RuleFilterArgs(
                        prefix="results/",
                    ),
                    transitions=[
                        aws.s3.BucketLifecycleConfigurationV2RuleTransitionArgs(
                            days=30,
                            storage_class="GLACIER_IR",
                        ),
                    ],
                ),
            ],
        )

        # Encryption
        aws.s3.BucketServerSideEncryptionConfigurationV2(
            f"{PROJECT_NAME}-results-encryption",
            bucket=bucket.id,
            rules=[
                aws.s3.BucketServerSideEncryptionConfigurationV2RuleArgs(
                    apply_server_side_encryption_by_default=aws.s3.BucketServerSideEncryptionConfigurationV2RuleApplyServerSideEncryptionByDefaultArgs(
                        sse_algorithm="AES256",
                    ),
                )
            ],
        )

        return bucket

    def _create_mlflow_bucket(self) -> aws.s3.BucketV2:
        bucket = aws.s3.BucketV2(
            f"{PROJECT_NAME}-mlflow",
            bucket=f"{PROJECT_NAME}-mlflow-{ENVIRONMENT}",
            tags=get_tags("mlflow"),
        )

        # Versioning for experiment tracking
        aws.s3.BucketVersioningV2(
            f"{PROJECT_NAME}-mlflow-versioning",
            bucket=bucket.id,
            versioning_configuration=aws.s3.BucketVersioningV2VersioningConfigurationArgs(
                status="Enabled",
            ),
        )

        return bucket

storage/dynamodb.py

"""DynamoDB tables for TradAI."""

import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags


class DynamoDBTables:
    """Creates DynamoDB tables for backtest tracking."""

    def __init__(self):
        self.backtests_table = self._create_backtests_table()

    def _create_backtests_table(self) -> aws.dynamodb.Table:
        return aws.dynamodb.Table(
            f"{PROJECT_NAME}-backtests",
            name=f"{PROJECT_NAME}-backtests",
            billing_mode="PAY_PER_REQUEST",
            hash_key="backtest_id",
            attributes=[
                aws.dynamodb.TableAttributeArgs(
                    name="backtest_id",
                    type="S",
                ),
                aws.dynamodb.TableAttributeArgs(
                    name="status",
                    type="S",
                ),
                aws.dynamodb.TableAttributeArgs(
                    name="created_at",
                    type="S",
                ),
            ],
            global_secondary_indexes=[
                aws.dynamodb.TableGlobalSecondaryIndexArgs(
                    name="status-created_at-index",
                    hash_key="status",
                    range_key="created_at",
                    projection_type="ALL",
                ),
            ],
            point_in_time_recovery=aws.dynamodb.TablePointInTimeRecoveryArgs(
                enabled=True,
            ),
            server_side_encryption=aws.dynamodb.TableServerSideEncryptionArgs(
                enabled=True,
            ),
            ttl=aws.dynamodb.TableTtlArgs(
                attribute_name="ttl",
                enabled=True,
            ),
            tags=get_tags("dynamodb"),
        )

8. SQS Queues

orchestration/sqs.py

"""SQS queues for async processing."""

import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags


class SqsQueues:
    """Creates SQS queues with DLQ."""

    def __init__(self):
        self.dlq = self._create_dlq()
        self.backtest_queue = self._create_backtest_queue()

    def _create_dlq(self) -> aws.sqs.Queue:
        return aws.sqs.Queue(
            f"{PROJECT_NAME}-backtest-dlq",
            name=f"{PROJECT_NAME}-backtest-dlq.fifo",
            fifo_queue=True,
            content_based_deduplication=True,
            message_retention_seconds=1209600,  # 14 days
            tags=get_tags("sqs-dlq"),
        )

    def _create_backtest_queue(self) -> aws.sqs.Queue:
        return aws.sqs.Queue(
            f"{PROJECT_NAME}-backtest-queue",
            name=f"{PROJECT_NAME}-backtest-queue.fifo",
            fifo_queue=True,
            content_based_deduplication=True,
            visibility_timeout_seconds=300,  # 5 minutes
            message_retention_seconds=86400,  # 1 day
            redrive_policy=self.dlq.arn.apply(
                lambda arn: f'{{"deadLetterTargetArn":"{arn}","maxReceiveCount":3}}'
            ),
            tags=get_tags("sqs"),
        )

9. IAM Roles and Policies

security/iam.py

"""IAM roles and policies."""

import json
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags


class IamRoles:
    """Creates IAM roles for TradAI services."""

    def __init__(self):
        self.ecs_execution_role = self._create_ecs_execution_role()
        self.ecs_task_role = self._create_ecs_task_role()
        self.lambda_role = self._create_lambda_role()
        self.step_functions_role = self._create_step_functions_role()

    def _create_ecs_execution_role(self) -> aws.iam.Role:
        assume_role_policy = json.dumps({
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "ecs-tasks.amazonaws.com"},
                "Action": "sts:AssumeRole",
            }],
        })

        role = aws.iam.Role(
            f"{PROJECT_NAME}-ecs-execution-role",
            name=f"{PROJECT_NAME}-ecs-execution-role",
            assume_role_policy=assume_role_policy,
            tags=get_tags(),
        )

        # Attach managed policy
        aws.iam.RolePolicyAttachment(
            f"{PROJECT_NAME}-ecs-execution-policy",
            role=role.name,
            policy_arn="arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy",
        )

        # Secrets Manager access
        secrets_policy = aws.iam.RolePolicy(
            f"{PROJECT_NAME}-ecs-secrets-policy",
            role=role.id,
            policy=json.dumps({
                "Version": "2012-10-17",
                "Statement": [{
                    "Effect": "Allow",
                    "Action": [
                        "secretsmanager:GetSecretValue",
                    ],
                    "Resource": f"arn:aws:secretsmanager:*:*:secret:{PROJECT_NAME}/*",
                }],
            }),
        )

        return role

    def _create_ecs_task_role(self) -> aws.iam.Role:
        assume_role_policy = json.dumps({
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "ecs-tasks.amazonaws.com"},
                "Action": "sts:AssumeRole",
            }],
        })

        role = aws.iam.Role(
            f"{PROJECT_NAME}-ecs-task-role",
            name=f"{PROJECT_NAME}-ecs-task-role",
            assume_role_policy=assume_role_policy,
            tags=get_tags(),
        )

        # S3 access policy
        aws.iam.RolePolicy(
            f"{PROJECT_NAME}-ecs-s3-policy",
            role=role.id,
            policy=json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Action": [
                            "s3:GetObject",
                            "s3:PutObject",
                            "s3:ListBucket",
                        ],
                        "Resource": [
                            f"arn:aws:s3:::{PROJECT_NAME}-*",
                            f"arn:aws:s3:::{PROJECT_NAME}-*/*",
                        ],
                    },
                    {
                        "Effect": "Allow",
                        "Action": [
                            "dynamodb:GetItem",
                            "dynamodb:PutItem",
                            "dynamodb:UpdateItem",
                            "dynamodb:Query",
                        ],
                        "Resource": f"arn:aws:dynamodb:*:*:table/{PROJECT_NAME}-*",
                    },
                ],
            }),
        )

        return role

    def _create_lambda_role(self) -> aws.iam.Role:
        assume_role_policy = json.dumps({
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "lambda.amazonaws.com"},
                "Action": "sts:AssumeRole",
            }],
        })

        role = aws.iam.Role(
            f"{PROJECT_NAME}-lambda-role",
            name=f"{PROJECT_NAME}-lambda-role",
            assume_role_policy=assume_role_policy,
            tags=get_tags(),
        )

        # VPC access
        aws.iam.RolePolicyAttachment(
            f"{PROJECT_NAME}-lambda-vpc-policy",
            role=role.name,
            policy_arn="arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
        )

        # Custom permissions
        aws.iam.RolePolicy(
            f"{PROJECT_NAME}-lambda-custom-policy",
            role=role.id,
            policy=json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Action": [
                            "sqs:ReceiveMessage",
                            "sqs:DeleteMessage",
                            "sqs:GetQueueAttributes",
                        ],
                        "Resource": f"arn:aws:sqs:*:*:{PROJECT_NAME}-*",
                    },
                    {
                        "Effect": "Allow",
                        "Action": [
                            "states:StartExecution",
                        ],
                        "Resource": f"arn:aws:states:*:*:stateMachine:{PROJECT_NAME}-*",
                    },
                    {
                        "Effect": "Allow",
                        "Action": [
                            "dynamodb:GetItem",
                            "dynamodb:PutItem",
                            "dynamodb:UpdateItem",
                        ],
                        "Resource": f"arn:aws:dynamodb:*:*:table/{PROJECT_NAME}-*",
                    },
                ],
            }),
        )

        return role

    def _create_step_functions_role(self) -> aws.iam.Role:
        assume_role_policy = json.dumps({
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "states.amazonaws.com"},
                "Action": "sts:AssumeRole",
            }],
        })

        role = aws.iam.Role(
            f"{PROJECT_NAME}-sfn-role",
            name=f"{PROJECT_NAME}-sfn-role",
            assume_role_policy=assume_role_policy,
            tags=get_tags(),
        )

        # Step Functions permissions
        aws.iam.RolePolicy(
            f"{PROJECT_NAME}-sfn-policy",
            role=role.id,
            policy=json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Action": [
                            "ecs:RunTask",
                            "ecs:StopTask",
                            "ecs:DescribeTasks",
                        ],
                        "Resource": "*",
                        "Condition": {
                            "ArnEquals": {
                                "ecs:cluster": f"arn:aws:ecs:*:*:cluster/{PROJECT_NAME}-*",
                            }
                        },
                    },
                    {
                        "Effect": "Allow",
                        "Action": "iam:PassRole",
                        "Resource": [
                            f"arn:aws:iam::*:role/{PROJECT_NAME}-ecs-*",
                        ],
                    },
                    {
                        "Effect": "Allow",
                        "Action": [
                            "lambda:InvokeFunction",
                        ],
                        "Resource": f"arn:aws:lambda:*:*:function:{PROJECT_NAME}-*",
                    },
                    {
                        "Effect": "Allow",
                        "Action": [
                            "dynamodb:GetItem",
                            "dynamodb:PutItem",
                            "dynamodb:UpdateItem",
                        ],
                        "Resource": f"arn:aws:dynamodb:*:*:table/{PROJECT_NAME}-*",
                    },
                    {
                        "Effect": "Allow",
                        "Action": [
                            "logs:CreateLogDelivery",
                            "logs:GetLogDelivery",
                            "logs:UpdateLogDelivery",
                            "logs:DeleteLogDelivery",
                            "logs:ListLogDeliveries",
                            "logs:PutResourcePolicy",
                            "logs:DescribeResourcePolicies",
                            "logs:DescribeLogGroups",
                        ],
                        "Resource": "*",
                    },
                    {
                        "Effect": "Allow",
                        "Action": [
                            "events:PutTargets",
                            "events:PutRule",
                            "events:DescribeRule",
                        ],
                        "Resource": f"arn:aws:events:*:*:rule/StepFunctions*",
                    },
                ],
            }),
        )

        return role

10. ALB and API Gateway

networking/alb.py

"""Application Load Balancer configuration."""

import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, SERVICES, get_tags


class ApplicationLoadBalancer:
    """Creates ALB with target groups for ECS services."""

    def __init__(
        self,
        vpc_id: pulumi.Output[str],
        public_subnet_ids: list[pulumi.Output[str]],
        security_group_id: pulumi.Output[str],
        certificate_arn: str,
    ):
        self.alb = self._create_alb(public_subnet_ids, security_group_id)
        self.target_groups = self._create_target_groups(vpc_id)
        self.https_listener = self._create_https_listener(certificate_arn)
        self.http_listener = self._create_http_redirect_listener()

    def _create_alb(
        self,
        subnet_ids: list[pulumi.Output[str]],
        sg_id: pulumi.Output[str],
    ) -> aws.lb.LoadBalancer:
        return aws.lb.LoadBalancer(
            f"{PROJECT_NAME}-alb",
            name=f"{PROJECT_NAME}-{ENVIRONMENT}",
            internal=False,
            load_balancer_type="application",
            security_groups=[sg_id],
            subnets=subnet_ids,
            enable_deletion_protection=ENVIRONMENT == "prod",
            tags=get_tags("alb"),
        )

    def _create_target_groups(
        self,
        vpc_id: pulumi.Output[str],
    ) -> dict[str, aws.lb.TargetGroup]:
        target_groups = {}

        for service_name, config in SERVICES.items():
            tg = aws.lb.TargetGroup(
                f"{PROJECT_NAME}-{service_name}-tg",
                name=f"{PROJECT_NAME}-{service_name}"[:32],
                port=config["port"],
                protocol="HTTP",
                target_type="ip",
                vpc_id=vpc_id,
                health_check=aws.lb.TargetGroupHealthCheckArgs(
                    enabled=True,
                    path=config["health_check_path"],
                    port="traffic-port",
                    protocol="HTTP",
                    healthy_threshold=2,
                    unhealthy_threshold=3,
                    timeout=5,
                    interval=30,
                    matcher="200",
                ),
                tags=get_tags(service_name),
            )
            target_groups[service_name] = tg

        return target_groups

    def _create_https_listener(
        self,
        certificate_arn: str,
    ) -> aws.lb.Listener:
        listener = aws.lb.Listener(
            f"{PROJECT_NAME}-https-listener",
            load_balancer_arn=self.alb.arn,
            port=443,
            protocol="HTTPS",
            ssl_policy="ELBSecurityPolicy-TLS13-1-2-2021-06",
            certificate_arn=certificate_arn,
            default_action=aws.lb.ListenerDefaultActionArgs(
                type="fixed-response",
                fixed_response=aws.lb.ListenerDefaultActionFixedResponseArgs(
                    content_type="text/plain",
                    message_body="Not Found",
                    status_code="404",
                ),
            ),
            tags=get_tags(),
        )

        # Path-based routing rules
        priority = 100
        for service_name, tg in self.target_groups.items():
            aws.lb.ListenerRule(
                f"{PROJECT_NAME}-{service_name}-rule",
                listener_arn=listener.arn,
                priority=priority,
                actions=[
                    aws.lb.ListenerRuleActionArgs(
                        type="forward",
                        target_group_arn=tg.arn,
                    )
                ],
                conditions=[
                    aws.lb.ListenerRuleConditionArgs(
                        path_pattern=aws.lb.ListenerRuleConditionPathPatternArgs(
                            values=[f"/api/{service_name}/*"],
                        ),
                    )
                ],
                tags=get_tags(service_name),
            )
            priority += 10

        return listener

    def _create_http_redirect_listener(self) -> aws.lb.Listener:
        return aws.lb.Listener(
            f"{PROJECT_NAME}-http-listener",
            load_balancer_arn=self.alb.arn,
            port=80,
            protocol="HTTP",
            default_action=aws.lb.ListenerDefaultActionArgs(
                type="redirect",
                redirect=aws.lb.ListenerDefaultActionRedirectArgs(
                    port="443",
                    protocol="HTTPS",
                    status_code="HTTP_301",
                ),
            ),
            tags=get_tags(),
        )

11. Cognito Authentication

security/cognito.py

"""Cognito User Pool for authentication.

NEW: This module was missing from the original implementation.
Implements authentication per CANONICAL Section 4.2.
"""

import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags


class CognitoAuth:
    """Creates Cognito User Pool and App Client for JWT authentication."""

    def __init__(self):
        self.user_pool = self._create_user_pool()
        self.user_pool_client = self._create_user_pool_client()
        self.user_pool_domain = self._create_user_pool_domain()

    def _create_user_pool(self) -> aws.cognito.UserPool:
        """Create Cognito User Pool with MFA required."""
        return aws.cognito.UserPool(
            f"{PROJECT_NAME}-user-pool",
            name=f"{PROJECT_NAME}-users",
            # Password policy (CANONICAL Section 4.2)
            password_policy=aws.cognito.UserPoolPasswordPolicyArgs(
                minimum_length=12,
                require_lowercase=True,
                require_uppercase=True,
                require_numbers=True,
                require_symbols=True,
                temporary_password_validity_days=7,
            ),
            # MFA Configuration
            mfa_configuration="ON",
            software_token_mfa_configuration=aws.cognito.UserPoolSoftwareTokenMfaConfigurationArgs(
                enabled=True,
            ),
            # Account recovery
            account_recovery_setting=aws.cognito.UserPoolAccountRecoverySettingArgs(
                recovery_mechanisms=[
                    aws.cognito.UserPoolAccountRecoverySettingRecoveryMechanismArgs(
                        name="verified_email",
                        priority=1,
                    ),
                ],
            ),
            # Auto-verified attributes
            auto_verified_attributes=["email"],
            # Username configuration
            username_attributes=["email"],
            username_configuration=aws.cognito.UserPoolUsernameConfigurationArgs(
                case_sensitive=False,
            ),
            # Schema
            schemas=[
                aws.cognito.UserPoolSchemaArgs(
                    name="email",
                    attribute_data_type="String",
                    required=True,
                    mutable=True,
                ),
                aws.cognito.UserPoolSchemaArgs(
                    name="name",
                    attribute_data_type="String",
                    required=True,
                    mutable=True,
                ),
            ],
            # Advanced security
            user_pool_add_ons=aws.cognito.UserPoolUserPoolAddOnsArgs(
                advanced_security_mode="ENFORCED",
            ),
            # Email configuration
            email_configuration=aws.cognito.UserPoolEmailConfigurationArgs(
                email_sending_account="COGNITO_DEFAULT",
            ),
            # Deletion protection
            deletion_protection="ACTIVE" if ENVIRONMENT == "prod" else "INACTIVE",
            tags=get_tags("cognito"),
        )

    def _create_user_pool_client(self) -> aws.cognito.UserPoolClient:
        """Create App Client for API authentication."""
        return aws.cognito.UserPoolClient(
            f"{PROJECT_NAME}-api-client",
            name=f"{PROJECT_NAME}-api-client",
            user_pool_id=self.user_pool.id,
            # OAuth configuration
            allowed_oauth_flows=["code"],
            allowed_oauth_flows_user_pool_client=True,
            allowed_oauth_scopes=["email", "openid", "profile"],
            callback_urls=[
                f"https://api.tradai.smartml.me/callback",
                "http://localhost:3000/callback",  # Dev
            ],
            logout_urls=[
                f"https://api.tradai.smartml.me/logout",
                "http://localhost:3000/logout",
            ],
            supported_identity_providers=["COGNITO"],
            # Token configuration
            access_token_validity=1,  # 1 hour
            id_token_validity=1,  # 1 hour
            refresh_token_validity=30,  # 30 days
            token_validity_units=aws.cognito.UserPoolClientTokenValidityUnitsArgs(
                access_token="hours",
                id_token="hours",
                refresh_token="days",
            ),
            # Security
            prevent_user_existence_errors="ENABLED",
            enable_token_revocation=True,
            # Auth flows
            explicit_auth_flows=[
                "ALLOW_REFRESH_TOKEN_AUTH",
                "ALLOW_USER_SRP_AUTH",
            ],
        )

    def _create_user_pool_domain(self) -> aws.cognito.UserPoolDomain:
        """Create Cognito hosted UI domain."""
        return aws.cognito.UserPoolDomain(
            f"{PROJECT_NAME}-domain",
            domain=f"{PROJECT_NAME}-{ENVIRONMENT}",
            user_pool_id=self.user_pool.id,
        )

12. API Gateway

networking/api_gateway.py

"""AWS API Gateway HTTP API with JWT authorization.

NEW: This module was missing from the original implementation.
Implements API Gateway per CANONICAL Section 5.
"""

import json
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags, SQS_CONFIG


class ApiGateway:
    """Creates HTTP API Gateway with Cognito JWT authorization."""

    def __init__(
        self,
        cognito_user_pool_id: pulumi.Output[str],
        cognito_client_id: pulumi.Output[str],
        alb_listener_arn: pulumi.Output[str],
        alb_dns_name: pulumi.Output[str],
        sqs_queue_arn: pulumi.Output[str],
        vpc_id: pulumi.Output[str],
        private_subnet_ids: list[pulumi.Output[str]],
        security_group_id: pulumi.Output[str],
    ):
        self.api = self._create_api()
        self.vpc_link = self._create_vpc_link(private_subnet_ids, security_group_id)
        self.authorizer = self._create_authorizer(cognito_user_pool_id, cognito_client_id)
        self.sqs_role = self._create_sqs_integration_role(sqs_queue_arn)

        # Create integrations
        self.alb_integration = self._create_alb_integration(alb_dns_name)
        self.sqs_integration = self._create_sqs_integration(sqs_queue_arn)

        # Create routes
        self._create_routes()

        # Create stage
        self.stage = self._create_stage()

        # Custom domain (optional)
        self.custom_domain = self._create_custom_domain()

    def _create_api(self) -> aws.apigatewayv2.Api:
        """Create HTTP API."""
        return aws.apigatewayv2.Api(
            f"{PROJECT_NAME}-api",
            name=f"{PROJECT_NAME}-api-{ENVIRONMENT}",
            protocol_type="HTTP",
            cors_configuration=aws.apigatewayv2.ApiCorsConfigurationArgs(
                allow_origins=["*"] if ENVIRONMENT == "dev" else [
                    "https://tradai.smartml.me",
                    "https://app.tradai.smartml.me",
                ],
                allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
                allow_headers=["Content-Type", "Authorization", "X-Request-ID"],
                max_age=86400,
            ),
            tags=get_tags("api-gateway"),
        )

    def _create_vpc_link(
        self,
        subnet_ids: list[pulumi.Output[str]],
        sg_id: pulumi.Output[str],
    ) -> aws.apigatewayv2.VpcLink:
        """Create VPC Link for private ALB integration."""
        return aws.apigatewayv2.VpcLink(
            f"{PROJECT_NAME}-vpc-link",
            name=f"{PROJECT_NAME}-vpc-link",
            subnet_ids=subnet_ids,
            security_group_ids=[sg_id],
            tags=get_tags("vpc-link"),
        )

    def _create_authorizer(
        self,
        user_pool_id: pulumi.Output[str],
        client_id: pulumi.Output[str],
    ) -> aws.apigatewayv2.Authorizer:
        """Create JWT authorizer using Cognito."""
        return aws.apigatewayv2.Authorizer(
            f"{PROJECT_NAME}-jwt-authorizer",
            api_id=self.api.id,
            authorizer_type="JWT",
            name=f"{PROJECT_NAME}-cognito-authorizer",
            identity_sources=["$request.header.Authorization"],
            jwt_configuration=aws.apigatewayv2.AuthorizerJwtConfigurationArgs(
                audiences=[client_id],
                issuer=user_pool_id.apply(
                    lambda id: f"https://cognito-idp.us-east-1.amazonaws.com/{id}"
                ),
            ),
        )

    def _create_sqs_integration_role(
        self,
        queue_arn: pulumi.Output[str],
    ) -> aws.iam.Role:
        """Create IAM role for API Gateway -> SQS integration."""
        assume_role = json.dumps({
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "apigateway.amazonaws.com"},
                "Action": "sts:AssumeRole",
            }],
        })

        role = aws.iam.Role(
            f"{PROJECT_NAME}-apigw-sqs-role",
            name=f"{PROJECT_NAME}-apigw-sqs-role",
            assume_role_policy=assume_role,
            tags=get_tags(),
        )

        aws.iam.RolePolicy(
            f"{PROJECT_NAME}-apigw-sqs-policy",
            role=role.id,
            policy=queue_arn.apply(lambda arn: json.dumps({
                "Version": "2012-10-17",
                "Statement": [{
                    "Effect": "Allow",
                    "Action": ["sqs:SendMessage"],
                    "Resource": arn,
                }],
            })),
        )

        return role

    def _create_alb_integration(
        self,
        alb_dns: pulumi.Output[str],
    ) -> aws.apigatewayv2.Integration:
        """Create integration to ALB via VPC Link."""
        return aws.apigatewayv2.Integration(
            f"{PROJECT_NAME}-alb-integration",
            api_id=self.api.id,
            integration_type="HTTP_PROXY",
            integration_method="ANY",
            integration_uri=alb_dns.apply(lambda dns: f"http://{dns}"),
            connection_type="VPC_LINK",
            connection_id=self.vpc_link.id,
        )

    def _create_sqs_integration(
        self,
        queue_arn: pulumi.Output[str],
    ) -> aws.apigatewayv2.Integration:
        """Create direct integration to SQS for backtest requests."""
        return aws.apigatewayv2.Integration(
            f"{PROJECT_NAME}-sqs-integration",
            api_id=self.api.id,
            integration_type="AWS_PROXY",
            integration_subtype="SQS-SendMessage",
            credentials_arn=self.sqs_role.arn,
            request_parameters={
                "QueueUrl": queue_arn.apply(lambda arn: arn.replace("arn:aws:sqs:", "https://sqs.").replace(":", ".amazonaws.com/")),
                "MessageBody": "$request.body",
                "MessageGroupId": "$request.header.X-Request-ID",
            },
        )

    def _create_routes(self):
        """Create all API routes per CANONICAL Section 5.1."""
        routes = [
            # Health check (no auth)
            ("GET", "/api/v1/health", self.alb_integration.id, False),

            # Strategies (auth required)
            ("GET", "/api/v1/strategies", self.alb_integration.id, True),
            ("GET", "/api/v1/strategies/{id}", self.alb_integration.id, True),
            ("POST", "/api/v1/strategies", self.alb_integration.id, True),
            ("POST", "/api/v1/strategies/{name}/stage", self.alb_integration.id, True),
            ("POST", "/api/v1/strategies/{name}/promote", self.alb_integration.id, True),

            # Backtests (auth required, SQS integration)
            ("POST", "/api/v1/backtests", self.sqs_integration.id, True),
            ("GET", "/api/v1/backtests", self.alb_integration.id, True),
            ("GET", "/api/v1/backtests/{job_id}", self.alb_integration.id, True),
            ("POST", "/api/v1/backtests/{job_id}/cancel", self.alb_integration.id, True),
            ("GET", "/api/v1/backtests/{job_id}/equity", self.alb_integration.id, True),
            ("GET", "/api/v1/backtests/{job_id}/report-data", self.alb_integration.id, True),

            # Data (auth required)
            ("GET", "/api/v1/data/symbols", self.alb_integration.id, True),
            ("GET", "/api/v1/data/freshness", self.alb_integration.id, True),
            ("POST", "/api/v1/data/sync", self.alb_integration.id, True),

            # MLflow proxy (auth required)
            ("ANY", "/mlflow/{proxy+}", self.alb_integration.id, True),
        ]

        for i, (method, path, integration_id, auth_required) in enumerate(routes):
            route_key = f"{method} {path}"
            aws.apigatewayv2.Route(
                f"{PROJECT_NAME}-route-{i}",
                api_id=self.api.id,
                route_key=route_key,
                target=pulumi.Output.concat("integrations/", integration_id),
                authorization_type="JWT" if auth_required else "NONE",
                authorizer_id=self.authorizer.id if auth_required else None,
            )

    def _create_stage(self) -> aws.apigatewayv2.Stage:
        """Create deployment stage with throttling."""
        return aws.apigatewayv2.Stage(
            f"{PROJECT_NAME}-stage",
            api_id=self.api.id,
            name="$default",
            auto_deploy=True,
            default_route_settings=aws.apigatewayv2.StageDefaultRouteSettingsArgs(
                # CANONICAL Section 5.2
                throttling_burst_limit=200,
                throttling_rate_limit=100,
            ),
            access_log_settings=aws.apigatewayv2.StageAccessLogSettingsArgs(
                destination_arn=f"arn:aws:logs:us-east-1:*:log-group:/aws/api-gateway/{PROJECT_NAME}",
                format=json.dumps({
                    "requestId": "$context.requestId",
                    "ip": "$context.identity.sourceIp",
                    "requestTime": "$context.requestTime",
                    "httpMethod": "$context.httpMethod",
                    "routeKey": "$context.routeKey",
                    "status": "$context.status",
                    "responseLength": "$context.responseLength",
                    "integrationError": "$context.integrationErrorMessage",
                }),
            ),
            tags=get_tags("api-gateway"),
        )

    def _create_custom_domain(self) -> aws.apigatewayv2.DomainName:
        """Create custom domain for API."""
        config = pulumi.Config()
        certificate_arn = config.get("api_certificate_arn")

        if not certificate_arn:
            return None

        domain = aws.apigatewayv2.DomainName(
            f"{PROJECT_NAME}-domain",
            domain_name=f"api.tradai.smartml.me",
            domain_name_configuration=aws.apigatewayv2.DomainNameDomainNameConfigurationArgs(
                certificate_arn=certificate_arn,
                endpoint_type="REGIONAL",
                security_policy="TLS_1_2",
            ),
            tags=get_tags("api-gateway"),
        )

        aws.apigatewayv2.ApiMapping(
            f"{PROJECT_NAME}-api-mapping",
            api_id=self.api.id,
            domain_name=domain.id,
            stage=self.stage.id,
        )

        return domain

13. WAF Web ACL

security/waf.py

"""WAF Web ACL for API protection.

NEW: This module was missing from the original implementation.
Implements WAF per CANONICAL Section 4.3.
"""

import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags


class WafWebAcl:
    """Creates WAF Web ACL with rate limiting and managed rules."""

    def __init__(
        self,
        api_gateway_stage_arn: pulumi.Output[str] = None,
        alb_arn: pulumi.Output[str] = None,
    ):
        self.web_acl = self._create_web_acl()

        # Associate with resources
        if api_gateway_stage_arn:
            self._associate_api_gateway(api_gateway_stage_arn)
        if alb_arn:
            self._associate_alb(alb_arn)

    def _create_web_acl(self) -> aws.wafv2.WebAcl:
        """Create WAF Web ACL with rules per CANONICAL Section 4.3."""
        return aws.wafv2.WebAcl(
            f"{PROJECT_NAME}-waf",
            name=f"{PROJECT_NAME}-waf-{ENVIRONMENT}",
            scope="REGIONAL",
            default_action=aws.wafv2.WebAclDefaultActionArgs(
                allow=aws.wafv2.WebAclDefaultActionAllowArgs(),
            ),
            visibility_config=aws.wafv2.WebAclVisibilityConfigArgs(
                cloudwatch_metrics_enabled=True,
                metric_name=f"{PROJECT_NAME}-waf-metrics",
                sampled_requests_enabled=True,
            ),
            rules=[
                # Rule 1: Rate limiting (CANONICAL: 100 req/5min per IP)
                aws.wafv2.WebAclRuleArgs(
                    name="RateLimitRule",
                    priority=1,
                    action=aws.wafv2.WebAclRuleActionArgs(
                        block=aws.wafv2.WebAclRuleActionBlockArgs(),
                    ),
                    statement=aws.wafv2.WebAclRuleStatementArgs(
                        rate_based_statement=aws.wafv2.WebAclRuleStatementRateBasedStatementArgs(
                            limit=100,  # per 5-minute window
                            aggregate_key_type="IP",
                        ),
                    ),
                    visibility_config=aws.wafv2.WebAclRuleVisibilityConfigArgs(
                        cloudwatch_metrics_enabled=True,
                        metric_name="RateLimitRule",
                        sampled_requests_enabled=True,
                    ),
                ),
                # Rule 2: AWS Managed Common Rule Set
                aws.wafv2.WebAclRuleArgs(
                    name="AWSManagedRulesCommonRuleSet",
                    priority=2,
                    override_action=aws.wafv2.WebAclRuleOverrideActionArgs(
                        none=aws.wafv2.WebAclRuleOverrideActionNoneArgs(),
                    ),
                    statement=aws.wafv2.WebAclRuleStatementArgs(
                        managed_rule_group_statement=aws.wafv2.WebAclRuleStatementManagedRuleGroupStatementArgs(
                            vendor_name="AWS",
                            name="AWSManagedRulesCommonRuleSet",
                        ),
                    ),
                    visibility_config=aws.wafv2.WebAclRuleVisibilityConfigArgs(
                        cloudwatch_metrics_enabled=True,
                        metric_name="AWSManagedRulesCommonRuleSet",
                        sampled_requests_enabled=True,
                    ),
                ),
                # Rule 3: AWS Managed Known Bad Inputs
                aws.wafv2.WebAclRuleArgs(
                    name="AWSManagedRulesKnownBadInputsRuleSet",
                    priority=3,
                    override_action=aws.wafv2.WebAclRuleOverrideActionArgs(
                        none=aws.wafv2.WebAclRuleOverrideActionNoneArgs(),
                    ),
                    statement=aws.wafv2.WebAclRuleStatementArgs(
                        managed_rule_group_statement=aws.wafv2.WebAclRuleStatementManagedRuleGroupStatementArgs(
                            vendor_name="AWS",
                            name="AWSManagedRulesKnownBadInputsRuleSet",
                        ),
                    ),
                    visibility_config=aws.wafv2.WebAclRuleVisibilityConfigArgs(
                        cloudwatch_metrics_enabled=True,
                        metric_name="AWSManagedRulesKnownBadInputsRuleSet",
                        sampled_requests_enabled=True,
                    ),
                ),
                # Rule 4: AWS Managed SQL Injection Rules
                aws.wafv2.WebAclRuleArgs(
                    name="AWSManagedRulesSQLiRuleSet",
                    priority=4,
                    override_action=aws.wafv2.WebAclRuleOverrideActionArgs(
                        none=aws.wafv2.WebAclRuleOverrideActionNoneArgs(),
                    ),
                    statement=aws.wafv2.WebAclRuleStatementArgs(
                        managed_rule_group_statement=aws.wafv2.WebAclRuleStatementManagedRuleGroupStatementArgs(
                            vendor_name="AWS",
                            name="AWSManagedRulesSQLiRuleSet",
                        ),
                    ),
                    visibility_config=aws.wafv2.WebAclRuleVisibilityConfigArgs(
                        cloudwatch_metrics_enabled=True,
                        metric_name="AWSManagedRulesSQLiRuleSet",
                        sampled_requests_enabled=True,
                    ),
                ),
            ],
            tags=get_tags("waf"),
        )

    def _associate_api_gateway(self, stage_arn: pulumi.Output[str]):
        """Associate WAF with API Gateway stage."""
        aws.wafv2.WebAclAssociation(
            f"{PROJECT_NAME}-waf-apigw-association",
            resource_arn=stage_arn,
            web_acl_arn=self.web_acl.arn,
        )

    def _associate_alb(self, alb_arn: pulumi.Output[str]):
        """Associate WAF with ALB."""
        aws.wafv2.WebAclAssociation(
            f"{PROJECT_NAME}-waf-alb-association",
            resource_arn=alb_arn,
            web_acl_arn=self.web_acl.arn,
        )

14. Main Entry Point

main.py

"""Main Pulumi program for TradAI infrastructure.

UPDATED: Added Cognito, API Gateway, WAF, VPC Endpoints, CloudTrail.
"""

import pulumi
from vpc.network import VpcNetwork
from vpc.security import SecurityGroups
from vpc.endpoints import VpcEndpoints
from vpc.nacls import NetworkAcls
from compute.nat import NatInstance
from compute.ecs import EcsCluster
from compute.lambda_funcs import LambdaFunctions
from orchestration.step_functions import StepFunctions
from orchestration.sqs import SqsQueues
from storage.rds import RdsDatabase
from storage.s3 import S3Buckets
from storage.dynamodb import DynamoDBTables
from security.iam import IamRoles
from security.cognito import CognitoAuth
from security.waf import WafWebAcl
from security.cloudtrail import CloudTrailAudit
from networking.alb import ApplicationLoadBalancer
from networking.api_gateway import ApiGateway
from observability.cloudwatch import CloudWatchAlarms
from config import PROJECT_NAME, ENVIRONMENT


def main():
    # 1. Create VPC and networking
    vpc = VpcNetwork()

    # 2. Create security groups
    security_groups = SecurityGroups(vpc.vpc.id)

    # 3. Create NAT instance with HA
    nat = NatInstance(
        public_subnet_ids=[s.id for s in vpc.public_subnets],
        private_subnet_ids=[s.id for s in vpc.private_subnets],
        security_group_id=security_groups.nat_sg.id,
        vpc_id=vpc.vpc.id,
    )

    # 4. Create VPC Endpoints (S3, DynamoDB - free gateway endpoints)
    endpoints = VpcEndpoints(
        vpc_id=vpc.vpc.id,
        route_table_ids=[nat.route_table.id],
    )

    # 5. Create Network ACLs
    nacls = NetworkAcls(
        vpc_id=vpc.vpc.id,
        public_subnet_ids=[s.id for s in vpc.public_subnets],
        private_subnet_ids=[s.id for s in vpc.private_subnets],
        database_subnet_ids=[s.id for s in vpc.database_subnets],
    )

    # 6. Create IAM roles
    iam = IamRoles()

    # 7. Create storage
    s3 = S3Buckets()
    dynamodb = DynamoDBTables()

    # 8. Create SQS queues
    sqs = SqsQueues()

    # 9. Create Cognito authentication
    cognito = CognitoAuth()

    # 10. Create ALB (internal, accessed via API Gateway)
    alb = ApplicationLoadBalancer(
        vpc_id=vpc.vpc.id,
        public_subnet_ids=[s.id for s in vpc.public_subnets],
        security_group_id=security_groups.alb_sg.id,
        certificate_arn=pulumi.Config().require("certificate_arn"),
    )

    # 11. Create RDS database
    rds = RdsDatabase(
        subnet_group_name=vpc.db_subnet_group.name,
        security_group_id=security_groups.rds_sg.id,
    )

    # 12. Create ECS cluster and services
    ecs = EcsCluster(
        private_subnet_ids=[s.id for s in vpc.private_subnets],
        security_group_id=security_groups.ecs_sg.id,
        execution_role_arn=iam.ecs_execution_role.arn,
        task_role_arn=iam.ecs_task_role.arn,
        target_groups=alb.target_groups,
    )

    # 13. Create Lambda functions
    lambdas = LambdaFunctions(
        subnet_ids=[s.id for s in vpc.private_subnets],
        security_group_id=security_groups.lambda_sg.id,
        role_arn=iam.lambda_role.arn,
        sqs_queue_arn=sqs.backtest_queue.arn,
    )

    # 14. Create Step Functions
    step_functions = StepFunctions(
        role_arn=iam.step_functions_role.arn,
        ecs_cluster_arn=ecs.cluster.arn,
        strategy_task_definition_arn=ecs.strategy_task_definition.arn,
        subnet_ids=[s.id for s in vpc.private_subnets],
        security_group_id=security_groups.ecs_sg.id,
        dynamodb_table_name=dynamodb.workflow_table.name,
        lambdas=lambdas,
    )

    # 15. Create API Gateway
    api_gateway = ApiGateway(
        cognito_user_pool_id=cognito.user_pool.id,
        cognito_client_id=cognito.user_pool_client.id,
        alb_listener_arn=alb.https_listener.arn,
        alb_dns_name=alb.alb.dns_name,
        sqs_queue_arn=sqs.backtest_queue.arn,
        vpc_id=vpc.vpc.id,
        private_subnet_ids=[s.id for s in vpc.private_subnets],
        security_group_id=security_groups.lambda_sg.id,
    )

    # 16. Create WAF
    waf = WafWebAcl(
        api_gateway_stage_arn=api_gateway.stage.arn,
        alb_arn=alb.alb.arn,
    )

    # 17. Create CloudTrail audit logging
    cloudtrail = CloudTrailAudit(
        s3_bucket_name=s3.logs_bucket.id,
    )

    # 18. Create CloudWatch alarms
    alarms = CloudWatchAlarms(
        ecs_cluster_name=ecs.cluster.name,
        sqs_queue_name=sqs.backtest_queue.name,
        dlq_name=sqs.dlq.name,
        step_function_arn=step_functions.backtest_workflow.arn,
    )

    # Export outputs
    pulumi.export("vpc_id", vpc.vpc.id)
    pulumi.export("api_gateway_url", api_gateway.api.api_endpoint)
    pulumi.export("cognito_user_pool_id", cognito.user_pool.id)
    pulumi.export("cognito_client_id", cognito.user_pool_client.id)
    pulumi.export("alb_dns", alb.alb.dns_name)
    pulumi.export("rds_endpoint", rds.instance.endpoint)
    pulumi.export("ecs_cluster_name", ecs.cluster.name)
    pulumi.export("backtest_queue_url", sqs.backtest_queue.url)
    pulumi.export("step_function_arn", step_functions.backtest_workflow.arn)
    pulumi.export("waf_web_acl_arn", waf.web_acl.arn)


if __name__ == "__main__":
    main()

15. VPC Endpoints and NACLs

vpc/endpoints.py

"""VPC Endpoints for AWS services.

NEW: S3 and DynamoDB gateway endpoints (free) for cost and security.
"""

import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, AWS_REGION, get_tags


class VpcEndpoints:
    """Creates VPC gateway endpoints for S3 and DynamoDB."""

    def __init__(
        self,
        vpc_id: pulumi.Output[str],
        route_table_ids: list[pulumi.Output[str]],
    ):
        self.s3_endpoint = self._create_s3_endpoint(vpc_id, route_table_ids)
        self.dynamodb_endpoint = self._create_dynamodb_endpoint(vpc_id, route_table_ids)

    def _create_s3_endpoint(
        self,
        vpc_id: pulumi.Output[str],
        route_table_ids: list[pulumi.Output[str]],
    ) -> aws.ec2.VpcEndpoint:
        """Create S3 gateway endpoint (free)."""
        return aws.ec2.VpcEndpoint(
            f"{PROJECT_NAME}-s3-endpoint",
            vpc_id=vpc_id,
            service_name=f"com.amazonaws.{AWS_REGION}.s3",
            vpc_endpoint_type="Gateway",
            route_table_ids=route_table_ids,
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-s3-endpoint"},
        )

    def _create_dynamodb_endpoint(
        self,
        vpc_id: pulumi.Output[str],
        route_table_ids: list[pulumi.Output[str]],
    ) -> aws.ec2.VpcEndpoint:
        """Create DynamoDB gateway endpoint (free)."""
        return aws.ec2.VpcEndpoint(
            f"{PROJECT_NAME}-dynamodb-endpoint",
            vpc_id=vpc_id,
            service_name=f"com.amazonaws.{AWS_REGION}.dynamodb",
            vpc_endpoint_type="Gateway",
            route_table_ids=route_table_ids,
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-dynamodb-endpoint"},
        )

vpc/nacls.py

"""Network ACLs for defense in depth.

NEW: Stateless firewall layer in addition to security groups.
"""

import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, SUBNETS, get_tags


class NetworkAcls:
    """Creates Network ACLs for all subnet types."""

    def __init__(
        self,
        vpc_id: pulumi.Output[str],
        public_subnet_ids: list[pulumi.Output[str]],
        private_subnet_ids: list[pulumi.Output[str]],
        database_subnet_ids: list[pulumi.Output[str]],
    ):
        self.public_nacl = self._create_public_nacl(vpc_id, public_subnet_ids)
        self.private_nacl = self._create_private_nacl(vpc_id, private_subnet_ids)
        self.database_nacl = self._create_database_nacl(vpc_id, database_subnet_ids)

    def _create_public_nacl(
        self,
        vpc_id: pulumi.Output[str],
        subnet_ids: list[pulumi.Output[str]],
    ) -> aws.ec2.NetworkAcl:
        nacl = aws.ec2.NetworkAcl(
            f"{PROJECT_NAME}-public-nacl",
            vpc_id=vpc_id,
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-public-nacl"},
        )

        # Ingress rules
        rules = [
            (100, "6", 80, 80, "0.0.0.0/0", "allow"),      # HTTP
            (110, "6", 443, 443, "0.0.0.0/0", "allow"),    # HTTPS
            (120, "6", 1024, 65535, "0.0.0.0/0", "allow"), # Ephemeral ports
        ]
        for i, (num, proto, from_p, to_p, cidr, action) in enumerate(rules):
            aws.ec2.NetworkAclRule(
                f"{PROJECT_NAME}-public-nacl-in-{i}",
                network_acl_id=nacl.id,
                rule_number=num,
                protocol=proto,
                rule_action=action,
                egress=False,
                cidr_block=cidr,
                from_port=from_p,
                to_port=to_p,
            )

        # Egress - allow all outbound
        aws.ec2.NetworkAclRule(
            f"{PROJECT_NAME}-public-nacl-out",
            network_acl_id=nacl.id,
            rule_number=100,
            protocol="-1",
            rule_action="allow",
            egress=True,
            cidr_block="0.0.0.0/0",
        )

        # Associate with subnets
        for i, subnet_id in enumerate(subnet_ids):
            aws.ec2.NetworkAclAssociation(
                f"{PROJECT_NAME}-public-nacl-assoc-{i}",
                network_acl_id=nacl.id,
                subnet_id=subnet_id,
            )

        return nacl

    def _create_private_nacl(
        self,
        vpc_id: pulumi.Output[str],
        subnet_ids: list[pulumi.Output[str]],
    ) -> aws.ec2.NetworkAcl:
        nacl = aws.ec2.NetworkAcl(
            f"{PROJECT_NAME}-private-nacl",
            vpc_id=vpc_id,
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-private-nacl"},
        )

        # Ingress - from VPC only
        aws.ec2.NetworkAclRule(
            f"{PROJECT_NAME}-private-nacl-in-vpc",
            network_acl_id=nacl.id,
            rule_number=100,
            protocol="-1",
            rule_action="allow",
            egress=False,
            cidr_block="10.0.0.0/16",
        )

        # Ingress - ephemeral for NAT return traffic
        aws.ec2.NetworkAclRule(
            f"{PROJECT_NAME}-private-nacl-in-ephemeral",
            network_acl_id=nacl.id,
            rule_number=110,
            protocol="6",
            rule_action="allow",
            egress=False,
            cidr_block="0.0.0.0/0",
            from_port=1024,
            to_port=65535,
        )

        # Egress - allow all
        aws.ec2.NetworkAclRule(
            f"{PROJECT_NAME}-private-nacl-out",
            network_acl_id=nacl.id,
            rule_number=100,
            protocol="-1",
            rule_action="allow",
            egress=True,
            cidr_block="0.0.0.0/0",
        )

        for i, subnet_id in enumerate(subnet_ids):
            aws.ec2.NetworkAclAssociation(
                f"{PROJECT_NAME}-private-nacl-assoc-{i}",
                network_acl_id=nacl.id,
                subnet_id=subnet_id,
            )

        return nacl

    def _create_database_nacl(
        self,
        vpc_id: pulumi.Output[str],
        subnet_ids: list[pulumi.Output[str]],
    ) -> aws.ec2.NetworkAcl:
        nacl = aws.ec2.NetworkAcl(
            f"{PROJECT_NAME}-db-nacl",
            vpc_id=vpc_id,
            tags=get_tags() | {"Name": f"{PROJECT_NAME}-db-nacl"},
        )

        # Ingress - PostgreSQL from private subnets only
        for i, cidr in enumerate(SUBNETS["private"]):
            aws.ec2.NetworkAclRule(
                f"{PROJECT_NAME}-db-nacl-in-pg-{i}",
                network_acl_id=nacl.id,
                rule_number=100 + i,
                protocol="6",
                rule_action="allow",
                egress=False,
                cidr_block=cidr,
                from_port=5432,
                to_port=5432,
            )

        # Egress - ephemeral ports to private subnets
        for i, cidr in enumerate(SUBNETS["private"]):
            aws.ec2.NetworkAclRule(
                f"{PROJECT_NAME}-db-nacl-out-{i}",
                network_acl_id=nacl.id,
                rule_number=100 + i,
                protocol="6",
                rule_action="allow",
                egress=True,
                cidr_block=cidr,
                from_port=1024,
                to_port=65535,
            )

        for i, subnet_id in enumerate(subnet_ids):
            aws.ec2.NetworkAclAssociation(
                f"{PROJECT_NAME}-db-nacl-assoc-{i}",
                network_acl_id=nacl.id,
                subnet_id=subnet_id,
            )

        return nacl

16. CloudTrail and VPC Flow Logs

security/cloudtrail.py

"""CloudTrail audit logging.

NEW: Required for compliance and security forensics.
"""

import json
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags


class CloudTrailAudit:
    """Creates CloudTrail for audit logging."""

    def __init__(self, s3_bucket_name: pulumi.Output[str]):
        self.flow_logs_group = self._create_flow_logs_group()
        self.trail = self._create_trail(s3_bucket_name)

    def _create_flow_logs_group(self) -> aws.cloudwatch.LogGroup:
        """Create CloudWatch Log Group for VPC Flow Logs."""
        return aws.cloudwatch.LogGroup(
            f"{PROJECT_NAME}-flow-logs",
            name=f"/aws/vpc/{PROJECT_NAME}/flow-logs",
            retention_in_days=7,
            tags=get_tags("flow-logs"),
        )

    def _create_trail(self, bucket_name: pulumi.Output[str]) -> aws.cloudtrail.Trail:
        """Create CloudTrail trail."""
        return aws.cloudtrail.Trail(
            f"{PROJECT_NAME}-trail",
            name=f"{PROJECT_NAME}-audit-trail",
            s3_bucket_name=bucket_name,
            s3_key_prefix="cloudtrail",
            include_global_service_events=True,
            is_multi_region_trail=False,
            enable_logging=True,
            enable_log_file_validation=True,
            event_selectors=[
                aws.cloudtrail.TrailEventSelectorArgs(
                    read_write_type="All",
                    include_management_events=True,
                    data_resources=[
                        aws.cloudtrail.TrailEventSelectorDataResourceArgs(
                            type="AWS::S3::Object",
                            values=[f"arn:aws:s3:::{PROJECT_NAME}-*/*"],
                        ),
                        aws.cloudtrail.TrailEventSelectorDataResourceArgs(
                            type="AWS::DynamoDB::Table",
                            values=["arn:aws:dynamodb"],
                        ),
                    ],
                ),
            ],
            tags=get_tags("cloudtrail"),
        )


class VpcFlowLogs:
    """Creates VPC Flow Logs."""

    def __init__(
        self,
        vpc_id: pulumi.Output[str],
        log_group_arn: pulumi.Output[str],
    ):
        self.role = self._create_role()
        self.flow_log = self._create_flow_log(vpc_id, log_group_arn)

    def _create_role(self) -> aws.iam.Role:
        assume_role = json.dumps({
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "vpc-flow-logs.amazonaws.com"},
                "Action": "sts:AssumeRole",
            }],
        })

        role = aws.iam.Role(
            f"{PROJECT_NAME}-flow-logs-role",
            assume_role_policy=assume_role,
            tags=get_tags(),
        )

        aws.iam.RolePolicy(
            f"{PROJECT_NAME}-flow-logs-policy",
            role=role.id,
            policy=json.dumps({
                "Version": "2012-10-17",
                "Statement": [{
                    "Effect": "Allow",
                    "Action": [
                        "logs:CreateLogGroup",
                        "logs:CreateLogStream",
                        "logs:PutLogEvents",
                        "logs:DescribeLogGroups",
                        "logs:DescribeLogStreams",
                    ],
                    "Resource": "*",
                }],
            }),
        )

        return role

    def _create_flow_log(
        self,
        vpc_id: pulumi.Output[str],
        log_group_arn: pulumi.Output[str],
    ) -> aws.ec2.FlowLog:
        return aws.ec2.FlowLog(
            f"{PROJECT_NAME}-vpc-flow-log",
            vpc_id=vpc_id,
            traffic_type="REJECT",  # Only log rejected traffic
            log_destination_type="cloud-watch-logs",
            log_destination=log_group_arn,
            iam_role_arn=self.role.arn,
            tags=get_tags("flow-logs"),
        )

17. CloudWatch Alarms

observability/cloudwatch.py

"""CloudWatch alarms and dashboards.

NEW: Operational monitoring per architecture spec.
"""

import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags


class CloudWatchAlarms:
    """Creates CloudWatch alarms for critical metrics."""

    def __init__(
        self,
        ecs_cluster_name: pulumi.Output[str],
        sqs_queue_name: pulumi.Output[str],
        dlq_name: pulumi.Output[str],
        step_function_arn: pulumi.Output[str],
    ):
        self.sns_topic = self._create_sns_topic()
        self.dlq_alarm = self._create_dlq_alarm(dlq_name)
        self.backtest_failure_alarm = self._create_backtest_failure_alarm(step_function_arn)
        self.backtest_duration_alarm = self._create_backtest_duration_alarm(step_function_arn)

    def _create_sns_topic(self) -> aws.sns.Topic:
        """Create SNS topic for alarm notifications."""
        return aws.sns.Topic(
            f"{PROJECT_NAME}-alarms",
            name=f"{PROJECT_NAME}-alarms",
            tags=get_tags("sns"),
        )

    def _create_dlq_alarm(self, dlq_name: pulumi.Output[str]) -> aws.cloudwatch.MetricAlarm:
        """Alarm when messages appear in DLQ."""
        return aws.cloudwatch.MetricAlarm(
            f"{PROJECT_NAME}-dlq-messages-alarm",
            alarm_name=f"{PROJECT_NAME}-dlq-messages",
            comparison_operator="GreaterThanThreshold",
            evaluation_periods=1,
            metric_name="ApproximateNumberOfMessagesVisible",
            namespace="AWS/SQS",
            period=300,
            statistic="Sum",
            threshold=0,
            alarm_description="Messages in DLQ indicate failed backtests",
            dimensions={"QueueName": dlq_name},
            alarm_actions=[self.sns_topic.arn],
            ok_actions=[self.sns_topic.arn],
            tags=get_tags("cloudwatch"),
        )

    def _create_backtest_failure_alarm(
        self,
        step_function_arn: pulumi.Output[str],
    ) -> aws.cloudwatch.MetricAlarm:
        """Alarm on high backtest failure rate."""
        return aws.cloudwatch.MetricAlarm(
            f"{PROJECT_NAME}-backtest-failures-alarm",
            alarm_name=f"{PROJECT_NAME}-backtest-failures-high",
            comparison_operator="GreaterThanThreshold",
            evaluation_periods=1,
            metric_name="ExecutionsFailed",
            namespace="AWS/States",
            period=3600,  # 1 hour
            statistic="Sum",
            threshold=5,
            alarm_description="High backtest failure rate",
            dimensions={"StateMachineArn": step_function_arn},
            alarm_actions=[self.sns_topic.arn],
            tags=get_tags("cloudwatch"),
        )

    def _create_backtest_duration_alarm(
        self,
        step_function_arn: pulumi.Output[str],
    ) -> aws.cloudwatch.MetricAlarm:
        """Alarm on long-running backtests."""
        return aws.cloudwatch.MetricAlarm(
            f"{PROJECT_NAME}-backtest-duration-alarm",
            alarm_name=f"{PROJECT_NAME}-backtest-duration-long",
            comparison_operator="GreaterThanThreshold",
            evaluation_periods=1,
            metric_name="ExecutionTime",
            namespace="AWS/States",
            period=300,
            statistic="Average",
            threshold=3600000,  # 1 hour in ms
            alarm_description="Backtest taking too long",
            dimensions={"StateMachineArn": step_function_arn},
            alarm_actions=[self.sns_topic.arn],
            tags=get_tags("cloudwatch"),
        )

Deployment Commands

# Initialize Pulumi project
cd infra
pulumi stack init dev

# Configure required secrets
pulumi config set aws:region us-east-1
pulumi config set --secret certificate_arn arn:aws:acm:...
pulumi config set --secret db_secret_arn arn:aws:secretsmanager:...

# Preview changes
pulumi preview

# Deploy infrastructure
pulumi up

# View outputs
pulumi stack output

# Destroy (dev only!)
pulumi destroy

Summary

Component Resource Count Key Features
VPC/Networking 15 Multi-AZ, NAT instance
Security 12 Least privilege IAM
Compute 8 ECS Fargate, Lambda
Storage 6 S3, RDS, DynamoDB
Orchestration 3 Step Functions, SQS
Load Balancing 5 ALB with TLS
Total ~49 Production-ready

Next Steps

  1. Copy this code structure to infra/ directory
  2. Install Pulumi: curl -fsSL https://get.pulumi.com | sh
  3. Configure AWS credentials
  4. Initialize stack and deploy

See 08-IMPLEMENTATION-ROADMAP.md for detailed deployment timeline.