Source code for adaptive_executor.criteria.time
"""Time-based scaling criterion."""
import datetime
import importlib.util
from typing import Any, Dict
import pytz
from ..utils import get_logger
from .base import ScalingCriterion
logger = get_logger(__name__)
[docs]
class TimeCriterion(ScalingCriterion):
"""A criterion that scales workers based on time of day.
This criterion returns the configured worker count if the current time of day
falls within the specified time range (inclusive start, exclusive end), otherwise
returns 1 (minimum workers).
"""
[docs]
def __init__(
self,
worker_count: int,
active_start: datetime.time,
active_end: datetime.time,
timezone: str = "UTC",
):
"""Initialize the time-based criterion.
Args:
worker_count: Number of workers to use during active hours
active_start: Start time of the active period (time of day)
active_end: End time of the active period (time of day)
timezone: Timezone for the active period (default: "UTC")
Raises:
ImportError: If pytz is not installed
ValueError: If worker_count is less than 1 or time values are invalid
TypeError: If active_start or active_end are not time objects
"""
if not importlib.util.find_spec("pytz"):
error_msg = (
"TimeCriterion requires 'pytz' package. "
"Install with: pip install adaptive-executor[time]"
)
logger.error(error_msg)
raise ImportError(error_msg)
# Validate types
if not isinstance(active_start, datetime.time):
raise TypeError("active_start must be a datetime.time instance")
if not isinstance(active_end, datetime.time):
raise TypeError("active_end must be a datetime.time instance")
# Validate worker count
if worker_count < 1:
raise ValueError("worker_count must be at least 1")
# Set up timezone
try:
self.tz = pytz.timezone(timezone)
except pytz.exceptions.UnknownTimeZoneError as e:
error_msg = f"Invalid timezone: {timezone}"
logger.error(error_msg)
raise ValueError(error_msg) from e
# Store the time objects and other attributes
self.active_start = active_start
self.active_end = active_end
self.worker_count = worker_count
# Log the configured time window
logger.debug(
"Initialized TimeCriterion: worker_count=%d, active_window=%s to %s %s",
worker_count,
active_start.strftime("%H:%M:%S"),
active_end.strftime("%H:%M:%S"),
timezone,
)
[docs]
def max_workers(self) -> int:
"""Get the maximum number of workers based on the current time.
Returns:
int: self.worker_count if current time is within active hours, else 1
"""
try:
now = datetime.datetime.now(self.tz)
current_time = now.time()
# Handle time ranges that cross midnight
if self.active_start <= self.active_end:
# Normal range: start <= end (e.g., 9:00 to 17:00)
is_active = self.active_start <= current_time <= self.active_end
else:
# Cross-midnight range: start > end (e.g., 22:00 to 06:00)
is_active = (
current_time >= self.active_start or current_time < self.active_end
)
if is_active:
logger.debug(
"TimeCriterion: Active time %s-%s, current time %s -> %d workers",
self.active_start.strftime("%H:%M"),
self.active_end.strftime("%H:%M"),
current_time.strftime("%H:%M"),
self.worker_count,
)
return self.worker_count
else:
logger.debug(
"TimeCriterion: Outside active time %s-%s, "
"current time %s -> 1 worker",
self.active_start.strftime("%H:%M"),
self.active_end.strftime("%H:%M"),
current_time.strftime("%H:%M"),
)
return 1 # Minimum workers outside time range
except Exception as e:
logger.error(
"Error in TimeCriterion.max_workers: %s", str(e), exc_info=True
)
return 1 # Fallback to minimum workers on error
[docs]
def to_dict(self) -> Dict[str, Any]:
return {
"type": "TimeCriterion",
"worker_count": self.worker_count,
"active_start": self.active_start.isoformat(),
"active_end": self.active_end.isoformat(),
"timezone": self.tz.zone,
}
[docs]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TimeCriterion":
# Parse time objects from ISO format
active_start = datetime.time.fromisoformat(data["active_start"])
active_end = datetime.time.fromisoformat(data["active_end"])
return cls(
worker_count=data["worker_count"],
active_start=active_start,
active_end=active_end,
timezone=data["timezone"],
)