Skip to content

Services & Base Classes

Auto-generated API reference for TradAI service base classes.

BaseService

tradai.common.base_service.BaseService

Bases: LoggerMixin, Generic[S]

Base class for all TradAI services.

Provides: - Settings injection with type-safe access via Generic[S] - Lifecycle hooks (startup/shutdown) for resource management - Health check endpoint using settings metadata - Hydra configuration factory method

Services must define a settings_class attribute pointing to their Settings class.

Attributes:

Name Type Description
settings_class type[S]

Pydantic Settings class (must be overridden)

settings S

Type-safe validated settings instance (read-only property)

Example

class BacktesterSettings(Settings): ... service_name: str = Field(default="Backtester") ... strategy: str ... timeframe: str ... class BacktesterService(BaseService[BacktesterSettings]): ... settings_class = BacktesterSettings ... ... async def startup(self) -> None: ... await super().startup() ... self.db = await connect_database() ... ... async def shutdown(self) -> None: ... await self.db.close() ... await super().shutdown() ...

Create from Hydra config

service = BacktesterService.from_hydra_cfg(cfg, "backtester") await service.startup()

Source code in libs/tradai-common/src/tradai/common/base_service.py
class BaseService(LoggerMixin, Generic[S]):
    """
    Base class for all TradAI services.

    Provides:
    - Settings injection with type-safe access via Generic[S]
    - Lifecycle hooks (startup/shutdown) for resource management
    - Health check endpoint using settings metadata
    - Hydra configuration factory method

    Services must define a `settings_class` attribute pointing to their Settings class.

    Attributes:
        settings_class: Pydantic Settings class (must be overridden)
        settings: Type-safe validated settings instance (read-only property)

    Example:
        >>> class BacktesterSettings(Settings):
        ...     service_name: str = Field(default="Backtester")
        ...     strategy: str
        ...     timeframe: str
        ...
        >>> class BacktesterService(BaseService[BacktesterSettings]):
        ...     settings_class = BacktesterSettings
        ...
        ...     async def startup(self) -> None:
        ...         await super().startup()
        ...         self.db = await connect_database()
        ...
        ...     async def shutdown(self) -> None:
        ...         await self.db.close()
        ...         await super().shutdown()
        ...
        >>> # Create from Hydra config
        >>> service = BacktesterService.from_hydra_cfg(cfg, "backtester")
        >>> await service.startup()
    """

    settings_class: type[S]

    def __init__(self, settings: S) -> None:
        """
        Initialize service with validated settings.

        Args:
            settings: Validated Pydantic settings instance

        Example:
            >>> settings = BacktesterSettings(service_name="Backtester", strategy="Pascal")
            >>> service = BacktesterService(settings)
        """
        super().__init__()  # Initialize logger via LoggerMixin
        self._settings = settings
        self._health_service = HealthService(
            service_name=settings.service_name,
            service_version=settings.service_version,
        )

    @property
    def settings(self) -> S:
        """Type-safe access to service settings."""
        return self._settings

    @property
    def health_service(self) -> HealthService:
        """Access to health service for advanced health check operations."""
        return self._health_service

    def register_health_checker(self, checker: HealthChecker) -> None:
        """
        Register a health checker for dependency monitoring.

        Args:
            checker: HealthChecker instance for a dependency

        Example:
            >>> service.register_health_checker(RedisHealthChecker(redis_client))
        """
        self._health_service.register(checker)

    async def check_health(self) -> HealthResult:
        """
        Perform async health check with all registered dependency checkers.

        Executes all registered health checkers concurrently and returns
        aggregated result.

        Returns:
            HealthResult with overall status and individual check results

        Example:
            >>> result = await service.check_health()
            >>> if result.healthy:
            ...     print("All systems operational")
        """
        return await self._health_service.check()

    def health_check(self) -> HealthResult:
        """
        Perform synchronous health check with all registered dependencies.

        Uses sync wrapper around async health check. For async contexts,
        prefer check_health() instead.

        Returns:
            HealthResult with overall status and individual check results

        Example:
            >>> result = service.health_check()
            >>> result.healthy
            True
            >>> result.service
            'MyService'
        """
        return self._health_service.check_sync()

    async def startup(self) -> None:
        """
        Called on service startup. Override for initialization.

        Use for:
        - Database connections
        - HTTP client setup
        - Cache warming
        - External service connections

        Example:
            >>> async def startup(self) -> None:
            ...     await super().startup()
            ...     self.db = await connect_database()
            ...     self.client = httpx.AsyncClient()
        """
        self.logger.info(f"Starting {self.settings.service_name} v{self.settings.service_version}")

    async def shutdown(self) -> None:
        """
        Called on service shutdown. Override for cleanup.

        Use for:
        - Close database connections
        - Flush buffers
        - Release resources

        Example:
            >>> async def shutdown(self) -> None:
            ...     await self.client.aclose()
            ...     await self.db.close()
            ...     await super().shutdown()
        """
        self.logger.info(f"Shutting down {self.settings.service_name}")

    @classmethod
    def from_hydra_cfg(cls, cfg: DictConfig, config_name: str) -> Self:
        """
        Create service from Hydra configuration.

        Validates that settings_class is defined, then loads and validates
        configuration from Hydra.

        Args:
            cfg: Hydra DictConfig object
            config_name: Configuration section name

        Returns:
            Service instance with validated settings

        Raises:
            NotImplementedError: If settings_class not defined
            ValidationError: If configuration is invalid

        Example:
            >>> # config.yaml:
            >>> # backtester:
            >>> #   strategy: PascalStrategy
            >>> #   timeframe: 1h
            >>>
            >>> cfg = hydra.compose(config_name="config")
            >>> service = BacktesterService.from_hydra_cfg(cfg, "backtester")
        """
        if not hasattr(cls, "settings_class"):
            raise NotImplementedError(
                f"{cls.__name__} must define a `settings_class` class attribute. "
                f"Example: settings_class = {cls.__name__}Settings"
            )

        if not issubclass(cls.settings_class, Settings):
            raise TypeError(
                f"{cls.__name__}.settings_class must be a Settings subclass, "
                f"got {cls.settings_class.__name__!r}. "
                f"Ensure it inherits from tradai.common.Settings."
            )

        settings = cast(S, cls.settings_class.from_hydra_cfg(cfg, config_name))
        return cls(settings)

