threading — Thread-Based Concurrency
The threading module is effective for handling I/O-bound tasks (network, file, DB) concurrently. Use Lock, Event, and Queue to safely share data and synchronize between threads.
Thread Basics
import threading
import time
def worker(name: str, duration: float) -> None:
print(f"[{name}] Start")
time.sleep(duration)
print(f"[{name}] Done ({duration}s)")
# Create and start threads
t1 = threading.Thread(target=worker, args=("Worker-1", 1.0))
t2 = threading.Thread(target=worker, args=("Worker-2", 0.5))
t3 = threading.Thread(target=worker, args=("Worker-3", 1.5), daemon=True) # Daemon thread
t1.start()
t2.start()
t3.start()
t1.join() # Wait for completion
t2.join()
# t3 is not joined — exits when main thread ends
print("Main thread done")
print(f"Active threads: {threading.active_count()}")
# Thread subclassing
class DownloadThread(threading.Thread):
def __init__(self, url: str, name: str = ""):
super().__init__(name=name or url)
self.url = url
self.result: bytes | None = None
self.error: Exception | None = None
def run(self) -> None:
try:
import urllib.request
with urllib.request.urlopen(self.url, timeout=5) as resp:
self.result = resp.read()
except Exception as e:
self.error = e
@property
def success(self) -> bool:
return self.result is not None
Lock — Preventing Race Conditions
import threading
# Race condition example
class UnsafeCounter:
def __init__(self):
self.value = 0
def increment(self):
current = self.value
# ← Another thread can interleave here
self.value = current + 1
class SafeCounter:
def __init__(self):
self.value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock: # Critical section
self.value += 1
def get(self) -> int:
with self._lock:
return self.value
# Verification
def stress_test(counter, n: int = 10000):
threads = [threading.Thread(target=lambda: [counter.increment() for _ in range(100)])
for _ in range(n // 100)]
for t in threads: t.start()
for t in threads: t.join()
unsafe = UnsafeCounter()
safe = SafeCounter()
stress_test(unsafe, 10000)
stress_test(safe, 10000)
print(f"UnsafeCounter: {unsafe.value} (expected: 10000, often less)")
print(f"SafeCounter: {safe.get()} (expected: 10000, always correct)")
# RLock: can be acquired multiple times by the same thread
class RecursiveResource:
def __init__(self):
self._lock = threading.RLock() # Reentrant Lock
def method_a(self):
with self._lock:
self.method_b() # Reentrant from same thread
def method_b(self):
with self._lock:
print("method_b executing")
Event and Condition — Thread Communication
import threading
import time
import random
# Event: signal-based synchronization
class DataPipeline:
def __init__(self):
self._data: list = []
self._ready = threading.Event()
def producer(self):
print("[Producer] Generating data...")
time.sleep(1.0)
self._data = [random.randint(1, 100) for _ in range(10)]
self._ready.set() # Send signal
print("[Producer] Data ready")
def consumer(self):
print("[Consumer] Waiting for data...")
self._ready.wait() # Block until signal
print(f"[Consumer] Received: {self._data}")
pipeline = DataPipeline()
prod = threading.Thread(target=pipeline.producer)
cons = threading.Thread(target=pipeline.consumer)
cons.start()
prod.start()
prod.join()
cons.join()
# Condition: condition variable
class BoundedBuffer:
"""Producer-consumer pattern with a size-limited buffer"""
def __init__(self, maxsize: int = 5):
self._buffer: list = []
self._maxsize = maxsize
self._cond = threading.Condition()
def put(self, item) -> None:
with self._cond:
while len(self._buffer) >= self._maxsize:
self._cond.wait() # Wait until space is available
self._buffer.append(item)
self._cond.notify_all()
def get(self):
with self._cond:
while not self._buffer:
self._cond.wait() # Wait until data is available
item = self._buffer.pop(0)
self._cond.notify_all()
return item
Queue — Thread-Safe Task Queue
import threading
import queue
import time
import random
def producer(q: queue.Queue, n: int) -> None:
for i in range(n):
item = f"item-{i}"
q.put(item)
print(f"[Produced] {item}")
time.sleep(random.uniform(0.05, 0.1))
q.put(None) # Shutdown signal
def consumer(q: queue.Queue, worker_id: int) -> None:
while True:
item = q.get()
if item is None:
q.put(None) # Re-insert for the next consumer
break
print(f"[Consumed-{worker_id}] {item}")
time.sleep(random.uniform(0.1, 0.2))
q.task_done()
# Run
q: queue.Queue = queue.Queue(maxsize=5)
prod = threading.Thread(target=producer, args=(q, 10))
consumers = [threading.Thread(target=consumer, args=(q, i)) for i in range(3)]
prod.start()
for c in consumers: c.start()
prod.join()
for c in consumers: c.join()
print("Pipeline complete")
# PriorityQueue: priority-based ordering
pq: queue.PriorityQueue = queue.PriorityQueue()
pq.put((3, "low priority"))
pq.put((1, "high priority"))
pq.put((2, "medium priority"))
while not pq.empty():
priority, item = pq.get()
print(f" Priority {priority}: {item}")
ThreadLocal — Thread-Specific Data
import threading
# threading.local(): each thread has its own independent value
_local = threading.local()
def set_user(user_id: int) -> None:
_local.user_id = user_id
_local.request_id = f"req-{user_id}-{id(threading.current_thread())}"
def get_current_user() -> int | None:
return getattr(_local, "user_id", None)
def handle_request(user_id: int) -> None:
set_user(user_id)
# Safe to access anywhere within the same thread
print(f"Thread {threading.current_thread().name}: user={get_current_user()}, "
f"req={_local.request_id}")
threads = [
threading.Thread(target=handle_request, args=(i,), name=f"Thread-{i}")
for i in range(5)
]
for t in threads: t.start()
for t in threads: t.join()
# Each thread has independent user_id and request_id
Real-World Example: Parallel Web Requests
import threading
import time
from urllib.request import urlopen
from urllib.error import URLError
from dataclasses import dataclass
@dataclass
class FetchResult:
url: str
status: int = 0
size: int = 0
elapsed: float = 0.0
error: str = ""
def fetch_url(url: str, results: list, lock: threading.Lock) -> None:
start = time.perf_counter()
try:
with urlopen(url, timeout=5) as resp:
data = resp.read()
result = FetchResult(
url=url,
status=resp.status,
size=len(data),
elapsed=time.perf_counter() - start,
)
except URLError as e:
result = FetchResult(url=url, error=str(e), elapsed=time.perf_counter() - start)
with lock:
results.append(result)
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
results: list[FetchResult] = []
lock = threading.Lock()
start = time.perf_counter()
threads = [threading.Thread(target=fetch_url, args=(url, results, lock)) for url in urls]
for t in threads: t.start()
for t in threads: t.join()
total = time.perf_counter() - start
print(f"Total time: {total:.2f}s (sequential would be ~3s)")
for r in results:
if r.error:
print(f" Error: {r.url} — {r.error}")
else:
print(f" Success: {r.status}, {r.size} bytes, {r.elapsed:.2f}s")
Summary
| Tool | Purpose |
|---|---|
Thread | Unit of parallel execution |
Lock / RLock | Protect critical sections |
Event | Signal between threads |
Condition | Complex wait/notify patterns |
Queue | Thread-safe task queue |
threading.local() | Thread-specific data |
Threading is most effective for I/O-bound tasks. For CPU-bound work use multiprocessing, and for async I/O consider asyncio first.