asyncio 완전 정복
asyncio는 단일 스레드에서 이벤트 루프를 통해 수천 개의 I/O 작업을 동시에 처리하는 파이썬의 비동기 프레임워크입니다. async/await 키워드로 직관적인 비동기 코드를 작성할 수 있습니다.
async/await 기초
import asyncio
import time
# 동기 함수
def sync_sleep(name: str, delay: float) -> None:
print(f"[동기] {name} 시작")
time.sleep(delay) # 블로킹
print(f"[동기] {name} 완료")
# 비동기 함수 (코루틴)
async def async_sleep(name: str, delay: float) -> str:
print(f"[비동기] {name} 시작")
await asyncio.sleep(delay) # 논블로킹 대기
print(f"[비동기] {name} 완료")
return f"{name} 결과"
# 비동기 실행
async def main():
# 순차 실행 (여전히 순서대로)
result1 = await async_sleep("태스크 A", 1.0)
result2 = await async_sleep("태스크 B", 0.5)
print(f"결과: {result1}, {result2}")
asyncio.run(main())
# 동시 실행 — gather
async def concurrent_main():
start = time.perf_counter()
results = await asyncio.gather(
async_sleep("A", 1.0),
async_sleep("B", 0.5),
async_sleep("C", 1.5),
)
elapsed = time.perf_counter() - start
print(f"동시 실행 결과: {results}")
print(f"소요 시간: {elapsed:.2f}s (순차면 3s)") # ~1.5s
asyncio.run(concurrent_main())
Task — 백그라운드 작업
import asyncio
import time
async def background_task(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} 완료"
async def main():
# create_task: 즉시 스케줄링, await 없이 실행 시작
task1 = asyncio.create_task(background_task("Task-1", 1.0))
task2 = asyncio.create_task(background_task("Task-2", 0.5))
print("태스크 생성 완료, 다른 작업 진행 중...")
await asyncio.sleep(0.1)
print("다른 작업 완료")
# 결과 수집
result1 = await task1
result2 = await task2
print(f"{result1}, {result2}")
asyncio.run(main())
# TaskGroup (Python 3.11+): 에러 처리 개선
async def task_group_example():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(background_task("A", 1.0))
task2 = tg.create_task(background_task("B", 0.5))
task3 = tg.create_task(background_task("C", 1.5))
# 모든 태스크 완료 후 여기에 도달
print(task1.result(), task2.result(), task3.result())
asyncio.run(task_group_example())
타임아웃과 취소
import asyncio
async def slow_operation() -> str:
await asyncio.sleep(10)
return "완료"
async def main():
# wait_for: 타임아웃
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("타임아웃!")
# shield: 취소로부터 보호
task = asyncio.create_task(slow_operation())
protected = asyncio.shield(task)
try:
result = await asyncio.wait_for(protected, timeout=1.0)
except asyncio.TimeoutError:
print("shield 타임아웃 — 내부 태스크는 계속 실행")
task.cancel() # 명시적 취소
# 태스크 취소
async def cancellable():
try:
await asyncio.sleep(100)
except asyncio.CancelledError:
print("취소됨! 정리 중...")
raise # 반드시 재발생
t = asyncio.create_task(cancellable())
await asyncio.sleep(0.1)
t.cancel()
try:
await t
except asyncio.CancelledError:
print("태스크 취소 확인")
asyncio.run(main())
비동기 컨텍스트 매니저와 이터레이터
import asyncio
# 비동기 컨텍스트 매니저
class AsyncDatabaseConnection:
def __init__(self, url: str):
self.url = url
self.connected = False
async def __aenter__(self):
print(f"DB 연결: {self.url}")
await asyncio.sleep(0.1) # 연결 대기 시뮬레이션
self.connected = True
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("DB 연결 해제")
await asyncio.sleep(0.05)
self.connected = False
async def execute(self, query: str) -> list:
if not self.connected:
raise RuntimeError("연결되지 않음")
await asyncio.sleep(0.05)
return [{"id": 1, "data": query}]
async def main():
async with AsyncDatabaseConnection("postgresql://localhost/mydb") as db:
rows = await db.execute("SELECT * FROM users")
print(f"조회 결과: {rows}")
asyncio.run(main())
# 비동기 이터레이터
class AsyncRange:
def __init__(self, start: int, stop: int, delay: float = 0.0):
self.current = start
self.stop = stop
self.delay = delay
def __aiter__(self):
return self
async def __anext__(self) -> int:
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(self.delay)
value = self.current
self.current += 1
return value
async def iterate():
async for value in AsyncRange(0, 5, delay=0.1):
print(value, end=" ")
print()
# 비동기 제너레이터
async def async_gen(n: int):
for i in range(n):
await asyncio.sleep(0.01)
yield i * i
async for sq in async_gen(5):
print(sq, end=" ")
print()
asyncio.run(iterate())
Semaphore와 동시성 제어
import asyncio
import aiohttp # pip install aiohttp
# Semaphore: 동시 실행 수 제한
async def fetch(session, url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore: # 동시에 최대 N개만 실행
async with session.get(url) as resp:
return {"url": url, "status": resp.status, "size": len(await resp.read())}
async def crawl(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# aiohttp 없이 asyncio만으로 구현 예시
async def limited_fetch(url: str, sem: asyncio.Semaphore) -> str:
async with sem:
await asyncio.sleep(0.1) # HTTP 요청 시뮬레이션
return f"fetched: {url}"
async def main():
sem = asyncio.Semaphore(3) # 동시 3개 제한
urls = [f"https://example.com/page/{i}" for i in range(10)]
results = await asyncio.gather(*[limited_fetch(url, sem) for url in urls])
print(f"완료: {len(results)}개")
asyncio.run(main())
실전 예제: 비동기 웹 스크래퍼
import asyncio
import time
from dataclasses import dataclass, field
from typing import Any
@dataclass
class Page:
url: str
status: int = 0
content_length: int = 0
elapsed: float = 0.0
error: str = ""
async def fetch_page(url: str, session_semaphore: asyncio.Semaphore) -> Page:
"""실제로는 aiohttp.ClientSession을 사용"""
start = time.perf_counter()
async with session_semaphore:
try:
# 네트워크 요청 시뮬레이션
import random
await asyncio.sleep(random.uniform(0.1, 0.5))
return Page(
url=url,
status=200,
content_length=random.randint(1000, 50000),
elapsed=time.perf_counter() - start,
)
except Exception as e:
return Page(url=url, error=str(e), elapsed=time.perf_counter() - start)
async def scrape(urls: list[str], max_concurrent: int = 5) -> list[Page]:
sem = asyncio.Semaphore(max_concurrent)
tasks = [asyncio.create_task(fetch_page(url, sem)) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [f"https://example.com/article/{i}" for i in range(20)]
start = time.perf_counter()
pages = await scrape(urls, max_concurrent=5)
elapsed = time.perf_counter() - start
success = [p for p in pages if p.status == 200]
print(f"완료: {len(success)}/{len(pages)}, 소요: {elapsed:.2f}s")
avg_size = sum(p.content_length for p in success) / len(success)
print(f"평균 크기: {avg_size:.0f} bytes")
asyncio.run(main())
정리
| 개념 | 설명 |
|---|---|
async def | 코루틴 함수 정의 |
await | 코루틴 실행 및 결과 대기 |
asyncio.gather() | 여러 코루틴 동시 실행 |
asyncio.create_task() | 백그라운드 태스크 예약 |
asyncio.TaskGroup | 구조화된 태스크 그룹 (3.11+) |
asyncio.wait_for() | 타임아웃 설정 |
asyncio.Semaphore | 동시 실행 수 제한 |
async with | 비동기 컨텍스트 매니저 |
async for | 비동기 이터레이터 |
asyncio는 I/O-bound + 고도의 동시성 시나리오(웹 서버, 크롤러, API 클라이언트)에서 가장 강력합니다. CPU-bound는 loop.run_in_executor()로 스레드풀/프로세스풀에 위임하세요.