settings: S property

Type-safe access to service settings.

health_service: HealthService property

Access to health service for advanced health check operations.

register_health_checker(checker: HealthChecker) -> None

Register a health checker for dependency monitoring.

Parameters:

Name Type Description Default
checker HealthChecker

HealthChecker instance for a dependency

required
Example

service.register_health_checker(RedisHealthChecker(redis_client))

Source code in libs/tradai-common/src/tradai/common/base_service.py
def register_health_checker(self, checker: HealthChecker) -> None:
    """
    Register a health checker for dependency monitoring.

    Args:
        checker: HealthChecker instance for a dependency

    Example:
        >>> service.register_health_checker(RedisHealthChecker(redis_client))
    """
    self._health_service.register(checker)

check_health() -> HealthResult async

Perform async health check with all registered dependency checkers.

Executes all registered health checkers concurrently and returns aggregated result.

Returns:

Type Description
HealthResult

HealthResult with overall status and individual check results

Example

result = await service.check_health() if result.healthy: ... print("All systems operational")

Source code in libs/tradai-common/src/tradai/common/base_service.py
async def check_health(self) -> HealthResult:
    """
    Perform async health check with all registered dependency checkers.

    Executes all registered health checkers concurrently and returns
    aggregated result.

    Returns:
        HealthResult with overall status and individual check results

    Example:
        >>> result = await service.check_health()
        >>> if result.healthy:
        ...     print("All systems operational")
    """
    return await self._health_service.check()

health_check() -> HealthResult

Perform synchronous health check with all registered dependencies.

Uses sync wrapper around async health check. For async contexts, prefer check_health() instead.

Returns:

Type Description
HealthResult

HealthResult with overall status and individual check results

Example

result = service.health_check() result.healthy True result.service 'MyService'

