Source code for adaptive_executor.criteria.cpu
"""CPU-based scaling criterion."""
import importlib.util
from typing import Any, Dict, Type
from ..utils import get_logger
from .base import ScalingCriterion
logger = get_logger(__name__)
[docs]
class CpuCriterion(ScalingCriterion):
"""A criterion that scales workers based on CPU usage.
This criterion returns the configured worker count if the current CPU usage
is above the threshold, otherwise returns 1 (minimum workers).
"""
[docs]
def __init__(self, threshold: float, workers: int):
"""Initialize the CPU-based criterion.
Args:
threshold: CPU usage threshold (0-100) above which to scale up
workers: Number of workers to use when CPU usage is above threshold
Raises:
ImportError: If psutil is not installed
ValueError: If threshold is not between 0 and 100 or workers < 1
"""
if not importlib.util.find_spec("psutil"):
error_msg = (
"CpuCriterion requires 'psutil' package. "
"Install with: pip install adaptive-executor[cpu]"
)
logger.error(error_msg)
raise ImportError(error_msg)
if not (0 <= threshold <= 100):
error_msg = f"threshold must be between 0 and 100, got {threshold}"
logger.error(error_msg)
raise ValueError(error_msg)
if workers < 1:
error_msg = f"workers must be at least 1, got {workers}"
logger.error(error_msg)
raise ValueError(error_msg)
self.threshold = threshold
self.workers = workers
logger.debug(
"Initialized CpuCriterion: threshold=%.1f%%, workers=%d", threshold, workers
)
[docs]
def max_workers(self) -> int:
"""Get the maximum number of workers based on current CPU usage.
Returns:
int: self.workers if CPU usage >= threshold, else 1
"""
try:
import psutil
# Get CPU usage with a short interval for more accurate reading
cpu_percent = psutil.cpu_percent(interval=0.1)
is_above_threshold = cpu_percent >= self.threshold
if is_above_threshold:
logger.debug(
"CpuCriterion: CPU usage %.1f%% >= %.1f%% -> %d workers",
cpu_percent,
self.threshold,
self.workers,
)
return self.workers
else:
logger.debug(
"CpuCriterion: CPU usage %.1f%% < %.1f%% -> 1 worker",
cpu_percent,
self.threshold,
)
return 1
except Exception as e:
logger.error("Error in CpuCriterion.max_workers: %s", str(e), exc_info=True)
return 1 # Fallback to minimum workers on error
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Serialize the criterion to a dictionary.
Returns:
Dict[str, Any]: Dictionary containing the criterion's state
"""
try:
return {
"type": "CpuCriterion",
"threshold": self.threshold,
"workers": self.workers,
}
except Exception as e:
logger.error(
"Error serializing CpuCriterion to dict: %s", str(e), exc_info=True
)
raise
[docs]
@classmethod
def from_dict(cls: Type["CpuCriterion"], data: Dict[str, Any]) -> "CpuCriterion":
"""Create a CpuCriterion from a dictionary.
Args:
data: Dictionary containing 'threshold' and 'workers' keys
Returns:
CpuCriterion: A new instance of CpuCriterion
Raises:
KeyError: If required keys are missing
ValueError: If values are invalid
"""
try:
return cls(threshold=data["threshold"], workers=data["workers"])
except KeyError as e:
logger.error("Missing required key in CpuCriterion data: %s", str(e))
raise
except Exception as e:
logger.error(
"Error creating CpuCriterion from dict: %s", str(e), exc_info=True
)
raise