TradAI Final Architecture - Pulumi Infrastructure Code¶
Version: 8.0 | Date: 2025-11-27
Overview¶
This document contains production-ready Pulumi Python code for deploying the TradAI infrastructure on AWS.
Project Structure¶
infra/
├── __main__.py # Main entry point
├── Pulumi.yaml # Project configuration
├── Pulumi.dev.yaml # Dev stack config
├── Pulumi.prod.yaml # Production stack config
├── config.py # Shared configuration
├── vpc/
│ ├── __init__.py
│ ├── network.py # VPC, subnets, route tables
│ └── security.py # Security groups, NACLs
├── compute/
│ ├── __init__.py
│ ├── ecs.py # ECS cluster and services
│ ├── lambda_funcs.py # Lambda functions
│ └── nat.py # NAT instance
├── storage/
│ ├── __init__.py
│ ├── rds.py # PostgreSQL database
│ ├── s3.py # S3 buckets
│ └── dynamodb.py # DynamoDB tables
├── orchestration/
│ ├── __init__.py
│ ├── step_functions.py # State machines
│ └── sqs.py # SQS queues
├── security/
│ ├── __init__.py
│ ├── iam.py # IAM roles and policies
│ ├── cognito.py # User authentication
│ └── secrets.py # Secrets Manager
└── networking/
├── __init__.py
├── alb.py # Application Load Balancer
└── api_gateway.py # API Gateway
1. Configuration¶
Pulumi.yaml¶
name: tradai-infrastructure
runtime:
name: python
options:
virtualenv: venv
description: TradAI AWS Infrastructure
config.py¶
"""Shared configuration for TradAI infrastructure.
CANONICAL SOURCE: See 10-CANONICAL-CONFIG.md for authoritative values.
This file implements those values in Pulumi-compatible format.
"""
import pulumi
from typing import Optional
config = pulumi.Config()
aws_config = pulumi.Config("aws")
# Environment
ENVIRONMENT = pulumi.get_stack()
PROJECT_NAME = "tradai"
AWS_REGION = aws_config.get("region") or "us-east-1"
# Network Configuration (CANONICAL: Section 1)
VPC_CIDR = "10.0.0.0/16"
AVAILABILITY_ZONES = ["us-east-1a", "us-east-1b"]
# Subnet CIDRs - CANONICAL VALUES from 10-CANONICAL-CONFIG.md
SUBNETS = {
"public": ["10.0.1.0/24", "10.0.2.0/24"],
"private": ["10.0.11.0/24", "10.0.12.0/24"],
"database": ["10.0.21.0/24", "10.0.22.0/24"],
}
# ECS Configuration
ECS_CLUSTER_NAME = f"{PROJECT_NAME}-{ENVIRONMENT}"
# Service Definitions - CANONICAL VALUES (ports from Section 2.1)
SERVICES = {
"backend-api": {
"cpu": 512,
"memory": 1024,
"port": 8000,
"desired_count": 1 if ENVIRONMENT != "prod" else 2,
"health_check_path": "/health",
},
"data-collection": {
"cpu": 256,
"memory": 512,
"port": 8002, # FIXED: Was 8001, canonical is 8002
"desired_count": 1 if ENVIRONMENT != "prod" else 2,
"health_check_path": "/health",
},
"mlflow": {
"cpu": 512,
"memory": 1024,
"port": 5000,
"desired_count": 1,
"health_check_path": "/health",
},
"strategy-service": {
"cpu": 512,
"memory": 1024,
"port": 8003,
"desired_count": 0, # On-demand only
"health_check_path": "/health",
},
}
# Strategy Container (Fargate Spot for backtests)
STRATEGY_CONTAINER = {
"cpu": 1024,
"memory": 2048,
"use_spot": True,
}
# Database Configuration - Environment-aware (CANONICAL: Section 3.3)
RDS_CONFIG = {
"instance_class": "db.t4g.micro" if ENVIRONMENT != "prod" else "db.t4g.small",
"allocated_storage": 20 if ENVIRONMENT != "prod" else 50,
"engine_version": "15.4",
"multi_az": ENVIRONMENT == "prod",
"backup_retention": 7 if ENVIRONMENT != "prod" else 14,
}
# S3 Buckets - CANONICAL NAMES (Section 3.1)
S3_BUCKETS = {
"configs": f"{PROJECT_NAME}-configs-{ENVIRONMENT}",
"results": f"{PROJECT_NAME}-results-{ENVIRONMENT}",
"arcticdb": f"{PROJECT_NAME}-arcticdb-{ENVIRONMENT}",
"logs": f"{PROJECT_NAME}-logs-{ENVIRONMENT}",
"mlflow": f"{PROJECT_NAME}-mlflow-{ENVIRONMENT}",
}
# SQS Configuration - CANONICAL VALUES (Section 6)
SQS_CONFIG = {
"visibility_timeout": 900, # 15 minutes
"message_retention": 345600, # 4 days
"dlq_retention": 1209600, # 14 days
"max_receive_count": 3,
}
# Lambda Functions - CANONICAL LIST (Section 7)
LAMBDA_FUNCTIONS = [
"validate-strategy",
"validate-data",
"sqs-consumer",
"transform-results",
"cleanup-resources",
"notify-completion",
"update-nat-routes",
]
# Log Retention - Environment-aware (CANONICAL: Section 10)
LOG_RETENTION_DAYS = {
"dev": 7,
"staging": 14,
"prod": 30,
}.get(ENVIRONMENT, 7)
# Required config inputs with validation
def get_required_config(key: str) -> str:
"""Get required config value with validation."""
value = config.require(key)
return value
def get_optional_config(key: str, default: str = None) -> Optional[str]:
"""Get optional config value."""
return config.get(key) or default
# Tags - CANONICAL REQUIREMENTS (Section 9.2)
def get_tags(service: str = None) -> dict:
"""Generate standard tags for resources per canonical spec."""
tags = {
"Application": PROJECT_NAME,
"Environment": ENVIRONMENT,
"ManagedBy": "pulumi",
"CostCenter": "trading-platform",
"Owner": "platform-team",
}
if service:
tags["Service"] = service
return tags
# Validation helper
def validate_environment():
"""Validate environment configuration."""
valid_envs = ["dev", "staging", "prod"]
if ENVIRONMENT not in valid_envs:
raise ValueError(f"Invalid environment: {ENVIRONMENT}. Must be one of {valid_envs}")
validate_environment()
2. VPC and Networking¶
vpc/network.py¶
"""VPC and subnet configuration."""
import pulumi
import pulumi_aws as aws
from config import (
VPC_CIDR,
AVAILABILITY_ZONES,
SUBNETS,
PROJECT_NAME,
ENVIRONMENT,
get_tags,
)
class VpcNetwork:
"""Creates VPC with public, private, and database subnets."""
def __init__(self):
self.vpc = self._create_vpc()
self.internet_gateway = self._create_internet_gateway()
self.public_subnets = self._create_public_subnets()
self.private_subnets = self._create_private_subnets()
self.database_subnets = self._create_database_subnets()
self.db_subnet_group = self._create_db_subnet_group()
def _create_vpc(self) -> aws.ec2.Vpc:
return aws.ec2.Vpc(
f"{PROJECT_NAME}-vpc",
cidr_block=VPC_CIDR,
enable_dns_hostnames=True,
enable_dns_support=True,
tags=get_tags() | {"Name": f"{PROJECT_NAME}-{ENVIRONMENT}-vpc"},
)
def _create_internet_gateway(self) -> aws.ec2.InternetGateway:
return aws.ec2.InternetGateway(
f"{PROJECT_NAME}-igw",
vpc_id=self.vpc.id,
tags=get_tags() | {"Name": f"{PROJECT_NAME}-{ENVIRONMENT}-igw"},
)
def _create_public_subnets(self) -> list[aws.ec2.Subnet]:
subnets = []
for i, (az, cidr) in enumerate(
zip(AVAILABILITY_ZONES, SUBNETS["public"])
):
subnet = aws.ec2.Subnet(
f"{PROJECT_NAME}-public-{i+1}",
vpc_id=self.vpc.id,
cidr_block=cidr,
availability_zone=az,
map_public_ip_on_launch=True,
tags=get_tags() | {"Name": f"{PROJECT_NAME}-public-{az[-2:]}"},
)
subnets.append(subnet)
# Public route table
public_rt = aws.ec2.RouteTable(
f"{PROJECT_NAME}-public-rt",
vpc_id=self.vpc.id,
routes=[
aws.ec2.RouteTableRouteArgs(
cidr_block="0.0.0.0/0",
gateway_id=self.internet_gateway.id,
)
],
tags=get_tags() | {"Name": f"{PROJECT_NAME}-public-rt"},
)
# Associate subnets with route table
for i, subnet in enumerate(subnets):
aws.ec2.RouteTableAssociation(
f"{PROJECT_NAME}-public-rta-{i+1}",
subnet_id=subnet.id,
route_table_id=public_rt.id,
)
return subnets
def _create_private_subnets(self) -> list[aws.ec2.Subnet]:
subnets = []
for i, (az, cidr) in enumerate(
zip(AVAILABILITY_ZONES, SUBNETS["private"])
):
subnet = aws.ec2.Subnet(
f"{PROJECT_NAME}-private-{i+1}",
vpc_id=self.vpc.id,
cidr_block=cidr,
availability_zone=az,
tags=get_tags() | {"Name": f"{PROJECT_NAME}-private-{az[-2:]}"},
)
subnets.append(subnet)
return subnets
def _create_database_subnets(self) -> list[aws.ec2.Subnet]:
subnets = []
for i, (az, cidr) in enumerate(
zip(AVAILABILITY_ZONES, SUBNETS["database"])
):
subnet = aws.ec2.Subnet(
f"{PROJECT_NAME}-database-{i+1}",
vpc_id=self.vpc.id,
cidr_block=cidr,
availability_zone=az,
tags=get_tags() | {"Name": f"{PROJECT_NAME}-database-{az[-2:]}"},
)
subnets.append(subnet)
return subnets
def _create_db_subnet_group(self) -> aws.rds.SubnetGroup:
return aws.rds.SubnetGroup(
f"{PROJECT_NAME}-db-subnet-group",
subnet_ids=[s.id for s in self.database_subnets],
tags=get_tags() | {"Name": f"{PROJECT_NAME}-db-subnet-group"},
)
vpc/security.py¶
"""Security groups and NACLs.
FIXED: Added port 80 to ALB SG for HTTP redirect.
FIXED: Added MLflow port 5000 to ECS SG.
FIXED: Updated NAT SG to use canonical CIDRs.
FIXED: Updated database subnet CIDRs.
"""
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, VPC_CIDR, SUBNETS, get_tags
class SecurityGroups:
"""Creates all security groups for TradAI services."""
def __init__(self, vpc_id: pulumi.Output[str]):
self.vpc_id = vpc_id
# Create in dependency order
self.alb_sg = self._create_alb_sg()
self.ecs_sg = self._create_ecs_sg()
self.lambda_sg = self._create_lambda_sg()
self.rds_sg = self._create_rds_sg()
self.nat_sg = self._create_nat_sg()
# Add cross-references after creation
self._add_cross_references()
def _create_alb_sg(self) -> aws.ec2.SecurityGroup:
"""ALB Security Group - allows HTTP (for redirect) and HTTPS."""
return aws.ec2.SecurityGroup(
f"{PROJECT_NAME}-alb-sg",
vpc_id=self.vpc_id,
description="ALB Security Group - HTTP/HTTPS",
ingress=[
# FIXED: Added port 80 for HTTP->HTTPS redirect
aws.ec2.SecurityGroupIngressArgs(
description="HTTP for redirect",
from_port=80,
to_port=80,
protocol="tcp",
cidr_blocks=["0.0.0.0/0"],
),
aws.ec2.SecurityGroupIngressArgs(
description="HTTPS from anywhere",
from_port=443,
to_port=443,
protocol="tcp",
cidr_blocks=["0.0.0.0/0"],
),
],
egress=[
aws.ec2.SecurityGroupEgressArgs(
description="All outbound to VPC",
from_port=0,
to_port=0,
protocol="-1",
cidr_blocks=[VPC_CIDR],
),
],
tags=get_tags() | {"Name": f"{PROJECT_NAME}-alb-sg"},
)
def _create_ecs_sg(self) -> aws.ec2.SecurityGroup:
"""ECS Tasks Security Group - receives traffic from ALB and Lambda."""
return aws.ec2.SecurityGroup(
f"{PROJECT_NAME}-ecs-sg",
vpc_id=self.vpc_id,
description="ECS Tasks Security Group",
ingress=[
# Backend API (8000)
aws.ec2.SecurityGroupIngressArgs(
description="Backend API from ALB",
from_port=8000,
to_port=8000,
protocol="tcp",
security_groups=[self.alb_sg.id],
),
# Data Collection (8002) - FIXED: was 8001
aws.ec2.SecurityGroupIngressArgs(
description="Data Collection from ALB",
from_port=8002,
to_port=8002,
protocol="tcp",
security_groups=[self.alb_sg.id],
),
# Strategy Service (8003)
aws.ec2.SecurityGroupIngressArgs(
description="Strategy Service from ALB",
from_port=8003,
to_port=8003,
protocol="tcp",
security_groups=[self.alb_sg.id],
),
# FIXED: Added MLflow (5000)
aws.ec2.SecurityGroupIngressArgs(
description="MLflow from ALB",
from_port=5000,
to_port=5000,
protocol="tcp",
security_groups=[self.alb_sg.id],
),
],
egress=[
aws.ec2.SecurityGroupEgressArgs(
description="HTTPS to internet (AWS APIs)",
from_port=443,
to_port=443,
protocol="tcp",
cidr_blocks=["0.0.0.0/0"],
),
# FIXED: Use canonical database subnet CIDRs
aws.ec2.SecurityGroupEgressArgs(
description="PostgreSQL to DB subnets",
from_port=5432,
to_port=5432,
protocol="tcp",
cidr_blocks=SUBNETS["database"], # 10.0.21.0/24, 10.0.22.0/24
),
],
tags=get_tags() | {"Name": f"{PROJECT_NAME}-ecs-sg"},
)
def _create_lambda_sg(self) -> aws.ec2.SecurityGroup:
"""Lambda Functions Security Group."""
return aws.ec2.SecurityGroup(
f"{PROJECT_NAME}-lambda-sg",
vpc_id=self.vpc_id,
description="Lambda Functions Security Group",
# No ingress - Lambda functions initiate connections
egress=[
aws.ec2.SecurityGroupEgressArgs(
description="HTTPS to internet (AWS APIs)",
from_port=443,
to_port=443,
protocol="tcp",
cidr_blocks=["0.0.0.0/0"],
),
# Will add ECS service ports via cross-reference
],
tags=get_tags() | {"Name": f"{PROJECT_NAME}-lambda-sg"},
)
def _create_rds_sg(self) -> aws.ec2.SecurityGroup:
"""RDS PostgreSQL Security Group - only accepts connections from ECS and Lambda."""
return aws.ec2.SecurityGroup(
f"{PROJECT_NAME}-rds-sg",
vpc_id=self.vpc_id,
description="RDS PostgreSQL Security Group",
ingress=[
aws.ec2.SecurityGroupIngressArgs(
description="PostgreSQL from ECS",
from_port=5432,
to_port=5432,
protocol="tcp",
security_groups=[self.ecs_sg.id],
),
],
egress=[], # No outbound needed for RDS
tags=get_tags() | {"Name": f"{PROJECT_NAME}-rds-sg"},
)
def _create_nat_sg(self) -> aws.ec2.SecurityGroup:
"""NAT Instance Security Group - allows traffic from private subnets."""
return aws.ec2.SecurityGroup(
f"{PROJECT_NAME}-nat-sg",
vpc_id=self.vpc_id,
description="NAT Instance Security Group",
ingress=[
# FIXED: Use canonical private subnet CIDRs
aws.ec2.SecurityGroupIngressArgs(
description="All TCP from private subnets",
from_port=0,
to_port=65535,
protocol="tcp",
cidr_blocks=SUBNETS["private"], # 10.0.11.0/24, 10.0.12.0/24
),
aws.ec2.SecurityGroupIngressArgs(
description="All UDP from private subnets",
from_port=0,
to_port=65535,
protocol="udp",
cidr_blocks=SUBNETS["private"],
),
aws.ec2.SecurityGroupIngressArgs(
description="ICMP from private subnets",
from_port=-1,
to_port=-1,
protocol="icmp",
cidr_blocks=SUBNETS["private"],
),
],
egress=[
aws.ec2.SecurityGroupEgressArgs(
description="All outbound to internet",
from_port=0,
to_port=0,
protocol="-1",
cidr_blocks=["0.0.0.0/0"],
),
],
tags=get_tags() | {"Name": f"{PROJECT_NAME}-nat-sg"},
)
def _add_cross_references(self):
"""Add security group rules that require cross-references."""
# Lambda -> ECS services
aws.ec2.SecurityGroupRule(
f"{PROJECT_NAME}-lambda-to-ecs-backend",
type="egress",
security_group_id=self.lambda_sg.id,
source_security_group_id=self.ecs_sg.id,
from_port=8000,
to_port=8000,
protocol="tcp",
description="Lambda to Backend API",
)
aws.ec2.SecurityGroupRule(
f"{PROJECT_NAME}-lambda-to-ecs-data",
type="egress",
security_group_id=self.lambda_sg.id,
source_security_group_id=self.ecs_sg.id,
from_port=8002,
to_port=8002,
protocol="tcp",
description="Lambda to Data Collection",
)
aws.ec2.SecurityGroupRule(
f"{PROJECT_NAME}-lambda-to-ecs-mlflow",
type="egress",
security_group_id=self.lambda_sg.id,
source_security_group_id=self.ecs_sg.id,
from_port=5000,
to_port=5000,
protocol="tcp",
description="Lambda to MLflow",
)
# ECS ingress from Lambda
aws.ec2.SecurityGroupRule(
f"{PROJECT_NAME}-ecs-from-lambda",
type="ingress",
security_group_id=self.ecs_sg.id,
source_security_group_id=self.lambda_sg.id,
from_port=8000,
to_port=8003,
protocol="tcp",
description="ECS services from Lambda",
)
aws.ec2.SecurityGroupRule(
f"{PROJECT_NAME}-ecs-mlflow-from-lambda",
type="ingress",
security_group_id=self.ecs_sg.id,
source_security_group_id=self.lambda_sg.id,
from_port=5000,
to_port=5000,
protocol="tcp",
description="MLflow from Lambda",
)
# RDS ingress from Lambda
aws.ec2.SecurityGroupRule(
f"{PROJECT_NAME}-rds-from-lambda",
type="ingress",
security_group_id=self.rds_sg.id,
source_security_group_id=self.lambda_sg.id,
from_port=5432,
to_port=5432,
protocol="tcp",
description="PostgreSQL from Lambda",
)
3. NAT Instance¶
compute/nat.py¶
"""NAT Instance for cost-optimized internet access.
FIXED: Uses ARM-compatible AMI with t4g.nano instance.
ADDED: Auto Scaling Group for HA with lifecycle hook for route updates.
"""
import pulumi
import pulumi_aws as aws
import json
from config import PROJECT_NAME, ENVIRONMENT, get_tags, SUBNETS
class NatInstance:
"""Creates a NAT instance with ASG for HA instead of NAT Gateway for cost savings."""
def __init__(
self,
public_subnet_ids: list[pulumi.Output[str]],
private_subnet_ids: list[pulumi.Output[str]],
security_group_id: pulumi.Output[str],
vpc_id: pulumi.Output[str],
):
self.eip = self._create_eip()
self.iam_role = self._create_iam_role()
self.instance_profile = self._create_instance_profile()
self.launch_template = self._create_launch_template(
public_subnet_ids[0], security_group_id
)
self.asg = self._create_asg(public_subnet_ids)
self.route_table = self._create_private_route_table(vpc_id)
self._associate_subnets(private_subnet_ids)
self.lifecycle_hook = self._create_lifecycle_hook()
self.route_update_lambda = self._create_route_update_lambda(vpc_id)
def _create_eip(self) -> aws.ec2.Eip:
"""Create Elastic IP for NAT instance."""
return aws.ec2.Eip(
f"{PROJECT_NAME}-nat-eip",
domain="vpc",
tags=get_tags() | {"Name": f"{PROJECT_NAME}-nat-eip"},
)
def _create_iam_role(self) -> aws.iam.Role:
"""Create IAM role for NAT instance to associate EIP."""
assume_role_policy = json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "ec2.amazonaws.com"},
"Action": "sts:AssumeRole",
}],
})
role = aws.iam.Role(
f"{PROJECT_NAME}-nat-role",
name=f"{PROJECT_NAME}-nat-role",
assume_role_policy=assume_role_policy,
tags=get_tags(),
)
# Policy to associate EIP and modify routes
aws.iam.RolePolicy(
f"{PROJECT_NAME}-nat-policy",
role=role.id,
policy=json.dumps({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:AssociateAddress",
"ec2:DisassociateAddress",
"ec2:DescribeAddresses",
],
"Resource": "*",
},
{
"Effect": "Allow",
"Action": [
"ec2:ReplaceRoute",
"ec2:CreateRoute",
"ec2:DescribeRouteTables",
],
"Resource": "*",
},
],
}),
)
return role
def _create_instance_profile(self) -> aws.iam.InstanceProfile:
"""Create instance profile for NAT instance."""
return aws.iam.InstanceProfile(
f"{PROJECT_NAME}-nat-profile",
name=f"{PROJECT_NAME}-nat-profile",
role=self.iam_role.name,
)
def _create_launch_template(
self,
subnet_id: pulumi.Output[str],
sg_id: pulumi.Output[str],
) -> aws.ec2.LaunchTemplate:
"""Create launch template for NAT instance ASG."""
# FIXED: Use Amazon Linux 2023 ARM AMI (compatible with t4g.nano)
ami = aws.ec2.get_ami(
most_recent=True,
owners=["amazon"],
filters=[
aws.ec2.GetAmiFilterArgs(
name="name",
values=["al2023-ami-*-arm64"], # ARM64 for Graviton
),
aws.ec2.GetAmiFilterArgs(
name="architecture",
values=["arm64"],
),
aws.ec2.GetAmiFilterArgs(
name="virtualization-type",
values=["hvm"],
),
],
)
# User data script to configure NAT and associate EIP
user_data = pulumi.Output.all(self.eip.allocation_id).apply(
lambda args: f"""#!/bin/bash
set -e
# Install required packages
yum install -y iptables-services
# Enable IP forwarding
echo "net.ipv4.ip_forward = 1" >> /etc/sysctl.conf
sysctl -p
# Configure iptables for NAT
iptables -t nat -A POSTROUTING -o ens5 -j MASQUERADE
iptables -A FORWARD -i ens5 -o ens5 -m state --state RELATED,ESTABLISHED -j ACCEPT
iptables -A FORWARD -i ens5 -o ens5 -j ACCEPT
# Save iptables rules
service iptables save
systemctl enable iptables
# Get instance ID and region
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
REGION=$(curl -s http://169.254.169.254/latest/meta-data/placement/region)
# Associate Elastic IP
aws ec2 associate-address --instance-id $INSTANCE_ID --allocation-id {args[0]} --region $REGION
# Disable source/dest check (required for NAT)
aws ec2 modify-instance-attribute --instance-id $INSTANCE_ID --no-source-dest-check --region $REGION
# Signal success to CloudFormation/ASG
/opt/aws/bin/cfn-signal -e $? --region $REGION || true
"""
)
return aws.ec2.LaunchTemplate(
f"{PROJECT_NAME}-nat-lt",
name=f"{PROJECT_NAME}-nat-lt",
image_id=ami.id,
instance_type="t4g.nano", # ARM instance - matches ARM AMI
vpc_security_group_ids=[sg_id],
iam_instance_profile=aws.ec2.LaunchTemplateIamInstanceProfileArgs(
arn=self.instance_profile.arn,
),
user_data=user_data.apply(lambda ud: __import__('base64').b64encode(ud.encode()).decode()),
metadata_options=aws.ec2.LaunchTemplateMetadataOptionsArgs(
http_endpoint="enabled",
http_tokens="required", # IMDSv2 required
),
monitoring=aws.ec2.LaunchTemplateMonitoringArgs(
enabled=True,
),
tag_specifications=[
aws.ec2.LaunchTemplateTagSpecificationArgs(
resource_type="instance",
tags=get_tags() | {"Name": f"{PROJECT_NAME}-nat-instance"},
),
],
tags=get_tags(),
)
def _create_asg(
self,
subnet_ids: list[pulumi.Output[str]],
) -> aws.autoscaling.Group:
"""Create Auto Scaling Group for NAT instance HA."""
return aws.autoscaling.Group(
f"{PROJECT_NAME}-nat-asg",
name=f"{PROJECT_NAME}-nat-asg",
min_size=1,
max_size=1,
desired_capacity=1,
vpc_zone_identifiers=subnet_ids,
launch_template=aws.autoscaling.GroupLaunchTemplateArgs(
id=self.launch_template.id,
version="$Latest",
),
health_check_type="EC2",
health_check_grace_period=300,
termination_policies=["OldestInstance"],
tags=[
aws.autoscaling.GroupTagArgs(
key="Name",
value=f"{PROJECT_NAME}-nat-instance",
propagate_at_launch=True,
),
aws.autoscaling.GroupTagArgs(
key="Application",
value=PROJECT_NAME,
propagate_at_launch=True,
),
],
)
def _create_private_route_table(
self,
vpc_id: pulumi.Output[str],
) -> aws.ec2.RouteTable:
"""Create private route table (route will be added by Lambda on instance launch)."""
return aws.ec2.RouteTable(
f"{PROJECT_NAME}-private-rt",
vpc_id=vpc_id,
# Note: Default route added by Lambda when NAT instance launches
tags=get_tags() | {"Name": f"{PROJECT_NAME}-private-rt"},
)
def _associate_subnets(
self,
subnet_ids: list[pulumi.Output[str]],
):
"""Associate private subnets with route table."""
for i, subnet_id in enumerate(subnet_ids):
aws.ec2.RouteTableAssociation(
f"{PROJECT_NAME}-private-rta-{i+1}",
subnet_id=subnet_id,
route_table_id=self.route_table.id,
)
def _create_lifecycle_hook(self) -> aws.autoscaling.LifecycleHook:
"""Create lifecycle hook to trigger route update on instance launch."""
return aws.autoscaling.LifecycleHook(
f"{PROJECT_NAME}-nat-lifecycle-hook",
name=f"{PROJECT_NAME}-nat-launching",
autoscaling_group_name=self.asg.name,
lifecycle_transition="autoscaling:EC2_INSTANCE_LAUNCHING",
heartbeat_timeout=300,
default_result="CONTINUE",
)
def _create_route_update_lambda(
self,
vpc_id: pulumi.Output[str],
) -> aws.lambda_.Function:
"""Create Lambda function to update routes when NAT instance changes."""
# IAM role for Lambda
assume_role = json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}],
})
role = aws.iam.Role(
f"{PROJECT_NAME}-nat-route-lambda-role",
assume_role_policy=assume_role,
tags=get_tags(),
)
aws.iam.RolePolicyAttachment(
f"{PROJECT_NAME}-nat-route-lambda-basic",
role=role.name,
policy_arn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
)
aws.iam.RolePolicy(
f"{PROJECT_NAME}-nat-route-lambda-policy",
role=role.id,
policy=pulumi.Output.all(self.route_table.id).apply(lambda args: json.dumps({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:CreateRoute",
"ec2:ReplaceRoute",
"ec2:DeleteRoute",
"ec2:DescribeRouteTables",
"ec2:DescribeInstances",
],
"Resource": "*",
},
{
"Effect": "Allow",
"Action": [
"autoscaling:CompleteLifecycleAction",
],
"Resource": "*",
},
],
})),
)
# Lambda function code
lambda_code = """
import boto3
import json
import os
def handler(event, context):
print(f"Received event: {json.dumps(event)}")
ec2 = boto3.client('ec2')
asg = boto3.client('autoscaling')
route_table_id = os.environ['ROUTE_TABLE_ID']
# Get the instance ID from the event
detail = event.get('detail', {})
instance_id = detail.get('EC2InstanceId')
asg_name = detail.get('AutoScalingGroupName')
lifecycle_hook = detail.get('LifecycleHookName')
lifecycle_token = detail.get('LifecycleActionToken')
if not instance_id:
print("No instance ID found in event")
return
try:
# Wait for instance to be running
waiter = ec2.get_waiter('instance_running')
waiter.wait(InstanceIds=[instance_id], WaiterConfig={'Delay': 5, 'MaxAttempts': 60})
# Update route table to point to new NAT instance
try:
ec2.replace_route(
RouteTableId=route_table_id,
DestinationCidrBlock='0.0.0.0/0',
InstanceId=instance_id,
)
print(f"Replaced route to {instance_id}")
except ec2.exceptions.ClientError as e:
if 'InvalidParameterValue' in str(e):
# Route doesn't exist, create it
ec2.create_route(
RouteTableId=route_table_id,
DestinationCidrBlock='0.0.0.0/0',
InstanceId=instance_id,
)
print(f"Created route to {instance_id}")
else:
raise
# Complete lifecycle action
if lifecycle_hook and lifecycle_token:
asg.complete_lifecycle_action(
LifecycleHookName=lifecycle_hook,
AutoScalingGroupName=asg_name,
LifecycleActionToken=lifecycle_token,
LifecycleActionResult='CONTINUE',
)
print("Completed lifecycle action")
return {'statusCode': 200, 'body': 'Route updated successfully'}
except Exception as e:
print(f"Error: {str(e)}")
raise
"""
func = aws.lambda_.Function(
f"{PROJECT_NAME}-update-nat-routes",
function_name=f"{PROJECT_NAME}-update-nat-routes",
runtime="python3.11",
handler="index.handler",
role=role.arn,
timeout=120,
memory_size=128,
environment=aws.lambda_.FunctionEnvironmentArgs(
variables={
"ROUTE_TABLE_ID": self.route_table.id,
},
),
code=pulumi.AssetArchive({
"index.py": pulumi.StringAsset(lambda_code),
}),
tags=get_tags("lambda"),
)
# EventBridge rule to trigger Lambda on ASG lifecycle events
rule = aws.cloudwatch.EventRule(
f"{PROJECT_NAME}-nat-lifecycle-rule",
name=f"{PROJECT_NAME}-nat-lifecycle",
event_pattern=json.dumps({
"source": ["aws.autoscaling"],
"detail-type": ["EC2 Instance-launch Lifecycle Action"],
"detail": {
"AutoScalingGroupName": [f"{PROJECT_NAME}-nat-asg"],
},
}),
tags=get_tags(),
)
aws.cloudwatch.EventTarget(
f"{PROJECT_NAME}-nat-lifecycle-target",
rule=rule.name,
arn=func.arn,
)
aws.lambda_.Permission(
f"{PROJECT_NAME}-nat-lambda-permission",
action="lambda:InvokeFunction",
function=func.name,
principal="events.amazonaws.com",
source_arn=rule.arn,
)
return func
4. ECS Cluster and Services¶
compute/ecs.py¶
"""ECS Fargate cluster and service definitions."""
import json
import pulumi
import pulumi_aws as aws
from config import (
PROJECT_NAME,
ENVIRONMENT,
ECS_CLUSTER_NAME,
SERVICES,
get_tags,
)
class EcsCluster:
"""Creates ECS Fargate cluster with services."""
def __init__(
self,
private_subnet_ids: list[pulumi.Output[str]],
security_group_id: pulumi.Output[str],
execution_role_arn: pulumi.Output[str],
task_role_arn: pulumi.Output[str],
target_groups: dict,
):
self.cluster = self._create_cluster()
self.log_group = self._create_log_group()
self.services = {}
for service_name, config in SERVICES.items():
self.services[service_name] = self._create_service(
service_name,
config,
private_subnet_ids,
security_group_id,
execution_role_arn,
task_role_arn,
target_groups.get(service_name),
)
def _create_cluster(self) -> aws.ecs.Cluster:
return aws.ecs.Cluster(
f"{PROJECT_NAME}-cluster",
name=ECS_CLUSTER_NAME,
settings=[
aws.ecs.ClusterSettingArgs(
name="containerInsights",
value="enabled",
)
],
tags=get_tags(),
)
def _create_log_group(self) -> aws.cloudwatch.LogGroup:
return aws.cloudwatch.LogGroup(
f"{PROJECT_NAME}-logs",
name=f"/ecs/{PROJECT_NAME}",
retention_in_days=7, # Cost optimization
tags=get_tags(),
)
def _create_service(
self,
service_name: str,
config: dict,
subnet_ids: list[pulumi.Output[str]],
sg_id: pulumi.Output[str],
execution_role_arn: pulumi.Output[str],
task_role_arn: pulumi.Output[str],
target_group: aws.lb.TargetGroup | None,
) -> dict:
# Task Definition
container_def = json.dumps([{
"name": service_name,
"image": f"${{AWS_ACCOUNT_ID}}.dkr.ecr.${{AWS_REGION}}.amazonaws.com/{PROJECT_NAME}-{service_name}:latest",
"cpu": config["cpu"],
"memory": config["memory"],
"essential": True,
"portMappings": [{
"containerPort": config["port"],
"protocol": "tcp",
}],
"environment": [
{"name": "ENVIRONMENT", "value": ENVIRONMENT},
{"name": "SERVICE_NAME", "value": service_name},
],
"secrets": [
{
"name": "DATABASE_URL",
"valueFrom": f"arn:aws:secretsmanager:${{AWS_REGION}}:${{AWS_ACCOUNT_ID}}:secret:{PROJECT_NAME}/database-url",
},
],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": f"/ecs/{PROJECT_NAME}",
"awslogs-region": "${AWS_REGION}",
"awslogs-stream-prefix": service_name,
},
},
"healthCheck": {
"command": ["CMD-SHELL", f"curl -f http://localhost:{config['port']}{config['health_check_path']} || exit 1"],
"interval": 30,
"timeout": 5,
"retries": 3,
"startPeriod": 60,
},
}])
task_def = aws.ecs.TaskDefinition(
f"{PROJECT_NAME}-{service_name}-task",
family=f"{PROJECT_NAME}-{service_name}",
cpu=str(config["cpu"]),
memory=str(config["memory"]),
network_mode="awsvpc",
requires_compatibilities=["FARGATE"],
execution_role_arn=execution_role_arn,
task_role_arn=task_role_arn,
container_definitions=container_def,
tags=get_tags(service_name),
)
# Service
load_balancers = []
if target_group:
load_balancers = [
aws.ecs.ServiceLoadBalancerArgs(
target_group_arn=target_group.arn,
container_name=service_name,
container_port=config["port"],
)
]
service = aws.ecs.Service(
f"{PROJECT_NAME}-{service_name}-service",
name=service_name,
cluster=self.cluster.arn,
task_definition=task_def.arn,
desired_count=config["desired_count"],
launch_type="FARGATE",
network_configuration=aws.ecs.ServiceNetworkConfigurationArgs(
subnets=subnet_ids,
security_groups=[sg_id],
assign_public_ip=False,
),
load_balancers=load_balancers,
health_check_grace_period_seconds=60 if load_balancers else None,
tags=get_tags(service_name),
)
return {"task_definition": task_def, "service": service}
5. Lambda Functions¶
compute/lambda_funcs.py¶
"""Lambda function definitions."""
import json
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags
class LambdaFunctions:
"""Creates Lambda functions for TradAI."""
def __init__(
self,
subnet_ids: list[pulumi.Output[str]],
security_group_id: pulumi.Output[str],
role_arn: pulumi.Output[str],
sqs_queue_arn: pulumi.Output[str],
):
self.sqs_consumer = self._create_sqs_consumer(
subnet_ids, security_group_id, role_arn, sqs_queue_arn
)
self.data_collection_proxy = self._create_data_collection_proxy(
subnet_ids, security_group_id, role_arn
)
self.backtest_trigger = self._create_backtest_trigger(
subnet_ids, security_group_id, role_arn
)
def _create_sqs_consumer(
self,
subnet_ids: list[pulumi.Output[str]],
sg_id: pulumi.Output[str],
role_arn: pulumi.Output[str],
sqs_queue_arn: pulumi.Output[str],
) -> aws.lambda_.Function:
func = aws.lambda_.Function(
f"{PROJECT_NAME}-sqs-consumer",
function_name=f"{PROJECT_NAME}-sqs-consumer",
runtime="python3.11",
handler="handler.lambda_handler",
role=role_arn,
timeout=30,
memory_size=256,
vpc_config=aws.lambda_.FunctionVpcConfigArgs(
subnet_ids=subnet_ids,
security_group_ids=[sg_id],
),
environment=aws.lambda_.FunctionEnvironmentArgs(
variables={
"ENVIRONMENT": ENVIRONMENT,
"BACKEND_API_URL": "http://backend-api.tradai.local:8000",
},
),
code=pulumi.AssetArchive({
".": pulumi.FileArchive("./lambda/sqs-consumer"),
}),
tags=get_tags("sqs-consumer"),
)
# SQS trigger
aws.lambda_.EventSourceMapping(
f"{PROJECT_NAME}-sqs-trigger",
event_source_arn=sqs_queue_arn,
function_name=func.name,
batch_size=1,
)
return func
def _create_data_collection_proxy(
self,
subnet_ids: list[pulumi.Output[str]],
sg_id: pulumi.Output[str],
role_arn: pulumi.Output[str],
) -> aws.lambda_.Function:
return aws.lambda_.Function(
f"{PROJECT_NAME}-data-collection-proxy",
function_name=f"{PROJECT_NAME}-data-collection-proxy",
runtime="python3.11",
handler="handler.lambda_handler",
role=role_arn,
timeout=60,
memory_size=256,
vpc_config=aws.lambda_.FunctionVpcConfigArgs(
subnet_ids=subnet_ids,
security_group_ids=[sg_id],
),
environment=aws.lambda_.FunctionEnvironmentArgs(
variables={
"ENVIRONMENT": ENVIRONMENT,
"DATA_COLLECTION_URL": "http://data-collection.tradai.local:8001",
},
),
code=pulumi.AssetArchive({
".": pulumi.FileArchive("./lambda/data-collection-proxy"),
}),
tags=get_tags("data-collection-proxy"),
)
def _create_backtest_trigger(
self,
subnet_ids: list[pulumi.Output[str]],
sg_id: pulumi.Output[str],
role_arn: pulumi.Output[str],
) -> aws.lambda_.Function:
return aws.lambda_.Function(
f"{PROJECT_NAME}-backtest-trigger",
function_name=f"{PROJECT_NAME}-backtest-trigger",
runtime="python3.11",
handler="handler.lambda_handler",
role=role_arn,
timeout=30,
memory_size=128,
vpc_config=aws.lambda_.FunctionVpcConfigArgs(
subnet_ids=subnet_ids,
security_group_ids=[sg_id],
),
environment=aws.lambda_.FunctionEnvironmentArgs(
variables={
"ENVIRONMENT": ENVIRONMENT,
"STEP_FUNCTION_ARN": f"arn:aws:states:${{AWS_REGION}}:${{AWS_ACCOUNT_ID}}:stateMachine:{PROJECT_NAME}-backtest-workflow",
},
),
code=pulumi.AssetArchive({
".": pulumi.FileArchive("./lambda/backtest-trigger"),
}),
tags=get_tags("backtest-trigger"),
)
6. Step Functions¶
orchestration/step_functions.py¶
"""Step Functions state machine definitions."""
import json
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags
class StepFunctions:
"""Creates Step Functions state machines for TradAI workflows."""
def __init__(
self,
role_arn: pulumi.Output[str],
ecs_cluster_arn: pulumi.Output[str],
task_definition_arn: pulumi.Output[str],
subnet_ids: list[pulumi.Output[str]],
security_group_id: pulumi.Output[str],
):
self.backtest_workflow = self._create_backtest_workflow(
role_arn,
ecs_cluster_arn,
task_definition_arn,
subnet_ids,
security_group_id,
)
def _create_backtest_workflow(
self,
role_arn: pulumi.Output[str],
cluster_arn: pulumi.Output[str],
task_def_arn: pulumi.Output[str],
subnet_ids: list[pulumi.Output[str]],
sg_id: pulumi.Output[str],
) -> aws.sfn.StateMachine:
# State machine definition
definition = {
"Comment": "TradAI Backtest Workflow v2.0",
"StartAt": "InitializeBacktest",
"States": {
"InitializeBacktest": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": f"{PROJECT_NAME}-backtests",
"Item": {
"backtest_id": {"S.$": "$.backtest_id"},
"status": {"S": "INITIALIZING"},
"created_at": {"S.$": "$$.State.EnteredTime"},
"strategy_name": {"S.$": "$.strategy_name"},
"config": {"S.$": "States.JsonToString($.config)"},
},
},
"ResultPath": "$.dynamodb_result",
"Next": "ValidateInputs",
},
"ValidateInputs": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "ValidateStrategy",
"States": {
"ValidateStrategy": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": f"{PROJECT_NAME}-validate-strategy",
"Payload.$": "$",
},
"ResultPath": "$.strategy_validation",
"End": True,
},
},
},
{
"StartAt": "ValidateData",
"States": {
"ValidateData": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": f"{PROJECT_NAME}-validate-data",
"Payload.$": "$",
},
"ResultPath": "$.data_validation",
"End": True,
},
},
},
],
"ResultPath": "$.validation_results",
"Next": "UpdateStatusToRunning",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "HandleValidationError",
}
],
},
"HandleValidationError": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": f"{PROJECT_NAME}-backtests",
"Key": {"backtest_id": {"S.$": "$.backtest_id"}},
"UpdateExpression": "SET #status = :status, error_message = :error",
"ExpressionAttributeNames": {"#status": "status"},
"ExpressionAttributeValues": {
":status": {"S": "VALIDATION_FAILED"},
":error": {"S.$": "States.JsonToString($.error)"},
},
},
"Next": "BacktestFailed",
},
"UpdateStatusToRunning": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": f"{PROJECT_NAME}-backtests",
"Key": {"backtest_id": {"S.$": "$.backtest_id"}},
"UpdateExpression": "SET #status = :status",
"ExpressionAttributeNames": {"#status": "status"},
"ExpressionAttributeValues": {
":status": {"S": "RUNNING"},
},
},
"ResultPath": None,
"Next": "RunBacktest",
},
"RunBacktest": {
"Type": "Task",
"Resource": "arn:aws:states:::ecs:runTask.sync",
"Parameters": {
"Cluster": cluster_arn,
"TaskDefinition": task_def_arn,
"LaunchType": "FARGATE",
"NetworkConfiguration": {
"AwsvpcConfiguration": {
"Subnets": subnet_ids,
"SecurityGroups": [sg_id],
"AssignPublicIp": "DISABLED",
}
},
"Overrides": {
"ContainerOverrides": [
{
"Name": "strategy-container",
"Environment": [
{"Name": "BACKTEST_ID", "Value.$": "$.backtest_id"},
{"Name": "STRATEGY_NAME", "Value.$": "$.strategy_name"},
{"Name": "CONFIG", "Value.$": "States.JsonToString($.config)"},
],
}
]
},
"CapacityProviderStrategy": [
{
"CapacityProvider": "FARGATE_SPOT",
"Weight": 1,
}
],
},
"ResultPath": "$.ecs_result",
"Next": "UpdateStatusToCompleted",
"Retry": [
{
"ErrorEquals": ["ECS.AmazonECSException"],
"IntervalSeconds": 30,
"MaxAttempts": 3,
"BackoffRate": 2,
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "HandleBacktestError",
}
],
},
"HandleBacktestError": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": f"{PROJECT_NAME}-backtests",
"Key": {"backtest_id": {"S.$": "$.backtest_id"}},
"UpdateExpression": "SET #status = :status, error_message = :error, completed_at = :completed",
"ExpressionAttributeNames": {"#status": "status"},
"ExpressionAttributeValues": {
":status": {"S": "FAILED"},
":error": {"S.$": "States.JsonToString($.error)"},
":completed": {"S.$": "$$.State.EnteredTime"},
},
},
"Next": "BacktestFailed",
},
"UpdateStatusToCompleted": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": f"{PROJECT_NAME}-backtests",
"Key": {"backtest_id": {"S.$": "$.backtest_id"}},
"UpdateExpression": "SET #status = :status, completed_at = :completed",
"ExpressionAttributeNames": {"#status": "status"},
"ExpressionAttributeValues": {
":status": {"S": "COMPLETED"},
":completed": {"S.$": "$$.State.EnteredTime"},
},
},
"ResultPath": None,
"Next": "BacktestSucceeded",
},
"BacktestSucceeded": {
"Type": "Succeed",
},
"BacktestFailed": {
"Type": "Fail",
"Error": "BacktestFailed",
"Cause": "Backtest execution failed",
},
},
}
return aws.sfn.StateMachine(
f"{PROJECT_NAME}-backtest-workflow",
name=f"{PROJECT_NAME}-backtest-workflow",
role_arn=role_arn,
type="STANDARD",
definition=json.dumps(definition),
logging_configuration=aws.sfn.StateMachineLoggingConfigurationArgs(
level="ERROR",
include_execution_data=True,
log_destination=f"arn:aws:logs:${{AWS_REGION}}:${{AWS_ACCOUNT_ID}}:log-group:/aws/states/{PROJECT_NAME}:*",
),
tags=get_tags("step-functions"),
)
7. Storage¶
storage/rds.py¶
"""RDS PostgreSQL database."""
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, RDS_CONFIG, get_tags
class RdsDatabase:
"""Creates RDS PostgreSQL instance."""
def __init__(
self,
subnet_group_name: pulumi.Output[str],
security_group_id: pulumi.Output[str],
secret_arn: pulumi.Output[str],
):
self.parameter_group = self._create_parameter_group()
self.instance = self._create_instance(
subnet_group_name,
security_group_id,
secret_arn,
)
def _create_parameter_group(self) -> aws.rds.ParameterGroup:
return aws.rds.ParameterGroup(
f"{PROJECT_NAME}-pg-params",
family="postgres15",
description="TradAI PostgreSQL parameters",
parameters=[
aws.rds.ParameterGroupParameterArgs(
name="log_statement",
value="ddl",
),
aws.rds.ParameterGroupParameterArgs(
name="log_min_duration_statement",
value="1000", # Log queries > 1 second
),
],
tags=get_tags(),
)
def _create_instance(
self,
subnet_group_name: pulumi.Output[str],
sg_id: pulumi.Output[str],
secret_arn: pulumi.Output[str],
) -> aws.rds.Instance:
return aws.rds.Instance(
f"{PROJECT_NAME}-postgres",
identifier=f"{PROJECT_NAME}-{ENVIRONMENT}",
engine="postgres",
engine_version=RDS_CONFIG["engine_version"],
instance_class=RDS_CONFIG["instance_class"],
allocated_storage=RDS_CONFIG["allocated_storage"],
storage_type="gp3",
storage_encrypted=True,
db_subnet_group_name=subnet_group_name,
vpc_security_group_ids=[sg_id],
parameter_group_name=self.parameter_group.name,
db_name="tradai",
username="tradai_admin",
manage_master_user_password=True,
master_user_secret_kms_key_id=None, # Use default
multi_az=RDS_CONFIG["multi_az"],
backup_retention_period=7,
backup_window="03:00-04:00",
maintenance_window="Mon:04:00-Mon:05:00",
deletion_protection=ENVIRONMENT == "prod",
skip_final_snapshot=ENVIRONMENT != "prod",
final_snapshot_identifier=f"{PROJECT_NAME}-final-snapshot" if ENVIRONMENT == "prod" else None,
enabled_cloudwatch_logs_exports=["postgresql"],
performance_insights_enabled=True,
performance_insights_retention_period=7,
tags=get_tags("rds"),
)
storage/s3.py¶
"""S3 buckets for TradAI."""
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags
class S3Buckets:
"""Creates S3 buckets with lifecycle policies."""
def __init__(self):
self.data_bucket = self._create_data_bucket()
self.results_bucket = self._create_results_bucket()
self.mlflow_bucket = self._create_mlflow_bucket()
def _create_data_bucket(self) -> aws.s3.BucketV2:
bucket = aws.s3.BucketV2(
f"{PROJECT_NAME}-data",
bucket=f"{PROJECT_NAME}-data-{ENVIRONMENT}",
tags=get_tags("data"),
)
# Enable versioning
aws.s3.BucketVersioningV2(
f"{PROJECT_NAME}-data-versioning",
bucket=bucket.id,
versioning_configuration=aws.s3.BucketVersioningV2VersioningConfigurationArgs(
status="Enabled",
),
)
# Server-side encryption
aws.s3.BucketServerSideEncryptionConfigurationV2(
f"{PROJECT_NAME}-data-encryption",
bucket=bucket.id,
rules=[
aws.s3.BucketServerSideEncryptionConfigurationV2RuleArgs(
apply_server_side_encryption_by_default=aws.s3.BucketServerSideEncryptionConfigurationV2RuleApplyServerSideEncryptionByDefaultArgs(
sse_algorithm="AES256",
),
)
],
)
# Block public access
aws.s3.BucketPublicAccessBlock(
f"{PROJECT_NAME}-data-public-block",
bucket=bucket.id,
block_public_acls=True,
block_public_policy=True,
ignore_public_acls=True,
restrict_public_buckets=True,
)
return bucket
def _create_results_bucket(self) -> aws.s3.BucketV2:
bucket = aws.s3.BucketV2(
f"{PROJECT_NAME}-results",
bucket=f"{PROJECT_NAME}-results-{ENVIRONMENT}",
tags=get_tags("results"),
)
# Lifecycle policy
aws.s3.BucketLifecycleConfigurationV2(
f"{PROJECT_NAME}-results-lifecycle",
bucket=bucket.id,
rules=[
aws.s3.BucketLifecycleConfigurationV2RuleArgs(
id="expire-temp-files",
status="Enabled",
filter=aws.s3.BucketLifecycleConfigurationV2RuleFilterArgs(
prefix="temp/",
),
expiration=aws.s3.BucketLifecycleConfigurationV2RuleExpirationArgs(
days=7,
),
),
aws.s3.BucketLifecycleConfigurationV2RuleArgs(
id="archive-old-results",
status="Enabled",
filter=aws.s3.BucketLifecycleConfigurationV2RuleFilterArgs(
prefix="results/",
),
transitions=[
aws.s3.BucketLifecycleConfigurationV2RuleTransitionArgs(
days=30,
storage_class="GLACIER_IR",
),
],
),
],
)
# Encryption
aws.s3.BucketServerSideEncryptionConfigurationV2(
f"{PROJECT_NAME}-results-encryption",
bucket=bucket.id,
rules=[
aws.s3.BucketServerSideEncryptionConfigurationV2RuleArgs(
apply_server_side_encryption_by_default=aws.s3.BucketServerSideEncryptionConfigurationV2RuleApplyServerSideEncryptionByDefaultArgs(
sse_algorithm="AES256",
),
)
],
)
return bucket
def _create_mlflow_bucket(self) -> aws.s3.BucketV2:
bucket = aws.s3.BucketV2(
f"{PROJECT_NAME}-mlflow",
bucket=f"{PROJECT_NAME}-mlflow-{ENVIRONMENT}",
tags=get_tags("mlflow"),
)
# Versioning for experiment tracking
aws.s3.BucketVersioningV2(
f"{PROJECT_NAME}-mlflow-versioning",
bucket=bucket.id,
versioning_configuration=aws.s3.BucketVersioningV2VersioningConfigurationArgs(
status="Enabled",
),
)
return bucket
storage/dynamodb.py¶
"""DynamoDB tables for TradAI."""
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags
class DynamoDBTables:
"""Creates DynamoDB tables for backtest tracking."""
def __init__(self):
self.backtests_table = self._create_backtests_table()
def _create_backtests_table(self) -> aws.dynamodb.Table:
return aws.dynamodb.Table(
f"{PROJECT_NAME}-backtests",
name=f"{PROJECT_NAME}-backtests",
billing_mode="PAY_PER_REQUEST",
hash_key="backtest_id",
attributes=[
aws.dynamodb.TableAttributeArgs(
name="backtest_id",
type="S",
),
aws.dynamodb.TableAttributeArgs(
name="status",
type="S",
),
aws.dynamodb.TableAttributeArgs(
name="created_at",
type="S",
),
],
global_secondary_indexes=[
aws.dynamodb.TableGlobalSecondaryIndexArgs(
name="status-created_at-index",
hash_key="status",
range_key="created_at",
projection_type="ALL",
),
],
point_in_time_recovery=aws.dynamodb.TablePointInTimeRecoveryArgs(
enabled=True,
),
server_side_encryption=aws.dynamodb.TableServerSideEncryptionArgs(
enabled=True,
),
ttl=aws.dynamodb.TableTtlArgs(
attribute_name="ttl",
enabled=True,
),
tags=get_tags("dynamodb"),
)
8. SQS Queues¶
orchestration/sqs.py¶
"""SQS queues for async processing."""
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags
class SqsQueues:
"""Creates SQS queues with DLQ."""
def __init__(self):
self.dlq = self._create_dlq()
self.backtest_queue = self._create_backtest_queue()
def _create_dlq(self) -> aws.sqs.Queue:
return aws.sqs.Queue(
f"{PROJECT_NAME}-backtest-dlq",
name=f"{PROJECT_NAME}-backtest-dlq.fifo",
fifo_queue=True,
content_based_deduplication=True,
message_retention_seconds=1209600, # 14 days
tags=get_tags("sqs-dlq"),
)
def _create_backtest_queue(self) -> aws.sqs.Queue:
return aws.sqs.Queue(
f"{PROJECT_NAME}-backtest-queue",
name=f"{PROJECT_NAME}-backtest-queue.fifo",
fifo_queue=True,
content_based_deduplication=True,
visibility_timeout_seconds=300, # 5 minutes
message_retention_seconds=86400, # 1 day
redrive_policy=self.dlq.arn.apply(
lambda arn: f'{{"deadLetterTargetArn":"{arn}","maxReceiveCount":3}}'
),
tags=get_tags("sqs"),
)
9. IAM Roles and Policies¶
security/iam.py¶
"""IAM roles and policies."""
import json
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags
class IamRoles:
"""Creates IAM roles for TradAI services."""
def __init__(self):
self.ecs_execution_role = self._create_ecs_execution_role()
self.ecs_task_role = self._create_ecs_task_role()
self.lambda_role = self._create_lambda_role()
self.step_functions_role = self._create_step_functions_role()
def _create_ecs_execution_role(self) -> aws.iam.Role:
assume_role_policy = json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "ecs-tasks.amazonaws.com"},
"Action": "sts:AssumeRole",
}],
})
role = aws.iam.Role(
f"{PROJECT_NAME}-ecs-execution-role",
name=f"{PROJECT_NAME}-ecs-execution-role",
assume_role_policy=assume_role_policy,
tags=get_tags(),
)
# Attach managed policy
aws.iam.RolePolicyAttachment(
f"{PROJECT_NAME}-ecs-execution-policy",
role=role.name,
policy_arn="arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy",
)
# Secrets Manager access
secrets_policy = aws.iam.RolePolicy(
f"{PROJECT_NAME}-ecs-secrets-policy",
role=role.id,
policy=json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue",
],
"Resource": f"arn:aws:secretsmanager:*:*:secret:{PROJECT_NAME}/*",
}],
}),
)
return role
def _create_ecs_task_role(self) -> aws.iam.Role:
assume_role_policy = json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "ecs-tasks.amazonaws.com"},
"Action": "sts:AssumeRole",
}],
})
role = aws.iam.Role(
f"{PROJECT_NAME}-ecs-task-role",
name=f"{PROJECT_NAME}-ecs-task-role",
assume_role_policy=assume_role_policy,
tags=get_tags(),
)
# S3 access policy
aws.iam.RolePolicy(
f"{PROJECT_NAME}-ecs-s3-policy",
role=role.id,
policy=json.dumps({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:ListBucket",
],
"Resource": [
f"arn:aws:s3:::{PROJECT_NAME}-*",
f"arn:aws:s3:::{PROJECT_NAME}-*/*",
],
},
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:Query",
],
"Resource": f"arn:aws:dynamodb:*:*:table/{PROJECT_NAME}-*",
},
],
}),
)
return role
def _create_lambda_role(self) -> aws.iam.Role:
assume_role_policy = json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}],
})
role = aws.iam.Role(
f"{PROJECT_NAME}-lambda-role",
name=f"{PROJECT_NAME}-lambda-role",
assume_role_policy=assume_role_policy,
tags=get_tags(),
)
# VPC access
aws.iam.RolePolicyAttachment(
f"{PROJECT_NAME}-lambda-vpc-policy",
role=role.name,
policy_arn="arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
)
# Custom permissions
aws.iam.RolePolicy(
f"{PROJECT_NAME}-lambda-custom-policy",
role=role.id,
policy=json.dumps({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
],
"Resource": f"arn:aws:sqs:*:*:{PROJECT_NAME}-*",
},
{
"Effect": "Allow",
"Action": [
"states:StartExecution",
],
"Resource": f"arn:aws:states:*:*:stateMachine:{PROJECT_NAME}-*",
},
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
],
"Resource": f"arn:aws:dynamodb:*:*:table/{PROJECT_NAME}-*",
},
],
}),
)
return role
def _create_step_functions_role(self) -> aws.iam.Role:
assume_role_policy = json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "states.amazonaws.com"},
"Action": "sts:AssumeRole",
}],
})
role = aws.iam.Role(
f"{PROJECT_NAME}-sfn-role",
name=f"{PROJECT_NAME}-sfn-role",
assume_role_policy=assume_role_policy,
tags=get_tags(),
)
# Step Functions permissions
aws.iam.RolePolicy(
f"{PROJECT_NAME}-sfn-policy",
role=role.id,
policy=json.dumps({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ecs:RunTask",
"ecs:StopTask",
"ecs:DescribeTasks",
],
"Resource": "*",
"Condition": {
"ArnEquals": {
"ecs:cluster": f"arn:aws:ecs:*:*:cluster/{PROJECT_NAME}-*",
}
},
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": [
f"arn:aws:iam::*:role/{PROJECT_NAME}-ecs-*",
],
},
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
],
"Resource": f"arn:aws:lambda:*:*:function:{PROJECT_NAME}-*",
},
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
],
"Resource": f"arn:aws:dynamodb:*:*:table/{PROJECT_NAME}-*",
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups",
],
"Resource": "*",
},
{
"Effect": "Allow",
"Action": [
"events:PutTargets",
"events:PutRule",
"events:DescribeRule",
],
"Resource": f"arn:aws:events:*:*:rule/StepFunctions*",
},
],
}),
)
return role
10. ALB and API Gateway¶
networking/alb.py¶
"""Application Load Balancer configuration."""
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, SERVICES, get_tags
class ApplicationLoadBalancer:
"""Creates ALB with target groups for ECS services."""
def __init__(
self,
vpc_id: pulumi.Output[str],
public_subnet_ids: list[pulumi.Output[str]],
security_group_id: pulumi.Output[str],
certificate_arn: str,
):
self.alb = self._create_alb(public_subnet_ids, security_group_id)
self.target_groups = self._create_target_groups(vpc_id)
self.https_listener = self._create_https_listener(certificate_arn)
self.http_listener = self._create_http_redirect_listener()
def _create_alb(
self,
subnet_ids: list[pulumi.Output[str]],
sg_id: pulumi.Output[str],
) -> aws.lb.LoadBalancer:
return aws.lb.LoadBalancer(
f"{PROJECT_NAME}-alb",
name=f"{PROJECT_NAME}-{ENVIRONMENT}",
internal=False,
load_balancer_type="application",
security_groups=[sg_id],
subnets=subnet_ids,
enable_deletion_protection=ENVIRONMENT == "prod",
tags=get_tags("alb"),
)
def _create_target_groups(
self,
vpc_id: pulumi.Output[str],
) -> dict[str, aws.lb.TargetGroup]:
target_groups = {}
for service_name, config in SERVICES.items():
tg = aws.lb.TargetGroup(
f"{PROJECT_NAME}-{service_name}-tg",
name=f"{PROJECT_NAME}-{service_name}"[:32],
port=config["port"],
protocol="HTTP",
target_type="ip",
vpc_id=vpc_id,
health_check=aws.lb.TargetGroupHealthCheckArgs(
enabled=True,
path=config["health_check_path"],
port="traffic-port",
protocol="HTTP",
healthy_threshold=2,
unhealthy_threshold=3,
timeout=5,
interval=30,
matcher="200",
),
tags=get_tags(service_name),
)
target_groups[service_name] = tg
return target_groups
def _create_https_listener(
self,
certificate_arn: str,
) -> aws.lb.Listener:
listener = aws.lb.Listener(
f"{PROJECT_NAME}-https-listener",
load_balancer_arn=self.alb.arn,
port=443,
protocol="HTTPS",
ssl_policy="ELBSecurityPolicy-TLS13-1-2-2021-06",
certificate_arn=certificate_arn,
default_action=aws.lb.ListenerDefaultActionArgs(
type="fixed-response",
fixed_response=aws.lb.ListenerDefaultActionFixedResponseArgs(
content_type="text/plain",
message_body="Not Found",
status_code="404",
),
),
tags=get_tags(),
)
# Path-based routing rules
priority = 100
for service_name, tg in self.target_groups.items():
aws.lb.ListenerRule(
f"{PROJECT_NAME}-{service_name}-rule",
listener_arn=listener.arn,
priority=priority,
actions=[
aws.lb.ListenerRuleActionArgs(
type="forward",
target_group_arn=tg.arn,
)
],
conditions=[
aws.lb.ListenerRuleConditionArgs(
path_pattern=aws.lb.ListenerRuleConditionPathPatternArgs(
values=[f"/api/{service_name}/*"],
),
)
],
tags=get_tags(service_name),
)
priority += 10
return listener
def _create_http_redirect_listener(self) -> aws.lb.Listener:
return aws.lb.Listener(
f"{PROJECT_NAME}-http-listener",
load_balancer_arn=self.alb.arn,
port=80,
protocol="HTTP",
default_action=aws.lb.ListenerDefaultActionArgs(
type="redirect",
redirect=aws.lb.ListenerDefaultActionRedirectArgs(
port="443",
protocol="HTTPS",
status_code="HTTP_301",
),
),
tags=get_tags(),
)
11. Cognito Authentication¶
security/cognito.py¶
"""Cognito User Pool for authentication.
NEW: This module was missing from the original implementation.
Implements authentication per CANONICAL Section 4.2.
"""
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags
class CognitoAuth:
"""Creates Cognito User Pool and App Client for JWT authentication."""
def __init__(self):
self.user_pool = self._create_user_pool()
self.user_pool_client = self._create_user_pool_client()
self.user_pool_domain = self._create_user_pool_domain()
def _create_user_pool(self) -> aws.cognito.UserPool:
"""Create Cognito User Pool with MFA required."""
return aws.cognito.UserPool(
f"{PROJECT_NAME}-user-pool",
name=f"{PROJECT_NAME}-users",
# Password policy (CANONICAL Section 4.2)
password_policy=aws.cognito.UserPoolPasswordPolicyArgs(
minimum_length=12,
require_lowercase=True,
require_uppercase=True,
require_numbers=True,
require_symbols=True,
temporary_password_validity_days=7,
),
# MFA Configuration
mfa_configuration="ON",
software_token_mfa_configuration=aws.cognito.UserPoolSoftwareTokenMfaConfigurationArgs(
enabled=True,
),
# Account recovery
account_recovery_setting=aws.cognito.UserPoolAccountRecoverySettingArgs(
recovery_mechanisms=[
aws.cognito.UserPoolAccountRecoverySettingRecoveryMechanismArgs(
name="verified_email",
priority=1,
),
],
),
# Auto-verified attributes
auto_verified_attributes=["email"],
# Username configuration
username_attributes=["email"],
username_configuration=aws.cognito.UserPoolUsernameConfigurationArgs(
case_sensitive=False,
),
# Schema
schemas=[
aws.cognito.UserPoolSchemaArgs(
name="email",
attribute_data_type="String",
required=True,
mutable=True,
),
aws.cognito.UserPoolSchemaArgs(
name="name",
attribute_data_type="String",
required=True,
mutable=True,
),
],
# Advanced security
user_pool_add_ons=aws.cognito.UserPoolUserPoolAddOnsArgs(
advanced_security_mode="ENFORCED",
),
# Email configuration
email_configuration=aws.cognito.UserPoolEmailConfigurationArgs(
email_sending_account="COGNITO_DEFAULT",
),
# Deletion protection
deletion_protection="ACTIVE" if ENVIRONMENT == "prod" else "INACTIVE",
tags=get_tags("cognito"),
)
def _create_user_pool_client(self) -> aws.cognito.UserPoolClient:
"""Create App Client for API authentication."""
return aws.cognito.UserPoolClient(
f"{PROJECT_NAME}-api-client",
name=f"{PROJECT_NAME}-api-client",
user_pool_id=self.user_pool.id,
# OAuth configuration
allowed_oauth_flows=["code"],
allowed_oauth_flows_user_pool_client=True,
allowed_oauth_scopes=["email", "openid", "profile"],
callback_urls=[
f"https://api.tradai.smartml.me/callback",
"http://localhost:3000/callback", # Dev
],
logout_urls=[
f"https://api.tradai.smartml.me/logout",
"http://localhost:3000/logout",
],
supported_identity_providers=["COGNITO"],
# Token configuration
access_token_validity=1, # 1 hour
id_token_validity=1, # 1 hour
refresh_token_validity=30, # 30 days
token_validity_units=aws.cognito.UserPoolClientTokenValidityUnitsArgs(
access_token="hours",
id_token="hours",
refresh_token="days",
),
# Security
prevent_user_existence_errors="ENABLED",
enable_token_revocation=True,
# Auth flows
explicit_auth_flows=[
"ALLOW_REFRESH_TOKEN_AUTH",
"ALLOW_USER_SRP_AUTH",
],
)
def _create_user_pool_domain(self) -> aws.cognito.UserPoolDomain:
"""Create Cognito hosted UI domain."""
return aws.cognito.UserPoolDomain(
f"{PROJECT_NAME}-domain",
domain=f"{PROJECT_NAME}-{ENVIRONMENT}",
user_pool_id=self.user_pool.id,
)
12. API Gateway¶
networking/api_gateway.py¶
"""AWS API Gateway HTTP API with JWT authorization.
NEW: This module was missing from the original implementation.
Implements API Gateway per CANONICAL Section 5.
"""
import json
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags, SQS_CONFIG
class ApiGateway:
"""Creates HTTP API Gateway with Cognito JWT authorization."""
def __init__(
self,
cognito_user_pool_id: pulumi.Output[str],
cognito_client_id: pulumi.Output[str],
alb_listener_arn: pulumi.Output[str],
alb_dns_name: pulumi.Output[str],
sqs_queue_arn: pulumi.Output[str],
vpc_id: pulumi.Output[str],
private_subnet_ids: list[pulumi.Output[str]],
security_group_id: pulumi.Output[str],
):
self.api = self._create_api()
self.vpc_link = self._create_vpc_link(private_subnet_ids, security_group_id)
self.authorizer = self._create_authorizer(cognito_user_pool_id, cognito_client_id)
self.sqs_role = self._create_sqs_integration_role(sqs_queue_arn)
# Create integrations
self.alb_integration = self._create_alb_integration(alb_dns_name)
self.sqs_integration = self._create_sqs_integration(sqs_queue_arn)
# Create routes
self._create_routes()
# Create stage
self.stage = self._create_stage()
# Custom domain (optional)
self.custom_domain = self._create_custom_domain()
def _create_api(self) -> aws.apigatewayv2.Api:
"""Create HTTP API."""
return aws.apigatewayv2.Api(
f"{PROJECT_NAME}-api",
name=f"{PROJECT_NAME}-api-{ENVIRONMENT}",
protocol_type="HTTP",
cors_configuration=aws.apigatewayv2.ApiCorsConfigurationArgs(
allow_origins=["*"] if ENVIRONMENT == "dev" else [
"https://tradai.smartml.me",
"https://app.tradai.smartml.me",
],
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type", "Authorization", "X-Request-ID"],
max_age=86400,
),
tags=get_tags("api-gateway"),
)
def _create_vpc_link(
self,
subnet_ids: list[pulumi.Output[str]],
sg_id: pulumi.Output[str],
) -> aws.apigatewayv2.VpcLink:
"""Create VPC Link for private ALB integration."""
return aws.apigatewayv2.VpcLink(
f"{PROJECT_NAME}-vpc-link",
name=f"{PROJECT_NAME}-vpc-link",
subnet_ids=subnet_ids,
security_group_ids=[sg_id],
tags=get_tags("vpc-link"),
)
def _create_authorizer(
self,
user_pool_id: pulumi.Output[str],
client_id: pulumi.Output[str],
) -> aws.apigatewayv2.Authorizer:
"""Create JWT authorizer using Cognito."""
return aws.apigatewayv2.Authorizer(
f"{PROJECT_NAME}-jwt-authorizer",
api_id=self.api.id,
authorizer_type="JWT",
name=f"{PROJECT_NAME}-cognito-authorizer",
identity_sources=["$request.header.Authorization"],
jwt_configuration=aws.apigatewayv2.AuthorizerJwtConfigurationArgs(
audiences=[client_id],
issuer=user_pool_id.apply(
lambda id: f"https://cognito-idp.us-east-1.amazonaws.com/{id}"
),
),
)
def _create_sqs_integration_role(
self,
queue_arn: pulumi.Output[str],
) -> aws.iam.Role:
"""Create IAM role for API Gateway -> SQS integration."""
assume_role = json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "apigateway.amazonaws.com"},
"Action": "sts:AssumeRole",
}],
})
role = aws.iam.Role(
f"{PROJECT_NAME}-apigw-sqs-role",
name=f"{PROJECT_NAME}-apigw-sqs-role",
assume_role_policy=assume_role,
tags=get_tags(),
)
aws.iam.RolePolicy(
f"{PROJECT_NAME}-apigw-sqs-policy",
role=role.id,
policy=queue_arn.apply(lambda arn: json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["sqs:SendMessage"],
"Resource": arn,
}],
})),
)
return role
def _create_alb_integration(
self,
alb_dns: pulumi.Output[str],
) -> aws.apigatewayv2.Integration:
"""Create integration to ALB via VPC Link."""
return aws.apigatewayv2.Integration(
f"{PROJECT_NAME}-alb-integration",
api_id=self.api.id,
integration_type="HTTP_PROXY",
integration_method="ANY",
integration_uri=alb_dns.apply(lambda dns: f"http://{dns}"),
connection_type="VPC_LINK",
connection_id=self.vpc_link.id,
)
def _create_sqs_integration(
self,
queue_arn: pulumi.Output[str],
) -> aws.apigatewayv2.Integration:
"""Create direct integration to SQS for backtest requests."""
return aws.apigatewayv2.Integration(
f"{PROJECT_NAME}-sqs-integration",
api_id=self.api.id,
integration_type="AWS_PROXY",
integration_subtype="SQS-SendMessage",
credentials_arn=self.sqs_role.arn,
request_parameters={
"QueueUrl": queue_arn.apply(lambda arn: arn.replace("arn:aws:sqs:", "https://sqs.").replace(":", ".amazonaws.com/")),
"MessageBody": "$request.body",
"MessageGroupId": "$request.header.X-Request-ID",
},
)
def _create_routes(self):
"""Create all API routes per CANONICAL Section 5.1."""
routes = [
# Health check (no auth)
("GET", "/api/v1/health", self.alb_integration.id, False),
# Strategies (auth required)
("GET", "/api/v1/strategies", self.alb_integration.id, True),
("GET", "/api/v1/strategies/{id}", self.alb_integration.id, True),
("POST", "/api/v1/strategies", self.alb_integration.id, True),
("POST", "/api/v1/strategies/{name}/stage", self.alb_integration.id, True),
("POST", "/api/v1/strategies/{name}/promote", self.alb_integration.id, True),
# Backtests (auth required, SQS integration)
("POST", "/api/v1/backtests", self.sqs_integration.id, True),
("GET", "/api/v1/backtests", self.alb_integration.id, True),
("GET", "/api/v1/backtests/{job_id}", self.alb_integration.id, True),
("POST", "/api/v1/backtests/{job_id}/cancel", self.alb_integration.id, True),
("GET", "/api/v1/backtests/{job_id}/equity", self.alb_integration.id, True),
("GET", "/api/v1/backtests/{job_id}/report-data", self.alb_integration.id, True),
# Data (auth required)
("GET", "/api/v1/data/symbols", self.alb_integration.id, True),
("GET", "/api/v1/data/freshness", self.alb_integration.id, True),
("POST", "/api/v1/data/sync", self.alb_integration.id, True),
# MLflow proxy (auth required)
("ANY", "/mlflow/{proxy+}", self.alb_integration.id, True),
]
for i, (method, path, integration_id, auth_required) in enumerate(routes):
route_key = f"{method} {path}"
aws.apigatewayv2.Route(
f"{PROJECT_NAME}-route-{i}",
api_id=self.api.id,
route_key=route_key,
target=pulumi.Output.concat("integrations/", integration_id),
authorization_type="JWT" if auth_required else "NONE",
authorizer_id=self.authorizer.id if auth_required else None,
)
def _create_stage(self) -> aws.apigatewayv2.Stage:
"""Create deployment stage with throttling."""
return aws.apigatewayv2.Stage(
f"{PROJECT_NAME}-stage",
api_id=self.api.id,
name="$default",
auto_deploy=True,
default_route_settings=aws.apigatewayv2.StageDefaultRouteSettingsArgs(
# CANONICAL Section 5.2
throttling_burst_limit=200,
throttling_rate_limit=100,
),
access_log_settings=aws.apigatewayv2.StageAccessLogSettingsArgs(
destination_arn=f"arn:aws:logs:us-east-1:*:log-group:/aws/api-gateway/{PROJECT_NAME}",
format=json.dumps({
"requestId": "$context.requestId",
"ip": "$context.identity.sourceIp",
"requestTime": "$context.requestTime",
"httpMethod": "$context.httpMethod",
"routeKey": "$context.routeKey",
"status": "$context.status",
"responseLength": "$context.responseLength",
"integrationError": "$context.integrationErrorMessage",
}),
),
tags=get_tags("api-gateway"),
)
def _create_custom_domain(self) -> aws.apigatewayv2.DomainName:
"""Create custom domain for API."""
config = pulumi.Config()
certificate_arn = config.get("api_certificate_arn")
if not certificate_arn:
return None
domain = aws.apigatewayv2.DomainName(
f"{PROJECT_NAME}-domain",
domain_name=f"api.tradai.smartml.me",
domain_name_configuration=aws.apigatewayv2.DomainNameDomainNameConfigurationArgs(
certificate_arn=certificate_arn,
endpoint_type="REGIONAL",
security_policy="TLS_1_2",
),
tags=get_tags("api-gateway"),
)
aws.apigatewayv2.ApiMapping(
f"{PROJECT_NAME}-api-mapping",
api_id=self.api.id,
domain_name=domain.id,
stage=self.stage.id,
)
return domain
13. WAF Web ACL¶
security/waf.py¶
"""WAF Web ACL for API protection.
NEW: This module was missing from the original implementation.
Implements WAF per CANONICAL Section 4.3.
"""
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags
class WafWebAcl:
"""Creates WAF Web ACL with rate limiting and managed rules."""
def __init__(
self,
api_gateway_stage_arn: pulumi.Output[str] = None,
alb_arn: pulumi.Output[str] = None,
):
self.web_acl = self._create_web_acl()
# Associate with resources
if api_gateway_stage_arn:
self._associate_api_gateway(api_gateway_stage_arn)
if alb_arn:
self._associate_alb(alb_arn)
def _create_web_acl(self) -> aws.wafv2.WebAcl:
"""Create WAF Web ACL with rules per CANONICAL Section 4.3."""
return aws.wafv2.WebAcl(
f"{PROJECT_NAME}-waf",
name=f"{PROJECT_NAME}-waf-{ENVIRONMENT}",
scope="REGIONAL",
default_action=aws.wafv2.WebAclDefaultActionArgs(
allow=aws.wafv2.WebAclDefaultActionAllowArgs(),
),
visibility_config=aws.wafv2.WebAclVisibilityConfigArgs(
cloudwatch_metrics_enabled=True,
metric_name=f"{PROJECT_NAME}-waf-metrics",
sampled_requests_enabled=True,
),
rules=[
# Rule 1: Rate limiting (CANONICAL: 100 req/5min per IP)
aws.wafv2.WebAclRuleArgs(
name="RateLimitRule",
priority=1,
action=aws.wafv2.WebAclRuleActionArgs(
block=aws.wafv2.WebAclRuleActionBlockArgs(),
),
statement=aws.wafv2.WebAclRuleStatementArgs(
rate_based_statement=aws.wafv2.WebAclRuleStatementRateBasedStatementArgs(
limit=100, # per 5-minute window
aggregate_key_type="IP",
),
),
visibility_config=aws.wafv2.WebAclRuleVisibilityConfigArgs(
cloudwatch_metrics_enabled=True,
metric_name="RateLimitRule",
sampled_requests_enabled=True,
),
),
# Rule 2: AWS Managed Common Rule Set
aws.wafv2.WebAclRuleArgs(
name="AWSManagedRulesCommonRuleSet",
priority=2,
override_action=aws.wafv2.WebAclRuleOverrideActionArgs(
none=aws.wafv2.WebAclRuleOverrideActionNoneArgs(),
),
statement=aws.wafv2.WebAclRuleStatementArgs(
managed_rule_group_statement=aws.wafv2.WebAclRuleStatementManagedRuleGroupStatementArgs(
vendor_name="AWS",
name="AWSManagedRulesCommonRuleSet",
),
),
visibility_config=aws.wafv2.WebAclRuleVisibilityConfigArgs(
cloudwatch_metrics_enabled=True,
metric_name="AWSManagedRulesCommonRuleSet",
sampled_requests_enabled=True,
),
),
# Rule 3: AWS Managed Known Bad Inputs
aws.wafv2.WebAclRuleArgs(
name="AWSManagedRulesKnownBadInputsRuleSet",
priority=3,
override_action=aws.wafv2.WebAclRuleOverrideActionArgs(
none=aws.wafv2.WebAclRuleOverrideActionNoneArgs(),
),
statement=aws.wafv2.WebAclRuleStatementArgs(
managed_rule_group_statement=aws.wafv2.WebAclRuleStatementManagedRuleGroupStatementArgs(
vendor_name="AWS",
name="AWSManagedRulesKnownBadInputsRuleSet",
),
),
visibility_config=aws.wafv2.WebAclRuleVisibilityConfigArgs(
cloudwatch_metrics_enabled=True,
metric_name="AWSManagedRulesKnownBadInputsRuleSet",
sampled_requests_enabled=True,
),
),
# Rule 4: AWS Managed SQL Injection Rules
aws.wafv2.WebAclRuleArgs(
name="AWSManagedRulesSQLiRuleSet",
priority=4,
override_action=aws.wafv2.WebAclRuleOverrideActionArgs(
none=aws.wafv2.WebAclRuleOverrideActionNoneArgs(),
),
statement=aws.wafv2.WebAclRuleStatementArgs(
managed_rule_group_statement=aws.wafv2.WebAclRuleStatementManagedRuleGroupStatementArgs(
vendor_name="AWS",
name="AWSManagedRulesSQLiRuleSet",
),
),
visibility_config=aws.wafv2.WebAclRuleVisibilityConfigArgs(
cloudwatch_metrics_enabled=True,
metric_name="AWSManagedRulesSQLiRuleSet",
sampled_requests_enabled=True,
),
),
],
tags=get_tags("waf"),
)
def _associate_api_gateway(self, stage_arn: pulumi.Output[str]):
"""Associate WAF with API Gateway stage."""
aws.wafv2.WebAclAssociation(
f"{PROJECT_NAME}-waf-apigw-association",
resource_arn=stage_arn,
web_acl_arn=self.web_acl.arn,
)
def _associate_alb(self, alb_arn: pulumi.Output[str]):
"""Associate WAF with ALB."""
aws.wafv2.WebAclAssociation(
f"{PROJECT_NAME}-waf-alb-association",
resource_arn=alb_arn,
web_acl_arn=self.web_acl.arn,
)
14. Main Entry Point¶
main.py¶
"""Main Pulumi program for TradAI infrastructure.
UPDATED: Added Cognito, API Gateway, WAF, VPC Endpoints, CloudTrail.
"""
import pulumi
from vpc.network import VpcNetwork
from vpc.security import SecurityGroups
from vpc.endpoints import VpcEndpoints
from vpc.nacls import NetworkAcls
from compute.nat import NatInstance
from compute.ecs import EcsCluster
from compute.lambda_funcs import LambdaFunctions
from orchestration.step_functions import StepFunctions
from orchestration.sqs import SqsQueues
from storage.rds import RdsDatabase
from storage.s3 import S3Buckets
from storage.dynamodb import DynamoDBTables
from security.iam import IamRoles
from security.cognito import CognitoAuth
from security.waf import WafWebAcl
from security.cloudtrail import CloudTrailAudit
from networking.alb import ApplicationLoadBalancer
from networking.api_gateway import ApiGateway
from observability.cloudwatch import CloudWatchAlarms
from config import PROJECT_NAME, ENVIRONMENT
def main():
# 1. Create VPC and networking
vpc = VpcNetwork()
# 2. Create security groups
security_groups = SecurityGroups(vpc.vpc.id)
# 3. Create NAT instance with HA
nat = NatInstance(
public_subnet_ids=[s.id for s in vpc.public_subnets],
private_subnet_ids=[s.id for s in vpc.private_subnets],
security_group_id=security_groups.nat_sg.id,
vpc_id=vpc.vpc.id,
)
# 4. Create VPC Endpoints (S3, DynamoDB - free gateway endpoints)
endpoints = VpcEndpoints(
vpc_id=vpc.vpc.id,
route_table_ids=[nat.route_table.id],
)
# 5. Create Network ACLs
nacls = NetworkAcls(
vpc_id=vpc.vpc.id,
public_subnet_ids=[s.id for s in vpc.public_subnets],
private_subnet_ids=[s.id for s in vpc.private_subnets],
database_subnet_ids=[s.id for s in vpc.database_subnets],
)
# 6. Create IAM roles
iam = IamRoles()
# 7. Create storage
s3 = S3Buckets()
dynamodb = DynamoDBTables()
# 8. Create SQS queues
sqs = SqsQueues()
# 9. Create Cognito authentication
cognito = CognitoAuth()
# 10. Create ALB (internal, accessed via API Gateway)
alb = ApplicationLoadBalancer(
vpc_id=vpc.vpc.id,
public_subnet_ids=[s.id for s in vpc.public_subnets],
security_group_id=security_groups.alb_sg.id,
certificate_arn=pulumi.Config().require("certificate_arn"),
)
# 11. Create RDS database
rds = RdsDatabase(
subnet_group_name=vpc.db_subnet_group.name,
security_group_id=security_groups.rds_sg.id,
)
# 12. Create ECS cluster and services
ecs = EcsCluster(
private_subnet_ids=[s.id for s in vpc.private_subnets],
security_group_id=security_groups.ecs_sg.id,
execution_role_arn=iam.ecs_execution_role.arn,
task_role_arn=iam.ecs_task_role.arn,
target_groups=alb.target_groups,
)
# 13. Create Lambda functions
lambdas = LambdaFunctions(
subnet_ids=[s.id for s in vpc.private_subnets],
security_group_id=security_groups.lambda_sg.id,
role_arn=iam.lambda_role.arn,
sqs_queue_arn=sqs.backtest_queue.arn,
)
# 14. Create Step Functions
step_functions = StepFunctions(
role_arn=iam.step_functions_role.arn,
ecs_cluster_arn=ecs.cluster.arn,
strategy_task_definition_arn=ecs.strategy_task_definition.arn,
subnet_ids=[s.id for s in vpc.private_subnets],
security_group_id=security_groups.ecs_sg.id,
dynamodb_table_name=dynamodb.workflow_table.name,
lambdas=lambdas,
)
# 15. Create API Gateway
api_gateway = ApiGateway(
cognito_user_pool_id=cognito.user_pool.id,
cognito_client_id=cognito.user_pool_client.id,
alb_listener_arn=alb.https_listener.arn,
alb_dns_name=alb.alb.dns_name,
sqs_queue_arn=sqs.backtest_queue.arn,
vpc_id=vpc.vpc.id,
private_subnet_ids=[s.id for s in vpc.private_subnets],
security_group_id=security_groups.lambda_sg.id,
)
# 16. Create WAF
waf = WafWebAcl(
api_gateway_stage_arn=api_gateway.stage.arn,
alb_arn=alb.alb.arn,
)
# 17. Create CloudTrail audit logging
cloudtrail = CloudTrailAudit(
s3_bucket_name=s3.logs_bucket.id,
)
# 18. Create CloudWatch alarms
alarms = CloudWatchAlarms(
ecs_cluster_name=ecs.cluster.name,
sqs_queue_name=sqs.backtest_queue.name,
dlq_name=sqs.dlq.name,
step_function_arn=step_functions.backtest_workflow.arn,
)
# Export outputs
pulumi.export("vpc_id", vpc.vpc.id)
pulumi.export("api_gateway_url", api_gateway.api.api_endpoint)
pulumi.export("cognito_user_pool_id", cognito.user_pool.id)
pulumi.export("cognito_client_id", cognito.user_pool_client.id)
pulumi.export("alb_dns", alb.alb.dns_name)
pulumi.export("rds_endpoint", rds.instance.endpoint)
pulumi.export("ecs_cluster_name", ecs.cluster.name)
pulumi.export("backtest_queue_url", sqs.backtest_queue.url)
pulumi.export("step_function_arn", step_functions.backtest_workflow.arn)
pulumi.export("waf_web_acl_arn", waf.web_acl.arn)
if __name__ == "__main__":
main()
15. VPC Endpoints and NACLs¶
vpc/endpoints.py¶
"""VPC Endpoints for AWS services.
NEW: S3 and DynamoDB gateway endpoints (free) for cost and security.
"""
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, AWS_REGION, get_tags
class VpcEndpoints:
"""Creates VPC gateway endpoints for S3 and DynamoDB."""
def __init__(
self,
vpc_id: pulumi.Output[str],
route_table_ids: list[pulumi.Output[str]],
):
self.s3_endpoint = self._create_s3_endpoint(vpc_id, route_table_ids)
self.dynamodb_endpoint = self._create_dynamodb_endpoint(vpc_id, route_table_ids)
def _create_s3_endpoint(
self,
vpc_id: pulumi.Output[str],
route_table_ids: list[pulumi.Output[str]],
) -> aws.ec2.VpcEndpoint:
"""Create S3 gateway endpoint (free)."""
return aws.ec2.VpcEndpoint(
f"{PROJECT_NAME}-s3-endpoint",
vpc_id=vpc_id,
service_name=f"com.amazonaws.{AWS_REGION}.s3",
vpc_endpoint_type="Gateway",
route_table_ids=route_table_ids,
tags=get_tags() | {"Name": f"{PROJECT_NAME}-s3-endpoint"},
)
def _create_dynamodb_endpoint(
self,
vpc_id: pulumi.Output[str],
route_table_ids: list[pulumi.Output[str]],
) -> aws.ec2.VpcEndpoint:
"""Create DynamoDB gateway endpoint (free)."""
return aws.ec2.VpcEndpoint(
f"{PROJECT_NAME}-dynamodb-endpoint",
vpc_id=vpc_id,
service_name=f"com.amazonaws.{AWS_REGION}.dynamodb",
vpc_endpoint_type="Gateway",
route_table_ids=route_table_ids,
tags=get_tags() | {"Name": f"{PROJECT_NAME}-dynamodb-endpoint"},
)
vpc/nacls.py¶
"""Network ACLs for defense in depth.
NEW: Stateless firewall layer in addition to security groups.
"""
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, SUBNETS, get_tags
class NetworkAcls:
"""Creates Network ACLs for all subnet types."""
def __init__(
self,
vpc_id: pulumi.Output[str],
public_subnet_ids: list[pulumi.Output[str]],
private_subnet_ids: list[pulumi.Output[str]],
database_subnet_ids: list[pulumi.Output[str]],
):
self.public_nacl = self._create_public_nacl(vpc_id, public_subnet_ids)
self.private_nacl = self._create_private_nacl(vpc_id, private_subnet_ids)
self.database_nacl = self._create_database_nacl(vpc_id, database_subnet_ids)
def _create_public_nacl(
self,
vpc_id: pulumi.Output[str],
subnet_ids: list[pulumi.Output[str]],
) -> aws.ec2.NetworkAcl:
nacl = aws.ec2.NetworkAcl(
f"{PROJECT_NAME}-public-nacl",
vpc_id=vpc_id,
tags=get_tags() | {"Name": f"{PROJECT_NAME}-public-nacl"},
)
# Ingress rules
rules = [
(100, "6", 80, 80, "0.0.0.0/0", "allow"), # HTTP
(110, "6", 443, 443, "0.0.0.0/0", "allow"), # HTTPS
(120, "6", 1024, 65535, "0.0.0.0/0", "allow"), # Ephemeral ports
]
for i, (num, proto, from_p, to_p, cidr, action) in enumerate(rules):
aws.ec2.NetworkAclRule(
f"{PROJECT_NAME}-public-nacl-in-{i}",
network_acl_id=nacl.id,
rule_number=num,
protocol=proto,
rule_action=action,
egress=False,
cidr_block=cidr,
from_port=from_p,
to_port=to_p,
)
# Egress - allow all outbound
aws.ec2.NetworkAclRule(
f"{PROJECT_NAME}-public-nacl-out",
network_acl_id=nacl.id,
rule_number=100,
protocol="-1",
rule_action="allow",
egress=True,
cidr_block="0.0.0.0/0",
)
# Associate with subnets
for i, subnet_id in enumerate(subnet_ids):
aws.ec2.NetworkAclAssociation(
f"{PROJECT_NAME}-public-nacl-assoc-{i}",
network_acl_id=nacl.id,
subnet_id=subnet_id,
)
return nacl
def _create_private_nacl(
self,
vpc_id: pulumi.Output[str],
subnet_ids: list[pulumi.Output[str]],
) -> aws.ec2.NetworkAcl:
nacl = aws.ec2.NetworkAcl(
f"{PROJECT_NAME}-private-nacl",
vpc_id=vpc_id,
tags=get_tags() | {"Name": f"{PROJECT_NAME}-private-nacl"},
)
# Ingress - from VPC only
aws.ec2.NetworkAclRule(
f"{PROJECT_NAME}-private-nacl-in-vpc",
network_acl_id=nacl.id,
rule_number=100,
protocol="-1",
rule_action="allow",
egress=False,
cidr_block="10.0.0.0/16",
)
# Ingress - ephemeral for NAT return traffic
aws.ec2.NetworkAclRule(
f"{PROJECT_NAME}-private-nacl-in-ephemeral",
network_acl_id=nacl.id,
rule_number=110,
protocol="6",
rule_action="allow",
egress=False,
cidr_block="0.0.0.0/0",
from_port=1024,
to_port=65535,
)
# Egress - allow all
aws.ec2.NetworkAclRule(
f"{PROJECT_NAME}-private-nacl-out",
network_acl_id=nacl.id,
rule_number=100,
protocol="-1",
rule_action="allow",
egress=True,
cidr_block="0.0.0.0/0",
)
for i, subnet_id in enumerate(subnet_ids):
aws.ec2.NetworkAclAssociation(
f"{PROJECT_NAME}-private-nacl-assoc-{i}",
network_acl_id=nacl.id,
subnet_id=subnet_id,
)
return nacl
def _create_database_nacl(
self,
vpc_id: pulumi.Output[str],
subnet_ids: list[pulumi.Output[str]],
) -> aws.ec2.NetworkAcl:
nacl = aws.ec2.NetworkAcl(
f"{PROJECT_NAME}-db-nacl",
vpc_id=vpc_id,
tags=get_tags() | {"Name": f"{PROJECT_NAME}-db-nacl"},
)
# Ingress - PostgreSQL from private subnets only
for i, cidr in enumerate(SUBNETS["private"]):
aws.ec2.NetworkAclRule(
f"{PROJECT_NAME}-db-nacl-in-pg-{i}",
network_acl_id=nacl.id,
rule_number=100 + i,
protocol="6",
rule_action="allow",
egress=False,
cidr_block=cidr,
from_port=5432,
to_port=5432,
)
# Egress - ephemeral ports to private subnets
for i, cidr in enumerate(SUBNETS["private"]):
aws.ec2.NetworkAclRule(
f"{PROJECT_NAME}-db-nacl-out-{i}",
network_acl_id=nacl.id,
rule_number=100 + i,
protocol="6",
rule_action="allow",
egress=True,
cidr_block=cidr,
from_port=1024,
to_port=65535,
)
for i, subnet_id in enumerate(subnet_ids):
aws.ec2.NetworkAclAssociation(
f"{PROJECT_NAME}-db-nacl-assoc-{i}",
network_acl_id=nacl.id,
subnet_id=subnet_id,
)
return nacl
16. CloudTrail and VPC Flow Logs¶
security/cloudtrail.py¶
"""CloudTrail audit logging.
NEW: Required for compliance and security forensics.
"""
import json
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags
class CloudTrailAudit:
"""Creates CloudTrail for audit logging."""
def __init__(self, s3_bucket_name: pulumi.Output[str]):
self.flow_logs_group = self._create_flow_logs_group()
self.trail = self._create_trail(s3_bucket_name)
def _create_flow_logs_group(self) -> aws.cloudwatch.LogGroup:
"""Create CloudWatch Log Group for VPC Flow Logs."""
return aws.cloudwatch.LogGroup(
f"{PROJECT_NAME}-flow-logs",
name=f"/aws/vpc/{PROJECT_NAME}/flow-logs",
retention_in_days=7,
tags=get_tags("flow-logs"),
)
def _create_trail(self, bucket_name: pulumi.Output[str]) -> aws.cloudtrail.Trail:
"""Create CloudTrail trail."""
return aws.cloudtrail.Trail(
f"{PROJECT_NAME}-trail",
name=f"{PROJECT_NAME}-audit-trail",
s3_bucket_name=bucket_name,
s3_key_prefix="cloudtrail",
include_global_service_events=True,
is_multi_region_trail=False,
enable_logging=True,
enable_log_file_validation=True,
event_selectors=[
aws.cloudtrail.TrailEventSelectorArgs(
read_write_type="All",
include_management_events=True,
data_resources=[
aws.cloudtrail.TrailEventSelectorDataResourceArgs(
type="AWS::S3::Object",
values=[f"arn:aws:s3:::{PROJECT_NAME}-*/*"],
),
aws.cloudtrail.TrailEventSelectorDataResourceArgs(
type="AWS::DynamoDB::Table",
values=["arn:aws:dynamodb"],
),
],
),
],
tags=get_tags("cloudtrail"),
)
class VpcFlowLogs:
"""Creates VPC Flow Logs."""
def __init__(
self,
vpc_id: pulumi.Output[str],
log_group_arn: pulumi.Output[str],
):
self.role = self._create_role()
self.flow_log = self._create_flow_log(vpc_id, log_group_arn)
def _create_role(self) -> aws.iam.Role:
assume_role = json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "vpc-flow-logs.amazonaws.com"},
"Action": "sts:AssumeRole",
}],
})
role = aws.iam.Role(
f"{PROJECT_NAME}-flow-logs-role",
assume_role_policy=assume_role,
tags=get_tags(),
)
aws.iam.RolePolicy(
f"{PROJECT_NAME}-flow-logs-policy",
role=role.id,
policy=json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
],
"Resource": "*",
}],
}),
)
return role
def _create_flow_log(
self,
vpc_id: pulumi.Output[str],
log_group_arn: pulumi.Output[str],
) -> aws.ec2.FlowLog:
return aws.ec2.FlowLog(
f"{PROJECT_NAME}-vpc-flow-log",
vpc_id=vpc_id,
traffic_type="REJECT", # Only log rejected traffic
log_destination_type="cloud-watch-logs",
log_destination=log_group_arn,
iam_role_arn=self.role.arn,
tags=get_tags("flow-logs"),
)
17. CloudWatch Alarms¶
observability/cloudwatch.py¶
"""CloudWatch alarms and dashboards.
NEW: Operational monitoring per architecture spec.
"""
import pulumi
import pulumi_aws as aws
from config import PROJECT_NAME, ENVIRONMENT, get_tags
class CloudWatchAlarms:
"""Creates CloudWatch alarms for critical metrics."""
def __init__(
self,
ecs_cluster_name: pulumi.Output[str],
sqs_queue_name: pulumi.Output[str],
dlq_name: pulumi.Output[str],
step_function_arn: pulumi.Output[str],
):
self.sns_topic = self._create_sns_topic()
self.dlq_alarm = self._create_dlq_alarm(dlq_name)
self.backtest_failure_alarm = self._create_backtest_failure_alarm(step_function_arn)
self.backtest_duration_alarm = self._create_backtest_duration_alarm(step_function_arn)
def _create_sns_topic(self) -> aws.sns.Topic:
"""Create SNS topic for alarm notifications."""
return aws.sns.Topic(
f"{PROJECT_NAME}-alarms",
name=f"{PROJECT_NAME}-alarms",
tags=get_tags("sns"),
)
def _create_dlq_alarm(self, dlq_name: pulumi.Output[str]) -> aws.cloudwatch.MetricAlarm:
"""Alarm when messages appear in DLQ."""
return aws.cloudwatch.MetricAlarm(
f"{PROJECT_NAME}-dlq-messages-alarm",
alarm_name=f"{PROJECT_NAME}-dlq-messages",
comparison_operator="GreaterThanThreshold",
evaluation_periods=1,
metric_name="ApproximateNumberOfMessagesVisible",
namespace="AWS/SQS",
period=300,
statistic="Sum",
threshold=0,
alarm_description="Messages in DLQ indicate failed backtests",
dimensions={"QueueName": dlq_name},
alarm_actions=[self.sns_topic.arn],
ok_actions=[self.sns_topic.arn],
tags=get_tags("cloudwatch"),
)
def _create_backtest_failure_alarm(
self,
step_function_arn: pulumi.Output[str],
) -> aws.cloudwatch.MetricAlarm:
"""Alarm on high backtest failure rate."""
return aws.cloudwatch.MetricAlarm(
f"{PROJECT_NAME}-backtest-failures-alarm",
alarm_name=f"{PROJECT_NAME}-backtest-failures-high",
comparison_operator="GreaterThanThreshold",
evaluation_periods=1,
metric_name="ExecutionsFailed",
namespace="AWS/States",
period=3600, # 1 hour
statistic="Sum",
threshold=5,
alarm_description="High backtest failure rate",
dimensions={"StateMachineArn": step_function_arn},
alarm_actions=[self.sns_topic.arn],
tags=get_tags("cloudwatch"),
)
def _create_backtest_duration_alarm(
self,
step_function_arn: pulumi.Output[str],
) -> aws.cloudwatch.MetricAlarm:
"""Alarm on long-running backtests."""
return aws.cloudwatch.MetricAlarm(
f"{PROJECT_NAME}-backtest-duration-alarm",
alarm_name=f"{PROJECT_NAME}-backtest-duration-long",
comparison_operator="GreaterThanThreshold",
evaluation_periods=1,
metric_name="ExecutionTime",
namespace="AWS/States",
period=300,
statistic="Average",
threshold=3600000, # 1 hour in ms
alarm_description="Backtest taking too long",
dimensions={"StateMachineArn": step_function_arn},
alarm_actions=[self.sns_topic.arn],
tags=get_tags("cloudwatch"),
)
Deployment Commands¶
# Initialize Pulumi project
cd infra
pulumi stack init dev
# Configure required secrets
pulumi config set aws:region us-east-1
pulumi config set --secret certificate_arn arn:aws:acm:...
pulumi config set --secret db_secret_arn arn:aws:secretsmanager:...
# Preview changes
pulumi preview
# Deploy infrastructure
pulumi up
# View outputs
pulumi stack output
# Destroy (dev only!)
pulumi destroy
Summary¶
| Component | Resource Count | Key Features |
|---|---|---|
| VPC/Networking | 15 | Multi-AZ, NAT instance |
| Security | 12 | Least privilege IAM |
| Compute | 8 | ECS Fargate, Lambda |
| Storage | 6 | S3, RDS, DynamoDB |
| Orchestration | 3 | Step Functions, SQS |
| Load Balancing | 5 | ALB with TLS |
| Total | ~49 | Production-ready |
Next Steps¶
- Copy this code structure to
infra/directory - Install Pulumi:
curl -fsSL https://get.pulumi.com | sh - Configure AWS credentials
- Initialize stack and deploy
See 08-IMPLEMENTATION-ROADMAP.md for detailed deployment timeline.