Source code in libs/tradai-common/src/tradai/common/base_service.py
def health_check(self) -> HealthResult:
    """
    Perform synchronous health check with all registered dependencies.

    Uses sync wrapper around async health check. For async contexts,
    prefer check_health() instead.

    Returns:
        HealthResult with overall status and individual check results

    Example:
        >>> result = service.health_check()
        >>> result.healthy
        True
        >>> result.service
        'MyService'
    """
    return self._health_service.check_sync()

startup() -> None async

Called on service startup. Override for initialization.

Use for: - Database connections - HTTP client setup - Cache warming - External service connections

Example

async def startup(self) -> None: ... await super().startup() ... self.db = await connect_database() ... self.client = httpx.AsyncClient()

Source code in libs/tradai-common/src/tradai/common/base_service.py
async def startup(self) -> None:
    """
    Called on service startup. Override for initialization.

    Use for:
    - Database connections
    - HTTP client setup
    - Cache warming
    - External service connections

    Example:
        >>> async def startup(self) -> None:
        ...     await super().startup()
        ...     self.db = await connect_database()
        ...     self.client = httpx.AsyncClient()
    """
    self.logger.info(f"Starting {self.settings.service_name} v{self.settings.service_version}")

shutdown() -> None async

Called on service shutdown. Override for cleanup.

Use for: - Close database connections - Flush buffers - Release resources

Example

async def shutdown(self) -> None: ... await self.client.aclose() ... await self.db.close() ... await super().shutdown()

Source code in libs/tradai-common/src/tradai/common/base_service.py
async def shutdown(self) -> None:
    """
    Called on service shutdown. Override for cleanup.

    Use for:
    - Close database connections
    - Flush buffers
    - Release resources

    Example:
        >>> async def shutdown(self) -> None:
        ...     await self.client.aclose()
        ...     await self.db.close()
        ...     await super().shutdown()
    """
    self.logger.info(f"Shutting down {self.settings.service_name}")

from_hydra_cfg(cfg: DictConfig, config_name: str) -> Self classmethod

Create service from Hydra configuration.

Validates that settings_class is defined, then loads and validates configuration from Hydra.

Parameters:

Name Type Description Default
cfg DictConfig

Hydra DictConfig object

required
config_name str

Configuration section name

required

Returns:

Type Description
Self

Service instance with validated settings

Raises:

Type Description
NotImplementedError

If settings_class not defined

ValidationError

If configuration is invalid

Example

config.yaml:

backtester:

strategy: PascalStrategy

timeframe: 1h

cfg = hydra.compose(config_name="config") service = BacktesterService.from_hydra_cfg(cfg, "backtester")

Source code in libs/tradai-common/src/tradai/common/base_service.py
@classmethod
def from_hydra_cfg(cls, cfg: DictConfig, config_name: str) -> Self:
    """
    Create service from Hydra configuration.

    Validates that settings_class is defined, then loads and validates
    configuration from Hydra.

    Args:
        cfg: Hydra DictConfig object
        config_name: Configuration section name

    Returns:
        Service instance with validated settings

    Raises:
        NotImplementedError: If settings_class not defined
        ValidationError: If configuration is invalid

    Example:
        >>> # config.yaml:
        >>> # backtester:
        >>> #   strategy: PascalStrategy
        >>> #   timeframe: 1h
        >>>
        >>> cfg = hydra.compose(config_name="config")
        >>> service = BacktesterService.from_hydra_cfg(cfg, "backtester")
    """
    if not hasattr(cls, "settings_class"):
        raise NotImplementedError(
            f"{cls.__name__} must define a `settings_class` class attribute. "
            f"Example: settings_class = {cls.__name__}Settings"
        )

    if not issubclass(cls.settings_class, Settings):
        raise TypeError(
            f"{cls.__name__}.settings_class must be a Settings subclass, "
            f"got {cls.settings_class.__name__!r}. "
            f"Ensure it inherits from tradai.common.Settings."
        )

    settings = cast(S, cls.settings_class.from_hydra_cfg(cfg, config_name))
    return cls(settings)

Settings

tradai.common.base_settings.Settings

Bases: BaseSettings

Base settings class for all TradAI services.

