Skip to main content
Advertisement

concurrent.futures — High-Level Parallel Execution

concurrent.futures provides a unified interface for thread pools and process pools. It abstracts the low-level APIs of threading and multiprocessing so you can write concise, powerful parallel code.


ThreadPoolExecutor — Thread Pool

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import urllib.request


def fetch_url(url: str) -> tuple[str, int]:
"""Download URL (I/O-bound)"""
try:
with urllib.request.urlopen(url, timeout=5) as resp:
return url, len(resp.read())
except Exception as e:
return url, -1


urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/get",
"https://httpbin.org/uuid",
]

# submit: submit individual tasks
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_url, url): url for url in urls}

for future in as_completed(futures):
url = futures[future]
try:
_, size = future.result()
print(f"Done: {url}{size} bytes")
except Exception as e:
print(f"Error: {url}{e}")


# map: preserves order, simple interface
def square(n: int) -> int:
time.sleep(0.01)
return n * n


with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square, range(10)))
print(f"Squares: {results}")

ProcessPoolExecutor — Process Pool

from concurrent.futures import ProcessPoolExecutor, as_completed
import time


def cpu_intensive(n: int) -> int:
"""CPU-bound task"""
return sum(i * i for i in range(n))


def find_primes(limit: int) -> list[int]:
"""Sieve of Eratosthenes"""
sieve = [True] * (limit + 1)
sieve[0] = sieve[1] = False
for i in range(2, int(limit**0.5) + 1):
if sieve[i]:
for j in range(i*i, limit + 1, i):
sieve[j] = False
return [i for i in range(2, limit + 1) if sieve[i]]


if __name__ == "__main__":
# Find primes over multiple ranges (in parallel)
limits = [100_000, 200_000, 300_000, 400_000]

start = time.perf_counter()
with ProcessPoolExecutor() as executor:
futures = {executor.submit(find_primes, limit): limit for limit in limits}
for future in as_completed(futures):
limit = futures[future]
primes = future.result()
print(f"Limit {limit}: {len(primes)} primes")
elapsed = time.perf_counter() - start
print(f"Parallel: {elapsed:.2f}s")


# chunksize: optimize map performance
numbers = list(range(1, 100_001))
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_intensive, numbers, chunksize=1000))
print(f"Done: {len(results)} results")

Future Object In Depth

from concurrent.futures import ThreadPoolExecutor, Future, wait, FIRST_COMPLETED, ALL_COMPLETED
import time
import random


def task(task_id: int, delay: float) -> dict:
time.sleep(delay)
return {"id": task_id, "delay": delay, "result": task_id ** 2}


with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(task, i, random.uniform(0.5, 2.0))
for i in range(8)
]

# wait: block until a condition is met
done, not_done = wait(futures, timeout=1.0, return_when=FIRST_COMPLETED)

print(f"Completed within 1s: {len(done)}")
print(f"Still running: {len(not_done)}")

# Process completed ones first
for future in done:
print(f" Done: {future.result()}")

# Wait for the rest
wait(not_done, return_when=ALL_COMPLETED)
for future in not_done:
print(f" Finally done: {future.result()}")


# add_done_callback: callback on completion
def on_complete(future: Future) -> None:
if future.exception():
print(f"Error: {future.exception()}")
else:
print(f"Callback fired: {future.result()}")


with ThreadPoolExecutor() as executor:
f = executor.submit(task, 99, 0.1)
f.add_done_callback(on_complete)

Using with asyncio

import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time


def cpu_work(n: int) -> int:
"""CPU-bound — run in process pool"""
return sum(range(n))


def blocking_io(path: str) -> str:
"""Blocking I/O — run in thread pool"""
time.sleep(0.1) # Simulate file read
return f"read: {path}"


async def main():
loop = asyncio.get_event_loop()

# Process pool: run CPU-bound work asynchronously
with ProcessPoolExecutor() as proc_pool:
result = await loop.run_in_executor(proc_pool, cpu_work, 50_000_000)
print(f"CPU result: {result}")

# Thread pool: run blocking I/O asynchronously
with ThreadPoolExecutor(max_workers=10) as thread_pool:
tasks = [
loop.run_in_executor(thread_pool, blocking_io, f"/data/file_{i}.txt")
for i in range(5)
]
results = await asyncio.gather(*tasks)
for r in results:
print(r)


asyncio.run(main())

Real-World Pattern: Retry + Parallel Processing

from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import time
import random


@dataclass
class TaskResult:
task_id: int
success: bool
result: any = None
attempts: int = 1
error: str = ""


def unreliable_task(task_id: int) -> int:
"""Task that fails 40% of the time"""
if random.random() < 0.4:
raise ConnectionError(f"Task {task_id} connection failed")
time.sleep(random.uniform(0.01, 0.1))
return task_id * 10


def task_with_retry(task_id: int, max_retries: int = 3) -> TaskResult:
for attempt in range(1, max_retries + 1):
try:
result = unreliable_task(task_id)
return TaskResult(task_id=task_id, success=True, result=result, attempts=attempt)
except ConnectionError as e:
if attempt == max_retries:
return TaskResult(task_id=task_id, success=False, error=str(e), attempts=attempt)
time.sleep(0.05 * attempt) # Exponential backoff


if __name__ == "__main__":
task_ids = list(range(20))
results = []

with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(task_with_retry, tid): tid for tid in task_ids}
for future in as_completed(futures):
results.append(future.result())

success = [r for r in results if r.success]
failed = [r for r in results if not r.success]
avg_attempts = sum(r.attempts for r in results) / len(results)

print(f"Success: {len(success)}/{len(task_ids)}")
print(f"Failed: {len(failed)}")
print(f"Avg attempts: {avg_attempts:.2f}")

Comparison Summary

ToolBest ForNotes
ThreadPoolExecutorI/O-boundThread reuse, context manager
ProcessPoolExecutorCPU-boundProcess reuse, bypasses GIL
as_completed()Process in completion orderFastest-first processing
executor.map()Order-preserving processingSimple interface
wait()Conditional waitingFIRST_COMPLETED, etc.

concurrent.futures is cleaner than raw threading/multiprocessing, and integrates easily with asyncio via run_in_executor(). It is the recommended default for new projects.

Advertisement