Examples#

This section contains practical examples of using Adaptive Executor in various scenarios.

Time-based Scaling#

"""
Time-based Scaling Example
=========================
"""
# This example demonstrates how to use TimeCriterion to scale workers
# based on time of day. Perfect for applications that should be more
# aggressive during off-peak hours.

import datetime
import logging
import time
from datetime import time as dt_time

from adaptive_executor import (
    AdaptiveExecutor,
    MultiCriterionPolicy,
    TimeCriterion,
    setup_logger,
)

# Configure logging for the example
setup_logger(level=logging.DEBUG)
logger = logging.getLogger("example.time_scaling")


def process_task(task_id: int, executor_name: str):
    """Simulate a task that runs for some time."""
    logger.debug("Starting task %d for %s", task_id, executor_name)
    start_time = datetime.datetime.now()

Scale workers based on time of day. Perfect for applications that should be more aggressive during off-peak hours.

Resource-based Scaling#

"""
Resource-based Scaling Example
===========================

This example demonstrates how to use CpuCriterion and MemoryCriterion to scale workers
based on system resource usage. Perfect for resource-intensive applications.
"""

import random

from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import CpuCriterion, MemoryCriterion, MultiCriterion


def main():
    # Create resource-based criteria
    # Scale to 4 workers when CPU >= 80%
    cpu_criterion = CpuCriterion(threshold=80.0, workers=4)

    # Scale to 6 workers when memory >= 85%
    memory_criterion = MemoryCriterion(threshold=85.0, workers=6)

    # Combine criteria with OR logic (scale if either resource is high)
    resource_criterion = MultiCriterion(
        criteria=[(cpu_criterion, 4), (memory_criterion, 6)], logic="or"
    )
    resource_policy = MultiCriterionPolicy(criteria=[resource_criterion], hard_cap=8)

    # Create executor with resource-based scaling
    executor = AdaptiveExecutor(
        max_workers=8,
        policy=resource_policy,
        check_interval=15,  # Check every 15 seconds for resources
    )

Scale workers based on CPU and memory usage. Ideal for resource-intensive applications.

Complex Logic#

"""
Complex Scaling Example
====================

This example demonstrates advanced scaling with MultiCriterion using AND logic.
Perfect for applications that need multiple conditions to be met simultaneously.
"""

from datetime import time as dt_time

from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import MemoryCriterion, MultiCriterion, TimeCriterion


def main():
    # Create criteria for complex logic
    # Time criterion: 10PM-3AM window
    time_criterion = TimeCriterion(
        worker_count=2,
        active_start=dt_time(22, 0),
        active_end=dt_time(3, 0),
        timezone="UTC",
    )

    # Memory criterion: >80% usage
    memory_criterion = MemoryCriterion(threshold=80.0, workers=2)

    # Multi-criterion with AND logic
    # BOTH time window AND memory threshold must be met to scale to 2 workers
    complex_criterion = MultiCriterion(
        criteria=[(time_criterion, 2), (memory_criterion, 2)], logic="and"
    )
    complex_policy = MultiCriterionPolicy(criteria=[complex_criterion], hard_cap=6)

    # Create executor with complex scaling

Combine multiple criteria with AND logic. Perfect for applications that need multiple conditions met simultaneously.

Conditional Scaling#

"""
Conditional Scaling Example
========================

This example demonstrates ConditionalCriterion for applying different scaling
based on dynamic conditions.
"""

from datetime import time as dt_time

from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import (
    ConditionalCriterion,
    MemoryCriterion,
    TimeCriterion,
)