Provides common configuration fields that all services need. Services inherit and override defaults as needed.

Uses Pydantic for validation with strict mode: - frozen=True: Immutable after creation - extra="ignore": Ignore unknown env vars (services share .env files) - validate_assignment=True: Validate on field assignment

Supports loading configuration from: 1. Environment variables (with optional prefix in subclass) 2. Environment variable S3_CONFIG pointing to S3 URI 3. Local YAML file via Hydra

Example

class MyServiceSettings(Settings): ... service_name: str = Field(default="MyService") ... port: int = Field(default=8001) # Override default ... api_key: str # Service-specific required field ... settings = MyServiceSettings(api_key="secret") settings.service_name 'MyService'

Source code in libs/tradai-common/src/tradai/common/base_settings.py
class Settings(BaseSettings):
    """
    Base settings class for all TradAI services.

    Provides common configuration fields that all services need.
    Services inherit and override defaults as needed.

    Uses Pydantic for validation with strict mode:
    - frozen=True: Immutable after creation
    - extra="ignore": Ignore unknown env vars (services share .env files)
    - validate_assignment=True: Validate on field assignment

    Supports loading configuration from:
    1. Environment variables (with optional prefix in subclass)
    2. Environment variable S3_CONFIG pointing to S3 URI
    3. Local YAML file via Hydra

    Example:
        >>> class MyServiceSettings(Settings):
        ...     service_name: str = Field(default="MyService")
        ...     port: int = Field(default=8001)  # Override default
        ...     api_key: str  # Service-specific required field
        ...
        >>> settings = MyServiceSettings(api_key="secret")
        >>> settings.service_name
        'MyService'
    """

    model_config = SettingsConfigDict(
        frozen=True,  # Immutable - thread-safe
        extra="ignore",  # Ignore unknown env vars (services share .env files)
        validate_assignment=True,  # Validate on assignment
        # Note: Subclasses should specify their own env_file paths
        env_file_encoding="utf-8",
    )

    # Service identity
    service_name: str = Field(..., description="Service display name")
    service_version: str = Field(default="0.1.0", description="Service version (semver)")

    # Server settings (0.0.0.0 is intentional for containerized services)
    host: str = Field(default="0.0.0.0", description="Server bind host")  # nosec B104
    port: int = Field(default=8000, ge=1, le=65535, description="Server bind port")
    debug: bool = Field(default=False, description="Enable debug mode")
    log_level: str = Field(default="INFO", description="Logging level")

    # CORS settings (empty list = CORS disabled)
    cors_origins: list[str] = Field(
        default_factory=list,
        description="Allowed CORS origins (empty = CORS disabled)",
    )

    @field_validator("cors_origins", mode="before")
    @classmethod
    def parse_cors_origins(cls, v: str | list[str] | None) -> list[str]:
        """Parse CORS origins from JSON list or comma-separated string.

        Supports formats:
        - JSON list: '["http://localhost:3000","http://example.com"]'
        - Comma-separated: 'http://localhost:3000,http://example.com'
        - Already a list: ['http://localhost:3000', 'http://example.com']

        Args:
            v: Input value (string, list, or None)

        Returns:
            List of origin URLs
        """
        if v is None:
            return []
        if isinstance(v, list):
            return v
        if isinstance(v, str):
            v = v.strip()
            if not v:
                return []
            # Try JSON first
            if v.startswith("["):
                try:
                    parsed = json.loads(v)
                    if isinstance(parsed, list):
                        return [str(item).strip() for item in parsed if item]
                except json.JSONDecodeError:
                    pass
            # Fall back to comma-separated
            return [item.strip() for item in v.split(",") if item.strip()]
        return []

    @classmethod
    def _find_project_root(cls, start_path: Path) -> Path:
        """
        Find project root by walking up from start_path.

        Looks for common project root markers (pyproject.toml, .git, configs/).

        Args:
            start_path: Path to start searching from

        Returns:
            Project root directory, or start_path.parent if not found
        """
        current = start_path.parent if start_path.is_file() else start_path
        markers = {"pyproject.toml", ".git", "configs"}

        for _ in range(10):  # Max 10 levels up
            if any((current / marker).exists() for marker in markers):
                return current
            parent = current.parent
            if parent == current:  # Reached filesystem root
                break
            current = parent

        # Fallback to start path's parent
        return start_path.parent if start_path.is_file() else start_path

    @classmethod
    def prepare_config(
        cls,
        file_path: str,
        file_name: str = "config.yaml",
        config_dir: str | Path | None = None,
    ) -> tuple[Path, str]:
        """
        Prepare configuration file path, downloading from S3 if needed.

        Checks S3_CONFIG environment variable. If set, downloads config from S3.
        Otherwise, uses local file path.

        Args:
            file_path: Path to caller file (typically __file__)
            file_name: Config filename (default: "config.yaml")
            config_dir: Explicit config directory. If None, finds project root
                        and uses <project_root>/configs/

        Returns:
            Tuple of (config_directory, config_filename)

        Raises:
            ValidationError: If S3 URI is invalid
            ExternalServiceError: If S3 download fails

        Example:
            >>> # Auto-detect project root
            >>> config_dir, config_file = Settings.prepare_config(__file__)
            >>> # config_dir = /path/to/project/configs
            >>> # config_file = config.yaml
            >>>
            >>> # Explicit config directory
            >>> config_dir, config_file = Settings.prepare_config(__file__, config_dir="./my_configs")
            >>>
            >>> # With S3 (S3_CONFIG=s3://bucket/configs/prod.yaml)
            >>> config_dir, config_file = Settings.prepare_config(__file__)
            >>> # Downloads to /path/to/project/configs/prod.yaml
        """
        if config_dir is not None:
            local_dir = Path(config_dir)
        else:
            # Find project root and use configs/ directory
            project_root = cls._find_project_root(Path(file_path))
            local_dir = project_root / "configs"

        s3_uri = resolve_key(S3_CONFIG)
        if s3_uri:
            # Lazy import - only needed when using S3 config
            from tradai.common.aws.s3_utils import download_from_s3

            # Download config from S3
            local_dir.mkdir(parents=True, exist_ok=True)

            # Extract filename from S3 key
            s3_filename = Path(s3_uri).name
            local_path = local_dir / s3_filename

            logger.info(f"Downloading config from S3: {s3_uri}")
            download_from_s3(s3_uri, local_path, create_dirs=True)
            logger.info(f"Downloaded config to: {local_path}")

            return local_dir, s3_filename

        # Use local config
        return local_dir, file_name

    @classmethod
    def from_hydra_cfg(cls, cfg: DictConfig, config_name: str) -> Settings:
        """
        Create settings from Hydra configuration.

        Extracts a specific section from Hydra config and validates it.

        Args:
            cfg: Hydra DictConfig object
            config_name: Section name in config (e.g., "backtester", "strategy")

        Returns:
            Validated settings instance

        Raises:
            ConfigurationError: If config section is missing
            ValidationError: If configuration values are invalid

        Example:
            >>> # config.yaml:
            >>> # backtester:
            >>> #   service_name: Backtester
            >>> #   strategy: PascalStrategy
            >>> #   timeframe: 1h
            >>>
            >>> cfg = hydra.compose(config_name="config")
            >>> settings = BacktesterSettings.from_hydra_cfg(cfg, "backtester")
        """
        # Lazy import - only needed when using Hydra configuration
        from omegaconf import OmegaConf

        config_dict = OmegaConf.to_container(cfg, resolve=True)
        section = config_dict.get(config_name) if isinstance(config_dict, dict) else None

        if section is None:
            raise ConfigurationError(
                f"Missing required config section '{config_name}'. "
                f"Expected a '{config_name}:' section in your config.yaml file."
            )

        try:
            return cls.model_validate(section)
        except Exception as e:
            logger.error(f"{cls.__name__}.from_hydra_cfg failed: {e}")
            raise

