Skip to content

Adapters & Repositories

Auto-generated API reference for key adapters and service classes.

ArcticDB Adapter

tradai.data.infrastructure.adapters.arctic_adapter.ArcticAdapter

Bases: LoggerMixin, DataAdapter

ArcticDB storage adapter for OHLCV market data.

Implements DataAdapter interface for persisting data to ArcticDB on S3. Uses dependency injection for testability - accepts optional library client.

Connection string format: s3s://endpoint:bucket?aws_auth=true

Attributes:

Name Type Description
SYMBOL_SEPARATOR

Separator used to normalize symbol names

Example

Default: creates ArcticDB connection lazily

adapter = ArcticAdapter( ... bucket="my-bucket", ... endpoint="s3.eu-central-1.amazonaws.com", ... library_name="futures" ... )

For testing: inject mock library

mock_library = Mock() adapter = ArcticAdapter( ... bucket="my-bucket", ... library_name="futures", ... arctic_library=mock_library ... )

Source code in libs/tradai-data/src/tradai/data/infrastructure/adapters/arctic_adapter.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
class ArcticAdapter(LoggerMixin, DataAdapter):
    """
    ArcticDB storage adapter for OHLCV market data.

    Implements DataAdapter interface for persisting data to ArcticDB on S3.
    Uses dependency injection for testability - accepts optional library client.

    Connection string format: s3s://endpoint:bucket?aws_auth=true

    Attributes:
        SYMBOL_SEPARATOR: Separator used to normalize symbol names

    Example:
        >>> # Default: creates ArcticDB connection lazily
        >>> adapter = ArcticAdapter(
        ...     bucket="my-bucket",
        ...     endpoint="s3.eu-central-1.amazonaws.com",
        ...     library_name="futures"
        ... )

        >>> # For testing: inject mock library
        >>> mock_library = Mock()
        >>> adapter = ArcticAdapter(
        ...     bucket="my-bucket",
        ...     library_name="futures",
        ...     arctic_library=mock_library
        ... )
    """

    SYMBOL_SEPARATOR = "__"

    @staticmethod
    def _get_arcticdb_request_type(type_name: str) -> Any | None:
        """Get request type from arcticdb if installed, else None."""
        try:
            module = importlib.import_module("arcticdb")
        except ImportError:
            return None
        return getattr(module, type_name, None)

    def __init__(
        self,
        bucket: str,
        library_name: str,
        endpoint: str = "s3.eu-central-1.amazonaws.com",
        arctic_library: ArcticLibraryProtocol | None = None,
        *,
        use_ssl: bool = True,
        access_key: str | None = None,
        secret_key: str | None = None,
        region: str = "eu-central-1",
        use_virtual_addressing: bool = True,
    ):
        """
        Initialize ArcticDB adapter.

        Args:
            bucket: S3 bucket name for ArcticDB storage
            library_name: ArcticDB library name (e.g., "futures_1h")
            endpoint: S3 endpoint (default: eu-central-1)
            arctic_library: Optional pre-configured library for testing
            use_ssl: Use SSL/TLS (default: True). Set False for LocalStack.
            access_key: Optional explicit access key (for LocalStack/MinIO)
            secret_key: Optional explicit secret key (for LocalStack/MinIO)
            region: AWS region (default: eu-central-1)
            use_virtual_addressing: Use virtual-hosted style URLs (default: True).
                Set False for LocalStack/MinIO path-style addressing.

        Example:
            >>> # AWS S3 (default - uses aws_auth)
            >>> adapter = ArcticAdapter(bucket="tradai", library_name="futures")

            >>> # LocalStack (explicit credentials, no SSL, path-style)
            >>> adapter = ArcticAdapter(
            ...     bucket="tradai",
            ...     library_name="futures",
            ...     endpoint="localstack:4566",
            ...     use_ssl=False,
            ...     access_key="test",
            ...     secret_key="test",
            ...     use_virtual_addressing=False,
            ... )
        """
        super().__init__()

        # H9: Block LocalStack endpoints in production (SSL enabled = production indicator)
        if use_ssl and endpoint:
            endpoint_lower = endpoint.lower()
            if "localstack" in endpoint_lower or "localhost:4566" in endpoint_lower:
                from tradai.common.exceptions import ConfigurationError

                raise ConfigurationError(
                    f"ArcticDB endpoint '{endpoint}' appears to be LocalStack, "
                    f"but use_ssl=True indicates production. "
                    f"Set use_ssl=False for LocalStack or use a production endpoint."
                )

        self._bucket = bucket
        self._library_name = library_name
        self._endpoint = endpoint
        self._arctic_library: ArcticLibraryProtocol | None = arctic_library
        self._connection: Any = None
        self._use_ssl = use_ssl
        self._access_key = access_key
        self._secret_key = secret_key
        self._region = region
        self._use_virtual_addressing = use_virtual_addressing

    @property
    def library(self) -> ArcticLibraryProtocol:
        """
        Lazy-loaded ArcticDB library.

        Creates connection and gets library on first access.
        Uses aws_auth=true for AWS S3, or explicit credentials for LocalStack/MinIO.

        Returns:
            ArcticDB library instance

        Raises:
            ImportError: If ArcticDB is not installed
        """
        if self._arctic_library is None:
            try:
                from arcticdb import Arctic
            except ImportError as e:
                raise ImportError(
                    "ArcticDB is not installed. Install with: pip install arcticdb "
                    "(only available on Linux x86_64 and Windows)"
                ) from e

            s3_url = self._build_connection_string()
            self._connection = Arctic(s3_url)
            self._arctic_library = self._connection.get_library(
                self._library_name,
                create_if_missing=True,
            )
            self.logger.info(f"Connected to ArcticDB: {self._bucket}/{self._library_name}")

        return self._arctic_library

    def _build_connection_string(self) -> str:
        """
        Build ArcticDB S3 connection string.

        Format: s3[s]://host:bucket?port=X&params

        For LocalStack/MinIO, endpoint can be "host:port" and the port
        will be extracted and added as a query parameter.

        Returns:
            Connection string for ArcticDB
        """
        protocol = "s3s" if self._use_ssl else "s3"

        # Parse endpoint to extract host and optional port
        host = self._endpoint
        port: str | None = None
        if ":" in self._endpoint:
            parts = self._endpoint.rsplit(":", 1)
            # Check if the part after : is a port number (not part of hostname)
            if parts[1].isdigit():
                host = parts[0]
                port = parts[1]

        # Build query parameters
        params: list[str] = []

        # Add port if specified
        if port:
            params.append(f"port={port}")

        if self._access_key and self._secret_key:
            # Explicit credentials (LocalStack/MinIO)
            params.append(f"access={self._access_key}")
            params.append(f"secret={self._secret_key}")
            params.append(f"region={self._region}")
        else:
            # AWS credential delegation
            params.append("aws_auth=true")

        # Add virtual addressing option (false for LocalStack/MinIO)
        if not self._use_virtual_addressing:
            params.append("use_virtual_addressing=false")

        query_string = "&".join(params)
        return f"{protocol}://{host}:{self._bucket}?{query_string}"

    def save(
        self,
        data: OHLCVData,
        symbols: SymbolList,
        latest_query_date: datetime,
        timeframe: str | None = None,
    ) -> None:
        """
        Save OHLCV data to ArcticDB.

        Groups data by symbol and uses upsert semantics:
        - New symbols: uses write
        - Existing symbols: uses update (upserts rows by index)

        Stores metadata_version, last_query_date, last_candle_date,
        and optional timeframe in metadata for incremental updates.

        Args:
            data: Market data to save
            symbols: Symbols being saved (for metadata)
            latest_query_date: Timestamp of when data was queried
            timeframe: Optional CCXT timeframe string stored in metadata

        Raises:
            StorageError: If saving fails

        Example:
            >>> adapter.save(
            ...     data=ohlcv_data,
            ...     symbols=SymbolList.from_input(["BTC/USDT:USDT"]),
            ...     latest_query_date=datetime.now(UTC),
            ...     timeframe="1h",
            ... )
        """
        if data.is_empty:
            self.logger.warning("Attempted to save empty OHLCVData, skipping")
            return

        try:
            df = data.to_dataframe()
            grouped = df.groupby("symbol")

            saved_count = 0
            for symbol in symbols.to_list():
                if symbol not in grouped.groups:
                    continue

                symbol_df = grouped.get_group(symbol).copy()

                # Prepare for ArcticDB: drop symbol column, set date as index
                prepared_df = symbol_df.drop(columns=["symbol"]).set_index("date")

                arctic_symbol = self._normalize_symbol(symbol)
                metadata = self._build_symbol_metadata(
                    latest_query_date=latest_query_date,
                    last_candle_date=prepared_df.index.max(),
                    timeframe=timeframe,
                )

                # Use upsert semantics: update existing, write new
                if self.library.has_symbol(arctic_symbol):
                    # Update existing symbol (upserts rows by index)
                    self.library.update(
                        arctic_symbol,
                        prepared_df,
                        metadata=metadata,
                        upsert=True,
                        prune_previous_versions=True,
                    )
                    self.logger.debug(f"Updated symbol: {symbol}")
                else:
                    # Write new symbol
                    self.library.write(
                        arctic_symbol,
                        prepared_df,
                        metadata=metadata,
                        prune_previous_versions=True,
                    )
                    self.logger.debug(f"Created symbol: {symbol}")

                saved_count += 1

            if saved_count > 0:
                self.logger.info(f"Saved {saved_count} symbols, {data.row_count} rows")

        except (KeyError, ValueError, AttributeError) as e:
            raise StorageError(f"Data format error during save: {e}") from e
        except Exception as e:
            self.logger.error("Unexpected error during save: %s", e)
            raise StorageError(f"Failed to save data: {e}") from e

    def save_batch(
        self,
        data: OHLCVData,
        symbols: SymbolList,
        latest_query_date: datetime,
        timeframe: str | None = None,
    ) -> int:
        """
        Save OHLCV data to ArcticDB using batch write for better performance.

        Uses write_batch for 2-3x faster writes compared to sequential save().
        Best for initial data loads or when updating multiple symbols.

        Note: This method uses write (not update), so it replaces existing data.
        For incremental upserts, use save() instead.

        Args:
            data: Market data to save
            symbols: Symbols being saved (for metadata)
            latest_query_date: Timestamp of when data was queried
            timeframe: Optional CCXT timeframe string stored in metadata

        Returns:
            Number of symbols successfully saved

        Raises:
            StorageError: If batch save fails

        Example:
            >>> # Save 10 symbols in batch (2-3x faster than save())
            >>> count = adapter.save_batch(
            ...     data=ohlcv_data,
            ...     symbols=SymbolList.from_input(["BTC/USDT:USDT", ...]),
            ...     latest_query_date=datetime.now(UTC),
            ...     timeframe="1h",
            ... )
            >>> print(f"Saved {count} symbols")
        """
        if data.is_empty:
            self.logger.warning("Attempted to save_batch empty OHLCVData, skipping")
            return 0

        try:
            payloads = self._prepare_write_payloads(data, symbols, latest_query_date, timeframe)

            if not payloads:
                self.logger.warning("No data to save in batch")
                return 0

            success_count = self._execute_batch_write(payloads, data.row_count)
            return success_count

        except (KeyError, ValueError, AttributeError) as e:
            raise StorageError(f"Data format error during batch save: {e}") from e
        except Exception as e:
            self.logger.error("Unexpected error during batch save: %s", e)
            raise StorageError(f"Failed to batch save data: {e}") from e

    def _prepare_write_payloads(
        self,
        data: OHLCVData,
        symbols: SymbolList,
        latest_query_date: datetime,
        timeframe: str | None = None,
    ) -> list[Any]:
        """
        Prepare WritePayload objects for batch write operation.

        Args:
            data: Market data to prepare
            symbols: Symbols to prepare payloads for
            latest_query_date: Timestamp of when data was queried
            timeframe: Optional CCXT timeframe string stored in metadata

        Returns:
            List of WritePayload objects ready for batch write
        """
        ArcticWritePayload = self._get_arcticdb_request_type("WritePayload") or WritePayload

        df = data.to_dataframe()
        grouped = df.groupby("symbol")

        payloads = []
        for symbol in symbols.to_list():
            if symbol not in grouped.groups:
                continue

            symbol_df = grouped.get_group(symbol).copy()

            # Prepare for ArcticDB: drop symbol column, set date as index
            prepared_df = symbol_df.drop(columns=["symbol"]).set_index("date")

            arctic_symbol = self._normalize_symbol(symbol)
            metadata = self._build_symbol_metadata(
                latest_query_date=latest_query_date,
                last_candle_date=prepared_df.index.max(),
                timeframe=timeframe,
            )

            payloads.append(
                ArcticWritePayload(symbol=arctic_symbol, data=prepared_df, metadata=metadata)
            )

        return payloads

    def _execute_batch_write(self, payloads: list[Any], row_count: int) -> int:
        """
        Execute batch write and count successful writes.

        Args:
            payloads: List of WritePayload objects to write
            row_count: Total number of rows being written (for logging)

        Returns:
            Number of symbols successfully written
        """
        results = self.library.write_batch(payloads)

        # Count successes and log errors
        success_count = 0
        for result in results:
            # Check for DataError (has error_code attribute)
            if hasattr(result, "error_code"):
                self.logger.error(f"Batch write error for {result.symbol}: {result}")
            else:
                success_count += 1

        self.logger.info(f"Batch saved {success_count}/{len(payloads)} symbols, {row_count} rows")
        return success_count

    def load(
        self,
        symbols: SymbolList,
        date_range: DateRange,
    ) -> OHLCVData:
        """
        Load OHLCV data from ArcticDB.

        Uses read_batch for efficient multi-symbol loading.
        Reconstructs DataFrame with symbol column.

        Args:
            symbols: Symbols to load
            date_range: Date range to load

        Returns:
            OHLCVData from storage

        Raises:
            DataNotFoundError: If no data found for symbols
            StorageError: If loading fails

        Example:
            >>> data = adapter.load(
            ...     symbols=SymbolList.from_input(["BTC/USDT:USDT"]),
            ...     date_range=DateRange.from_strings("2024-01-01", "2024-01-31")
            ... )
        """
        try:
            # Prefer real arcticdb request types when available (production),
            # but fall back to local models for macOS/CI environments where
            # arcticdb isn't installable (tests inject a mock library).
            ArcticReadRequest = self._get_arcticdb_request_type("ReadRequest") or ReadRequest

            read_requests = [
                ArcticReadRequest(
                    symbol=self._normalize_symbol(symbol),
                    date_range=(date_range.start, date_range.end),
                )
                for symbol in symbols.to_list()
            ]

            results = self.library.read_batch(read_requests)

            dataframes = []
            for result in results:
                # Skip DataError results (check for 'data' attribute)
                if not hasattr(result, "data") or result.data is None:
                    continue
                if hasattr(result, "error_code"):
                    # This is a DataError, skip it
                    continue

                df = result.data.reset_index()  # date back as column

                # Denormalize symbol name back to trading format
                trading_symbol = self._denormalize_symbol(result.symbol)
                df.insert(0, "symbol", trading_symbol)

                dataframes.append(df)

            if not dataframes:
                raise DataNotFoundError(
                    f"No data found for {symbols.to_list()} "
                    f"between {date_range.start} and {date_range.end}"
                )

            combined_df = pd.concat(dataframes, ignore_index=True)
            return OHLCVData.from_dataframe(combined_df)

        except DataNotFoundError:
            raise
        except (KeyError, ValueError, AttributeError) as e:
            raise StorageError(f"Data format error during load: {e}") from e
        except Exception as e:
            self.logger.error("Unexpected error during load: %s", e)
            raise StorageError(f"Failed to load data: {e}") from e

    def exists(self, symbols: SymbolList) -> dict[str, bool]:
        """
        Check if data exists for given symbols.

        Uses has_symbol for efficient individual checks.

        Args:
            symbols: Symbols to check

        Returns:
            Dictionary mapping symbol -> exists (True/False)

        Example:
            >>> result = adapter.exists(
            ...     SymbolList.from_input(["BTC/USDT:USDT", "ETH/USDT:USDT"])
            ... )
            >>> result
            {'BTC/USDT:USDT': True, 'ETH/USDT:USDT': False}
        """
        return {
            symbol: self.library.has_symbol(self._normalize_symbol(symbol))
            for symbol in symbols.to_list()
        }

    def get_latest_date(self, symbols: SymbolList) -> dict[str, datetime]:
        """
        Get latest available date for each symbol.

        Returns the actual last candle date from stored data.
        Falls back to last_query_date for backwards compatibility with older data.

        Args:
            symbols: Symbols to check

        Returns:
            Dictionary mapping symbol -> latest candle date (only for existing symbols)

        Example:
            >>> result = adapter.get_latest_date(
            ...     SymbolList.from_input(["BTC/USDT:USDT"])
            ... )
            >>> result
            {'BTC/USDT:USDT': datetime(2024, 1, 31, 23, 0, 0)}
        """
        # Prefer real arcticdb request types when available (production),
        # but fall back to local models for macOS/CI environments.
        ArcticReadInfoRequest = (
            self._get_arcticdb_request_type("ReadInfoRequest") or ReadInfoRequest
        )

        read_info_requests = [
            ArcticReadInfoRequest(symbol=self._normalize_symbol(symbol))
            for symbol in symbols.to_list()
        ]

        results = self.library.read_metadata_batch(read_info_requests)

        latest_dates: dict[str, datetime] = {}
        for result in results:
            # Skip DataError results
            if hasattr(result, "error_code"):
                continue
            if not hasattr(result, "metadata") or result.metadata is None:
                continue

            trading_symbol = self._denormalize_symbol(result.symbol)

            if "metadata_version" not in result.metadata:
                self.logger.debug(f"Symbol '{trading_symbol}' has pre-versioning metadata")

            # Prefer last_candle_date (actual data end date)
            # Fall back to last_query_date for backwards compatibility
            date_str = result.metadata.get("last_candle_date") or result.metadata.get(
                "last_query_date"
            )
            if date_str:
                latest_dates[trading_symbol] = ensure_utc_naive(datetime.fromisoformat(date_str))

        return latest_dates

    def get_stored_timeframe(self, symbols: SymbolList) -> dict[str, str | None]:
        """Get stored timeframe for each symbol from ArcticDB metadata.

        Returns None for legacy data without timeframe metadata.

        Args:
            symbols: Symbols to check

        Returns:
            Dictionary mapping symbol -> timeframe string or None
        """
        ArcticReadInfoRequest = (
            self._get_arcticdb_request_type("ReadInfoRequest") or ReadInfoRequest
        )

        read_info_requests = [
            ArcticReadInfoRequest(symbol=self._normalize_symbol(symbol))
            for symbol in symbols.to_list()
        ]

        results = self.library.read_metadata_batch(read_info_requests)

        timeframes: dict[str, str | None] = {}
        for result in results:
            if hasattr(result, "error_code"):
                continue
            if not hasattr(result, "metadata") or result.metadata is None:
                continue

            trading_symbol = self._denormalize_symbol(result.symbol)
            timeframes[trading_symbol] = result.metadata.get("timeframe")

        return timeframes

    def list_symbols(self) -> list[str]:
        """
        List all symbols stored in ArcticDB.

        Returns symbols in trading format (e.g., "BTC/USDT:USDT").

        Returns:
            List of trading symbols stored in the library

        Example:
            >>> symbols = adapter.list_symbols()
            >>> symbols
            ['BTC/USDT:USDT', 'ETH/USDT:USDT']
        """
        arctic_symbols = self.library.list_symbols()
        return [self._denormalize_symbol(s) for s in arctic_symbols]

    def delete(self, symbols: SymbolList) -> dict[str, bool]:
        """
        Delete data for given symbols from ArcticDB.

        Args:
            symbols: Symbols to delete

        Returns:
            Dictionary mapping symbol -> deleted (True if existed and deleted)

        Raises:
            StorageError: If deletion fails

        Example:
            >>> result = adapter.delete(
            ...     SymbolList.from_input(["BTC/USDT:USDT"])
            ... )
            >>> result
            {'BTC/USDT:USDT': True}
        """
        try:
            results: dict[str, bool] = {}
            for symbol in symbols.to_list():
                arctic_symbol = self._normalize_symbol(symbol)
                if self.library.has_symbol(arctic_symbol):
                    self.library.delete(arctic_symbol)
                    results[symbol] = True
                    self.logger.info(f"Deleted symbol: {symbol}")
                else:
                    results[symbol] = False
            return results
        except Exception as e:
            raise StorageError(f"Failed to delete symbols: {e}") from e

    @staticmethod
    def _build_symbol_metadata(
        latest_query_date: datetime,
        last_candle_date: datetime,
        timeframe: str | None = None,
    ) -> dict[str, Any]:
        """Build metadata dict for a single symbol write.

        Args:
            latest_query_date: Timestamp of when data was queried
            last_candle_date: Timestamp of the last candle in the data
            timeframe: Optional CCXT timeframe string (e.g., "1h")

        Returns:
            Metadata dict with version, query date, candle date, and optional timeframe
        """
        meta: dict[str, Any] = {
            "metadata_version": 2,
            "last_query_date": latest_query_date.isoformat(),
            "last_candle_date": last_candle_date.isoformat(),
        }
        if timeframe is not None:
            meta["timeframe"] = timeframe
        return meta

    @classmethod
    def _normalize_symbol(cls, symbol: str) -> str:
        """
        Convert trading symbol to ArcticDB-safe format.

        ArcticDB symbols cannot contain / or : characters.

        Args:
            symbol: Trading symbol (e.g., "BTC/USDT:USDT")

        Returns:
            Normalized symbol (e.g., "BTC__USDT__USDT")

        Example:
            >>> ArcticAdapter._normalize_symbol("BTC/USDT:USDT")
            'BTC__USDT__USDT'
        """
        return symbol.replace("/", cls.SYMBOL_SEPARATOR).replace(":", cls.SYMBOL_SEPARATOR)

    @classmethod
    def _denormalize_symbol(cls, arctic_symbol: str) -> str:
        """
        Convert ArcticDB symbol back to trading format.

        Args:
            arctic_symbol: ArcticDB symbol (e.g., "BTC__USDT__USDT")

        Returns:
            Trading symbol (e.g., "BTC/USDT:USDT")

        Example:
            >>> ArcticAdapter._denormalize_symbol("BTC__USDT__USDT")
            'BTC/USDT:USDT'
        """
        parts = arctic_symbol.split(cls.SYMBOL_SEPARATOR)
        if len(parts) >= 3:
            # Futures: BTC__USDT__USDT -> BTC/USDT:USDT
            settle = cls.SYMBOL_SEPARATOR.join(parts[2:])
            return f"{parts[0]}/{parts[1]}:{settle}"
        elif len(parts) == 2:
            # Spot: BTC__USDT -> BTC/USDT
            return f"{parts[0]}/{parts[1]}"
        return arctic_symbol

