API Reference#
Core Classes#
AdaptiveExecutor#
- class adaptive_executor.AdaptiveExecutor(max_workers, policy, check_interval=60)[source]#
Bases:
object- submit(fn: Callable, *args, **kwargs) None[source]#
Submit a task to be executed by the worker pool.
- Parameters:
fn – The function to execute
*args – Positional arguments to pass to the function
**kwargs – Keyword arguments to pass to the function
MultiCriterionPolicy#
- class adaptive_executor.MultiCriterionPolicy(criteria: List[object], hard_cap: int)[source]#
Bases:
objectA policy that combines multiple scaling criteria.
This policy takes the minimum worker count from all criteria and ensures it doesn’t exceed the hard cap.
Scaling Criteria#
ScalingCriterion#
- class adaptive_executor.ScalingCriterion[source]#
Bases:
objectBase class for all scaling criteria.
Subclasses must implement the max_workers() method to define their scaling logic.
- max_workers() int[source]#
Calculate the maximum number of workers based on this criterion.
- Returns:
The maximum number of workers (must be at least 1)
- Return type:
int
- Raises:
NotImplementedError – If the subclass doesn’t implement this method
- to_dict() Dict[str, Any][source]#
Serialize criterion to a dictionary.
- Returns:
A dictionary representation of the criterion
- Return type:
Dict[str, Any]
- Raises:
NotImplementedError – If the subclass doesn’t implement this method
- classmethod from_dict(data: Dict[str, Any]) C[source]#
Deserialize criterion from a dictionary.
- Parameters:
data – Dictionary containing serialized criterion data
- Returns:
An instance of the criterion
- Raises:
NotImplementedError – If the subclass doesn’t implement this method
ValueError – If the data is invalid
- to_json() str[source]#
Serialize criterion to a JSON string.
- Returns:
JSON string representation of the criterion
- Return type:
str
- Raises:
json.JSONEncodeError – If the criterion cannot be serialized to JSON
- classmethod from_json(json_str: str) C[source]#
Deserialize criterion from a JSON string.
- Parameters:
json_str – JSON string containing serialized criterion data
- Returns:
An instance of the criterion
- Raises:
json.JSONDecodeError – If the JSON string is invalid
ValueError – If the data is invalid
NotImplementedError – If the criterion type is not supported
TimeCriterion#
- class adaptive_executor.TimeCriterion(worker_count: int, active_start: time, active_end: time, timezone: str = 'UTC')[source]#
Bases:
ScalingCriterionA criterion that scales workers based on time of day.
This criterion returns the configured worker count if the current time of day falls within the specified time range (inclusive start, exclusive end), otherwise returns 1 (minimum workers).
- __init__(worker_count: int, active_start: time, active_end: time, timezone: str = 'UTC')[source]#
Initialize the time-based criterion.
- Parameters:
worker_count – Number of workers to use during active hours
active_start – Start time of the active period (time of day)
active_end – End time of the active period (time of day)
timezone – Timezone for the active period (default: “UTC”)
- Raises:
ImportError – If pytz is not installed
ValueError – If worker_count is less than 1 or time values are invalid
TypeError – If active_start or active_end are not time objects
- max_workers() int[source]#
Get the maximum number of workers based on the current time.
- Returns:
self.worker_count if current time is within active hours, else 1
- Return type:
int
- to_dict() Dict[str, Any][source]#
Serialize criterion to a dictionary.
- Returns:
A dictionary representation of the criterion
- Return type:
Dict[str, Any]
- Raises:
NotImplementedError – If the subclass doesn’t implement this method
- classmethod from_dict(data: Dict[str, Any]) TimeCriterion[source]#
Deserialize criterion from a dictionary.
- Parameters:
data – Dictionary containing serialized criterion data
- Returns:
An instance of the criterion
- Raises:
NotImplementedError – If the subclass doesn’t implement this method
ValueError – If the data is invalid
CpuCriterion#
- class adaptive_executor.CpuCriterion(threshold: float, workers: int)[source]#
Bases:
ScalingCriterionA criterion that scales workers based on CPU usage.
This criterion returns the configured worker count if the current CPU usage is above the threshold, otherwise returns 1 (minimum workers).
- __init__(threshold: float, workers: int)[source]#
Initialize the CPU-based criterion.
- Parameters:
threshold – CPU usage threshold (0-100) above which to scale up
workers – Number of workers to use when CPU usage is above threshold
- Raises:
ImportError – If psutil is not installed
ValueError – If threshold is not between 0 and 100 or workers < 1
- max_workers() int[source]#
Get the maximum number of workers based on current CPU usage.
- Returns:
self.workers if CPU usage >= threshold, else 1
- Return type:
int
- to_dict() Dict[str, Any][source]#
Serialize the criterion to a dictionary.
- Returns:
Dictionary containing the criterion’s state
- Return type:
Dict[str, Any]
- classmethod from_dict(data: Dict[str, Any]) CpuCriterion[source]#
Create a CpuCriterion from a dictionary.
- Parameters:
data – Dictionary containing ‘threshold’ and ‘workers’ keys
- Returns:
A new instance of CpuCriterion
- Return type:
- Raises:
KeyError – If required keys are missing
ValueError – If values are invalid
MemoryCriterion#
- class adaptive_executor.MemoryCriterion(threshold: float, workers: int)[source]#
Bases:
ScalingCriterionA criterion that scales workers based on memory usage.
- __init__(threshold: float, workers: int)[source]#
Initialize the memory-based criterion.
- Parameters:
threshold – Memory usage threshold (0-100) above which to scale up
workers – Number of workers to use when memory usage is above threshold
- Raises:
ImportError – If psutil is not installed
ValueError – If threshold is not between 0 and 100 or workers < 1
- max_workers() int[source]#
Get the maximum number of workers based on current memory usage.
- Returns:
self.workers if memory usage >= threshold, else 1
- Return type:
int
- to_dict() Dict[str, Any][source]#
Serialize the criterion to a dictionary.
- Returns:
Dictionary containing the criterion’s state
- Return type:
Dict[str, Any]
- classmethod from_dict(data: Dict[str, Any]) MemoryCriterion[source]#
Create a MemoryCriterion from a dictionary.
- Parameters:
data – Dictionary containing ‘threshold’ and ‘workers’ keys
- Returns:
A new instance of MemoryCriterion
- Return type:
- Raises:
KeyError – If required keys are missing
ValueError – If values are invalid