threading — 스레드 기반 동시성
threading 모듈은 I/O-bound 작업(네트워크, 파일, DB)을 동시에 처리할 때 효과적입니다. Lock, Event, Queue 등으로 스레드 간 안전한 데이터 공유와 동기화를 구현합니다.
Thread 기본
import threading
import time
def worker(name: str, duration: float) -> None:
print(f"[{name}] 시작")
time.sleep(duration)
print(f"[{name}] 완료 ({duration}s)")
# 스레드 생성 및 시작
t1 = threading.Thread(target=worker, args=("Worker-1", 1.0))
t2 = threading.Thread(target=worker, args=("Worker-2", 0.5))
t3 = threading.Thread(target=worker, args=("Worker-3", 1.5), daemon=True) # 데몬 스레드
t1.start()
t2.start()
t3.start()
t1.join() # 완료 대기
t2.join()
# t3는 join하지 않음 — 메인 스레드 종료 시 함께 종료
print("메인 스레드 완료")
print(f"활성 스레드 수: {threading.active_count()}")
# Thread 서브클래싱
class DownloadThread(threading.Thread):
def __init__(self, url: str, name: str = ""):
super().__init__(name=name or url)
self.url = url
self.result: bytes | None = None
self.error: Exception | None = None
def run(self) -> None:
try:
import urllib.request
with urllib.request.urlopen(self.url, timeout=5) as resp:
self.result = resp.read()
except Exception as e:
self.error = e
@property
def success(self) -> bool:
return self.result is not None
Lock — 경쟁 조건 방지
import threading
# 경쟁 조건(Race Condition) 예시
class UnsafeCounter:
def __init__(self):
self.value = 0
def increment(self):
current = self.value
# ← 여기서 다른 스레드가 끼어들 수 있음
self.value = current + 1
class SafeCounter:
def __init__(self):
self.value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock: # 임계 구역
self.value += 1
def get(self) -> int:
with self._lock:
return self.value
# 검증
def stress_test(counter, n: int = 10000):
threads = [threading.Thread(target=lambda: [counter.increment() for _ in range(100)])
for _ in range(n // 100)]
for t in threads: t.start()
for t in threads: t.join()
unsafe = UnsafeCounter()
safe = SafeCounter()
stress_test(unsafe, 10000)
stress_test(safe, 10000)
print(f"UnsafeCounter: {unsafe.value} (기대: 10000, 보통 더 작음)")
print(f"SafeCounter: {safe.get()} (기대: 10000, 항상 정확)")
# RLock: 같은 스레드에서 여러 번 획득 가능
class RecursiveResource:
def __init__(self):
self._lock = threading.RLock() # Reentrant Lock
def method_a(self):
with self._lock:
self.method_b() # 같은 스레드에서 재진입 가능
def method_b(self):
with self._lock:
print("method_b 실행")
Event와 Condition — 스레드 통신
import threading
import time
import random
# Event: 신호 기반 동기화
class DataPipeline:
def __init__(self):
self._data: list = []
self._ready = threading.Event()
def producer(self):
print("[생산자] 데이터 생성 중...")
time.sleep(1.0)
self._data = [random.randint(1, 100) for _ in range(10)]
self._ready.set() # 신호 발생
print("[생산자] 데이터 준비 완료")
def consumer(self):
print("[소비자] 데이터 대기 중...")
self._ready.wait() # 신호 대기 (블로킹)
print(f"[소비자] 수신: {self._data}")
pipeline = DataPipeline()
prod = threading.Thread(target=pipeline.producer)
cons = threading.Thread(target=pipeline.consumer)
cons.start()
prod.start()
prod.join()
cons.join()
# Condition: 조건 변수
class BoundedBuffer:
"""생산자-소비자 패턴 (크기 제한 버퍼)"""
def __init__(self, maxsize: int = 5):
self._buffer: list = []
self._maxsize = maxsize
self._cond = threading.Condition()
def put(self, item) -> None:
with self._cond:
while len(self._buffer) >= self._maxsize:
self._cond.wait() # 공간이 생길 때까지 대기
self._buffer.append(item)
self._cond.notify_all()
def get(self):
with self._cond:
while not self._buffer:
self._cond.wait() # 데이터가 생길 때까지 대기
item = self._buffer.pop(0)
self._cond.notify_all()
return item
Queue — 스레드 안전한 작업 큐
import threading
import queue
import time
import random
def producer(q: queue.Queue, n: int) -> None:
for i in range(n):
item = f"item-{i}"
q.put(item)
print(f"[생산] {item}")
time.sleep(random.uniform(0.05, 0.1))
q.put(None) # 종료 신호
def consumer(q: queue.Queue, worker_id: int) -> None:
while True:
item = q.get()
if item is None:
q.put(None) # 다음 소비자를 위해 재삽입
break
print(f"[소비-{worker_id}] {item}")
time.sleep(random.uniform(0.1, 0.2))
q.task_done()
# 실행
q: queue.Queue = queue.Queue(maxsize=5)
prod = threading.Thread(target=producer, args=(q, 10))
consumers = [threading.Thread(target=consumer, args=(q, i)) for i in range(3)]
prod.start()
for c in consumers: c.start()
prod.join()
for c in consumers: c.join()
print("파이프라인 완료")
# PriorityQueue: 우선순위 기반
pq: queue.PriorityQueue = queue.PriorityQueue()
pq.put((3, "저우선순위"))
pq.put((1, "고우선순위"))
pq.put((2, "중우선순위"))
while not pq.empty():
priority, item = pq.get()
print(f" 우선순위 {priority}: {item}")
ThreadLocal — 스레드별 독립 데이터
import threading
# threading.local(): 각 스레드가 독립된 값을 가짐
_local = threading.local()
def set_user(user_id: int) -> None:
_local.user_id = user_id
_local.request_id = f"req-{user_id}-{id(threading.current_thread())}"
def get_current_user() -> int | None:
return getattr(_local, "user_id", None)
def handle_request(user_id: int) -> None:
set_user(user_id)
# 같은 스레드 내 어디서든 안전하게 접근
print(f"스레드 {threading.current_thread().name}: user={get_current_user()}, "
f"req={_local.request_id}")
threads = [
threading.Thread(target=handle_request, args=(i,), name=f"Thread-{i}")
for i in range(5)
]
for t in threads: t.start()
for t in threads: t.join()
# 각 스레드가 독립적인 user_id, request_id를 가짐
실전 예제: 병렬 웹 요청
import threading
import time
from urllib.request import urlopen
from urllib.error import URLError
from dataclasses import dataclass, field
@dataclass
class FetchResult:
url: str
status: int = 0
size: int = 0
elapsed: float = 0.0
error: str = ""
def fetch_url(url: str, results: list, lock: threading.Lock) -> None:
start = time.perf_counter()
try:
with urlopen(url, timeout=5) as resp:
data = resp.read()
result = FetchResult(
url=url,
status=resp.status,
size=len(data),
elapsed=time.perf_counter() - start,
)
except URLError as e:
result = FetchResult(url=url, error=str(e), elapsed=time.perf_counter() - start)
with lock:
results.append(result)
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
results: list[FetchResult] = []
lock = threading.Lock()
start = time.perf_counter()
threads = [threading.Thread(target=fetch_url, args=(url, results, lock)) for url in urls]
for t in threads: t.start()
for t in threads: t.join()
total = time.perf_counter() - start
print(f"총 소요시간: {total:.2f}s (순차면 ~3s)")
for r in results:
if r.error:
print(f" 오류: {r.url} — {r.error}")
else:
print(f" 성공: {r.status}, {r.size} bytes, {r.elapsed:.2f}s")
정리
| 도구 | 용도 |
|---|---|
Thread | 병렬 작업 실행 단위 |
Lock / RLock | 임계 구역 보호 |
Event | 스레드 간 신호 전달 |
Condition | 복잡한 대기/알림 패턴 |
Queue | 스레드 안전한 작업 큐 |
threading.local() | 스레드별 독립 데이터 |
I/O-bound 작업에서는 threading이 효과적입니다. CPU-bound는 multiprocessing을, 비동기 I/O에는 asyncio를 우선 고려하세요.