library: ArcticLibraryProtocol property

Lazy-loaded ArcticDB library.

Creates connection and gets library on first access. Uses aws_auth=true for AWS S3, or explicit credentials for LocalStack/MinIO.

Returns:

Type Description
ArcticLibraryProtocol

ArcticDB library instance

Raises:

Type Description
ImportError

If ArcticDB is not installed

save(data: OHLCVData, symbols: SymbolList, latest_query_date: datetime, timeframe: str | None = None) -> None

Save OHLCV data to ArcticDB.

Groups data by symbol and uses upsert semantics: - New symbols: uses write - Existing symbols: uses update (upserts rows by index)

Stores metadata_version, last_query_date, last_candle_date, and optional timeframe in metadata for incremental updates.

Parameters:

Name Type Description Default
data OHLCVData

Market data to save

required
symbols SymbolList

Symbols being saved (for metadata)

required
latest_query_date datetime

Timestamp of when data was queried

required
timeframe str | None

Optional CCXT timeframe string stored in metadata

None

Raises:

Type Description
StorageError

If saving fails

Example

adapter.save( ... data=ohlcv_data, ... symbols=SymbolList.from_input(["BTC/USDT:USDT"]), ... latest_query_date=datetime.now(UTC), ... timeframe="1h", ... )

Source code in libs/tradai-data/src/tradai/data/infrastructure/adapters/arctic_adapter.py
def save(
    self,
    data: OHLCVData,
    symbols: SymbolList,
    latest_query_date: datetime,
    timeframe: str | None = None,
) -> None:
    """
    Save OHLCV data to ArcticDB.

    Groups data by symbol and uses upsert semantics:
    - New symbols: uses write
    - Existing symbols: uses update (upserts rows by index)

    Stores metadata_version, last_query_date, last_candle_date,
    and optional timeframe in metadata for incremental updates.

    Args:
        data: Market data to save
        symbols: Symbols being saved (for metadata)
        latest_query_date: Timestamp of when data was queried
        timeframe: Optional CCXT timeframe string stored in metadata

    Raises:
        StorageError: If saving fails

    Example:
        >>> adapter.save(
        ...     data=ohlcv_data,
        ...     symbols=SymbolList.from_input(["BTC/USDT:USDT"]),
        ...     latest_query_date=datetime.now(UTC),
        ...     timeframe="1h",
        ... )
    """
    if data.is_empty:
        self.logger.warning("Attempted to save empty OHLCVData, skipping")
        return

    try:
        df = data.to_dataframe()
        grouped = df.groupby("symbol")

        saved_count = 0
        for symbol in symbols.to_list():
            if symbol not in grouped.groups:
                continue

            symbol_df = grouped.get_group(symbol).copy()

            # Prepare for ArcticDB: drop symbol column, set date as index
            prepared_df = symbol_df.drop(columns=["symbol"]).set_index("date")

            arctic_symbol = self._normalize_symbol(symbol)
            metadata = self._build_symbol_metadata(
                latest_query_date=latest_query_date,
                last_candle_date=prepared_df.index.max(),
                timeframe=timeframe,
            )

            # Use upsert semantics: update existing, write new
            if self.library.has_symbol(arctic_symbol):
                # Update existing symbol (upserts rows by index)
                self.library.update(
                    arctic_symbol,
                    prepared_df,
                    metadata=metadata,
                    upsert=True,
                    prune_previous_versions=True,
                )
                self.logger.debug(f"Updated symbol: {symbol}")
            else:
                # Write new symbol
                self.library.write(
                    arctic_symbol,
                    prepared_df,
                    metadata=metadata,
                    prune_previous_versions=True,
                )
                self.logger.debug(f"Created symbol: {symbol}")

            saved_count += 1

        if saved_count > 0:
            self.logger.info(f"Saved {saved_count} symbols, {data.row_count} rows")

    except (KeyError, ValueError, AttributeError) as e:
        raise StorageError(f"Data format error during save: {e}") from e
    except Exception as e:
        self.logger.error("Unexpected error during save: %s", e)
        raise StorageError(f"Failed to save data: {e}") from e

