Skip to main content
Advertisement

multiprocessing — Process-Based Parallelism

The multiprocessing module gives each process its own independent memory space and GIL, enabling true parallel execution of CPU-bound tasks.


Process Basics

import multiprocessing
import os
import time


def worker(name: str, n: int) -> int:
"""CPU-bound task"""
pid = os.getpid()
print(f"[{name}] PID={pid} start")
result = sum(range(n))
print(f"[{name}] PID={pid} done")
return result


if __name__ == "__main__": # Required on Windows!
p1 = multiprocessing.Process(target=worker, args=("P1", 10_000_000))
p2 = multiprocessing.Process(target=worker, args=("P2", 10_000_000))

p1.start()
p2.start()

p1.join()
p2.join()

print(f"Main PID: {os.getpid()}")
print(f"p1 exit code: {p1.exitcode}")
print(f"p2 exit code: {p2.exitcode}")


# Use Queue or Pipe to retrieve return values
def compute(n: int, result_queue: multiprocessing.Queue) -> None:
result = sum(range(n))
result_queue.put(result)


if __name__ == "__main__":
q = multiprocessing.Queue()
p = multiprocessing.Process(target=compute, args=(50_000_000, q))
p.start()
p.join()
print(f"Result: {q.get()}")

Pool — Process Pool

import multiprocessing
import time


def square(x: int) -> int:
return x * x


def is_prime(n: int) -> bool:
if n < 2:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True


if __name__ == "__main__":
numbers = list(range(1, 10001))

# map: preserves order, collects results
with multiprocessing.Pool() as pool:
squares = pool.map(square, numbers)

print(f"First 5: {squares[:5]}")

# map_async: asynchronous version
with multiprocessing.Pool(processes=4) as pool:
async_result = pool.map_async(is_prime, range(100_000, 100_100))
# Can do other work here
primes = async_result.get(timeout=10)

print(f"Prime count: {sum(primes)}")

# imap: returns an iterator (memory efficient)
with multiprocessing.Pool() as pool:
for result in pool.imap(square, range(10), chunksize=3):
print(result, end=" ")
print()

# starmap: pass multiple arguments
def power(base: int, exp: int) -> int:
return base ** exp

args = [(2, 10), (3, 5), (4, 3)]
with multiprocessing.Pool() as pool:
results = pool.starmap(power, args)
print(f"Powers: {results}")

Inter-Process Communication (IPC)

import multiprocessing
import time


# Pipe: bidirectional communication channel
def pipe_worker(conn: multiprocessing.Connection) -> None:
msg = conn.recv()
print(f"[Worker] Received: {msg}")
conn.send(f"Processed: {msg.upper()}")
conn.close()


if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=pipe_worker, args=(child_conn,))
p.start()

parent_conn.send("hello world")
response = parent_conn.recv()
print(f"[Main] Response: {response}")
p.join()


# Queue: multi-producer/consumer
def producer(q: multiprocessing.Queue, n: int) -> None:
for i in range(n):
q.put(f"item-{i}")
time.sleep(0.01)
q.put(None) # Shutdown signal


def consumer(q: multiprocessing.Queue, worker_id: int) -> None:
while True:
item = q.get()
if item is None:
q.put(None)
break
print(f"[Consumer-{worker_id}] {item}")


if __name__ == "__main__":
q = multiprocessing.Queue()
prod = multiprocessing.Process(target=producer, args=(q, 10))
cons = [multiprocessing.Process(target=consumer, args=(q, i)) for i in range(2)]

prod.start()
for c in cons: c.start()
prod.join()
for c in cons: c.join()

Shared Memory

import multiprocessing
import ctypes


def increment_array(shared_arr, start: int, end: int) -> None:
for i in range(start, end):
shared_arr[i] += 1


def increment_safe(shared_arr, lock, start: int, end: int) -> None:
for i in range(start, end):
with lock:
shared_arr[i] += 1


if __name__ == "__main__":
N = 100

# Value: single shared value
counter = multiprocessing.Value(ctypes.c_int, 0)
lock = multiprocessing.Lock()

def increment_counter(val, lk, times: int):
for _ in range(times):
with lk:
val.value += 1

procs = [
multiprocessing.Process(target=increment_counter, args=(counter, lock, 1000))
for _ in range(4)
]
for p in procs: p.start()
for p in procs: p.join()
print(f"Counter: {counter.value}") # 4000

# Array: shared array
arr = multiprocessing.Array(ctypes.c_int, N)
for i in range(N):
arr[i] = i

p1 = multiprocessing.Process(target=increment_array, args=(arr, 0, N // 2))
p2 = multiprocessing.Process(target=increment_array, args=(arr, N // 2, N))
p1.start(); p2.start()
p1.join(); p2.join()

print(f"arr[0]: {arr[0]}, arr[50]: {arr[50]}") # Each incremented by 1


# Python 3.8+: shared_memory module (zero-copy sharing)
from multiprocessing import shared_memory
import numpy as np

if __name__ == "__main__":
shm = shared_memory.SharedMemory(create=True, size=100 * 4)
shared_arr = np.ndarray((100,), dtype=np.int32, buffer=shm.buf)
shared_arr[:] = range(100)
print(f"Shared memory sum: {shared_arr.sum()}")
shm.close()
shm.unlink() # Cleanup

Real-World Example: Batch Image Processing

import multiprocessing
import time
from pathlib import Path
from dataclasses import dataclass


@dataclass
class ProcessResult:
filename: str
success: bool
elapsed: float
message: str = ""


def process_image(filepath: str) -> ProcessResult:
"""Image processing simulation (use PIL/OpenCV in production)"""
start = time.perf_counter()
try:
# Actual processing: resize, compress, watermark, etc.
time.sleep(0.1) # Simulate processing
return ProcessResult(
filename=Path(filepath).name,
success=True,
elapsed=time.perf_counter() - start,
)
except Exception as e:
return ProcessResult(
filename=Path(filepath).name,
success=False,
elapsed=time.perf_counter() - start,
message=str(e),
)


if __name__ == "__main__":
filepaths = [f"/images/photo_{i:04d}.jpg" for i in range(20)]

# Sequential processing
start = time.perf_counter()
sequential = [process_image(fp) for fp in filepaths]
seq_time = time.perf_counter() - start

# Parallel processing
start = time.perf_counter()
with multiprocessing.Pool() as pool:
parallel = pool.map(process_image, filepaths)
par_time = time.perf_counter() - start

success = sum(1 for r in parallel if r.success)
print(f"Processed: {success}/{len(filepaths)}")
print(f"Sequential: {seq_time:.2f}s, Parallel: {par_time:.2f}s")
print(f"Speedup: {seq_time / par_time:.1f}x")

Summary

ToolPurpose
ProcessCreate and control individual processes
PoolDistribute work across a process pool
QueueMessage passing between processes
PipeChannel between two processes
Value / ArrayBasic shared memory types
shared_memoryHigh-performance zero-copy sharing
LockProtect shared resources

On Windows, the if __name__ == "__main__": guard is mandatory because child processes re-import the module on creation.

Advertisement