multiprocessing — 프로세스 기반 병렬화
multiprocessing 모듈은 각 프로세스가 독립적인 메모리 공간과 GIL을 가지므로 CPU-bound 작업의 진정한 병렬 실행을 가능하게 합니다.
Process 기본
import multiprocessing
import os
import time
def worker(name: str, n: int) -> int:
"""CPU-bound 작업"""
pid = os.getpid()
print(f"[{name}] PID={pid} 시작")
result = sum(range(n))
print(f"[{name}] PID={pid} 완료")
return result
if __name__ == "__main__": # Windows에서 필수!
# 프로세스 생성
p1 = multiprocessing.Process(target=worker, args=("P1", 10_000_000))
p2 = multiprocessing.Process(target=worker, args=("P2", 10_000_000))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"메인 PID: {os.getpid()}")
print(f"p1 종료 코드: {p1.exitcode}")
print(f"p2 종료 코드: {p2.exitcode}")
# 반환값을 가져오려면 Queue나 Pipe 사용
def compute(n: int, result_queue: multiprocessing.Queue) -> None:
result = sum(range(n))
result_queue.put(result)
if __name__ == "__main__":
q = multiprocessing.Queue()
p = multiprocessing.Process(target=compute, args=(50_000_000, q))
p.start()
p.join()
print(f"결과: {q.get()}")
Pool — 프로세스 풀
import multiprocessing
import time
def square(x: int) -> int:
return x * x
def is_prime(n: int) -> bool:
if n < 2:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True
if __name__ == "__main__":
numbers = list(range(1, 10001))
# map: 순서 보장, 결과 수집
with multiprocessing.Pool() as pool:
squares = pool.map(square, numbers)
print(f"첫 5개: {squares[:5]}")
# map_async: 비동기 버전
with multiprocessing.Pool(processes=4) as pool:
async_result = pool.map_async(is_prime, range(100_000, 100_100))
# 다른 작업 가능
primes = async_result.get(timeout=10)
print(f"소수 개수: {sum(primes)}")
# imap: 이터레이터 반환 (메모리 효율)
with multiprocessing.Pool() as pool:
for result in pool.imap(square, range(10), chunksize=3):
print(result, end=" ")
print()
# starmap: 여러 인수 전달
def power(base: int, exp: int) -> int:
return base ** exp
args = [(2, 10), (3, 5), (4, 3)]
with multiprocessing.Pool() as pool:
results = pool.starmap(power, args)
print(f"거듭제곱: {results}")
프로세스 간 통신 (IPC)
import multiprocessing
import time
# Pipe: 양방향 통신 채널
def pipe_worker(conn: multiprocessing.Connection) -> None:
msg = conn.recv()
print(f"[워커] 수신: {msg}")
conn.send(f"처리 완료: {msg.upper()}")
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=pipe_worker, args=(child_conn,))
p.start()
parent_conn.send("hello world")
response = parent_conn.recv()
print(f"[메인] 응답: {response}")
p.join()
# Queue: 멀티 프로듀서-컨슈머
def producer(q: multiprocessing.Queue, n: int) -> None:
for i in range(n):
q.put(f"item-{i}")
time.sleep(0.01)
q.put(None) # 종료 신호
def consumer(q: multiprocessing.Queue, worker_id: int) -> None:
while True:
item = q.get()
if item is None:
q.put(None)
break
print(f"[소비-{worker_id}] {item}")
if __name__ == "__main__":
q = multiprocessing.Queue()
prod = multiprocessing.Process(target=producer, args=(q, 10))
cons = [multiprocessing.Process(target=consumer, args=(q, i)) for i in range(2)]
prod.start()
for c in cons: c.start()
prod.join()
for c in cons: c.join()
공유 메모리 (Shared Memory)
import multiprocessing
import ctypes
def increment_array(shared_arr, start: int, end: int) -> None:
for i in range(start, end):
shared_arr[i] += 1
def increment_safe(shared_arr, lock, start: int, end: int) -> None:
for i in range(start, end):
with lock:
shared_arr[i] += 1
if __name__ == "__main__":
N = 100
# Value: 단일 공유 값
counter = multiprocessing.Value(ctypes.c_int, 0)
lock = multiprocessing.Lock()
def increment_counter(val, lk, times: int):
for _ in range(times):
with lk:
val.value += 1
procs = [
multiprocessing.Process(target=increment_counter, args=(counter, lock, 1000))
for _ in range(4)
]
for p in procs: p.start()
for p in procs: p.join()
print(f"카운터: {counter.value}") # 4000
# Array: 공유 배열
arr = multiprocessing.Array(ctypes.c_int, N)
for i in range(N):
arr[i] = i
p1 = multiprocessing.Process(target=increment_array, args=(arr, 0, N // 2))
p2 = multiprocessing.Process(target=increment_array, args=(arr, N // 2, N))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"배열[0]: {arr[0]}, 배열[50]: {arr[50]}") # 각각 1 증가
# Python 3.8+: shared_memory 모듈 (제로카피 공유)
from multiprocessing import shared_memory
import numpy as np
if __name__ == "__main__":
shm = shared_memory.SharedMemory(create=True, size=100 * 4)
shared_arr = np.ndarray((100,), dtype=np.int32, buffer=shm.buf)
shared_arr[:] = range(100)
print(f"공유 메모리 합계: {shared_arr.sum()}")
shm.close()
shm.unlink() # 정리
실전 예제: 이미지 일괄 처리
import multiprocessing
import time
from pathlib import Path
from dataclasses import dataclass
@dataclass
class ProcessResult:
filename: str
success: bool
elapsed: float
message: str = ""
def process_image(filepath: str) -> ProcessResult:
"""이미지 처리 시뮬레이션 (실제는 PIL/OpenCV 사용)"""
start = time.perf_counter()
try:
# 실제 처리: resize, compress, watermark 등
time.sleep(0.1) # 처리 시뮬레이션
return ProcessResult(
filename=Path(filepath).name,
success=True,
elapsed=time.perf_counter() - start,
)
except Exception as e:
return ProcessResult(
filename=Path(filepath).name,
success=False,
elapsed=time.perf_counter() - start,
message=str(e),
)
if __name__ == "__main__":
# 100개 파일 시뮬레이션
filepaths = [f"/images/photo_{i:04d}.jpg" for i in range(20)]
# 순차 처리
start = time.perf_counter()
sequential = [process_image(fp) for fp in filepaths]
seq_time = time.perf_counter() - start
# 병렬 처리
start = time.perf_counter()
with multiprocessing.Pool() as pool:
parallel = pool.map(process_image, filepaths)
par_time = time.perf_counter() - start
success = sum(1 for r in parallel if r.success)
print(f"처리 완료: {success}/{len(filepaths)}")
print(f"순차: {seq_time:.2f}s, 병렬: {par_time:.2f}s")
print(f"속도 향상: {seq_time / par_time:.1f}x")
정리
| 도구 | 용도 |
|---|---|
Process | 개별 프로세스 생성/제어 |
Pool | 프로세스 풀로 작업 분배 |
Queue | 프로세스 간 메시지 전달 |
Pipe | 두 프로세스 간 채널 |
Value / Array | 공유 메모리 기본형 |
shared_memory | 고성능 제로카피 공유 |
Lock | 공유 자원 보호 |
Windows에서는 if __name__ == "__main__": 가드가 필수입니다. 자식 프로세스 생성 시 모듈을 다시 임포트하기 때문입니다.