API Reference#

Core Classes#

AdaptiveExecutor#

class adaptive_executor.AdaptiveExecutor(max_workers, policy, check_interval=60)[source]#

Bases: object

__init__(max_workers, policy, check_interval=60)[source]#
submit(fn: Callable, *args, **kwargs) None[source]#

Submit a task to be executed by the worker pool.

Parameters:
  • fn – The function to execute

  • *args – Positional arguments to pass to the function

  • **kwargs – Keyword arguments to pass to the function

join(timeout: float = None) bool[source]#

Wait until all tasks in the queue are processed.

Parameters:

timeout – Maximum time to wait in seconds

Returns:

True if all tasks completed, False if timed out

Return type:

bool

shutdown() None[source]#

Shut down the executor and all worker threads.

MultiCriterionPolicy#

class adaptive_executor.MultiCriterionPolicy(criteria: List[object], hard_cap: int)[source]#

Bases: object

A policy that combines multiple scaling criteria.

This policy takes the minimum worker count from all criteria and ensures it doesn’t exceed the hard cap.

__init__(criteria: List[object], hard_cap: int)[source]#

Initialize the policy with scaling criteria and a hard cap.

Parameters:
  • criteria – List of scaling criteria objects that implement max_workers()

  • hard_cap – Maximum number of workers allowed

target_workers() int[source]#

Calculate the target number of workers based on all criteria.

Returns:

The target number of workers, between 1 and hard_cap (inclusive)

Return type:

int

Scaling Criteria#

ScalingCriterion#

class adaptive_executor.ScalingCriterion[source]#

Bases: object

Base class for all scaling criteria.

Subclasses must implement the max_workers() method to define their scaling logic.

max_workers() int[source]#

Calculate the maximum number of workers based on this criterion.

Returns:

The maximum number of workers (must be at least 1)

Return type:

int

Raises:

NotImplementedError – If the subclass doesn’t implement this method

to_dict() Dict[str, Any][source]#

Serialize criterion to a dictionary.

Returns:

A dictionary representation of the criterion

Return type:

Dict[str, Any]

Raises:

NotImplementedError – If the subclass doesn’t implement this method

classmethod from_dict(data: Dict[str, Any]) C[source]#

Deserialize criterion from a dictionary.

Parameters:

data – Dictionary containing serialized criterion data

Returns:

An instance of the criterion

Raises:
  • NotImplementedError – If the subclass doesn’t implement this method

  • ValueError – If the data is invalid

to_json() str[source]#

Serialize criterion to a JSON string.

Returns:

JSON string representation of the criterion

Return type:

str

Raises:

json.JSONEncodeError – If the criterion cannot be serialized to JSON

classmethod from_json(json_str: str) C[source]#

Deserialize criterion from a JSON string.

Parameters:

json_str – JSON string containing serialized criterion data

Returns:

An instance of the criterion

Raises:
  • json.JSONDecodeError – If the JSON string is invalid

  • ValueError – If the data is invalid

  • NotImplementedError – If the criterion type is not supported

TimeCriterion#

class adaptive_executor.TimeCriterion(worker_count: int, active_start: time, active_end: time, timezone: str = 'UTC')[source]#

Bases: ScalingCriterion

A criterion that scales workers based on time of day.

This criterion returns the configured worker count if the current time of day falls within the specified time range (inclusive start, exclusive end), otherwise returns 1 (minimum workers).

__init__(worker_count: int, active_start: time, active_end: time, timezone: str = 'UTC')[source]#

Initialize the time-based criterion.

Parameters:
  • worker_count – Number of workers to use during active hours

  • active_start – Start time of the active period (time of day)

  • active_end – End time of the active period (time of day)

  • timezone – Timezone for the active period (default: “UTC”)

Raises:
  • ImportError – If pytz is not installed

  • ValueError – If worker_count is less than 1 or time values are invalid

  • TypeError – If active_start or active_end are not time objects

max_workers() int[source]#

Get the maximum number of workers based on the current time.

Returns:

self.worker_count if current time is within active hours, else 1

Return type:

int

to_dict() Dict[str, Any][source]#

Serialize criterion to a dictionary.

Returns:

A dictionary representation of the criterion

Return type:

Dict[str, Any]

Raises:

NotImplementedError – If the subclass doesn’t implement this method

classmethod from_dict(data: Dict[str, Any]) TimeCriterion[source]#

Deserialize criterion from a dictionary.

Parameters:

data – Dictionary containing serialized criterion data

Returns:

An instance of the criterion

Raises:
  • NotImplementedError – If the subclass doesn’t implement this method

  • ValueError – If the data is invalid

CpuCriterion#

class adaptive_executor.CpuCriterion(threshold: float, workers: int)[source]#

Bases: ScalingCriterion

A criterion that scales workers based on CPU usage.

This criterion returns the configured worker count if the current CPU usage is above the threshold, otherwise returns 1 (minimum workers).

__init__(threshold: float, workers: int)[source]#

Initialize the CPU-based criterion.

Parameters:
  • threshold – CPU usage threshold (0-100) above which to scale up

  • workers – Number of workers to use when CPU usage is above threshold

Raises:
  • ImportError – If psutil is not installed

  • ValueError – If threshold is not between 0 and 100 or workers < 1

max_workers() int[source]#

Get the maximum number of workers based on current CPU usage.

Returns:

self.workers if CPU usage >= threshold, else 1

Return type:

int

to_dict() Dict[str, Any][source]#

Serialize the criterion to a dictionary.

Returns:

Dictionary containing the criterion’s state

Return type:

Dict[str, Any]

classmethod from_dict(data: Dict[str, Any]) CpuCriterion[source]#

Create a CpuCriterion from a dictionary.

Parameters:

data – Dictionary containing ‘threshold’ and ‘workers’ keys

Returns:

A new instance of CpuCriterion

Return type:

CpuCriterion

Raises:
  • KeyError – If required keys are missing

  • ValueError – If values are invalid

MemoryCriterion#

class adaptive_executor.MemoryCriterion(threshold: float, workers: int)[source]#

Bases: ScalingCriterion

A criterion that scales workers based on memory usage.

__init__(threshold: float, workers: int)[source]#

Initialize the memory-based criterion.

Parameters:
  • threshold – Memory usage threshold (0-100) above which to scale up

  • workers – Number of workers to use when memory usage is above threshold

Raises:
  • ImportError – If psutil is not installed

  • ValueError – If threshold is not between 0 and 100 or workers < 1

max_workers() int[source]#

Get the maximum number of workers based on current memory usage.

Returns:

self.workers if memory usage >= threshold, else 1

Return type:

int

to_dict() Dict[str, Any][source]#

Serialize the criterion to a dictionary.

Returns:

Dictionary containing the criterion’s state

Return type:

Dict[str, Any]

classmethod from_dict(data: Dict[str, Any]) MemoryCriterion[source]#

Create a MemoryCriterion from a dictionary.

Parameters:

data – Dictionary containing ‘threshold’ and ‘workers’ keys

Returns:

A new instance of MemoryCriterion

Return type:

MemoryCriterion

Raises:
  • KeyError – If required keys are missing

  • ValueError – If values are invalid