save_batch(data: OHLCVData, symbols: SymbolList, latest_query_date: datetime, timeframe: str | None = None) -> int

Save OHLCV data to ArcticDB using batch write for better performance.

Uses write_batch for 2-3x faster writes compared to sequential save(). Best for initial data loads or when updating multiple symbols.

Note: This method uses write (not update), so it replaces existing data. For incremental upserts, use save() instead.

Parameters:

Name Type Description Default
data OHLCVData

Market data to save

required
symbols SymbolList

Symbols being saved (for metadata)

required
latest_query_date datetime

Timestamp of when data was queried

required
timeframe str | None

Optional CCXT timeframe string stored in metadata

None

Returns:

Type Description
int

Number of symbols successfully saved

Raises:

Type Description
StorageError

If batch save fails

Example

Save 10 symbols in batch (2-3x faster than save())

count = adapter.save_batch( ... data=ohlcv_data, ... symbols=SymbolList.from_input(["BTC/USDT:USDT", ...]), ... latest_query_date=datetime.now(UTC), ... timeframe="1h", ... ) print(f"Saved {count} symbols")

Source code in libs/tradai-data/src/tradai/data/infrastructure/adapters/arctic_adapter.py
def save_batch(
    self,
    data: OHLCVData,
    symbols: SymbolList,
    latest_query_date: datetime,
    timeframe: str | None = None,
) -> int:
    """
    Save OHLCV data to ArcticDB using batch write for better performance.

    Uses write_batch for 2-3x faster writes compared to sequential save().
    Best for initial data loads or when updating multiple symbols.

    Note: This method uses write (not update), so it replaces existing data.
    For incremental upserts, use save() instead.

    Args:
        data: Market data to save
        symbols: Symbols being saved (for metadata)
        latest_query_date: Timestamp of when data was queried
        timeframe: Optional CCXT timeframe string stored in metadata

    Returns:
        Number of symbols successfully saved

    Raises:
        StorageError: If batch save fails

    Example:
        >>> # Save 10 symbols in batch (2-3x faster than save())
        >>> count = adapter.save_batch(
        ...     data=ohlcv_data,
        ...     symbols=SymbolList.from_input(["BTC/USDT:USDT", ...]),
        ...     latest_query_date=datetime.now(UTC),
        ...     timeframe="1h",
        ... )
        >>> print(f"Saved {count} symbols")
    """
    if data.is_empty:
        self.logger.warning("Attempted to save_batch empty OHLCVData, skipping")
        return 0

    try:
        payloads = self._prepare_write_payloads(data, symbols, latest_query_date, timeframe)

        if not payloads:
            self.logger.warning("No data to save in batch")
            return 0

        success_count = self._execute_batch_write(payloads, data.row_count)
        return success_count

    except (KeyError, ValueError, AttributeError) as e:
        raise StorageError(f"Data format error during batch save: {e}") from e
    except Exception as e:
        self.logger.error("Unexpected error during batch save: %s", e)
        raise StorageError(f"Failed to batch save data: {e}") from e

load(symbols: SymbolList, date_range: DateRange) -> OHLCVData

Load OHLCV data from ArcticDB.

Uses read_batch for efficient multi-symbol loading. Reconstructs DataFrame with symbol column.

Parameters:

Name Type Description Default
symbols SymbolList

Symbols to load

required
date_range DateRange

Date range to load

required

Returns:

Type Description
OHLCVData

OHLCVData from storage

Raises:

Type Description
DataNotFoundError

If no data found for symbols

StorageError

If loading fails

Example

data = adapter.load( ... symbols=SymbolList.from_input(["BTC/USDT:USDT"]), ... date_range=DateRange.from_strings("2024-01-01", "2024-01-31") ... )

Source code in libs/tradai-data/src/tradai/data/infrastructure/adapters/arctic_adapter.py
def load(
    self,
    symbols: SymbolList,
    date_range: DateRange,
) -> OHLCVData:
    """
    Load OHLCV data from ArcticDB.

    Uses read_batch for efficient multi-symbol loading.
    Reconstructs DataFrame with symbol column.

    Args:
        symbols: Symbols to load
        date_range: Date range to load

    Returns:
        OHLCVData from storage

    Raises:
        DataNotFoundError: If no data found for symbols
        StorageError: If loading fails

    Example:
        >>> data = adapter.load(
        ...     symbols=SymbolList.from_input(["BTC/USDT:USDT"]),
        ...     date_range=DateRange.from_strings("2024-01-01", "2024-01-31")
        ... )
    """
    try:
        # Prefer real arcticdb request types when available (production),
        # but fall back to local models for macOS/CI environments where
        # arcticdb isn't installable (tests inject a mock library).
        ArcticReadRequest = self._get_arcticdb_request_type("ReadRequest") or ReadRequest

        read_requests = [
            ArcticReadRequest(
                symbol=self._normalize_symbol(symbol),
                date_range=(date_range.start, date_range.end),
            )
            for symbol in symbols.to_list()
        ]

        results = self.library.read_batch(read_requests)

        dataframes = []
        for result in results:
            # Skip DataError results (check for 'data' attribute)
            if not hasattr(result, "data") or result.data is None:
                continue
            if hasattr(result, "error_code"):
                # This is a DataError, skip it
                continue

            df = result.data.reset_index()  # date back as column

            # Denormalize symbol name back to trading format
            trading_symbol = self._denormalize_symbol(result.symbol)
            df.insert(0, "symbol", trading_symbol)

            dataframes.append(df)

        if not dataframes:
            raise DataNotFoundError(
                f"No data found for {symbols.to_list()} "
                f"between {date_range.start} and {date_range.end}"
            )

        combined_df = pd.concat(dataframes, ignore_index=True)
        return OHLCVData.from_dataframe(combined_df)

    except DataNotFoundError:
        raise
    except (KeyError, ValueError, AttributeError) as e:
        raise StorageError(f"Data format error during load: {e}") from e
    except Exception as e:
        self.logger.error("Unexpected error during load: %s", e)
        raise StorageError(f"Failed to load data: {e}") from e

exists(symbols: SymbolList) -> dict[str, bool]

Check if data exists for given symbols.

Uses has_symbol for efficient individual checks.

Parameters:

Name Type Description Default
symbols SymbolList

Symbols to check

required

Returns:

Type Description
dict[str, bool]

Dictionary mapping symbol -> exists (True/False)

Example

result = adapter.exists( ... SymbolList.from_input(["BTC/USDT:USDT", "ETH/USDT:USDT"]) ... ) result

Source code in libs/tradai-data/src/tradai/data/infrastructure/adapters/arctic_adapter.py
def exists(self, symbols: SymbolList) -> dict[str, bool]:
    """
    Check if data exists for given symbols.

    Uses has_symbol for efficient individual checks.

    Args:
        symbols: Symbols to check

    Returns:
        Dictionary mapping symbol -> exists (True/False)

    Example:
        >>> result = adapter.exists(
        ...     SymbolList.from_input(["BTC/USDT:USDT", "ETH/USDT:USDT"])
        ... )
        >>> result
        {'BTC/USDT:USDT': True, 'ETH/USDT:USDT': False}
    """
    return {
        symbol: self.library.has_symbol(self._normalize_symbol(symbol))
        for symbol in symbols.to_list()
    }

get_latest_date(symbols: SymbolList) -> dict[str, datetime]

Get latest available date for each symbol.

Returns the actual last candle date from stored data. Falls back to last_query_date for backwards compatibility with older data.

Parameters:

Name Type Description Default
symbols SymbolList

Symbols to check

required

Returns:

Type Description
dict[str, datetime]

Dictionary mapping symbol -> latest candle date (only for existing symbols)

Example

result = adapter.get_latest_date( ... SymbolList.from_input(["BTC/USDT:USDT"]) ... ) result

Source code in libs/tradai-data/src/tradai/data/infrastructure/adapters/arctic_adapter.py
def get_latest_date(self, symbols: SymbolList) -> dict[str, datetime]:
    """
    Get latest available date for each symbol.

    Returns the actual last candle date from stored data.
    Falls back to last_query_date for backwards compatibility with older data.

    Args:
        symbols: Symbols to check

    Returns:
        Dictionary mapping symbol -> latest candle date (only for existing symbols)

    Example:
        >>> result = adapter.get_latest_date(
        ...     SymbolList.from_input(["BTC/USDT:USDT"])
        ... )
        >>> result
        {'BTC/USDT:USDT': datetime(2024, 1, 31, 23, 0, 0)}
    """
    # Prefer real arcticdb request types when available (production),
    # but fall back to local models for macOS/CI environments.
    ArcticReadInfoRequest = (
        self._get_arcticdb_request_type("ReadInfoRequest") or ReadInfoRequest
    )

    read_info_requests = [
        ArcticReadInfoRequest(symbol=self._normalize_symbol(symbol))
        for symbol in symbols.to_list()
    ]

    results = self.library.read_metadata_batch(read_info_requests)

    latest_dates: dict[str, datetime] = {}
    for result in results:
        # Skip DataError results
        if hasattr(result, "error_code"):
            continue
        if not hasattr(result, "metadata") or result.metadata is None:
            continue

        trading_symbol = self._denormalize_symbol(result.symbol)

        if "metadata_version" not in result.metadata:
            self.logger.debug(f"Symbol '{trading_symbol}' has pre-versioning metadata")

        # Prefer last_candle_date (actual data end date)
        # Fall back to last_query_date for backwards compatibility
        date_str = result.metadata.get("last_candle_date") or result.metadata.get(
            "last_query_date"
        )
        if date_str:
            latest_dates[trading_symbol] = ensure_utc_naive(datetime.fromisoformat(date_str))

    return latest_dates

get_stored_timeframe(symbols: SymbolList) -> dict[str, str | None]

Get stored timeframe for each symbol from ArcticDB metadata.

Returns None for legacy data without timeframe metadata.

Parameters:

Name Type Description Default
symbols SymbolList

Symbols to check

required

Returns:

Type Description
dict[str, str | None]

Dictionary mapping symbol -> timeframe string or None

Source code in libs/tradai-data/src/tradai/data/infrastructure/adapters/arctic_adapter.py
def get_stored_timeframe(self, symbols: SymbolList) -> dict[str, str | None]:
    """Get stored timeframe for each symbol from ArcticDB metadata.

    Returns None for legacy data without timeframe metadata.

    Args:
        symbols: Symbols to check

    Returns:
        Dictionary mapping symbol -> timeframe string or None
    """
    ArcticReadInfoRequest = (
        self._get_arcticdb_request_type("ReadInfoRequest") or ReadInfoRequest
    )

    read_info_requests = [
        ArcticReadInfoRequest(symbol=self._normalize_symbol(symbol))
        for symbol in symbols.to_list()
    ]

    results = self.library.read_metadata_batch(read_info_requests)

    timeframes: dict[str, str | None] = {}
    for result in results:
        if hasattr(result, "error_code"):
            continue
        if not hasattr(result, "metadata") or result.metadata is None:
            continue

        trading_symbol = self._denormalize_symbol(result.symbol)
        timeframes[trading_symbol] = result.metadata.get("timeframe")

    return timeframes

list_symbols() -> list[str]

List all symbols stored in ArcticDB.

Returns symbols in trading format (e.g., "BTC/USDT:USDT").

Returns:

Type Description
list[str]

List of trading symbols stored in the library

Example

symbols = adapter.list_symbols() symbols ['BTC/USDT:USDT', 'ETH/USDT:USDT']

Source code in libs/tradai-data/src/tradai/data/infrastructure/adapters/arctic_adapter.py
def list_symbols(self) -> list[str]:
    """
    List all symbols stored in ArcticDB.

    Returns symbols in trading format (e.g., "BTC/USDT:USDT").

    Returns:
        List of trading symbols stored in the library

    Example:
        >>> symbols = adapter.list_symbols()
        >>> symbols
        ['BTC/USDT:USDT', 'ETH/USDT:USDT']
    """
    arctic_symbols = self.library.list_symbols()
    return [self._denormalize_symbol(s) for s in arctic_symbols]

