concurrent.futures — 고수준 병렬 실행
concurrent.futures는 스레드풀과 프로세스풀을 통일된 인터페이스로 제공합니다. threading과 multiprocessing의 저수준 API를 추상화하여 간결하고 강력한 병렬 코드를 작성할 수 있습니다.
ThreadPoolExecutor — 스레드풀
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import urllib.request
def fetch_url(url: str) -> tuple[str, int]:
"""URL 다운로드 (I/O-bound)"""
try:
with urllib.request.urlopen(url, timeout=5) as resp:
return url, len(resp.read())
except Exception as e:
return url, -1
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/get",
"https://httpbin.org/uuid",
]
# submit: 개별 작업 제출
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
_, size = future.result()
print(f"완료: {url} — {size} bytes")
except Exception as e:
print(f"오류: {url} — {e}")
# map: 순서 보장, 간단한 인터페이스
def square(n: int) -> int:
time.sleep(0.01)
return n * n
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square, range(10)))
print(f"제곱: {results}")
ProcessPoolExecutor — 프로세스풀
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def cpu_intensive(n: int) -> int:
"""CPU-bound 작업"""
return sum(i * i for i in range(n))
def find_primes(limit: int) -> list[int]:
"""에라토스테네스의 체"""
sieve = [True] * (limit + 1)
sieve[0] = sieve[1] = False
for i in range(2, int(limit**0.5) + 1):
if sieve[i]:
for j in range(i*i, limit + 1, i):
sieve[j] = False
return [i for i in range(2, limit + 1) if sieve[i]]
if __name__ == "__main__":
# 여러 범위에서 소수 찾기 (병렬)
limits = [100_000, 200_000, 300_000, 400_000]
start = time.perf_counter()
with ProcessPoolExecutor() as executor:
futures = {executor.submit(find_primes, limit): limit for limit in limits}
for future in as_completed(futures):
limit = futures[future]
primes = future.result()
print(f"범위 {limit}: 소수 {len(primes)}개")
elapsed = time.perf_counter() - start
print(f"병렬 처리: {elapsed:.2f}s")
# chunksize: map 성능 최적화
numbers = list(range(1, 100_001))
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_intensive, numbers, chunksize=1000))
print(f"완료: {len(results)}개")
Future 객체 심화
from concurrent.futures import ThreadPoolExecutor, Future, wait, FIRST_COMPLETED, ALL_COMPLETED
import time
import random
def task(task_id: int, delay: float) -> dict:
time.sleep(delay)
return {"id": task_id, "delay": delay, "result": task_id ** 2}
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(task, i, random.uniform(0.5, 2.0))
for i in range(8)
]
# wait: 특정 조건까지 대기
done, not_done = wait(futures, timeout=1.0, return_when=FIRST_COMPLETED)
print(f"1초 안에 완료된 작업: {len(done)}개")
print(f"아직 진행 중: {len(not_done)}개")
# 완료된 것부터 처리
for future in done:
print(f" 완료: {future.result()}")
# 나머지 완료 대기
wait(not_done, return_when=ALL_COMPLETED)
for future in not_done:
print(f" 최종 완료: {future.result()}")
# add_done_callback: 완료 시 콜백
def on_complete(future: Future) -> None:
if future.exception():
print(f"오류 발생: {future.exception()}")
else:
print(f"콜백 호출: {future.result()}")
with ThreadPoolExecutor() as executor:
f = executor.submit(task, 99, 0.1)
f.add_done_callback(on_complete)
asyncio와 함께 사용하기
import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
def cpu_work(n: int) -> int:
"""CPU-bound — 프로세스풀에서 실행"""
return sum(range(n))
def blocking_io(path: str) -> str:
"""블로킹 I/O — 스레드풀에서 실행"""
time.sleep(0.1) # 파일 읽기 시뮬레이션
return f"read: {path}"
async def main():
loop = asyncio.get_event_loop()
# 프로세스풀: CPU-bound 작업을 비동기로 실행
with ProcessPoolExecutor() as proc_pool:
result = await loop.run_in_executor(proc_pool, cpu_work, 50_000_000)
print(f"CPU 작업 결과: {result}")
# 스레드풀: 블로킹 I/O를 비동기로 실행
with ThreadPoolExecutor(max_workers=10) as thread_pool:
tasks = [
loop.run_in_executor(thread_pool, blocking_io, f"/data/file_{i}.txt")
for i in range(5)
]
results = await asyncio.gather(*tasks)
for r in results:
print(r)
asyncio.run(main())
실전 패턴: 재시도 + 병렬 처리
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import time
import random
@dataclass
class TaskResult:
task_id: int
success: bool
result: any = None
attempts: int = 1
error: str = ""
def unreliable_task(task_id: int) -> int:
"""40% 확률로 실패하는 작업"""
if random.random() < 0.4:
raise ConnectionError(f"Task {task_id} 연결 실패")
time.sleep(random.uniform(0.01, 0.1))
return task_id * 10
def task_with_retry(task_id: int, max_retries: int = 3) -> TaskResult:
for attempt in range(1, max_retries + 1):
try:
result = unreliable_task(task_id)
return TaskResult(task_id=task_id, success=True, result=result, attempts=attempt)
except ConnectionError as e:
if attempt == max_retries:
return TaskResult(task_id=task_id, success=False, error=str(e), attempts=attempt)
time.sleep(0.05 * attempt) # 지수 백오프
if __name__ == "__main__":
task_ids = list(range(20))
results = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(task_with_retry, tid): tid for tid in task_ids}
for future in as_completed(futures):
results.append(future.result())
success = [r for r in results if r.success]
failed = [r for r in results if not r.success]
avg_attempts = sum(r.attempts for r in results) / len(results)
print(f"성공: {len(success)}/{len(task_ids)}")
print(f"실패: {len(failed)}")
print(f"평균 시도 횟수: {avg_attempts:.2f}")
비교 정리
| 도구 | 적합한 작업 | 특징 |
|---|---|---|
ThreadPoolExecutor | I/O-bound | 스레드 재사용, 컨텍스트 매니저 |
ProcessPoolExecutor | CPU-bound | 프로세스 재사용, GIL 우회 |
as_completed() | 완료 순 처리 | 가장 빠른 것부터 |
executor.map() | 순서 보장 처리 | 간단한 인터페이스 |
wait() | 조건부 대기 | FIRST_COMPLETED 등 |
concurrent.futures는 threading과 multiprocessing보다 코드가 간결하고, asyncio와 run_in_executor()로 쉽게 통합됩니다. 새 프로젝트에서는 concurrent.futures를 기본 선택으로 권장합니다.