Async DB Integration — SQLAlchemy 2.0 async
Combining SQLAlchemy 2.0's async engine with FastAPI's Depends enables non-blocking database operations.
Installation
pip install sqlalchemy[asyncio] asyncpg # PostgreSQL
pip install aiosqlite # SQLite (development/testing)
Models and Engine Setup
# database.py
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Integer, Float, DateTime, func
# Engine (connection pool)
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
# PostgreSQL: "postgresql+asyncpg://user:pass@localhost/dbname"
engine = create_async_engine(
DATABASE_URL,
echo=True, # log SQL (dev only)
)
# Session factory
AsyncSessionFactory = async_sessionmaker(
engine,
expire_on_commit=False,
class_=AsyncSession,
)
# Base class
class Base(DeclarativeBase):
pass
# Model definition
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
name: Mapped[str] = mapped_column(String(100), nullable=False)
email: Mapped[str] = mapped_column(String(200), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(200))
created_at: Mapped[DateTime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
class Item(Base):
__tablename__ = "items"
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
name: Mapped[str] = mapped_column(String(100), nullable=False)
price: Mapped[float] = mapped_column(Float, nullable=False)
owner_id: Mapped[int] = mapped_column(Integer, nullable=False)
# Create tables on startup
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
DB Session Dependency
# dependencies.py
from typing import AsyncGenerator, Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from database import AsyncSessionFactory
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionFactory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
DB = Annotated[AsyncSession, Depends(get_db)]
CRUD Pattern
# crud.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from database import User, Item
from pydantic import BaseModel
class UserCreate(BaseModel):
name: str
email: str
password: str
class UserOut(BaseModel):
id: int
name: str
email: str
model_config = {"from_attributes": True}
async def create_user(db: AsyncSession, user_data: UserCreate) -> User:
user = User(
name=user_data.name,
email=user_data.email,
hashed_password=f"hashed_{user_data.password}", # use bcrypt in production
)
db.add(user)
await db.flush() # assign ID (before commit)
await db.refresh(user)
return user
async def get_user(db: AsyncSession, user_id: int) -> User | None:
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100) -> list[User]:
result = await db.execute(select(User).offset(skip).limit(limit))
return list(result.scalars().all())
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def update_user(db: AsyncSession, user_id: int, name: str) -> User | None:
await db.execute(
update(User).where(User.id == user_id).values(name=name)
)
return await get_user(db, user_id)
async def delete_user(db: AsyncSession, user_id: int) -> bool:
result = await db.execute(delete(User).where(User.id == user_id))
return result.rowcount > 0
Connecting to FastAPI Router
# routers/users.py
from fastapi import APIRouter, HTTPException, status
from dependencies import DB
import crud
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/", response_model=crud.UserOut, status_code=status.HTTP_201_CREATED)
async def create_user(user_data: crud.UserCreate, db: DB):
existing = await crud.get_user_by_email(db, user_data.email)
if existing:
raise HTTPException(status_code=400, detail="Email already in use")
return await crud.create_user(db, user_data)
@router.get("/", response_model=list[crud.UserOut])
async def list_users(db: DB, skip: int = 0, limit: int = 100):
return await crud.get_users(db, skip=skip, limit=limit)
@router.get("/{user_id}", response_model=crud.UserOut)
async def get_user(user_id: int, db: DB):
user = await crud.get_user(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.put("/{user_id}", response_model=crud.UserOut)
async def update_user(user_id: int, name: str, db: DB):
user = await crud.update_user(db, user_id, name)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int, db: DB):
deleted = await crud.delete_user(db, user_id)
if not deleted:
raise HTTPException(status_code=404, detail="User not found")
App Integration
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from database import create_tables
from routers import users
@asynccontextmanager
async def lifespan(app: FastAPI):
await create_tables() # create tables on startup
yield
app = FastAPI(lifespan=lifespan)
app.include_router(users.router)
Summary
| Component | Role |
|---|---|
create_async_engine | Async DB connection pool |
async_sessionmaker | Session factory |
AsyncSession | Async DB session |
Mapped, mapped_column | Type-safe ORM column definition |
select() | Async SELECT query |
session.flush() | Send to DB (get ID before commit) |
session.commit() | Finalize transaction |
SQLAlchemy 2.0 async integrates perfectly with FastAPI's async architecture for high concurrency.