본문으로 건너뛰기
Advertisement

multiprocessing — 프로세스 기반 병렬화

multiprocessing 모듈은 각 프로세스가 독립적인 메모리 공간과 GIL을 가지므로 CPU-bound 작업의 진정한 병렬 실행을 가능하게 합니다.


Process 기본

import multiprocessing
import os
import time


def worker(name: str, n: int) -> int:
"""CPU-bound 작업"""
pid = os.getpid()
print(f"[{name}] PID={pid} 시작")
result = sum(range(n))
print(f"[{name}] PID={pid} 완료")
return result


if __name__ == "__main__": # Windows에서 필수!
# 프로세스 생성
p1 = multiprocessing.Process(target=worker, args=("P1", 10_000_000))
p2 = multiprocessing.Process(target=worker, args=("P2", 10_000_000))

p1.start()
p2.start()

p1.join()
p2.join()

print(f"메인 PID: {os.getpid()}")
print(f"p1 종료 코드: {p1.exitcode}")
print(f"p2 종료 코드: {p2.exitcode}")


# 반환값을 가져오려면 Queue나 Pipe 사용
def compute(n: int, result_queue: multiprocessing.Queue) -> None:
result = sum(range(n))
result_queue.put(result)


if __name__ == "__main__":
q = multiprocessing.Queue()
p = multiprocessing.Process(target=compute, args=(50_000_000, q))
p.start()
p.join()
print(f"결과: {q.get()}")

Pool — 프로세스 풀

import multiprocessing
import time


def square(x: int) -> int:
return x * x


def is_prime(n: int) -> bool:
if n < 2:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True


if __name__ == "__main__":
numbers = list(range(1, 10001))

# map: 순서 보장, 결과 수집
with multiprocessing.Pool() as pool:
squares = pool.map(square, numbers)

print(f"첫 5개: {squares[:5]}")

# map_async: 비동기 버전
with multiprocessing.Pool(processes=4) as pool:
async_result = pool.map_async(is_prime, range(100_000, 100_100))
# 다른 작업 가능
primes = async_result.get(timeout=10)

print(f"소수 개수: {sum(primes)}")

# imap: 이터레이터 반환 (메모리 효율)
with multiprocessing.Pool() as pool:
for result in pool.imap(square, range(10), chunksize=3):
print(result, end=" ")
print()

# starmap: 여러 인수 전달
def power(base: int, exp: int) -> int:
return base ** exp

args = [(2, 10), (3, 5), (4, 3)]
with multiprocessing.Pool() as pool:
results = pool.starmap(power, args)
print(f"거듭제곱: {results}")

프로세스 간 통신 (IPC)

import multiprocessing
import time


# Pipe: 양방향 통신 채널
def pipe_worker(conn: multiprocessing.Connection) -> None:
msg = conn.recv()
print(f"[워커] 수신: {msg}")
conn.send(f"처리 완료: {msg.upper()}")
conn.close()


if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=pipe_worker, args=(child_conn,))
p.start()

parent_conn.send("hello world")
response = parent_conn.recv()
print(f"[메인] 응답: {response}")
p.join()


# Queue: 멀티 프로듀서-컨슈머
def producer(q: multiprocessing.Queue, n: int) -> None:
for i in range(n):
q.put(f"item-{i}")
time.sleep(0.01)
q.put(None) # 종료 신호


def consumer(q: multiprocessing.Queue, worker_id: int) -> None:
while True:
item = q.get()
if item is None:
q.put(None)
break
print(f"[소비-{worker_id}] {item}")


if __name__ == "__main__":
q = multiprocessing.Queue()
prod = multiprocessing.Process(target=producer, args=(q, 10))
cons = [multiprocessing.Process(target=consumer, args=(q, i)) for i in range(2)]

prod.start()
for c in cons: c.start()
prod.join()
for c in cons: c.join()

공유 메모리 (Shared Memory)

import multiprocessing
import ctypes


def increment_array(shared_arr, start: int, end: int) -> None:
for i in range(start, end):
shared_arr[i] += 1


def increment_safe(shared_arr, lock, start: int, end: int) -> None:
for i in range(start, end):
with lock:
shared_arr[i] += 1


if __name__ == "__main__":
N = 100

# Value: 단일 공유 값
counter = multiprocessing.Value(ctypes.c_int, 0)
lock = multiprocessing.Lock()

def increment_counter(val, lk, times: int):
for _ in range(times):
with lk:
val.value += 1

procs = [
multiprocessing.Process(target=increment_counter, args=(counter, lock, 1000))
for _ in range(4)
]
for p in procs: p.start()
for p in procs: p.join()
print(f"카운터: {counter.value}") # 4000

# Array: 공유 배열
arr = multiprocessing.Array(ctypes.c_int, N)
for i in range(N):
arr[i] = i

p1 = multiprocessing.Process(target=increment_array, args=(arr, 0, N // 2))
p2 = multiprocessing.Process(target=increment_array, args=(arr, N // 2, N))
p1.start(); p2.start()
p1.join(); p2.join()

print(f"배열[0]: {arr[0]}, 배열[50]: {arr[50]}") # 각각 1 증가


# Python 3.8+: shared_memory 모듈 (제로카피 공유)
from multiprocessing import shared_memory
import numpy as np

if __name__ == "__main__":
shm = shared_memory.SharedMemory(create=True, size=100 * 4)
shared_arr = np.ndarray((100,), dtype=np.int32, buffer=shm.buf)
shared_arr[:] = range(100)
print(f"공유 메모리 합계: {shared_arr.sum()}")
shm.close()
shm.unlink() # 정리

실전 예제: 이미지 일괄 처리

import multiprocessing
import time
from pathlib import Path
from dataclasses import dataclass


@dataclass
class ProcessResult:
filename: str
success: bool
elapsed: float
message: str = ""


def process_image(filepath: str) -> ProcessResult:
"""이미지 처리 시뮬레이션 (실제는 PIL/OpenCV 사용)"""
start = time.perf_counter()
try:
# 실제 처리: resize, compress, watermark 등
time.sleep(0.1) # 처리 시뮬레이션
return ProcessResult(
filename=Path(filepath).name,
success=True,
elapsed=time.perf_counter() - start,
)
except Exception as e:
return ProcessResult(
filename=Path(filepath).name,
success=False,
elapsed=time.perf_counter() - start,
message=str(e),
)


if __name__ == "__main__":
# 100개 파일 시뮬레이션
filepaths = [f"/images/photo_{i:04d}.jpg" for i in range(20)]

# 순차 처리
start = time.perf_counter()
sequential = [process_image(fp) for fp in filepaths]
seq_time = time.perf_counter() - start

# 병렬 처리
start = time.perf_counter()
with multiprocessing.Pool() as pool:
parallel = pool.map(process_image, filepaths)
par_time = time.perf_counter() - start

success = sum(1 for r in parallel if r.success)
print(f"처리 완료: {success}/{len(filepaths)}")
print(f"순차: {seq_time:.2f}s, 병렬: {par_time:.2f}s")
print(f"속도 향상: {seq_time / par_time:.1f}x")

정리

도구용도
Process개별 프로세스 생성/제어
Pool프로세스 풀로 작업 분배
Queue프로세스 간 메시지 전달
Pipe두 프로세스 간 채널
Value / Array공유 메모리 기본형
shared_memory고성능 제로카피 공유
Lock공유 자원 보호

Windows에서는 if __name__ == "__main__": 가드가 필수입니다. 자식 프로세스 생성 시 모듈을 다시 임포트하기 때문입니다.

Advertisement