parse_cors_origins(v: str | list[str] | None) -> list[str] classmethod

Parse CORS origins from JSON list or comma-separated string.

Supports formats: - JSON list: '["http://localhost:3000","http://example.com"]' - Comma-separated: 'http://localhost:3000,http://example.com' - Already a list: ['http://localhost:3000', 'http://example.com']

Parameters:

Name Type Description Default
v str | list[str] | None

Input value (string, list, or None)

required

Returns:

Type Description
list[str]

List of origin URLs

Source code in libs/tradai-common/src/tradai/common/base_settings.py
@field_validator("cors_origins", mode="before")
@classmethod
def parse_cors_origins(cls, v: str | list[str] | None) -> list[str]:
    """Parse CORS origins from JSON list or comma-separated string.

    Supports formats:
    - JSON list: '["http://localhost:3000","http://example.com"]'
    - Comma-separated: 'http://localhost:3000,http://example.com'
    - Already a list: ['http://localhost:3000', 'http://example.com']

    Args:
        v: Input value (string, list, or None)

    Returns:
        List of origin URLs
    """
    if v is None:
        return []
    if isinstance(v, list):
        return v
    if isinstance(v, str):
        v = v.strip()
        if not v:
            return []
        # Try JSON first
        if v.startswith("["):
            try:
                parsed = json.loads(v)
                if isinstance(parsed, list):
                    return [str(item).strip() for item in parsed if item]
            except json.JSONDecodeError:
                pass
        # Fall back to comma-separated
        return [item.strip() for item in v.split(",") if item.strip()]
    return []

