Quick Start#

Get up and running with Adaptive Executor in minutes. This guide will show you how to create intelligent thread pools that automatically scale based on system resources and time-based policies.

Basic Usage#

The simplest way to use Adaptive Executor is with a basic scaling policy:

from datetime import time
from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import TimeCriterion, CpuCriterion

# Create criteria for scaling
time_criteria = TimeCriterion(
    worker_count=8,
    active_start=time(22, 0),
    active_end=time(3, 0),
    timezone="UTC",
)
cpu_criteria = CpuCriterion(threshold=75.0, workers=4)

# Combine criteria with a policy
policy = MultiCriterionPolicy([time_criteria, cpu_criteria], hard_cap=10)

# Create and use the executor
executor = AdaptiveExecutor(max_workers=15, policy=policy, check_interval=30)

# Submit tasks
def my_task(task_id):
    print(f"Processing task {task_id}")

for i in range(5):
    executor.submit(my_task, i)

# Wait for all tasks to complete
executor.join()
executor.shutdown()

This example creates an executor that: * Uses 8 workers between 10PM-3AM, 1 worker otherwise * Scales to 4 workers when CPU usage >= 75% * Never exceeds 10 workers (hard cap) * Checks scaling conditions every 30 seconds

Scaling Criteria#

Time-based scaling#

Optimize performance based on time of day - perfect for applications that should be more aggressive during off-peak hours:

from datetime import time
from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import TimeCriterion

# Scale based on time of day
time_crit = TimeCriterion(
    worker_count=10,       # Aggressive during off-peak hours
    active_start=time(22, 0),
    active_end=time(6, 0),
    timezone="America/New_York",
)

# Use with executor
policy = MultiCriterionPolicy([time_crit], hard_cap=12)
executor = AdaptiveExecutor(max_workers=20, policy=policy)

Use cases: * Web scraping (more aggressive when target servers are less busy) * Data processing (batch jobs during off-peak hours) * Background maintenance tasks

CPU-based scaling#

Automatically reduce concurrency when system resources are constrained:

from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import CpuCriterion

# Scale based on CPU usage
cpu_crit = CpuCriterion(threshold=80, workers=4)
# Uses 4 workers when CPU >= 80%, otherwise 1 worker

policy = MultiCriterionPolicy([cpu_crit], hard_cap=15)
executor = AdaptiveExecutor(max_workers=20, policy=policy)

Benefits: * Prevents CPU overload during intensive tasks * Maintains system responsiveness * Automatically adapts to varying workloads

Memory-based scaling#

Protect your system from memory exhaustion during data-intensive operations:

from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import MemoryCriterion

# Scale based on memory usage
mem_crit = MemoryCriterion(threshold=85, workers=4)
# Uses 4 workers when memory >= 85%, otherwise 1 worker

policy = MultiCriterionPolicy([mem_crit], hard_cap=15)
executor = AdaptiveExecutor(max_workers=20, policy=policy)

Ideal for: * Large file processing * Data analysis pipelines * Image/video processing workflows

Advanced Configuration#

Multi-criteria Scaling#

Combine multiple criteria for sophisticated scaling behavior:

from datetime import time
from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import TimeCriterion, CpuCriterion, MemoryCriterion

# Create multiple scaling criteria
time_policy = TimeCriterion(
    worker_count=12,
    active_start=time(20, 0),
    active_end=time(6, 0),
    timezone="UTC",
)
cpu_policy = CpuCriterion(threshold=75, workers=3)
memory_policy = MemoryCriterion(threshold=80, workers=3)

# Combine all criteria
policy = MultiCriterionPolicy(
    [time_policy, cpu_policy, memory_policy],
    hard_cap=15
)

# Create executor with frequent checking
executor = AdaptiveExecutor(
    max_workers=20,
    policy=policy,
    check_interval=45  # Check every 45 seconds
)

This creates an intelligent executor that: * Uses time-based scaling as the baseline * Reduces workers when CPU or memory are constrained * Never exceeds 15 workers regardless of conditions * Monitors system state every 45 seconds

Real-world Examples#

Web Scraping with Time Optimization#

import time
from datetime import time as dt_time
from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import TimeCriterion
import requests

def scrape_url(url):
    try:
        response = requests.get(url, timeout=10)
        print(f"Scraped {url}: {len(response.content)} bytes")
        return len(response.content)
    except Exception as e:
        print(f"Error scraping {url}: {e}")
        return 0

# Aggressive scraping during off-peak hours
time_policy = TimeCriterion(
    worker_count=20,  # Aggressive during off-peak
    active_start=dt_time(22, 0),
    active_end=dt_time(6, 0),
    timezone="America/New_York",
)

policy = MultiCriterionPolicy([time_policy], hard_cap=25)
executor = AdaptiveExecutor(max_workers=30, policy=policy, check_interval=60)

urls = [f"https://example.com/page/{i}" for i in range(100)]

start_time = time.time()
for url in urls:
    executor.submit(scrape_url, url)

executor.join()
print(f"Completed in {time.time() - start_time:.2f} seconds")
executor.shutdown()

Data Processing with Resource Awareness#

import pandas as pd
from adaptive_executor import AdaptiveExecutor, MultiCriterionPolicy
from adaptive_executor.criteria import CpuCriterion, MemoryCriterion

def process_chunk(chunk_file):
    try:
        df = pd.read_csv(chunk_file)
        processed = df.groupby('category').agg({'value': ['sum', 'mean']})
        output_file = chunk_file.replace('.csv', '_processed.csv')
        processed.to_csv(output_file)
        print(f"Processed {chunk_file} -> {output_file}")
        return len(df)
    except Exception as e:
        print(f"Error processing {chunk_file}: {e}")
        return 0

# Conservative scaling to protect system resources
cpu_policy = CpuCriterion(threshold=70, workers=4)
memory_policy = MemoryCriterion(threshold=75, workers=4)

policy = MultiCriterionPolicy([cpu_policy, memory_policy], hard_cap=8)
executor = AdaptiveExecutor(max_workers=12, policy=policy, check_interval=30)

chunk_files = [f"data/chunk_{i}.csv" for i in range(50)]

total_rows = 0
for chunk_file in chunk_files:
    executor.submit(process_chunk, chunk_file)

executor.join()
print(f"Processed {total_rows} total rows")
executor.shutdown()

Best Practices#

  • Start conservative: Begin with lower worker counts and monitor performance

  • Set appropriate check intervals: Too frequent checking adds overhead, too infrequent misses opportunities

  • Use hard caps: Always set a reasonable maximum to prevent system overload

  • Monitor system metrics: Use tools like htop or Activity Monitor to understand scaling behavior

  • Test different thresholds: Optimize CPU/memory thresholds for your specific workload

Next Steps#

Now that you understand the basics, explore these topics: