Skip to main content
Advertisement

Mastering asyncio

asyncio is Python's asynchronous framework that handles thousands of I/O operations concurrently in a single thread via an event loop. The async/await keywords make it easy to write intuitive asynchronous code.


async/await Basics

import asyncio
import time


# Synchronous function
def sync_sleep(name: str, delay: float) -> None:
print(f"[Sync] {name} start")
time.sleep(delay) # Blocking
print(f"[Sync] {name} done")


# Asynchronous function (coroutine)
async def async_sleep(name: str, delay: float) -> str:
print(f"[Async] {name} start")
await asyncio.sleep(delay) # Non-blocking wait
print(f"[Async] {name} done")
return f"{name} result"


# Run asynchronously
async def main():
# Sequential execution (still runs in order)
result1 = await async_sleep("Task A", 1.0)
result2 = await async_sleep("Task B", 0.5)
print(f"Results: {result1}, {result2}")


asyncio.run(main())


# Concurrent execution — gather
async def concurrent_main():
start = time.perf_counter()

results = await asyncio.gather(
async_sleep("A", 1.0),
async_sleep("B", 0.5),
async_sleep("C", 1.5),
)
elapsed = time.perf_counter() - start
print(f"Concurrent results: {results}")
print(f"Elapsed: {elapsed:.2f}s (sequential would be 3s)") # ~1.5s


asyncio.run(concurrent_main())

Task — Background Execution

import asyncio
import time


async def background_task(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} done"


async def main():
# create_task: scheduled immediately, starts running without await
task1 = asyncio.create_task(background_task("Task-1", 1.0))
task2 = asyncio.create_task(background_task("Task-2", 0.5))

print("Tasks created, doing other work...")
await asyncio.sleep(0.1)
print("Other work complete")

# Collect results
result1 = await task1
result2 = await task2
print(f"{result1}, {result2}")


asyncio.run(main())


# TaskGroup (Python 3.11+): improved error handling
async def task_group_example():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(background_task("A", 1.0))
task2 = tg.create_task(background_task("B", 0.5))
task3 = tg.create_task(background_task("C", 1.5))

# All tasks complete before reaching here
print(task1.result(), task2.result(), task3.result())


asyncio.run(task_group_example())

Timeouts and Cancellation

import asyncio


async def slow_operation() -> str:
await asyncio.sleep(10)
return "done"


async def main():
# wait_for: apply a timeout
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("Timed out!")

# shield: protect from cancellation
task = asyncio.create_task(slow_operation())
protected = asyncio.shield(task)

try:
result = await asyncio.wait_for(protected, timeout=1.0)
except asyncio.TimeoutError:
print("Shield timed out — inner task keeps running")
task.cancel() # Explicit cancellation

# Task cancellation
async def cancellable():
try:
await asyncio.sleep(100)
except asyncio.CancelledError:
print("Cancelled! Cleaning up...")
raise # Must re-raise

t = asyncio.create_task(cancellable())
await asyncio.sleep(0.1)
t.cancel()
try:
await t
except asyncio.CancelledError:
print("Task cancellation confirmed")


asyncio.run(main())

Async Context Managers and Iterators

import asyncio


# Async context manager
class AsyncDatabaseConnection:
def __init__(self, url: str):
self.url = url
self.connected = False

async def __aenter__(self):
print(f"DB connect: {self.url}")
await asyncio.sleep(0.1) # Simulate connection delay
self.connected = True
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print("DB disconnect")
await asyncio.sleep(0.05)
self.connected = False

async def execute(self, query: str) -> list:
if not self.connected:
raise RuntimeError("Not connected")
await asyncio.sleep(0.05)
return [{"id": 1, "data": query}]


async def main():
async with AsyncDatabaseConnection("postgresql://localhost/mydb") as db:
rows = await db.execute("SELECT * FROM users")
print(f"Query result: {rows}")


asyncio.run(main())


# Async iterator
class AsyncRange:
def __init__(self, start: int, stop: int, delay: float = 0.0):
self.current = start
self.stop = stop
self.delay = delay

def __aiter__(self):
return self

async def __anext__(self) -> int:
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(self.delay)
value = self.current
self.current += 1
return value


async def iterate():
async for value in AsyncRange(0, 5, delay=0.1):
print(value, end=" ")
print()

# Async generator
async def async_gen(n: int):
for i in range(n):
await asyncio.sleep(0.01)
yield i * i

async for sq in async_gen(5):
print(sq, end=" ")
print()


asyncio.run(iterate())

Semaphore and Concurrency Control

import asyncio


# Semaphore: limit concurrent executions
async def fetch(url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore: # At most N running at once
await asyncio.sleep(0.1) # Simulate HTTP request
return {"url": url, "status": 200}


async def crawl(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [fetch(url, semaphore) for url in urls]
return await asyncio.gather(*tasks)


async def main():
sem = asyncio.Semaphore(3) # Max 3 concurrent
urls = [f"https://example.com/page/{i}" for i in range(10)]
results = await asyncio.gather(*[fetch(url, sem) for url in urls])
print(f"Done: {len(results)} pages")


asyncio.run(main())

Real-World Example: Async Web Scraper

import asyncio
import time
from dataclasses import dataclass


@dataclass
class Page:
url: str
status: int = 0
content_length: int = 0
elapsed: float = 0.0
error: str = ""


async def fetch_page(url: str, session_semaphore: asyncio.Semaphore) -> Page:
"""In production, use aiohttp.ClientSession"""
start = time.perf_counter()
async with session_semaphore:
try:
import random
await asyncio.sleep(random.uniform(0.1, 0.5))
return Page(
url=url,
status=200,
content_length=random.randint(1000, 50000),
elapsed=time.perf_counter() - start,
)
except Exception as e:
return Page(url=url, error=str(e), elapsed=time.perf_counter() - start)


async def scrape(urls: list[str], max_concurrent: int = 5) -> list[Page]:
sem = asyncio.Semaphore(max_concurrent)
tasks = [asyncio.create_task(fetch_page(url, sem)) for url in urls]
return await asyncio.gather(*tasks)


async def main():
urls = [f"https://example.com/article/{i}" for i in range(20)]

start = time.perf_counter()
pages = await scrape(urls, max_concurrent=5)
elapsed = time.perf_counter() - start

success = [p for p in pages if p.status == 200]
print(f"Done: {len(success)}/{len(pages)}, elapsed: {elapsed:.2f}s")
avg_size = sum(p.content_length for p in success) / len(success)
print(f"Average size: {avg_size:.0f} bytes")


asyncio.run(main())

Summary

ConceptDescription
async defDefine a coroutine function
awaitRun a coroutine and wait for its result
asyncio.gather()Run multiple coroutines concurrently
asyncio.create_task()Schedule a background task
asyncio.TaskGroupStructured task group (3.11+)
asyncio.wait_for()Apply a timeout
asyncio.SemaphoreLimit concurrent executions
async withAsync context manager
async forAsync iterator

asyncio is most powerful for I/O-bound + high-concurrency scenarios (web servers, crawlers, API clients). For CPU-bound work, delegate to a thread or process pool with loop.run_in_executor().

Advertisement