prepare_config(file_path: str, file_name: str = 'config.yaml', config_dir: str | Path | None = None) -> tuple[Path, str] classmethod

Prepare configuration file path, downloading from S3 if needed.

Checks S3_CONFIG environment variable. If set, downloads config from S3. Otherwise, uses local file path.

Parameters:

Name Type Description Default
file_path str

Path to caller file (typically file)

required
file_name str

Config filename (default: "config.yaml")

'config.yaml'
config_dir str | Path | None

Explicit config directory. If None, finds project root and uses /configs/

None

Returns:

Type Description
tuple[Path, str]

Tuple of (config_directory, config_filename)

Raises:

Type Description
ValidationError

If S3 URI is invalid

ExternalServiceError

If S3 download fails

Example

Auto-detect project root

config_dir, config_file = Settings.prepare_config(file)

config_dir = /path/to/project/configs

config_file = config.yaml

Explicit config directory

config_dir, config_file = Settings.prepare_config(file, config_dir="./my_configs")

With S3 (S3_CONFIG=s3://bucket/configs/prod.yaml)

config_dir, config_file = Settings.prepare_config(file)

Downloads to /path/to/project/configs/prod.yaml

Source code in libs/tradai-common/src/tradai/common/base_settings.py
@classmethod
def prepare_config(
    cls,
    file_path: str,
    file_name: str = "config.yaml",
    config_dir: str | Path | None = None,
) -> tuple[Path, str]:
    """
    Prepare configuration file path, downloading from S3 if needed.

    Checks S3_CONFIG environment variable. If set, downloads config from S3.
    Otherwise, uses local file path.

    Args:
        file_path: Path to caller file (typically __file__)
        file_name: Config filename (default: "config.yaml")
        config_dir: Explicit config directory. If None, finds project root
                    and uses <project_root>/configs/

    Returns:
        Tuple of (config_directory, config_filename)

    Raises:
        ValidationError: If S3 URI is invalid
        ExternalServiceError: If S3 download fails

    Example:
        >>> # Auto-detect project root
        >>> config_dir, config_file = Settings.prepare_config(__file__)
        >>> # config_dir = /path/to/project/configs
        >>> # config_file = config.yaml
        >>>
        >>> # Explicit config directory
        >>> config_dir, config_file = Settings.prepare_config(__file__, config_dir="./my_configs")
        >>>
        >>> # With S3 (S3_CONFIG=s3://bucket/configs/prod.yaml)
        >>> config_dir, config_file = Settings.prepare_config(__file__)
        >>> # Downloads to /path/to/project/configs/prod.yaml
    """
    if config_dir is not None:
        local_dir = Path(config_dir)
    else:
        # Find project root and use configs/ directory
        project_root = cls._find_project_root(Path(file_path))
        local_dir = project_root / "configs"

    s3_uri = resolve_key(S3_CONFIG)
    if s3_uri:
        # Lazy import - only needed when using S3 config
        from tradai.common.aws.s3_utils import download_from_s3

        # Download config from S3
        local_dir.mkdir(parents=True, exist_ok=True)

        # Extract filename from S3 key
        s3_filename = Path(s3_uri).name
        local_path = local_dir / s3_filename

        logger.info(f"Downloading config from S3: {s3_uri}")
        download_from_s3(s3_uri, local_path, create_dirs=True)
        logger.info(f"Downloaded config to: {local_path}")

        return local_dir, s3_filename

    # Use local config
    return local_dir, file_name

from_hydra_cfg(cfg: DictConfig, config_name: str) -> Settings classmethod

Create settings from Hydra configuration.

Extracts a specific section from Hydra config and validates it.

Parameters:

Name Type Description Default
cfg DictConfig

Hydra DictConfig object

required
config_name str

Section name in config (e.g., "backtester", "strategy")

required

Returns:

Type Description
Settings

Validated settings instance

Raises:

Type Description
ConfigurationError

If config section is missing

ValidationError

If configuration values are invalid

Example

config.yaml:

backtester:

service_name: Backtester

strategy: PascalStrategy

timeframe: 1h

cfg = hydra.compose(config_name="config") settings = BacktesterSettings.from_hydra_cfg(cfg, "backtester")

Source code in libs/tradai-common/src/tradai/common/base_settings.py
@classmethod
def from_hydra_cfg(cls, cfg: DictConfig, config_name: str) -> Settings:
    """
    Create settings from Hydra configuration.

    Extracts a specific section from Hydra config and validates it.

    Args:
        cfg: Hydra DictConfig object
        config_name: Section name in config (e.g., "backtester", "strategy")

    Returns:
        Validated settings instance

    Raises:
        ConfigurationError: If config section is missing
        ValidationError: If configuration values are invalid

    Example:
        >>> # config.yaml:
        >>> # backtester:
        >>> #   service_name: Backtester
        >>> #   strategy: PascalStrategy
        >>> #   timeframe: 1h
        >>>
        >>> cfg = hydra.compose(config_name="config")
        >>> settings = BacktesterSettings.from_hydra_cfg(cfg, "backtester")
    """
    # Lazy import - only needed when using Hydra configuration
    from omegaconf import OmegaConf

    config_dict = OmegaConf.to_container(cfg, resolve=True)
    section = config_dict.get(config_name) if isinstance(config_dict, dict) else None

    if section is None:
        raise ConfigurationError(
            f"Missing required config section '{config_name}'. "
            f"Expected a '{config_name}:' section in your config.yaml file."
        )

    try:
        return cls.model_validate(section)
    except Exception as e:
        logger.error(f"{cls.__name__}.from_hydra_cfg failed: {e}")
        raise

LoggerMixin

tradai.common.logger_mixin.LoggerMixin

Mixin providing logging capabilities.

Uses cooperative multiple inheritance pattern - always calls super().init() to ensure proper MRO (Method Resolution Order) traversal.

Attributes:

Name Type Description
_logger Logger | None

Internal logger instance

LOGGER_FORMAT str

Default log message format

Example

class MyService(LoggerMixin): ... def do_work(self): ... self.logger.info("Working...") ... service = MyService() service.do_work() [2024-01-01][MyService][INFO] - Working...

Source code in libs/tradai-common/src/tradai/common/logger_mixin.py
class LoggerMixin:
    """
    Mixin providing logging capabilities.

    Uses cooperative multiple inheritance pattern - always calls super().__init__()
    to ensure proper MRO (Method Resolution Order) traversal.

    Attributes:
        _logger: Internal logger instance
        LOGGER_FORMAT: Default log message format

    Example:
        >>> class MyService(LoggerMixin):
        ...     def do_work(self):
        ...         self.logger.info("Working...")
        ...
        >>> service = MyService()
        >>> service.do_work()
        [2024-01-01][MyService][INFO] - Working...
    """

    _logger: logging.Logger | None = None
    LOGGER_FORMAT: str = "[%(asctime)s][%(name)s][%(levelname)s] - %(message)s"

    def __init__(self, *args: object, **kwargs: object) -> None:
        """
        Initialize mixin and logger.

        Calls super().__init__() for cooperative multiple inheritance.
        Initializes logger with class-specific name.
        """
        super().__init__(*args, **kwargs)  # Cooperative multiple inheritance
        self._initialize_logger()

    def _initialize_logger(self, level: int = logging.INFO) -> None:
        """
        Initialize logger with structured format.

        Creates logger with fully qualified class name (module + class).

        Args:
            level: Logging level (default: INFO)
        """
        logging.basicConfig(format=self.LOGGER_FORMAT)
        logger_name = f"{self.__class__.__module__}.{self.__class__.__name__}"
        logger = logging.getLogger(logger_name)
        logger.setLevel(level)
        logger.propagate = True  # Allow logs to propagate to parent handlers
        self._logger = logger

    @property
    def logger(self) -> logging.Logger:
        """
        Get logger instance.

        Returns:
            Configured logger

        Raises:
            LoggerNotInitializedError: If logger not initialized
        """
        if self._logger is None:
            # Auto-initialize if accessed before __init__ (e.g., after unpickling)
            self._initialize_logger()
        return self._logger  # type: ignore[return-value]

    def __getstate__(self) -> dict[str, Any]:
        """Get state for pickling, excluding unpicklable logger."""
        state = self.__dict__.copy()
        # Remove logger - will be re-initialized on unpickle
        state.pop("_logger", None)
        return state

    def __setstate__(self, state: dict[str, Any]) -> None:
        """Restore state after unpickling and re-initialize logger."""
        self.__dict__.update(state)
        self._logger = None  # Will be lazy-initialized on first access

    @staticmethod
    def get_hydra_logfile_dir_and_path() -> tuple[str, str]:
        """
        Get Hydra output directory and log file path.

        Useful when running services with Hydra configuration management.

        Returns:
            Tuple of (output_dir, log_path)

        Example:
            >>> log_dir, log_path = LoggerMixin.get_hydra_logfile_dir_and_path()
            >>> print(f"Logs at: {log_path}")
            Logs at: outputs/2024-01-01/12-00-00/service.log
        """
        from hydra.core.hydra_config import HydraConfig

        script_basename = os.path.splitext(os.path.basename(sys.argv[0]))[0]
        hydra_output_dir = HydraConfig.get().runtime.output_dir
        hydra_log_path = (Path(hydra_output_dir) / f"{script_basename}.log").as_posix()
        return hydra_output_dir, hydra_log_path

logger: logging.Logger property

Get logger instance.

Returns:

Type Description
Logger

Configured logger

Raises:

Type Description
LoggerNotInitializedError

If logger not initialized

get_hydra_logfile_dir_and_path() -> tuple[str, str] staticmethod

Get Hydra output directory and log file path.

Useful when running services with Hydra configuration management.

Returns:

Type Description
tuple[str, str]

Tuple of (output_dir, log_path)

Example

log_dir, log_path = LoggerMixin.get_hydra_logfile_dir_and_path() print(f"Logs at: {log_path}") Logs at: outputs/2024-01-01/12-00-00/service.log

Source code in libs/tradai-common/src/tradai/common/logger_mixin.py
@staticmethod
def get_hydra_logfile_dir_and_path() -> tuple[str, str]:
    """
    Get Hydra output directory and log file path.

    Useful when running services with Hydra configuration management.

    Returns:
        Tuple of (output_dir, log_path)

    Example:
        >>> log_dir, log_path = LoggerMixin.get_hydra_logfile_dir_and_path()
        >>> print(f"Logs at: {log_path}")
        Logs at: outputs/2024-01-01/12-00-00/service.log
    """
    from hydra.core.hydra_config import HydraConfig

    script_basename = os.path.splitext(os.path.basename(sys.argv[0]))[0]
    hydra_output_dir = HydraConfig.get().runtime.output_dir
    hydra_log_path = (Path(hydra_output_dir) / f"{script_basename}.log").as_posix()
    return hydra_output_dir, hydra_log_path