본문으로 건너뛰기
Advertisement

concurrent.futures — 고수준 병렬 실행

concurrent.futures는 스레드풀과 프로세스풀을 통일된 인터페이스로 제공합니다. threadingmultiprocessing의 저수준 API를 추상화하여 간결하고 강력한 병렬 코드를 작성할 수 있습니다.


ThreadPoolExecutor — 스레드풀

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import urllib.request


def fetch_url(url: str) -> tuple[str, int]:
"""URL 다운로드 (I/O-bound)"""
try:
with urllib.request.urlopen(url, timeout=5) as resp:
return url, len(resp.read())
except Exception as e:
return url, -1


urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/get",
"https://httpbin.org/uuid",
]

# submit: 개별 작업 제출
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_url, url): url for url in urls}

for future in as_completed(futures):
url = futures[future]
try:
_, size = future.result()
print(f"완료: {url}{size} bytes")
except Exception as e:
print(f"오류: {url}{e}")


# map: 순서 보장, 간단한 인터페이스
def square(n: int) -> int:
time.sleep(0.01)
return n * n


with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square, range(10)))
print(f"제곱: {results}")

ProcessPoolExecutor — 프로세스풀

from concurrent.futures import ProcessPoolExecutor, as_completed
import time


def cpu_intensive(n: int) -> int:
"""CPU-bound 작업"""
return sum(i * i for i in range(n))


def find_primes(limit: int) -> list[int]:
"""에라토스테네스의 체"""
sieve = [True] * (limit + 1)
sieve[0] = sieve[1] = False
for i in range(2, int(limit**0.5) + 1):
if sieve[i]:
for j in range(i*i, limit + 1, i):
sieve[j] = False
return [i for i in range(2, limit + 1) if sieve[i]]


if __name__ == "__main__":
# 여러 범위에서 소수 찾기 (병렬)
limits = [100_000, 200_000, 300_000, 400_000]

start = time.perf_counter()
with ProcessPoolExecutor() as executor:
futures = {executor.submit(find_primes, limit): limit for limit in limits}
for future in as_completed(futures):
limit = futures[future]
primes = future.result()
print(f"범위 {limit}: 소수 {len(primes)}개")
elapsed = time.perf_counter() - start
print(f"병렬 처리: {elapsed:.2f}s")


# chunksize: map 성능 최적화
numbers = list(range(1, 100_001))
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_intensive, numbers, chunksize=1000))
print(f"완료: {len(results)}개")

Future 객체 심화

from concurrent.futures import ThreadPoolExecutor, Future, wait, FIRST_COMPLETED, ALL_COMPLETED
import time
import random


def task(task_id: int, delay: float) -> dict:
time.sleep(delay)
return {"id": task_id, "delay": delay, "result": task_id ** 2}


with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(task, i, random.uniform(0.5, 2.0))
for i in range(8)
]

# wait: 특정 조건까지 대기
done, not_done = wait(futures, timeout=1.0, return_when=FIRST_COMPLETED)

print(f"1초 안에 완료된 작업: {len(done)}개")
print(f"아직 진행 중: {len(not_done)}개")

# 완료된 것부터 처리
for future in done:
print(f" 완료: {future.result()}")

# 나머지 완료 대기
wait(not_done, return_when=ALL_COMPLETED)
for future in not_done:
print(f" 최종 완료: {future.result()}")


# add_done_callback: 완료 시 콜백
def on_complete(future: Future) -> None:
if future.exception():
print(f"오류 발생: {future.exception()}")
else:
print(f"콜백 호출: {future.result()}")


with ThreadPoolExecutor() as executor:
f = executor.submit(task, 99, 0.1)
f.add_done_callback(on_complete)

asyncio와 함께 사용하기

import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time


def cpu_work(n: int) -> int:
"""CPU-bound — 프로세스풀에서 실행"""
return sum(range(n))


def blocking_io(path: str) -> str:
"""블로킹 I/O — 스레드풀에서 실행"""
time.sleep(0.1) # 파일 읽기 시뮬레이션
return f"read: {path}"


async def main():
loop = asyncio.get_event_loop()

# 프로세스풀: CPU-bound 작업을 비동기로 실행
with ProcessPoolExecutor() as proc_pool:
result = await loop.run_in_executor(proc_pool, cpu_work, 50_000_000)
print(f"CPU 작업 결과: {result}")

# 스레드풀: 블로킹 I/O를 비동기로 실행
with ThreadPoolExecutor(max_workers=10) as thread_pool:
tasks = [
loop.run_in_executor(thread_pool, blocking_io, f"/data/file_{i}.txt")
for i in range(5)
]
results = await asyncio.gather(*tasks)
for r in results:
print(r)


asyncio.run(main())

실전 패턴: 재시도 + 병렬 처리

from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import time
import random


@dataclass
class TaskResult:
task_id: int
success: bool
result: any = None
attempts: int = 1
error: str = ""


def unreliable_task(task_id: int) -> int:
"""40% 확률로 실패하는 작업"""
if random.random() < 0.4:
raise ConnectionError(f"Task {task_id} 연결 실패")
time.sleep(random.uniform(0.01, 0.1))
return task_id * 10


def task_with_retry(task_id: int, max_retries: int = 3) -> TaskResult:
for attempt in range(1, max_retries + 1):
try:
result = unreliable_task(task_id)
return TaskResult(task_id=task_id, success=True, result=result, attempts=attempt)
except ConnectionError as e:
if attempt == max_retries:
return TaskResult(task_id=task_id, success=False, error=str(e), attempts=attempt)
time.sleep(0.05 * attempt) # 지수 백오프


if __name__ == "__main__":
task_ids = list(range(20))
results = []

with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(task_with_retry, tid): tid for tid in task_ids}
for future in as_completed(futures):
results.append(future.result())

success = [r for r in results if r.success]
failed = [r for r in results if not r.success]
avg_attempts = sum(r.attempts for r in results) / len(results)

print(f"성공: {len(success)}/{len(task_ids)}")
print(f"실패: {len(failed)}")
print(f"평균 시도 횟수: {avg_attempts:.2f}")

비교 정리

도구적합한 작업특징
ThreadPoolExecutorI/O-bound스레드 재사용, 컨텍스트 매니저
ProcessPoolExecutorCPU-bound프로세스 재사용, GIL 우회
as_completed()완료 순 처리가장 빠른 것부터
executor.map()순서 보장 처리간단한 인터페이스
wait()조건부 대기FIRST_COMPLETED 등

concurrent.futuresthreadingmultiprocessing보다 코드가 간결하고, asynciorun_in_executor()로 쉽게 통합됩니다. 새 프로젝트에서는 concurrent.futures를 기본 선택으로 권장합니다.

Advertisement