Examples#
This section contains practical examples of using Adaptive Executor in various scenarios.
Time-based Scaling#
"""
Time-based Scaling Example
=========================
"""
# This example demonstrates how to use TimeCriterion to scale workers
# based on time of day. Perfect for applications that should be more
# aggressive during off-peak hours.
import datetime
import logging
import time
from datetime import time as dt_time
from adaptive_executor import (
AdaptiveExecutor,
MultiCriterionPolicy,
TimeCriterion,
setup_logger,
)
# Configure logging for the example
setup_logger(level=logging.DEBUG)
logger = logging.getLogger("example.time_scaling")
def process_task(task_id: int, executor_name: str):
"""Simulate a task that runs for some time."""
logger.debug("Starting task %d for %s", task_id, executor_name)
start_time = datetime.datetime.now()
Scale workers based on time of day. Perfect for applications that should be more aggressive during off-peak hours.
Resource-based Scaling#
"""
Resource-based Scaling Example
===========================
This example demonstrates how to use CpuCriterion and MemoryCriterion to scale workers
based on system resource usage. Perfect for resource-intensive applications.
"""
import random
from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import CpuCriterion, MemoryCriterion, MultiCriterion
def main():
# Create resource-based criteria
# Scale to 4 workers when CPU >= 80%
cpu_criterion = CpuCriterion(threshold=80.0, workers=4)
# Scale to 6 workers when memory >= 85%
memory_criterion = MemoryCriterion(threshold=85.0, workers=6)
# Combine criteria with OR logic (scale if either resource is high)
resource_criterion = MultiCriterion(
criteria=[(cpu_criterion, 4), (memory_criterion, 6)], logic="or"
)
resource_policy = MultiCriterionPolicy(criteria=[resource_criterion], hard_cap=8)
# Create executor with resource-based scaling
executor = AdaptiveExecutor(
max_workers=8,
policy=resource_policy,
check_interval=15, # Check every 15 seconds for resources
)
Scale workers based on CPU and memory usage. Ideal for resource-intensive applications.
Complex Logic#
"""
Complex Scaling Example
====================
This example demonstrates advanced scaling with MultiCriterion using AND logic.
Perfect for applications that need multiple conditions to be met simultaneously.
"""
from datetime import time as dt_time
from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import MemoryCriterion, MultiCriterion, TimeCriterion
def main():
# Create criteria for complex logic
# Time criterion: 10PM-3AM window
time_criterion = TimeCriterion(
worker_count=2,
active_start=dt_time(22, 0),
active_end=dt_time(3, 0),
timezone="UTC",
)
# Memory criterion: >80% usage
memory_criterion = MemoryCriterion(threshold=80.0, workers=2)
# Multi-criterion with AND logic
# BOTH time window AND memory threshold must be met to scale to 2 workers
complex_criterion = MultiCriterion(
criteria=[(time_criterion, 2), (memory_criterion, 2)], logic="and"
)
complex_policy = MultiCriterionPolicy(criteria=[complex_criterion], hard_cap=6)
# Create executor with complex scaling
Combine multiple criteria with AND logic. Perfect for applications that need multiple conditions met simultaneously.
Conditional Scaling#
"""
Conditional Scaling Example
========================
This example demonstrates ConditionalCriterion for applying different scaling
based on dynamic conditions.
"""
from datetime import time as dt_time
from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import (
ConditionalCriterion,
MemoryCriterion,
TimeCriterion,
)
def main():
# Create condition and action criteria
# Condition: Check if memory usage is high
memory_condition = MemoryCriterion(threshold=80.0, workers=2)
# Action: Use time-based scaling when condition is met
time_action = TimeCriterion(
worker_count=4,
active_start=dt_time(20, 0),
active_end=dt_time(6, 0),
timezone="UTC",
)
# Conditional criterion
# If memory > 80%, use time-based scaling (4 workers)
# Otherwise, use memory criterion behavior (2 workers)
conditional_policy = ConditionalCriterion(
Apply different scaling based on dynamic conditions. Great for adaptive behavior.
Configuration Management#
"""
Configuration Management Example
=============================
This example demonstrates how to save and load scaling criteria configurations
using JSON serialization. Perfect for production deployments.
"""
import json
from datetime import time as dt_time
from adaptive_executor import AdaptiveExecutor
from adaptive_executor.criteria import CpuCriterion, MemoryCriterion, TimeCriterion
from adaptive_executor.policies import MultiCriterionPolicy
def save_configuration(criteria, config_file):
"""Save criteria configuration to JSON file"""
config = {
"criteria": [criterion.to_dict() for criterion in criteria],
"metadata": {"version": "1.0", "created_at": "2024-01-01T00:00:00Z"},
}
with open(config_file, "w") as f:
json.dump(config, f, indent=2)
print(f"Configuration saved to {config_file}")
def load_configuration(config_file):
"""Load criteria configuration from JSON file"""
with open(config_file, "r") as f:
config = json.load(f)
# Recreate criteria from saved configuration
Save and load scaling configurations using JSON serialization. Essential for production deployments.
Legacy Examples#
"""Basic usage example for adaptive-executor with enhanced logging."""
import logging
import time
from datetime import time as datetime_time
from adaptive_executor import (
AdaptiveExecutor,
MultiCriterionPolicy,
TimeCriterion,
setup_logger,
)
# Configure logging for the example
setup_logger(level=logging.DEBUG)
logger = logging.getLogger("example.basic_usage")
def simple_task(task_id):
"""A simple task that simulates work."""
logger.debug("Processing task %s", task_id)
time.sleep(1)
return f"Completed task {task_id}"
def main():
"""Main function demonstrating basic usage of the adaptive executor."""
logger.info("Starting basic usage example")
try:
Web Scraping with Time-Based Scaling#
import random
import time
from datetime import time as dt_time
from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import TimeCriterion
def scrape_url(url):
"""Simulate web scraping with variable duration."""
print(f"Scraping {url}")
# Simulate network latency and processing time
processing_time = random.uniform(0.5, 2.0)
time.sleep(processing_time)
# Simulate successful scrape
data_size = random.randint(1000, 5000)
print(f" Scraped {data_size} bytes from {url} in {processing_time:.2f}s")
return {"url": url, "size": data_size, "time": processing_time}
def main():
print("=== Web Scraping with Time-Based Scaling ===")
Data Processing with Resource Awareness#
import random
import time
from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import CpuCriterion, MemoryCriterion
def process_data_chunk(chunk_id, data_size):
"""Simulate processing a data chunk with CPU and memory usage."""
print(f"Processing chunk {chunk_id} ({data_size} items)")
# Simulate CPU-intensive processing
processing_steps = data_size // 100
for step in range(processing_steps):
# Simulate computation
_ = sum(range(100))
time.sleep(0.01)
# Simulate memory usage
result_data = [random.random() for _ in range(data_size)]
processed_count = len(result_data)
print(f" Processed {processed_count} items from chunk {chunk_id}")
return {
"chunk_id": chunk_id,
"processed_count": processed_count,
"processing_time": processing_steps * 0.01,
}
Multi-Criteria Scaling#
import random
import time
from datetime import time as dt_time
from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import CpuCriterion, MemoryCriterion, TimeCriterion
def background_task(task_id, duration):
"""Simulate a background task with variable workload."""
print(f"Task {task_id} started (duration: {duration:.1f}s)")
# Simulate mixed workload
for i in range(int(duration * 10)):
# CPU work
_ = sum(range(50))
time.sleep(0.1)
print(f"Task {task_id} completed")
return f"Task {task_id} result"
def main():
print("=== Multi-Criteria Scaling Example ===")
# Create multiple scaling criteria
time_policy = TimeCriterion(
worker_count=12, # More workers at night
active_start=dt_time(20, 0), # 8 PM to 6 AM
active_end=dt_time(6, 0),
timezone="UTC",
)
cpu_policy = CpuCriterion(threshold=75, workers=3)
memory_policy = MemoryCriterion(threshold=80, workers=3)
Custom Scaling Criteria#
import random
import time
from datetime import datetime
import pytz
from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import ScalingCriterion
class LoadBasedCriterion(ScalingCriterion):
"""Custom scaling criterion based on current task load."""
def __init__(self, low_threshold=5, high_threshold=15):
self.low_threshold = low_threshold
self.high_threshold = high_threshold
self.pending_tasks = 0
self.completed_tasks = 0
def update_metrics(self, pending, completed):
"""Update internal metrics for scaling decisions."""
self.pending_tasks = pending
self.completed_tasks = completed
def max_workers(self):
"""Calculate optimal workers based on current load."""
if self.pending_tasks == 0:
return 2 # Minimum workers when no work
load_ratio = self.pending_tasks / max(self.completed_tasks, 1)
if load_ratio > 3.0: # High load
return min(self.high_threshold, self.pending_tasks + 2)
elif load_ratio > 1.5: # Medium load
return max(6, self.pending_tasks)
else: # Low load
return max(3, self.low_threshold)
class TimeOfDayCriterion(ScalingCriterion):
"""Custom time-based criterion with more granular control."""
def __init__(self, timezone="UTC"):
self.tz = pytz.timezone(timezone)
Running Examples#
Install Adaptive Executor with all features:
pip install adaptive-executor[standard]
Run any example:
python examples/basic_usage.py
For more examples and details, see the examples/ directory in the repository.