delete(symbols: SymbolList) -> dict[str, bool]

Delete data for given symbols from ArcticDB.

Parameters:

Name Type Description Default
symbols SymbolList

Symbols to delete

required

Returns:

Type Description
dict[str, bool]

Dictionary mapping symbol -> deleted (True if existed and deleted)

Raises:

Type Description
StorageError

If deletion fails

Example

result = adapter.delete( ... SymbolList.from_input(["BTC/USDT:USDT"]) ... ) result

Source code in libs/tradai-data/src/tradai/data/infrastructure/adapters/arctic_adapter.py
def delete(self, symbols: SymbolList) -> dict[str, bool]:
    """
    Delete data for given symbols from ArcticDB.

    Args:
        symbols: Symbols to delete

    Returns:
        Dictionary mapping symbol -> deleted (True if existed and deleted)

    Raises:
        StorageError: If deletion fails

    Example:
        >>> result = adapter.delete(
        ...     SymbolList.from_input(["BTC/USDT:USDT"])
        ... )
        >>> result
        {'BTC/USDT:USDT': True}
    """
    try:
        results: dict[str, bool] = {}
        for symbol in symbols.to_list():
            arctic_symbol = self._normalize_symbol(symbol)
            if self.library.has_symbol(arctic_symbol):
                self.library.delete(arctic_symbol)
                results[symbol] = True
                self.logger.info(f"Deleted symbol: {symbol}")
            else:
                results[symbol] = False
        return results
    except Exception as e:
        raise StorageError(f"Failed to delete symbols: {e}") from e

MLflow Adapter

tradai.common.mlflow.adapter.MLflowAdapter

Bases: MLflowClientMixin, ExperimentsMixin, RegistryMixin, ArtifactsMixin

MLflow REST API adapter with automatic base path discovery.

Compatible with MLflow 3.x API. Uses Basic Authentication. Follows DI pattern - accepts optional session for testing.

This class composes functionality from specialized mixins: - MLflowClientMixin: HTTP client infrastructure, session management - ExperimentsMixin: Experiment creation, run tracking, metrics - RegistryMixin: Model version management, stage transitions - ArtifactsMixin: Artifact upload and download

Attributes:

Name Type Description
base_url str

Base URL of the MLflow server

verify_ssl bool

Whether to verify SSL certificates

Example

adapter = MLflowAdapter( ... base_url="https://mlflow.tradai.smartml.me", ... username="admin", ... password="secret", ... ) version = adapter.get_model_version("PascalStrategy") print(version.version) '3'

Methods from MLflowClientMixin
  • api_base: Property to get/discover API base URL
  • mlflow_client: Property to get MLflow Python SDK client
  • is_available(): Check if MLflow server is reachable
  • _request(): Send HTTP request to MLflow API
Methods from ExperimentsMixin
  • log_experiment(): Log metrics to an experiment
  • get_experiment_results(): Get experiment with all runs
  • create_run(): Create a new run
  • log_metric(): Log a single metric
  • end_run(): End a run
  • log_parameter(): Log a parameter
  • set_tag(): Set a tag on a run
  • search_experiments(): Search all experiments
  • search_runs(): Search runs within experiments
Methods from RegistryMixin
  • get_model_version(): Get model version info
  • search_registered_models(): Search models in registry
  • get_registered_model(): Get registered model details
  • get_production_version(): Get production version for a model
  • create_model_version(): Create a new model version
  • transition_model_version_stage(): Transition version stage
  • delete_model_version(): Delete a model version
  • delete_registered_model(): Delete a registered model
  • load_model(): Load a model for inference
Methods from ArtifactsMixin
  • log_artifact(): Upload a file as artifact
  • upload_artifact(): Upload directly to S3
  • download_artifacts(): Download artifacts from a run
Source code in libs/tradai-common/src/tradai/common/mlflow/adapter.py
class MLflowAdapter(
    MLflowClientMixin,
    ExperimentsMixin,
    RegistryMixin,
    ArtifactsMixin,
):
    """MLflow REST API adapter with automatic base path discovery.

    Compatible with MLflow 3.x API. Uses Basic Authentication.
    Follows DI pattern - accepts optional session for testing.

    This class composes functionality from specialized mixins:
    - MLflowClientMixin: HTTP client infrastructure, session management
    - ExperimentsMixin: Experiment creation, run tracking, metrics
    - RegistryMixin: Model version management, stage transitions
    - ArtifactsMixin: Artifact upload and download

    Attributes:
        base_url: Base URL of the MLflow server
        verify_ssl: Whether to verify SSL certificates

    Example:
        >>> adapter = MLflowAdapter(
        ...     base_url="https://mlflow.tradai.smartml.me",
        ...     username="admin",
        ...     password="secret",
        ... )
        >>> version = adapter.get_model_version("PascalStrategy")
        >>> print(version.version)
        '3'

    Methods from MLflowClientMixin:
        - api_base: Property to get/discover API base URL
        - mlflow_client: Property to get MLflow Python SDK client
        - is_available(): Check if MLflow server is reachable
        - _request(): Send HTTP request to MLflow API

    Methods from ExperimentsMixin:
        - log_experiment(): Log metrics to an experiment
        - get_experiment_results(): Get experiment with all runs
        - create_run(): Create a new run
        - log_metric(): Log a single metric
        - end_run(): End a run
        - log_parameter(): Log a parameter
        - set_tag(): Set a tag on a run
        - search_experiments(): Search all experiments
        - search_runs(): Search runs within experiments

    Methods from RegistryMixin:
        - get_model_version(): Get model version info
        - search_registered_models(): Search models in registry
        - get_registered_model(): Get registered model details
        - get_production_version(): Get production version for a model
        - create_model_version(): Create a new model version
        - transition_model_version_stage(): Transition version stage
        - delete_model_version(): Delete a model version
        - delete_registered_model(): Delete a registered model
        - load_model(): Load a model for inference

    Methods from ArtifactsMixin:
        - log_artifact(): Upload a file as artifact
        - upload_artifact(): Upload directly to S3
        - download_artifacts(): Download artifacts from a run
    """

    def __init__(
        self,
        base_url: str,
        username: str | None = None,
        password: str | None = None,
        verify_ssl: bool = True,
        session: requests.Session | None = None,
        lazy: bool = True,
    ) -> None:
        """Initialize MLflowAdapter.

        Args:
            base_url: MLflow server URL (without trailing slash)
            username: Optional username for Basic Authentication
            password: Optional password for Basic Authentication
            verify_ssl: Verify SSL certificates (default: True)
            session: Optional session for DI/testing
            lazy: If True, defer connection until first use (default: True)

        Raises:
            ExternalServiceError: If no valid MLflow API path is found (only if lazy=False)
        """
        # Initialize the client mixin which sets up all shared state
        self._init_client(
            base_url=base_url,
            username=username,
            password=password,
            verify_ssl=verify_ssl,
            session=session,
            lazy=lazy,
        )

Configuration Services

tradai.common.config.service.ConfigVersionService

Bases: LoggerMixin

High-level service for config version management.

Provides content-addressable versioning with deduplication, lifecycle management (DRAFT → ACTIVE → DEPRECATED), and atomic activation (deprecates previous active).

Follows existing patterns: - LoggerMixin for logging - Constructor DI for repository - Protocol-based dependencies

Example

repo = ConfigVersionRepository() service = ConfigVersionService(repository=repo) version = service.create_version( ... strategy_name="PascalStrategy", ... config_data={"timeframe": "1h", "stoploss": -0.05}, ... ) print(f"Created: {version.config_id}") activated = service.activate("PascalStrategy", version.config_id)

Source code in libs/tradai-common/src/tradai/common/config/service.py
class ConfigVersionService(LoggerMixin):
    """High-level service for config version management.

    Provides content-addressable versioning with deduplication,
    lifecycle management (DRAFT → ACTIVE → DEPRECATED), and
    atomic activation (deprecates previous active).

    Follows existing patterns:
    - LoggerMixin for logging
    - Constructor DI for repository
    - Protocol-based dependencies

    Example:
        >>> repo = ConfigVersionRepository()
        >>> service = ConfigVersionService(repository=repo)
        >>> version = service.create_version(
        ...     strategy_name="PascalStrategy",
        ...     config_data={"timeframe": "1h", "stoploss": -0.05},
        ... )
        >>> print(f"Created: {version.config_id}")
        >>> activated = service.activate("PascalStrategy", version.config_id)
    """

    def __init__(
        self,
        repository: ConfigVersionRepository,
    ) -> None:
        """Initialize service.

        Args:
            repository: ConfigVersionRepository for persistence
        """
        super().__init__()
        self._repository = repository

    def create_version(
        self,
        strategy_name: str,
        config_data: dict[str, Any],
        description: str = "",
        created_by: str = "system",
    ) -> ConfigVersion:
        """Create new config version with content-addressable deduplication.

        If a version with the same content hash already exists, returns
        the existing version (deduplication).

        Args:
            strategy_name: Strategy this config belongs to
            config_data: Configuration dictionary
            description: Optional version description
            created_by: User/system creating the version

        Returns:
            Created or existing ConfigVersion

        Raises:
            ValidationError: If config_data is empty
        """
        if not config_data:
            raise ValidationError("config_data cannot be empty")

        # Compute content hash for deduplication
        config_hash = self._compute_hash(config_data)
        self.logger.debug(f"Config hash: {config_hash[:8]}")

        # Check for duplicate
        existing = self._repository.get_by_hash(config_hash)
        if existing:
            self.logger.info(f"Config already exists for '{strategy_name}': {existing.config_id}")
            return existing

        # Get next version number
        version_number = self._repository.get_next_version_number(strategy_name)
        config_id = f"v{version_number}-{config_hash[:8]}"

        version = ConfigVersion(
            strategy_name=strategy_name,
            config_id=config_id,
            config_hash=config_hash,
            config_data=config_data,
            version_number=version_number,
            description=description,
            created_by=created_by,
        )

        self._repository.put(version)
        self.logger.info(f"Created config version: {config_id}")
        return version

    def activate(self, strategy_name: str, config_id: str) -> ConfigVersion:
        """Activate a config version (deprecates previous active).

        Transitions:
        - Current ACTIVE → DEPRECATED (with superseded_by)
        - Target DRAFT → ACTIVE (sets deployed_at)

        Args:
            strategy_name: Strategy name
            config_id: Version to activate

        Returns:
            Activated ConfigVersion

        Raises:
            NotFoundError: If version doesn't exist
            ValidationError: If version is not in DRAFT status
        """
        # Get target version
        version = self._repository.get(strategy_name, config_id)
        if not version:
            raise NotFoundError(f"Config version not found: {config_id}")

        if version.status == ConfigVersionStatus.ACTIVE:
            self.logger.info(f"Version already active: {config_id}")
            return version

        if version.status == ConfigVersionStatus.DEPRECATED:
            raise ValidationError(f"Cannot activate deprecated version: {config_id}")

        # Deprecate current active (if exists)
        current_active = self._repository.get_active(strategy_name)
        if current_active and current_active.config_id != config_id:
            deprecated = current_active.with_status(ConfigVersionStatus.DEPRECATED)
            # Add superseded_by reference
            deprecated = ConfigVersion(
                **deprecated.model_dump(exclude={"superseded_by"}),
                superseded_by=config_id,
            )
            self._repository.put(deprecated)
            self.logger.info(f"Deprecated previous active: {current_active.config_id}")

        # Activate target version
        activated = version.with_status(ConfigVersionStatus.ACTIVE)
        self._repository.put(activated)
        self.logger.info(f"Activated config version: {config_id}")
        return activated

    def get_active(self, strategy_name: str) -> ConfigVersion | None:
        """Get currently active config version for strategy.

        Args:
            strategy_name: Strategy to get active config for

        Returns:
            Active ConfigVersion if exists, None otherwise
        """
        return self._repository.get_active(strategy_name)

    def get_version(self, strategy_name: str, config_id: str) -> ConfigVersion | None:
        """Get specific config version.

        Args:
            strategy_name: Strategy name
            config_id: Version identifier

        Returns:
            ConfigVersion if found, None otherwise
        """
        return self._repository.get(strategy_name, config_id)

    def list_versions(
        self,
        strategy_name: str,
        limit: int = 100,
        status_filter: ConfigVersionStatus | None = None,
    ) -> list[ConfigVersion]:
        """List config versions for a strategy.

        Args:
            strategy_name: Strategy to list versions for
            limit: Maximum versions to return
            status_filter: Optional status filter

        Returns:
            List of ConfigVersion entities (newest first)
        """
        return self._repository.list_by_strategy(
            strategy_name,
            limit=limit,
            status_filter=status_filter,
        )

    def deprecate(self, strategy_name: str, config_id: str) -> ConfigVersion:
        """Manually deprecate a config version.

        Args:
            strategy_name: Strategy name
            config_id: Version to deprecate

        Returns:
            Deprecated ConfigVersion

        Raises:
            NotFoundError: If version doesn't exist
            ValidationError: If version is already deprecated
        """
        version = self._repository.get(strategy_name, config_id)
        if not version:
            raise NotFoundError(f"Config version not found: {config_id}")

        if version.status == ConfigVersionStatus.DEPRECATED:
            raise ValidationError(f"Version already deprecated: {config_id}")

        deprecated = version.with_status(ConfigVersionStatus.DEPRECATED)
        self._repository.put(deprecated)
        self.logger.info(f"Deprecated config version: {config_id}")
        return deprecated

    @staticmethod
    def _compute_hash(config_data: dict[str, Any]) -> str:
        """Compute SHA256 hash of config data.

        Normalizes config by sorting keys for deterministic hashing.

        Args:
            config_data: Configuration dictionary

        Returns:
            SHA256 hex string (64 characters)
        """
        normalized = json.dumps(config_data, sort_keys=True)
        return hashlib.sha256(normalized.encode()).hexdigest()

