concurrent.futures — High-Level Parallel Execution
concurrent.futures provides a unified interface for thread pools and process pools. It abstracts the low-level APIs of threading and multiprocessing so you can write concise, powerful parallel code.
ThreadPoolExecutor — Thread Pool
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import urllib.request
def fetch_url(url: str) -> tuple[str, int]:
"""Download URL (I/O-bound)"""
try:
with urllib.request.urlopen(url, timeout=5) as resp:
return url, len(resp.read())
except Exception as e:
return url, -1
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/get",
"https://httpbin.org/uuid",
]
# submit: submit individual tasks
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
_, size = future.result()
print(f"Done: {url} — {size} bytes")
except Exception as e:
print(f"Error: {url} — {e}")
# map: preserves order, simple interface
def square(n: int) -> int:
time.sleep(0.01)
return n * n
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square, range(10)))
print(f"Squares: {results}")
ProcessPoolExecutor — Process Pool
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def cpu_intensive(n: int) -> int:
"""CPU-bound task"""
return sum(i * i for i in range(n))
def find_primes(limit: int) -> list[int]:
"""Sieve of Eratosthenes"""
sieve = [True] * (limit + 1)
sieve[0] = sieve[1] = False
for i in range(2, int(limit**0.5) + 1):
if sieve[i]:
for j in range(i*i, limit + 1, i):
sieve[j] = False
return [i for i in range(2, limit + 1) if sieve[i]]
if __name__ == "__main__":
# Find primes over multiple ranges (in parallel)
limits = [100_000, 200_000, 300_000, 400_000]
start = time.perf_counter()
with ProcessPoolExecutor() as executor:
futures = {executor.submit(find_primes, limit): limit for limit in limits}
for future in as_completed(futures):
limit = futures[future]
primes = future.result()
print(f"Limit {limit}: {len(primes)} primes")
elapsed = time.perf_counter() - start
print(f"Parallel: {elapsed:.2f}s")
# chunksize: optimize map performance
numbers = list(range(1, 100_001))
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_intensive, numbers, chunksize=1000))
print(f"Done: {len(results)} results")
Future Object In Depth
from concurrent.futures import ThreadPoolExecutor, Future, wait, FIRST_COMPLETED, ALL_COMPLETED
import time
import random
def task(task_id: int, delay: float) -> dict:
time.sleep(delay)
return {"id": task_id, "delay": delay, "result": task_id ** 2}
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(task, i, random.uniform(0.5, 2.0))
for i in range(8)
]
# wait: block until a condition is met
done, not_done = wait(futures, timeout=1.0, return_when=FIRST_COMPLETED)
print(f"Completed within 1s: {len(done)}")
print(f"Still running: {len(not_done)}")
# Process completed ones first
for future in done:
print(f" Done: {future.result()}")
# Wait for the rest
wait(not_done, return_when=ALL_COMPLETED)
for future in not_done:
print(f" Finally done: {future.result()}")
# add_done_callback: callback on completion
def on_complete(future: Future) -> None:
if future.exception():
print(f"Error: {future.exception()}")
else:
print(f"Callback fired: {future.result()}")
with ThreadPoolExecutor() as executor:
f = executor.submit(task, 99, 0.1)
f.add_done_callback(on_complete)
Using with asyncio
import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
def cpu_work(n: int) -> int:
"""CPU-bound — run in process pool"""
return sum(range(n))
def blocking_io(path: str) -> str:
"""Blocking I/O — run in thread pool"""
time.sleep(0.1) # Simulate file read
return f"read: {path}"
async def main():
loop = asyncio.get_event_loop()
# Process pool: run CPU-bound work asynchronously
with ProcessPoolExecutor() as proc_pool:
result = await loop.run_in_executor(proc_pool, cpu_work, 50_000_000)
print(f"CPU result: {result}")
# Thread pool: run blocking I/O asynchronously
with ThreadPoolExecutor(max_workers=10) as thread_pool:
tasks = [
loop.run_in_executor(thread_pool, blocking_io, f"/data/file_{i}.txt")
for i in range(5)
]
results = await asyncio.gather(*tasks)
for r in results:
print(r)
asyncio.run(main())
Real-World Pattern: Retry + Parallel Processing
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import time
import random
@dataclass
class TaskResult:
task_id: int
success: bool
result: any = None
attempts: int = 1
error: str = ""
def unreliable_task(task_id: int) -> int:
"""Task that fails 40% of the time"""
if random.random() < 0.4:
raise ConnectionError(f"Task {task_id} connection failed")
time.sleep(random.uniform(0.01, 0.1))
return task_id * 10
def task_with_retry(task_id: int, max_retries: int = 3) -> TaskResult:
for attempt in range(1, max_retries + 1):
try:
result = unreliable_task(task_id)
return TaskResult(task_id=task_id, success=True, result=result, attempts=attempt)
except ConnectionError as e:
if attempt == max_retries:
return TaskResult(task_id=task_id, success=False, error=str(e), attempts=attempt)
time.sleep(0.05 * attempt) # Exponential backoff
if __name__ == "__main__":
task_ids = list(range(20))
results = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(task_with_retry, tid): tid for tid in task_ids}
for future in as_completed(futures):
results.append(future.result())
success = [r for r in results if r.success]
failed = [r for r in results if not r.success]
avg_attempts = sum(r.attempts for r in results) / len(results)
print(f"Success: {len(success)}/{len(task_ids)}")
print(f"Failed: {len(failed)}")
print(f"Avg attempts: {avg_attempts:.2f}")
Comparison Summary
| Tool | Best For | Notes |
|---|---|---|
ThreadPoolExecutor | I/O-bound | Thread reuse, context manager |
ProcessPoolExecutor | CPU-bound | Process reuse, bypasses GIL |
as_completed() | Process in completion order | Fastest-first processing |
executor.map() | Order-preserving processing | Simple interface |
wait() | Conditional waiting | FIRST_COMPLETED, etc. |
concurrent.futures is cleaner than raw threading/multiprocessing, and integrates easily with asyncio via run_in_executor(). It is the recommended default for new projects.