Source code for adaptive_executor.criteria.base

"""Base class for all scaling criteria."""

import datetime
import json
from typing import Any, Dict, Type, TypeVar

from ..utils import get_logger

logger = get_logger(__name__)

# Type variable for criterion classes
C = TypeVar("C", bound="ScalingCriterion")


[docs] class ScalingCriterion: """Base class for all scaling criteria. Subclasses must implement the max_workers() method to define their scaling logic. """
[docs] def max_workers(self) -> int: """Calculate the maximum number of workers based on this criterion. Returns: int: The maximum number of workers (must be at least 1) Raises: NotImplementedError: If the subclass doesn't implement this method """ raise NotImplementedError("Subclasses must implement max_workers()")
[docs] def to_dict(self) -> Dict[str, Any]: """Serialize criterion to a dictionary. Returns: Dict[str, Any]: A dictionary representation of the criterion Raises: NotImplementedError: If the subclass doesn't implement this method """ raise NotImplementedError("Subclasses must implement to_dict()")
[docs] @classmethod def from_dict(cls: Type[C], data: Dict[str, Any]) -> C: """Deserialize criterion from a dictionary. Args: data: Dictionary containing serialized criterion data Returns: An instance of the criterion Raises: NotImplementedError: If the subclass doesn't implement this method ValueError: If the data is invalid """ raise NotImplementedError("Subclasses must implement from_dict()")
[docs] def to_json(self) -> str: """Serialize criterion to a JSON string. Returns: str: JSON string representation of the criterion Raises: json.JSONEncodeError: If the criterion cannot be serialized to JSON """ try: return json.dumps(self.to_dict()) except Exception as e: logger.error( "Failed to serialize criterion to JSON: %s", str(e), exc_info=True ) raise
[docs] @classmethod def from_json(cls: Type[C], json_str: str) -> C: """Deserialize criterion from a JSON string. Args: json_str: JSON string containing serialized criterion data Returns: An instance of the criterion Raises: json.JSONDecodeError: If the JSON string is invalid ValueError: If the data is invalid NotImplementedError: If the criterion type is not supported """ try: data = json.loads(json_str) return cls.from_dict(data) except json.JSONDecodeError as e: logger.error("Invalid JSON string: %s", str(e)) raise except Exception as e: logger.error( "Failed to deserialize criterion from JSON: %s", str(e), exc_info=True ) raise
@staticmethod def _parse_datetime(dt_val: str) -> datetime.datetime: """Parse a datetime from string or timestamp.""" return datetime.datetime.fromisoformat(dt_val) @staticmethod def _format_with_tz(dt: datetime.datetime) -> str: """Format a datetime with timezone if available.""" return dt.isoformat()