create_version(strategy_name: str, config_data: dict[str, Any], description: str = '', created_by: str = 'system') -> ConfigVersion

Create new config version with content-addressable deduplication.

If a version with the same content hash already exists, returns the existing version (deduplication).

Parameters:

Name Type Description Default
strategy_name str

Strategy this config belongs to

required
config_data dict[str, Any]

Configuration dictionary

required
description str

Optional version description

''
created_by str

User/system creating the version

'system'

Returns:

Type Description
ConfigVersion

Created or existing ConfigVersion

Raises:

Type Description
ValidationError

If config_data is empty

Source code in libs/tradai-common/src/tradai/common/config/service.py
def create_version(
    self,
    strategy_name: str,
    config_data: dict[str, Any],
    description: str = "",
    created_by: str = "system",
) -> ConfigVersion:
    """Create new config version with content-addressable deduplication.

    If a version with the same content hash already exists, returns
    the existing version (deduplication).

    Args:
        strategy_name: Strategy this config belongs to
        config_data: Configuration dictionary
        description: Optional version description
        created_by: User/system creating the version

    Returns:
        Created or existing ConfigVersion

    Raises:
        ValidationError: If config_data is empty
    """
    if not config_data:
        raise ValidationError("config_data cannot be empty")

    # Compute content hash for deduplication
    config_hash = self._compute_hash(config_data)
    self.logger.debug(f"Config hash: {config_hash[:8]}")

    # Check for duplicate
    existing = self._repository.get_by_hash(config_hash)
    if existing:
        self.logger.info(f"Config already exists for '{strategy_name}': {existing.config_id}")
        return existing

    # Get next version number
    version_number = self._repository.get_next_version_number(strategy_name)
    config_id = f"v{version_number}-{config_hash[:8]}"

    version = ConfigVersion(
        strategy_name=strategy_name,
        config_id=config_id,
        config_hash=config_hash,
        config_data=config_data,
        version_number=version_number,
        description=description,
        created_by=created_by,
    )

    self._repository.put(version)
    self.logger.info(f"Created config version: {config_id}")
    return version

activate(strategy_name: str, config_id: str) -> ConfigVersion

Activate a config version (deprecates previous active).

Transitions: - Current ACTIVE → DEPRECATED (with superseded_by) - Target DRAFT → ACTIVE (sets deployed_at)

Parameters:

Name Type Description Default
strategy_name str

Strategy name

required
config_id str

Version to activate

required

Returns:

Type Description
ConfigVersion

Activated ConfigVersion

Raises:

Type Description
NotFoundError

If version doesn't exist

ValidationError

If version is not in DRAFT status

Source code in libs/tradai-common/src/tradai/common/config/service.py
def activate(self, strategy_name: str, config_id: str) -> ConfigVersion:
    """Activate a config version (deprecates previous active).

    Transitions:
    - Current ACTIVE → DEPRECATED (with superseded_by)
    - Target DRAFT → ACTIVE (sets deployed_at)

    Args:
        strategy_name: Strategy name
        config_id: Version to activate

    Returns:
        Activated ConfigVersion

    Raises:
        NotFoundError: If version doesn't exist
        ValidationError: If version is not in DRAFT status
    """
    # Get target version
    version = self._repository.get(strategy_name, config_id)
    if not version:
        raise NotFoundError(f"Config version not found: {config_id}")

    if version.status == ConfigVersionStatus.ACTIVE:
        self.logger.info(f"Version already active: {config_id}")
        return version

    if version.status == ConfigVersionStatus.DEPRECATED:
        raise ValidationError(f"Cannot activate deprecated version: {config_id}")

    # Deprecate current active (if exists)
    current_active = self._repository.get_active(strategy_name)
    if current_active and current_active.config_id != config_id:
        deprecated = current_active.with_status(ConfigVersionStatus.DEPRECATED)
        # Add superseded_by reference
        deprecated = ConfigVersion(
            **deprecated.model_dump(exclude={"superseded_by"}),
            superseded_by=config_id,
        )
        self._repository.put(deprecated)
        self.logger.info(f"Deprecated previous active: {current_active.config_id}")

    # Activate target version
    activated = version.with_status(ConfigVersionStatus.ACTIVE)
    self._repository.put(activated)
    self.logger.info(f"Activated config version: {config_id}")
    return activated

get_active(strategy_name: str) -> ConfigVersion | None

Get currently active config version for strategy.

Parameters:

Name Type Description Default
strategy_name str

Strategy to get active config for

required

Returns:

Type Description
ConfigVersion | None

Active ConfigVersion if exists, None otherwise

Source code in libs/tradai-common/src/tradai/common/config/service.py
def get_active(self, strategy_name: str) -> ConfigVersion | None:
    """Get currently active config version for strategy.

    Args:
        strategy_name: Strategy to get active config for

    Returns:
        Active ConfigVersion if exists, None otherwise
    """
    return self._repository.get_active(strategy_name)

get_version(strategy_name: str, config_id: str) -> ConfigVersion | None

Get specific config version.

Parameters:

Name Type Description Default
strategy_name str

Strategy name

required
config_id str

Version identifier

required

Returns:

Type Description
ConfigVersion | None

ConfigVersion if found, None otherwise

Source code in libs/tradai-common/src/tradai/common/config/service.py
def get_version(self, strategy_name: str, config_id: str) -> ConfigVersion | None:
    """Get specific config version.

    Args:
        strategy_name: Strategy name
        config_id: Version identifier

    Returns:
        ConfigVersion if found, None otherwise
    """
    return self._repository.get(strategy_name, config_id)

list_versions(strategy_name: str, limit: int = 100, status_filter: ConfigVersionStatus | None = None) -> list[ConfigVersion]

List config versions for a strategy.

Parameters:

Name Type Description Default
strategy_name str

Strategy to list versions for

required
limit int

Maximum versions to return

100
status_filter ConfigVersionStatus | None

Optional status filter

None

Returns:

Type Description
list[ConfigVersion]

List of ConfigVersion entities (newest first)

Source code in libs/tradai-common/src/tradai/common/config/service.py
def list_versions(
    self,
    strategy_name: str,
    limit: int = 100,
    status_filter: ConfigVersionStatus | None = None,
) -> list[ConfigVersion]:
    """List config versions for a strategy.

    Args:
        strategy_name: Strategy to list versions for
        limit: Maximum versions to return
        status_filter: Optional status filter

    Returns:
        List of ConfigVersion entities (newest first)
    """
    return self._repository.list_by_strategy(
        strategy_name,
        limit=limit,
        status_filter=status_filter,
    )

deprecate(strategy_name: str, config_id: str) -> ConfigVersion

Manually deprecate a config version.

Parameters:

Name Type Description Default
strategy_name str

Strategy name

required
config_id str

Version to deprecate

required

Returns:

Type Description
ConfigVersion

Deprecated ConfigVersion

Raises:

Type Description
NotFoundError

If version doesn't exist

ValidationError

If version is already deprecated

Source code in libs/tradai-common/src/tradai/common/config/service.py
def deprecate(self, strategy_name: str, config_id: str) -> ConfigVersion:
    """Manually deprecate a config version.

    Args:
        strategy_name: Strategy name
        config_id: Version to deprecate

    Returns:
        Deprecated ConfigVersion

    Raises:
        NotFoundError: If version doesn't exist
        ValidationError: If version is already deprecated
    """
    version = self._repository.get(strategy_name, config_id)
    if not version:
        raise NotFoundError(f"Config version not found: {config_id}")

    if version.status == ConfigVersionStatus.DEPRECATED:
        raise ValidationError(f"Version already deprecated: {config_id}")

    deprecated = version.with_status(ConfigVersionStatus.DEPRECATED)
    self._repository.put(deprecated)
    self.logger.info(f"Deprecated config version: {config_id}")
    return deprecated

tradai.common.config.merge.ConfigMergeService

Bases: LoggerMixin

Generic config merging service using OmegaConf.

Provides deep merge capabilities for configuration dictionaries, with optional validation using required fields or Pydantic schemas.

Works with raw dicts for maximum flexibility. Domain-specific services can wrap this and convert to/from their entity types.

Parameters:

Name Type Description Default
config_repository S3ConfigRepository | None

S3ConfigRepository for loading/saving configs (DI)

None
bucket str | None

S3 bucket name (creates repository if not provided)

None
prefix str

S3 key prefix for configs

''
Example

service = ConfigMergeService(config_repository=my_repo) merged = service.merge_configs( ... base_config_name="base-config", ... overrides={"version": "2.0.0"}, ... ) merged["version"] '2.0.0'

