Skip to main content
Advertisement

Async DB Integration — SQLAlchemy 2.0 async

Combining SQLAlchemy 2.0's async engine with FastAPI's Depends enables non-blocking database operations.


Installation

pip install sqlalchemy[asyncio] asyncpg  # PostgreSQL
pip install aiosqlite # SQLite (development/testing)

Models and Engine Setup

# database.py
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Integer, Float, DateTime, func


# Engine (connection pool)
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
# PostgreSQL: "postgresql+asyncpg://user:pass@localhost/dbname"

engine = create_async_engine(
DATABASE_URL,
echo=True, # log SQL (dev only)
)

# Session factory
AsyncSessionFactory = async_sessionmaker(
engine,
expire_on_commit=False,
class_=AsyncSession,
)


# Base class
class Base(DeclarativeBase):
pass


# Model definition
class User(Base):
__tablename__ = "users"

id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
name: Mapped[str] = mapped_column(String(100), nullable=False)
email: Mapped[str] = mapped_column(String(200), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(200))
created_at: Mapped[DateTime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)


class Item(Base):
__tablename__ = "items"

id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
name: Mapped[str] = mapped_column(String(100), nullable=False)
price: Mapped[float] = mapped_column(Float, nullable=False)
owner_id: Mapped[int] = mapped_column(Integer, nullable=False)


# Create tables on startup
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

DB Session Dependency

# dependencies.py
from typing import AsyncGenerator, Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from database import AsyncSessionFactory


async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionFactory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise


DB = Annotated[AsyncSession, Depends(get_db)]

CRUD Pattern

# crud.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from database import User, Item
from pydantic import BaseModel


class UserCreate(BaseModel):
name: str
email: str
password: str


class UserOut(BaseModel):
id: int
name: str
email: str
model_config = {"from_attributes": True}


async def create_user(db: AsyncSession, user_data: UserCreate) -> User:
user = User(
name=user_data.name,
email=user_data.email,
hashed_password=f"hashed_{user_data.password}", # use bcrypt in production
)
db.add(user)
await db.flush() # assign ID (before commit)
await db.refresh(user)
return user


async def get_user(db: AsyncSession, user_id: int) -> User | None:
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()


async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100) -> list[User]:
result = await db.execute(select(User).offset(skip).limit(limit))
return list(result.scalars().all())


async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()


async def update_user(db: AsyncSession, user_id: int, name: str) -> User | None:
await db.execute(
update(User).where(User.id == user_id).values(name=name)
)
return await get_user(db, user_id)


async def delete_user(db: AsyncSession, user_id: int) -> bool:
result = await db.execute(delete(User).where(User.id == user_id))
return result.rowcount > 0

Connecting to FastAPI Router

# routers/users.py
from fastapi import APIRouter, HTTPException, status
from dependencies import DB
import crud

router = APIRouter(prefix="/users", tags=["users"])


@router.post("/", response_model=crud.UserOut, status_code=status.HTTP_201_CREATED)
async def create_user(user_data: crud.UserCreate, db: DB):
existing = await crud.get_user_by_email(db, user_data.email)
if existing:
raise HTTPException(status_code=400, detail="Email already in use")
return await crud.create_user(db, user_data)


@router.get("/", response_model=list[crud.UserOut])
async def list_users(db: DB, skip: int = 0, limit: int = 100):
return await crud.get_users(db, skip=skip, limit=limit)


@router.get("/{user_id}", response_model=crud.UserOut)
async def get_user(user_id: int, db: DB):
user = await crud.get_user(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user


@router.put("/{user_id}", response_model=crud.UserOut)
async def update_user(user_id: int, name: str, db: DB):
user = await crud.update_user(db, user_id, name)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user


@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int, db: DB):
deleted = await crud.delete_user(db, user_id)
if not deleted:
raise HTTPException(status_code=404, detail="User not found")

App Integration

# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from database import create_tables
from routers import users


@asynccontextmanager
async def lifespan(app: FastAPI):
await create_tables() # create tables on startup
yield


app = FastAPI(lifespan=lifespan)
app.include_router(users.router)

Summary

ComponentRole
create_async_engineAsync DB connection pool
async_sessionmakerSession factory
AsyncSessionAsync DB session
Mapped, mapped_columnType-safe ORM column definition
select()Async SELECT query
session.flush()Send to DB (get ID before commit)
session.commit()Finalize transaction

SQLAlchemy 2.0 async integrates perfectly with FastAPI's async architecture for high concurrency.

Advertisement