Source code for adaptive_executor.policies
"""Policies for determining the target number of workers."""
from typing import List
from .utils import get_logger
logger = get_logger(__name__)
[docs]
class MultiCriterionPolicy:
"""A policy that combines multiple scaling criteria.
This policy takes the minimum worker count from all criteria and ensures
it doesn't exceed the hard cap.
"""
[docs]
def __init__(self, criteria: List[object], hard_cap: int):
"""Initialize the policy with scaling criteria and a hard cap.
Args:
criteria: List of scaling criteria objects that implement max_workers()
hard_cap: Maximum number of workers allowed
"""
if not criteria:
raise ValueError("At least one criterion is required")
if hard_cap < 1:
raise ValueError("Hard cap must be at least 1")
self.criteria = criteria
self.hard_cap = hard_cap
logger.debug(
"Initialized MultiCriterionPolicy with %d criteria and hard cap of %d",
len(criteria),
hard_cap,
)
[docs]
def target_workers(self) -> int:
"""Calculate the target number of workers based on all criteria.
Returns:
int: The target number of workers, between 1 and hard_cap (inclusive)
"""
try:
limits = []
for criterion in self.criteria:
try:
limit = criterion.max_workers()
limits.append(limit)
logger.debug(
"Criterion %s suggested %d workers",
criterion.__class__.__name__,
limit,
)
except Exception as e:
logger.error(
"Error getting worker limit from %s: %s",
criterion.__class__.__name__,
str(e),
exc_info=True,
)
# Use a safe default if a criterion fails
limits.append(1)
if not limits:
logger.warning("No valid criteria results, using 1 worker")
return 1
min_limit = min(limits)
result = max(1, min(min_limit, self.hard_cap))
logger.debug(
"Calculated target workers: min_limit=%d, hard_cap=%d, result=%d",
min_limit,
self.hard_cap,
result,
)
return result
except Exception as e:
logger.error("Error calculating target workers: %s", str(e), exc_info=True)
return 1 # Fallback to minimum workers on error