Source code in libs/tradai-common/src/tradai/common/config/merge.py
class ConfigMergeService(LoggerMixin):
    """Generic config merging service using OmegaConf.

    Provides deep merge capabilities for configuration dictionaries,
    with optional validation using required fields or Pydantic schemas.

    Works with raw dicts for maximum flexibility. Domain-specific services
    can wrap this and convert to/from their entity types.

    Args:
        config_repository: S3ConfigRepository for loading/saving configs (DI)
        bucket: S3 bucket name (creates repository if not provided)
        prefix: S3 key prefix for configs

    Example:
        >>> service = ConfigMergeService(config_repository=my_repo)
        >>> merged = service.merge_configs(
        ...     base_config_name="base-config",
        ...     overrides={"version": "2.0.0"},
        ... )
        >>> merged["version"]
        '2.0.0'
    """

    def __init__(
        self,
        config_repository: S3ConfigRepository | None = None,
        bucket: str | None = None,
        prefix: str = "",
    ) -> None:
        """Initialize ConfigMergeService.

        Args:
            config_repository: Optional S3ConfigRepository for DI
            bucket: S3 bucket name (used if config_repository is None)
            prefix: S3 key prefix for configs
        """
        super().__init__()
        self._config_repository = config_repository
        self._bucket = bucket
        self._prefix = prefix
        self._repo_lock = threading.Lock()

    @property
    def config_repository(self) -> S3ConfigRepository:
        """Get or create the config repository (lazy initialization).

        Thread-safe via double-checked locking pattern.

        Returns:
            S3ConfigRepository instance

        Raises:
            ExternalServiceError: If no bucket provided and no repository injected
        """
        if self._config_repository is None:
            with self._repo_lock:
                # Double-check after acquiring lock
                if self._config_repository is None:
                    if self._bucket is None:
                        raise ExternalServiceError(
                            "ConfigMergeService requires either config_repository or bucket"
                        )
                    # Lazy import - only needed when using S3 repository
                    from tradai.common.aws.s3_config_repository import S3ConfigRepository

                    self._config_repository = S3ConfigRepository(
                        bucket=self._bucket,
                        prefix=self._prefix,
                    )
        return self._config_repository

    def load_config(self, config_name: str) -> dict[str, Any]:
        """Load config from repository.

        Args:
            config_name: Name of the config (without extension)

        Returns:
            Configuration as dictionary

        Raises:
            ExternalServiceError: If config not found or download fails
        """
        self.logger.debug(f"Loading config: {config_name}")
        return self.config_repository.download(config_name)

    def apply_overrides(self, base: dict[str, Any], overrides: dict[str, Any]) -> dict[str, Any]:
        """Apply overrides using OmegaConf deep merge.

        Performs a deep merge where override values take precedence.
        Nested dictionaries are merged recursively.

        Args:
            base: Base configuration dictionary
            overrides: Override values to apply

        Returns:
            New dictionary with overrides applied (original unchanged)

        Example:
            >>> base = {"a": 1, "nested": {"x": 10, "y": 20}}
            >>> overrides = {"nested": {"y": 200, "z": 30}}
            >>> result = service.apply_overrides(base, overrides)
            >>> result["nested"]
            {'x': 10, 'y': 200, 'z': 30}
        """
        # Lazy import - only needed when applying overrides
        from omegaconf import OmegaConf

        self.logger.debug("Applying config overrides")

        # Convert to OmegaConf for deep merge
        base_omega = OmegaConf.create(base)
        overrides_omega = OmegaConf.create(overrides)

        # Deep merge - overrides take precedence
        merged = OmegaConf.merge(base_omega, overrides_omega)

        # Convert back to plain dict
        result: dict[str, Any] = OmegaConf.to_container(merged, resolve=True)  # type: ignore[assignment]
        return result

    def validate_config(
        self,
        config: dict[str, Any],
        required_fields: list[str] | None = None,
        schema: type[BaseModel] | None = None,
    ) -> ValidationResult:
        """Validate configuration.

        Supports two validation modes:
        1. Required fields check - simple presence validation
        2. Pydantic schema - full type and constraint validation

        Both can be used together.

        Args:
            config: Configuration to validate
            required_fields: List of field names that must be present
            schema: Optional Pydantic model class for schema validation

        Returns:
            ValidationResult with errors/warnings

        Example:
            >>> result = service.validate_config(
            ...     config={"name": "test"},
            ...     required_fields=["name", "version"],
            ... )
            >>> result.valid
            False
            >>> result.errors
            ["Missing required field: version"]
        """
        errors: list[str] = []
        warnings: list[str] = []

        # Check required fields
        if required_fields:
            for field in required_fields:
                if field not in config:
                    errors.append(f"Missing required field: {field}")
                elif config[field] is None or (
                    isinstance(config[field], str) and not config[field].strip()
                ):
                    errors.append(f"Required field cannot be empty: {field}")

        # Validate against Pydantic schema
        if schema is not None:
            try:
                schema(**config)
            except PydanticValidationError as e:
                for error in e.errors():
                    field = ".".join(str(loc) for loc in error["loc"])
                    msg = error["msg"]
                    errors.append(f"{field}: {msg}")

        return ValidationResult(
            valid=len(errors) == 0,
            errors=errors,
            warnings=warnings,
        )

    def merge_configs(
        self,
        base_config_name: str,
        overrides: dict[str, Any],
        validate: bool = True,
        required_fields: list[str] | None = None,
        schema: type[BaseModel] | None = None,
    ) -> dict[str, Any]:
        """Load base config, apply overrides, and optionally validate.

        This is the main convenience method that combines all operations.

        Args:
            base_config_name: Name of base config to load from repository
            overrides: Override values to apply
            validate: Whether to validate the merged config
            required_fields: Fields required for validation
            schema: Optional Pydantic schema for validation

        Returns:
            Merged configuration dictionary

        Raises:
            ExternalServiceError: If config not found or validation fails
        """
        self.logger.info(f"Merging config: {base_config_name}")

        # Load base config
        base = self.load_config(base_config_name)

        # Apply overrides
        merged = self.apply_overrides(base, overrides)

        # Validate if requested
        if validate and (required_fields or schema):
            result = self.validate_config(
                merged,
                required_fields=required_fields,
                schema=schema,
            )
            if not result.valid:
                error_msg = "; ".join(result.errors)
                raise ExternalServiceError(f"Config validation failed: {error_msg}")

        self.logger.info("Config merged successfully")
        return merged

    def save_config(self, config_name: str, config: dict[str, Any]) -> str:
        """Save config to repository.

        Args:
            config_name: Name to save as (without extension)
            config: Configuration dictionary to save

        Returns:
            S3 URI of saved config
        """
        self.logger.info(f"Saving config: {config_name}")
        return self.config_repository.upload(config_name, config)

    def list_configs(self) -> list[str]:
        """List available config names from repository.

        Returns:
            List of config names
        """
        return self.config_repository.list_configs()

config_repository: S3ConfigRepository property

Get or create the config repository (lazy initialization).

Thread-safe via double-checked locking pattern.

Returns:

Type Description
S3ConfigRepository

S3ConfigRepository instance

Raises:

Type Description
ExternalServiceError

If no bucket provided and no repository injected

load_config(config_name: str) -> dict[str, Any]

Load config from repository.

Parameters:

Name Type Description Default
config_name str

Name of the config (without extension)

required

Returns:

Type Description
dict[str, Any]

Configuration as dictionary

Raises:

Type Description
ExternalServiceError

If config not found or download fails

Source code in libs/tradai-common/src/tradai/common/config/merge.py
def load_config(self, config_name: str) -> dict[str, Any]:
    """Load config from repository.

    Args:
        config_name: Name of the config (without extension)

    Returns:
        Configuration as dictionary

    Raises:
        ExternalServiceError: If config not found or download fails
    """
    self.logger.debug(f"Loading config: {config_name}")
    return self.config_repository.download(config_name)

apply_overrides(base: dict[str, Any], overrides: dict[str, Any]) -> dict[str, Any]

Apply overrides using OmegaConf deep merge.

Performs a deep merge where override values take precedence. Nested dictionaries are merged recursively.

Parameters:

Name Type Description Default
base dict[str, Any]

Base configuration dictionary

required
overrides dict[str, Any]

Override values to apply

required

Returns:

Type Description
dict[str, Any]

New dictionary with overrides applied (original unchanged)

Example

base = {"a": 1, "nested": {"x": 10, "y": 20}} overrides = {"nested": {"y": 200, "z": 30}} result = service.apply_overrides(base, overrides) result["nested"]

Source code in libs/tradai-common/src/tradai/common/config/merge.py
def apply_overrides(self, base: dict[str, Any], overrides: dict[str, Any]) -> dict[str, Any]:
    """Apply overrides using OmegaConf deep merge.

    Performs a deep merge where override values take precedence.
    Nested dictionaries are merged recursively.

    Args:
        base: Base configuration dictionary
        overrides: Override values to apply

    Returns:
        New dictionary with overrides applied (original unchanged)

    Example:
        >>> base = {"a": 1, "nested": {"x": 10, "y": 20}}
        >>> overrides = {"nested": {"y": 200, "z": 30}}
        >>> result = service.apply_overrides(base, overrides)
        >>> result["nested"]
        {'x': 10, 'y': 200, 'z': 30}
    """
    # Lazy import - only needed when applying overrides
    from omegaconf import OmegaConf

    self.logger.debug("Applying config overrides")

    # Convert to OmegaConf for deep merge
    base_omega = OmegaConf.create(base)
    overrides_omega = OmegaConf.create(overrides)

    # Deep merge - overrides take precedence
    merged = OmegaConf.merge(base_omega, overrides_omega)

    # Convert back to plain dict
    result: dict[str, Any] = OmegaConf.to_container(merged, resolve=True)  # type: ignore[assignment]
    return result

validate_config(config: dict[str, Any], required_fields: list[str] | None = None, schema: type[BaseModel] | None = None) -> ValidationResult

Validate configuration.

Supports two validation modes: 1. Required fields check - simple presence validation 2. Pydantic schema - full type and constraint validation

Both can be used together.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration to validate

required
required_fields list[str] | None

List of field names that must be present

None
schema type[BaseModel] | None

Optional Pydantic model class for schema validation

None

Returns:

Type Description
ValidationResult

ValidationResult with errors/warnings

Example

result = service.validate_config( ... config={"name": "test"}, ... required_fields=["name", "version"], ... ) result.valid False result.errors ["Missing required field: version"]

Source code in libs/tradai-common/src/tradai/common/config/merge.py
def validate_config(
    self,
    config: dict[str, Any],
    required_fields: list[str] | None = None,
    schema: type[BaseModel] | None = None,
) -> ValidationResult:
    """Validate configuration.

    Supports two validation modes:
    1. Required fields check - simple presence validation
    2. Pydantic schema - full type and constraint validation

    Both can be used together.

    Args:
        config: Configuration to validate
        required_fields: List of field names that must be present
        schema: Optional Pydantic model class for schema validation

    Returns:
        ValidationResult with errors/warnings

    Example:
        >>> result = service.validate_config(
        ...     config={"name": "test"},
        ...     required_fields=["name", "version"],
        ... )
        >>> result.valid
        False
        >>> result.errors
        ["Missing required field: version"]
    """
    errors: list[str] = []
    warnings: list[str] = []

    # Check required fields
    if required_fields:
        for field in required_fields:
            if field not in config:
                errors.append(f"Missing required field: {field}")
            elif config[field] is None or (
                isinstance(config[field], str) and not config[field].strip()
            ):
                errors.append(f"Required field cannot be empty: {field}")

    # Validate against Pydantic schema
    if schema is not None:
        try:
            schema(**config)
        except PydanticValidationError as e:
            for error in e.errors():
                field = ".".join(str(loc) for loc in error["loc"])
                msg = error["msg"]
                errors.append(f"{field}: {msg}")

    return ValidationResult(
        valid=len(errors) == 0,
        errors=errors,
        warnings=warnings,
    )

merge_configs(base_config_name: str, overrides: dict[str, Any], validate: bool = True, required_fields: list[str] | None = None, schema: type[BaseModel] | None = None) -> dict[str, Any]

Load base config, apply overrides, and optionally validate.

This is the main convenience method that combines all operations.

Parameters:

Name Type Description Default
base_config_name str

Name of base config to load from repository

required
overrides dict[str, Any]

Override values to apply

required
validate bool

Whether to validate the merged config

True
required_fields list[str] | None

Fields required for validation

None
schema type[BaseModel] | None

Optional Pydantic schema for validation

None

Returns:

Type Description
dict[str, Any]

Merged configuration dictionary

Raises:

Type Description
ExternalServiceError

If config not found or validation fails

Source code in libs/tradai-common/src/tradai/common/config/merge.py
def merge_configs(
    self,
    base_config_name: str,
    overrides: dict[str, Any],
    validate: bool = True,
    required_fields: list[str] | None = None,
    schema: type[BaseModel] | None = None,
) -> dict[str, Any]:
    """Load base config, apply overrides, and optionally validate.

    This is the main convenience method that combines all operations.

    Args:
        base_config_name: Name of base config to load from repository
        overrides: Override values to apply
        validate: Whether to validate the merged config
        required_fields: Fields required for validation
        schema: Optional Pydantic schema for validation

    Returns:
        Merged configuration dictionary

    Raises:
        ExternalServiceError: If config not found or validation fails
    """
    self.logger.info(f"Merging config: {base_config_name}")

    # Load base config
    base = self.load_config(base_config_name)

    # Apply overrides
    merged = self.apply_overrides(base, overrides)

    # Validate if requested
    if validate and (required_fields or schema):
        result = self.validate_config(
            merged,
            required_fields=required_fields,
            schema=schema,
        )
        if not result.valid:
            error_msg = "; ".join(result.errors)
            raise ExternalServiceError(f"Config validation failed: {error_msg}")

    self.logger.info("Config merged successfully")
    return merged

save_config(config_name: str, config: dict[str, Any]) -> str

Save config to repository.

Parameters:

Name Type Description Default
config_name str

Name to save as (without extension)

required
config dict[str, Any]

Configuration dictionary to save

required

Returns:

Type Description
str

S3 URI of saved config

Source code in libs/tradai-common/src/tradai/common/config/merge.py
def save_config(self, config_name: str, config: dict[str, Any]) -> str:
    """Save config to repository.

    Args:
        config_name: Name to save as (without extension)
        config: Configuration dictionary to save

    Returns:
        S3 URI of saved config
    """
    self.logger.info(f"Saving config: {config_name}")
    return self.config_repository.upload(config_name, config)

list_configs() -> list[str]

List available config names from repository.

Returns:

Type Description
list[str]

List of config names

Source code in libs/tradai-common/src/tradai/common/config/merge.py
def list_configs(self) -> list[str]:
    """List available config names from repository.

    Returns:
        List of config names
    """
    return self.config_repository.list_configs()

tradai.common.config.loader.StrategyConfigLoader

Bases: LoggerMixin

High-level strategy config loader for live trading.

Wraps ConfigMergeService with strategy-specific validation and entity conversion. Thread-safe lazy initialization.

Parameters:

Name Type Description Default
config_service ConfigMergeService | None

Optional ConfigMergeService for DI

None
bucket str | None

S3 bucket name (required, pass explicitly via DI)

None
prefix str

S3 key prefix for strategy configs

'strategies/'
Example

loader = StrategyConfigLoader(bucket="tradai-configs-dev") config = loader.load("pascal-strategy") config.name 'pascal-strategy'

With runtime overrides

config = loader.load_with_overrides( ... "pascal-strategy", ... {"timeframe": "4h", "parameters": {"rsi_period": 21}} ... )