def main():
    # Create condition and action criteria
    # Condition: Check if memory usage is high
    memory_condition = MemoryCriterion(threshold=80.0, workers=2)

    # Action: Use time-based scaling when condition is met
    time_action = TimeCriterion(
        worker_count=4,
        active_start=dt_time(20, 0),
        active_end=dt_time(6, 0),
        timezone="UTC",
    )

    # Conditional criterion
    # If memory > 80%, use time-based scaling (4 workers)
    # Otherwise, use memory criterion behavior (2 workers)
    conditional_policy = ConditionalCriterion(

Apply different scaling based on dynamic conditions. Great for adaptive behavior.

Configuration Management#

"""
Configuration Management Example
=============================

This example demonstrates how to save and load scaling criteria configurations
using JSON serialization. Perfect for production deployments.
"""

import json
from datetime import time as dt_time

from adaptive_executor import AdaptiveExecutor
from adaptive_executor.criteria import CpuCriterion, MemoryCriterion, TimeCriterion
from adaptive_executor.policies import MultiCriterionPolicy


def save_configuration(criteria, config_file):
    """Save criteria configuration to JSON file"""
    config = {
        "criteria": [criterion.to_dict() for criterion in criteria],
        "metadata": {"version": "1.0", "created_at": "2024-01-01T00:00:00Z"},
    }

    with open(config_file, "w") as f:
        json.dump(config, f, indent=2)

    print(f"Configuration saved to {config_file}")


def load_configuration(config_file):
    """Load criteria configuration from JSON file"""
    with open(config_file, "r") as f:
        config = json.load(f)

    # Recreate criteria from saved configuration

Save and load scaling configurations using JSON serialization. Essential for production deployments.

Legacy Examples#

"""Basic usage example for adaptive-executor with enhanced logging."""

import logging
import time
from datetime import time as datetime_time

from adaptive_executor import (
    AdaptiveExecutor,
    MultiCriterionPolicy,
    TimeCriterion,
    setup_logger,
)

# Configure logging for the example
setup_logger(level=logging.DEBUG)
logger = logging.getLogger("example.basic_usage")


def simple_task(task_id):
    """A simple task that simulates work."""
    logger.debug("Processing task %s", task_id)
    time.sleep(1)
    return f"Completed task {task_id}"


def main():
    """Main function demonstrating basic usage of the adaptive executor."""
    logger.info("Starting basic usage example")

    try:

Web Scraping with Time-Based Scaling#

import random
import time
from datetime import time as dt_time

from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import TimeCriterion


def scrape_url(url):
    """Simulate web scraping with variable duration."""
    print(f"Scraping {url}")

    # Simulate network latency and processing time
    processing_time = random.uniform(0.5, 2.0)
    time.sleep(processing_time)

    # Simulate successful scrape
    data_size = random.randint(1000, 5000)
    print(f"  Scraped {data_size} bytes from {url} in {processing_time:.2f}s")

    return {"url": url, "size": data_size, "time": processing_time}


def main():
    print("=== Web Scraping with Time-Based Scaling ===")

Data Processing with Resource Awareness#

import random
import time

from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import CpuCriterion, MemoryCriterion


def process_data_chunk(chunk_id, data_size):
    """Simulate processing a data chunk with CPU and memory usage."""
    print(f"Processing chunk {chunk_id} ({data_size} items)")

    # Simulate CPU-intensive processing
    processing_steps = data_size // 100
    for step in range(processing_steps):
        # Simulate computation
        _ = sum(range(100))
        time.sleep(0.01)

    # Simulate memory usage
    result_data = [random.random() for _ in range(data_size)]
    processed_count = len(result_data)

    print(f"  Processed {processed_count} items from chunk {chunk_id}")
    return {
        "chunk_id": chunk_id,
        "processed_count": processed_count,
        "processing_time": processing_steps * 0.01,
    }


Multi-Criteria Scaling#

import random
import time
from datetime import time as dt_time

from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import CpuCriterion, MemoryCriterion, TimeCriterion


def background_task(task_id, duration):
    """Simulate a background task with variable workload."""
    print(f"Task {task_id} started (duration: {duration:.1f}s)")

    # Simulate mixed workload
    for i in range(int(duration * 10)):
        # CPU work
        _ = sum(range(50))
        time.sleep(0.1)

    print(f"Task {task_id} completed")
    return f"Task {task_id} result"


def main():
    print("=== Multi-Criteria Scaling Example ===")

    # Create multiple scaling criteria
    time_policy = TimeCriterion(
        worker_count=12,  # More workers at night
        active_start=dt_time(20, 0),  # 8 PM to 6 AM
        active_end=dt_time(6, 0),
        timezone="UTC",
    )

    cpu_policy = CpuCriterion(threshold=75, workers=3)
    memory_policy = MemoryCriterion(threshold=80, workers=3)

Custom Scaling Criteria#

import random
import time
from datetime import datetime

import pytz

from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import ScalingCriterion


class LoadBasedCriterion(ScalingCriterion):
    """Custom scaling criterion based on current task load."""

    def __init__(self, low_threshold=5, high_threshold=15):
        self.low_threshold = low_threshold
        self.high_threshold = high_threshold
        self.pending_tasks = 0
        self.completed_tasks = 0

    def update_metrics(self, pending, completed):
        """Update internal metrics for scaling decisions."""
        self.pending_tasks = pending
        self.completed_tasks = completed

    def max_workers(self):
        """Calculate optimal workers based on current load."""
        if self.pending_tasks == 0:
            return 2  # Minimum workers when no work

        load_ratio = self.pending_tasks / max(self.completed_tasks, 1)

        if load_ratio > 3.0:  # High load
            return min(self.high_threshold, self.pending_tasks + 2)
        elif load_ratio > 1.5:  # Medium load
            return max(6, self.pending_tasks)
        else:  # Low load
            return max(3, self.low_threshold)


class TimeOfDayCriterion(ScalingCriterion):
    """Custom time-based criterion with more granular control."""

    def __init__(self, timezone="UTC"):
        self.tz = pytz.timezone(timezone)

Running Examples#

  1. Install Adaptive Executor with all features:

pip install adaptive-executor[standard]
  1. Run any example:

python examples/basic_usage.py

For more examples and details, see the examples/ directory in the repository.