Source code in libs/tradai-common/src/tradai/common/config/loader.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
class StrategyConfigLoader(LoggerMixin):
    """High-level strategy config loader for live trading.

    Wraps ConfigMergeService with strategy-specific validation
    and entity conversion. Thread-safe lazy initialization.

    Args:
        config_service: Optional ConfigMergeService for DI
        bucket: S3 bucket name (required, pass explicitly via DI)
        prefix: S3 key prefix for strategy configs

    Example:
        >>> loader = StrategyConfigLoader(bucket="tradai-configs-dev")
        >>> config = loader.load("pascal-strategy")
        >>> config.name
        'pascal-strategy'

        >>> # With runtime overrides
        >>> config = loader.load_with_overrides(
        ...     "pascal-strategy",
        ...     {"timeframe": "4h", "parameters": {"rsi_period": 21}}
        ... )
    """

    REQUIRED_FIELDS = ["name", "version", "timeframe"]

    # Expected tags on MLflow model versions
    CONFIG_FILE_TAG = "strategy.configuration_file"
    TIMEFRAME_TAG = "strategy.timeframe"
    PAIRS_TAG = "strategy.pairs"
    WARMUP_DAYS_TAG = "strategy.warmup_days"
    # FreqAI tags (LM003)
    FREQAI_ENABLED_TAG = "strategy.freqai_enabled"
    FREQAI_MODEL_URI_TAG = "strategy.freqai_model_uri"

    # Tag extractor configuration: (tag_key, target_path, parser, log_name)
    # target_path uses dot notation for nested keys (e.g., "freqai.enabled")
    _TAG_EXTRACTORS: list[tuple[str, str, str, str]] = [
        (TIMEFRAME_TAG, "timeframe", "str", "timeframe"),
        (PAIRS_TAG, "pairs", "csv_list", "pairs"),
        (WARMUP_DAYS_TAG, "parameters.warmup_days", "int", "warmup_days"),
        (FREQAI_ENABLED_TAG, "freqai.enabled", "bool", "freqai.enabled"),
        (FREQAI_MODEL_URI_TAG, "freqai.model_uri", "str", "freqai.model_uri"),
    ]

    def __init__(
        self,
        config_service: ConfigMergeService | None = None,
        mlflow_adapter: MLflowAdapter | None = None,
        bucket: str | None = None,
        prefix: str = "strategies/",
    ) -> None:
        """Initialize StrategyConfigLoader.

        Args:
            config_service: Optional ConfigMergeService for dependency injection
            mlflow_adapter: Optional MLflowAdapter for loading configs from registry
            bucket: S3 bucket name (required, pass explicitly via DI)
            prefix: S3 key prefix for strategy configs
        """
        super().__init__()
        self._config_service = config_service
        self._mlflow_adapter = mlflow_adapter
        self._bucket = bucket
        self._prefix = prefix
        self._service_lock = threading.Lock()

    @property
    def config_service(self) -> ConfigMergeService:
        """Get or create ConfigMergeService (thread-safe lazy init).

        Returns:
            ConfigMergeService instance

        Raises:
            ValidationError: If no bucket configured
        """
        if self._config_service is None:
            with self._service_lock:
                if self._config_service is None:
                    if self._bucket is None:
                        raise ValidationError("StrategyConfigLoader requires bucket parameter")
                    self._config_service = ConfigMergeService(
                        bucket=self._bucket,
                        prefix=self._prefix,
                    )
        return self._config_service

    def _load_raw_config(self, strategy_name: str) -> dict[str, Any]:
        """Load raw config dict from S3 (DRY helper).

        Args:
            strategy_name: Strategy identifier

        Returns:
            Raw configuration dictionary

        Raises:
            NotFoundError: If strategy config doesn't exist
        """
        try:
            return self.config_service.load_config(strategy_name)
        except Exception as e:
            error_str = str(e).lower()
            if "nosuchkey" in error_str or "not found" in error_str:
                safe_name = sanitize_for_display(strategy_name)
                raise NotFoundError(f"Strategy config not found: {safe_name}") from e
            raise

    def load(self, strategy_name: str) -> StrategyConfig:
        """Load and validate strategy configuration from S3.

        Args:
            strategy_name: Strategy identifier (e.g., "pascal-strategy")

        Returns:
            Validated StrategyConfig entity

        Raises:
            NotFoundError: If strategy config doesn't exist
            ValidationError: If config fails validation
        """
        self.logger.info(f"Loading strategy config: {strategy_name}")

        data = self._load_raw_config(strategy_name)

        # Validate and convert to entity
        result = self.validate(data)
        if not result.valid:
            raise ValidationError(
                f"Invalid strategy config '{strategy_name}': {'; '.join(result.errors)}"
            )

        config = StrategyConfig.from_dict(data)
        self.logger.debug(f"Loaded strategy config: {config.name} v{config.version}")
        return config

    def load_with_overrides(
        self,
        strategy_name: str,
        overrides: dict[str, Any],
    ) -> StrategyConfig:
        """Load strategy config and apply runtime overrides.

        Useful for live trading containers that need to modify
        parameters at runtime (e.g., different timeframe, pairs).

        Args:
            strategy_name: Strategy identifier
            overrides: Override values to deep-merge

        Returns:
            Validated StrategyConfig with overrides applied

        Raises:
            NotFoundError: If strategy config doesn't exist
            ValidationError: If merged config fails validation
        """
        self.logger.info(f"Loading strategy config with overrides: {strategy_name}")

        base_data = self._load_raw_config(strategy_name)

        # Apply overrides via deep merge
        merged_data = self.config_service.apply_overrides(base_data, overrides)

        # Validate merged config
        result = self.validate(merged_data)
        if not result.valid:
            raise ValidationError(
                f"Invalid merged config '{strategy_name}': {'; '.join(result.errors)}"
            )

        config = StrategyConfig.from_dict(merged_data)
        self.logger.debug(f"Loaded strategy config with overrides: {config.name} v{config.version}")
        return config

    def validate(self, config: dict[str, Any]) -> ValidationResult:
        """Validate config against StrategyConfig schema.

        Args:
            config: Configuration dictionary to validate

        Returns:
            ValidationResult with errors/warnings
        """
        return self.config_service.validate_config(
            config,
            required_fields=self.REQUIRED_FIELDS,
            schema=StrategyConfig,
        )

    def list_strategies(self) -> list[str]:
        """List available strategy configurations.

        Returns:
            List of strategy names (without .yaml extension)
        """
        configs = self.config_service.list_configs()
        self.logger.debug(f"Found {len(configs)} strategy configs")
        return configs

    def exists(self, strategy_name: str) -> bool:
        """Check if strategy configuration exists.

        Args:
            strategy_name: Strategy identifier

        Returns:
            True if config exists, False otherwise

        Raises:
            ExternalServiceError: If S3 is unavailable or access denied
        """
        try:
            return self.config_service.config_repository.exists(strategy_name)
        except Exception as e:
            error_str = str(e).lower()
            # Only treat "not found" type errors as non-existence
            if "nosuchkey" in error_str or "not found" in error_str:
                self.logger.debug(f"Strategy config not found: {strategy_name}")
                return False
            # Log and re-raise other errors (access denied, network, etc.)
            self.logger.error(f"Failed to check if strategy exists: {e}")
            raise ExternalServiceError(
                f"Failed to check if strategy config exists: {strategy_name}"
            ) from e

    # =========================================================================
    # MLflow Integration Methods (LF002B)
    # =========================================================================

    def load_from_mlflow(
        self,
        strategy_name: str,
        stage: str = ModelStage.PRODUCTION.value,
    ) -> StrategyConfig:
        """Load strategy config via MLflow Model Registry.

        Flow:
        1. Get model version from MLflow registry (by name + stage)
        2. Extract 'strategy.configuration_file' tag for S3 path
        3. Extract metadata tags (timeframe, pairs, warmup_days)
        4. Load config from S3 using extracted path
        5. Apply extracted metadata as overrides
        6. Return validated StrategyConfig

        Args:
            strategy_name: Registered model name in MLflow (e.g., "PascalStrategy")
            stage: Model stage to load from ("Production", "Staging", "None")

        Returns:
            Validated StrategyConfig

        Raises:
            ExternalServiceError: If MLflow unavailable or API error
            NotFoundError: If model or config not found
            ValidationError: If config is invalid

        Example:
            >>> config = loader.load_from_mlflow("PascalStrategy", stage="Production")
            >>> config.name
            'PascalStrategy'
        """
        if self._mlflow_adapter is None:
            raise ExternalServiceError(
                "MLflowAdapter not configured. Pass mlflow_adapter to __init__ or "
                "use load() for direct S3 loading."
            )

        self.logger.info(f"Loading strategy config from MLflow: {strategy_name} ({stage})")

        # Get model version from registry
        model_version = self._resolve_model_version(strategy_name, stage)

        # Extract config path and metadata from tags
        config_name, overrides = self._extract_strategy_tags(model_version)

        # Load base config from S3
        base_data = self._load_raw_config(config_name)

        # Apply extracted metadata as overrides
        if overrides:
            merged_data = self.config_service.apply_overrides(base_data, overrides)
        else:
            merged_data = base_data

        # Validate merged config
        result = self.validate(merged_data)
        if not result.valid:
            raise ValidationError(
                f"Invalid config from MLflow '{strategy_name}': {'; '.join(result.errors)}"
            )

        config = StrategyConfig.from_dict(merged_data)
        self.logger.info(
            f"Loaded strategy config from MLflow: {config.name} v{config.version} ({stage})"
        )
        return config

    def load_with_fallback(
        self,
        strategy_name: str,
        stage: str = ModelStage.PRODUCTION.value,
    ) -> StrategyConfig:
        """Try loading from MLflow first, fall back to S3 on failure.

        Useful for environments where MLflow may be unavailable or when
        graceful degradation is preferred.

        Args:
            strategy_name: Strategy identifier
            stage: MLflow stage to try first ("Production", "Staging")

        Returns:
            Validated StrategyConfig

        Example:
            >>> # Tries MLflow registry first, then falls back to S3
            >>> config = loader.load_with_fallback("pascal-strategy")
        """
        if self._mlflow_adapter and self._mlflow_adapter.is_available():
            try:
                return self.load_from_mlflow(strategy_name, stage)
            except (ExternalServiceError, NotFoundError) as e:
                self.logger.warning(f"MLflow loading failed, falling back to S3: {e}")
        return self.load(strategy_name)

    def _resolve_model_version(
        self,
        strategy_name: str,
        stage: str,
    ) -> ModelVersion:
        """Get model version from MLflow registry by stage.

        Args:
            strategy_name: Registered model name
            stage: Target stage ("Production", "Staging", "Archived", "None")

        Returns:
            ModelVersion entity matching the requested stage

        Raises:
            NotFoundError: If no version found for stage
            ExternalServiceError: If MLflow API error
        """
        if self._mlflow_adapter is None:
            raise ExternalServiceError("MLflowAdapter not configured")

        try:
            model = self._mlflow_adapter.get_registered_model(strategy_name)

            if not model.latest_versions:
                safe_name = sanitize_for_display(strategy_name)
                raise NotFoundError(f"No versions found for model '{safe_name}'")

            # Normalize stage for comparison (MLflow uses capitalized stages)
            target_stage = stage.capitalize() if stage.lower() != "none" else "None"

            # Find version matching requested stage
            for version in model.latest_versions:
                version_stage = version.current_stage or "None"
                if version_stage.lower() == target_stage.lower():
                    self.logger.debug(
                        f"Found model version {version.name} v{version.version} "
                        f"in stage '{version_stage}'"
                    )
                    return version

            # No exact stage match - log warning and return latest if stage is "None"
            if target_stage.lower() == "none":
                self.logger.debug(
                    f"Returning latest version for stage 'None': v{model.latest_versions[0].version}"
                )
                return model.latest_versions[0]

            # Stage not found - this is an error
            safe_name = sanitize_for_display(strategy_name)
            available_stages = [v.current_stage or "None" for v in model.latest_versions]
            raise NotFoundError(
                f"No version found for model '{safe_name}' in stage '{target_stage}'. "
                f"Available stages: {available_stages}"
            )

        except NotFoundError:
            raise
        except ExternalServiceError:
            raise
        except Exception as e:
            safe_name = sanitize_for_display(strategy_name)
            raise ExternalServiceError(f"Failed to get model version for '{safe_name}': {e}") from e

    def _extract_strategy_tags(
        self,
        model_version: ModelVersion,
    ) -> tuple[str, dict[str, Any]]:
        """Extract config path and metadata from model version tags.

        Expected tags on the model version:
        - strategy.configuration_file: S3 key or URI for config (optional, falls back to source)
        - strategy.timeframe: Trading timeframe (e.g., "1h") (optional)
        - strategy.pairs: Comma-separated trading pairs (optional)
        - strategy.warmup_days: Days of historical data needed (optional)

        Args:
            model_version: ModelVersion entity from MLflow

        Returns:
            Tuple of (config_name, metadata_overrides)

        Raises:
            NotFoundError: If config name cannot be determined
        """
        tags = model_version.tags
        overrides: dict[str, Any] = {}

        # Extract config file name from tags or fall back to source URI
        config_name = self._extract_config_name(model_version, tags)

        # Extract optional metadata overrides using table-driven approach
        self._apply_tag_extractors(tags, overrides)

        return config_name, overrides

    def _extract_config_name(self, model_version: ModelVersion, tags: dict[str, str]) -> str:
        """Extract config name from tags or source URI."""
        config_tag_value = tags.get(self.CONFIG_FILE_TAG)
        if not config_tag_value:
            if not model_version.source:
                raise NotFoundError("Model source missing for config extraction")
            return self._extract_config_from_source(model_version.source)

        # Check if tag value is a URI that needs conversion
        uri_prefixes = ("mlflow-artifacts:/", "runs:/", "s3://", "models:/")
        if config_tag_value.startswith(uri_prefixes):
            converter = MLflowURIConverter(mlflow_adapter=self._mlflow_adapter)
            config_name = converter.extract_config_path(config_tag_value)
            self.logger.debug(
                f"Converted URI tag to config name: {config_tag_value} -> {config_name}"
            )
        else:
            config_name = config_tag_value.replace(".yaml", "").replace(".json", "")
            self.logger.debug(f"Using config file from tag: {config_name}")

        return config_name

    def _apply_tag_extractors(self, tags: dict[str, str], overrides: dict[str, Any]) -> None:
        """Apply tag extractors to populate overrides dict."""
        for tag_key, target_path, parser, log_name in self._TAG_EXTRACTORS:
            if tag_key not in tags:
                continue

            value = self._parse_tag_value(tags[tag_key], parser, tag_key)
            if value is None:
                continue  # Parse failed, warning already logged

            self._set_nested_value(overrides, target_path, value)
            self.logger.debug(f"Override {log_name} from tag: {value}")

    def _parse_tag_value(self, raw_value: str, parser: str, tag_key: str) -> Any:
        """Parse tag value according to parser type."""
        if parser == "str":
            return raw_value
        if parser == "csv_list":
            return [p.strip() for p in raw_value.split(",") if p.strip()]
        if parser == "int":
            try:
                return int(raw_value)
            except ValueError:
                self.logger.warning(f"Invalid int value in tag {tag_key}: {raw_value}")
                return None
        if parser == "bool":
            return raw_value.lower() in ("true", "1", "yes")
        return raw_value  # Fallback to string

    def _set_nested_value(self, overrides: dict[str, Any], path: str, value: Any) -> None:
        """Set value in nested dict using dot notation path."""
        keys = path.split(".")
        current = overrides
        for key in keys[:-1]:
            if key not in current:
                current[key] = {}
            current = current[key]
        current[keys[-1]] = value

    def _extract_config_from_source(self, source: str) -> str:
        """Extract config name from model source URI.

        Issue 3 Fix: Now supports multiple URI formats via MLflowURIConverter:
        - runs:/{run_id}/model -> Extract from run tags (strategy.configuration_file)
        - s3://bucket/strategies/pascal-strategy/v1.0.0 -> pascal-strategy
        - s3://bucket/models/PascalStrategy -> PascalStrategy
        - models:/{model_name}/{version} -> model_name

        Args:
            source: Model source URI

        Returns:
            Extracted config name

        Raises:
            NotFoundError: If config name cannot be extracted
        """
        converter = MLflowURIConverter(mlflow_adapter=self._mlflow_adapter)
        return converter.extract_config_path(source)

config_service: ConfigMergeService property

Get or create ConfigMergeService (thread-safe lazy init).

Returns:

Type Description
ConfigMergeService

ConfigMergeService instance

Raises:

Type Description
ValidationError

If no bucket configured

load(strategy_name: str) -> StrategyConfig

Load and validate strategy configuration from S3.

Parameters:

Name Type Description Default
strategy_name str

Strategy identifier (e.g., "pascal-strategy")

required

Returns:

Type Description
StrategyConfig

Validated StrategyConfig entity

Raises:

Type Description
NotFoundError

If strategy config doesn't exist

ValidationError

If config fails validation

Source code in libs/tradai-common/src/tradai/common/config/loader.py
def load(self, strategy_name: str) -> StrategyConfig:
    """Load and validate strategy configuration from S3.

    Args:
        strategy_name: Strategy identifier (e.g., "pascal-strategy")

    Returns:
        Validated StrategyConfig entity

    Raises:
        NotFoundError: If strategy config doesn't exist
        ValidationError: If config fails validation
    """
    self.logger.info(f"Loading strategy config: {strategy_name}")

    data = self._load_raw_config(strategy_name)

    # Validate and convert to entity
    result = self.validate(data)
    if not result.valid:
        raise ValidationError(
            f"Invalid strategy config '{strategy_name}': {'; '.join(result.errors)}"
        )

    config = StrategyConfig.from_dict(data)
    self.logger.debug(f"Loaded strategy config: {config.name} v{config.version}")
    return config

load_with_overrides(strategy_name: str, overrides: dict[str, Any]) -> StrategyConfig

Load strategy config and apply runtime overrides.

Useful for live trading containers that need to modify parameters at runtime (e.g., different timeframe, pairs).

Parameters:

Name Type Description Default
strategy_name str

Strategy identifier

required
overrides dict[str, Any]

Override values to deep-merge

required

Returns:

Type Description
StrategyConfig

Validated StrategyConfig with overrides applied

Raises:

Type Description
NotFoundError

If strategy config doesn't exist

ValidationError

If merged config fails validation

Source code in libs/tradai-common/src/tradai/common/config/loader.py
def load_with_overrides(
    self,
    strategy_name: str,
    overrides: dict[str, Any],
) -> StrategyConfig:
    """Load strategy config and apply runtime overrides.

    Useful for live trading containers that need to modify
    parameters at runtime (e.g., different timeframe, pairs).

    Args:
        strategy_name: Strategy identifier
        overrides: Override values to deep-merge

    Returns:
        Validated StrategyConfig with overrides applied

    Raises:
        NotFoundError: If strategy config doesn't exist
        ValidationError: If merged config fails validation
    """
    self.logger.info(f"Loading strategy config with overrides: {strategy_name}")

    base_data = self._load_raw_config(strategy_name)

    # Apply overrides via deep merge
    merged_data = self.config_service.apply_overrides(base_data, overrides)

    # Validate merged config
    result = self.validate(merged_data)
    if not result.valid:
        raise ValidationError(
            f"Invalid merged config '{strategy_name}': {'; '.join(result.errors)}"
        )

    config = StrategyConfig.from_dict(merged_data)
    self.logger.debug(f"Loaded strategy config with overrides: {config.name} v{config.version}")
    return config

validate(config: dict[str, Any]) -> ValidationResult

Validate config against StrategyConfig schema.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary to validate

required

Returns:

Type Description
ValidationResult

ValidationResult with errors/warnings

Source code in libs/tradai-common/src/tradai/common/config/loader.py
def validate(self, config: dict[str, Any]) -> ValidationResult:
    """Validate config against StrategyConfig schema.

    Args:
        config: Configuration dictionary to validate

    Returns:
        ValidationResult with errors/warnings
    """
    return self.config_service.validate_config(
        config,
        required_fields=self.REQUIRED_FIELDS,
        schema=StrategyConfig,
    )

list_strategies() -> list[str]

List available strategy configurations.

Returns:

Type Description
list[str]

List of strategy names (without .yaml extension)

Source code in libs/tradai-common/src/tradai/common/config/loader.py
def list_strategies(self) -> list[str]:
    """List available strategy configurations.

    Returns:
        List of strategy names (without .yaml extension)
    """
    configs = self.config_service.list_configs()
    self.logger.debug(f"Found {len(configs)} strategy configs")
    return configs

exists(strategy_name: str) -> bool

Check if strategy configuration exists.

Parameters:

Name Type Description Default
strategy_name str

Strategy identifier

required

Returns:

Type Description
bool

True if config exists, False otherwise

Raises:

Type Description
ExternalServiceError

If S3 is unavailable or access denied

Source code in libs/tradai-common/src/tradai/common/config/loader.py
def exists(self, strategy_name: str) -> bool:
    """Check if strategy configuration exists.

    Args:
        strategy_name: Strategy identifier

    Returns:
        True if config exists, False otherwise

    Raises:
        ExternalServiceError: If S3 is unavailable or access denied
    """
    try:
        return self.config_service.config_repository.exists(strategy_name)
    except Exception as e:
        error_str = str(e).lower()
        # Only treat "not found" type errors as non-existence
        if "nosuchkey" in error_str or "not found" in error_str:
            self.logger.debug(f"Strategy config not found: {strategy_name}")
            return False
        # Log and re-raise other errors (access denied, network, etc.)
        self.logger.error(f"Failed to check if strategy exists: {e}")
        raise ExternalServiceError(
            f"Failed to check if strategy config exists: {strategy_name}"
        ) from e

load_from_mlflow(strategy_name: str, stage: str = ModelStage.PRODUCTION.value) -> StrategyConfig

Load strategy config via MLflow Model Registry.

Flow: 1. Get model version from MLflow registry (by name + stage) 2. Extract 'strategy.configuration_file' tag for S3 path 3. Extract metadata tags (timeframe, pairs, warmup_days) 4. Load config from S3 using extracted path 5. Apply extracted metadata as overrides 6. Return validated StrategyConfig

Parameters:

Name Type Description Default
strategy_name str

Registered model name in MLflow (e.g., "PascalStrategy")

required
stage str

Model stage to load from ("Production", "Staging", "None")

PRODUCTION.value

Returns:

Type Description
StrategyConfig

Validated StrategyConfig

Raises:

Type Description
ExternalServiceError

If MLflow unavailable or API error

NotFoundError

If model or config not found

ValidationError

If config is invalid

Example

config = loader.load_from_mlflow("PascalStrategy", stage="Production") config.name 'PascalStrategy'

Source code in libs/tradai-common/src/tradai/common/config/loader.py
def load_from_mlflow(
    self,
    strategy_name: str,
    stage: str = ModelStage.PRODUCTION.value,
) -> StrategyConfig:
    """Load strategy config via MLflow Model Registry.

    Flow:
    1. Get model version from MLflow registry (by name + stage)
    2. Extract 'strategy.configuration_file' tag for S3 path
    3. Extract metadata tags (timeframe, pairs, warmup_days)
    4. Load config from S3 using extracted path
    5. Apply extracted metadata as overrides
    6. Return validated StrategyConfig

    Args:
        strategy_name: Registered model name in MLflow (e.g., "PascalStrategy")
        stage: Model stage to load from ("Production", "Staging", "None")

    Returns:
        Validated StrategyConfig

    Raises:
        ExternalServiceError: If MLflow unavailable or API error
        NotFoundError: If model or config not found
        ValidationError: If config is invalid

    Example:
        >>> config = loader.load_from_mlflow("PascalStrategy", stage="Production")
        >>> config.name
        'PascalStrategy'
    """
    if self._mlflow_adapter is None:
        raise ExternalServiceError(
            "MLflowAdapter not configured. Pass mlflow_adapter to __init__ or "
            "use load() for direct S3 loading."
        )

    self.logger.info(f"Loading strategy config from MLflow: {strategy_name} ({stage})")

    # Get model version from registry
    model_version = self._resolve_model_version(strategy_name, stage)

    # Extract config path and metadata from tags
    config_name, overrides = self._extract_strategy_tags(model_version)

    # Load base config from S3
    base_data = self._load_raw_config(config_name)

    # Apply extracted metadata as overrides
    if overrides:
        merged_data = self.config_service.apply_overrides(base_data, overrides)
    else:
        merged_data = base_data

    # Validate merged config
    result = self.validate(merged_data)
    if not result.valid:
        raise ValidationError(
            f"Invalid config from MLflow '{strategy_name}': {'; '.join(result.errors)}"
        )

    config = StrategyConfig.from_dict(merged_data)
    self.logger.info(
        f"Loaded strategy config from MLflow: {config.name} v{config.version} ({stage})"
    )
    return config

load_with_fallback(strategy_name: str, stage: str = ModelStage.PRODUCTION.value) -> StrategyConfig

Try loading from MLflow first, fall back to S3 on failure.

Useful for environments where MLflow may be unavailable or when graceful degradation is preferred.

Parameters:

Name Type Description Default
strategy_name str

Strategy identifier

required
stage str

MLflow stage to try first ("Production", "Staging")

PRODUCTION.value

Returns:

Type Description
StrategyConfig

Validated StrategyConfig

Example

Tries MLflow registry first, then falls back to S3

config = loader.load_with_fallback("pascal-strategy")

Source code in libs/tradai-common/src/tradai/common/config/loader.py
def load_with_fallback(
    self,
    strategy_name: str,
    stage: str = ModelStage.PRODUCTION.value,
) -> StrategyConfig:
    """Try loading from MLflow first, fall back to S3 on failure.

    Useful for environments where MLflow may be unavailable or when
    graceful degradation is preferred.

    Args:
        strategy_name: Strategy identifier
        stage: MLflow stage to try first ("Production", "Staging")

    Returns:
        Validated StrategyConfig

    Example:
        >>> # Tries MLflow registry first, then falls back to S3
        >>> config = loader.load_with_fallback("pascal-strategy")
    """
    if self._mlflow_adapter and self._mlflow_adapter.is_available():
        try:
            return self.load_from_mlflow(strategy_name, stage)
        except (ExternalServiceError, NotFoundError) as e:
            self.logger.warning(f"MLflow loading failed, falling back to S3: {e}")
    return self.load(strategy_name)