SQLAlchemy 2.0
SQLAlchemy is Python's most powerful ORM library. Version 2.0 significantly improved declarative mapping and async support.
Installation and Setup
pip install sqlalchemy psycopg2-binary # PostgreSQL
# or
pip install sqlalchemy aiosqlite # SQLite (async)
pip install sqlalchemy asyncpg # PostgreSQL (async)
# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
# Synchronous engine
engine = create_engine(
"postgresql://user:password@localhost/dbname",
pool_size=10,
max_overflow=20,
echo=False, # whether to output SQL logs
)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
class Base(DeclarativeBase):
pass
Declarative Model Mapping
from sqlalchemy import String, Integer, ForeignKey, Text, Boolean, DateTime
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from datetime import datetime
class Category(Base):
__tablename__ = "categories"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
slug: Mapped[str] = mapped_column(String(100), unique=True)
products: Mapped[list["Product"]] = relationship(back_populates="category")
def __repr__(self):
return f"<Category(id={self.id}, name={self.name})>"
class Product(Base):
__tablename__ = "products"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(200), nullable=False)
description: Mapped[str | None] = mapped_column(Text)
price: Mapped[float] = mapped_column(nullable=False)
stock: Mapped[int] = mapped_column(default=0)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
DateTime, server_default=func.now(), onupdate=func.now()
)
category_id: Mapped[int | None] = mapped_column(ForeignKey("categories.id"))
category: Mapped["Category | None"] = relationship(back_populates="products")
def __repr__(self):
return f"<Product(id={self.id}, name={self.name})>"
Table Creation and Session Management
# Create tables
Base.metadata.create_all(bind=engine)
# Session context manager (works for FastAPI and general apps)
from contextlib import contextmanager
@contextmanager
def get_db():
db = SessionLocal()
try:
yield db
db.commit()
except Exception:
db.rollback()
raise
finally:
db.close()
CRUD Patterns
from sqlalchemy.orm import Session
from sqlalchemy import select, update, delete
# CREATE
def create_product(db: Session, name: str, price: float, category_id: int) -> Product:
product = Product(name=name, price=price, category_id=category_id)
db.add(product)
db.flush() # flush to DB (before commit)
db.refresh(product) # fetch auto-generated values (id, created_at)
return product
# READ — select() style (recommended in 2.0)
def get_product(db: Session, product_id: int) -> Product | None:
stmt = select(Product).where(Product.id == product_id)
return db.scalar(stmt)
def get_products(
db: Session,
skip: int = 0,
limit: int = 20,
is_active: bool = True,
) -> list[Product]:
stmt = (
select(Product)
.where(Product.is_active == is_active)
.order_by(Product.created_at.desc())
.offset(skip)
.limit(limit)
)
return list(db.scalars(stmt))
# Fetch with relationships (prevent N+1)
from sqlalchemy.orm import selectinload, joinedload
def get_products_with_category(db: Session) -> list[Product]:
stmt = (
select(Product)
.options(joinedload(Product.category)) # fetch in single JOIN
.where(Product.is_active == True)
)
return list(db.scalars(stmt).unique())
# UPDATE
def update_product(db: Session, product_id: int, **kwargs) -> Product | None:
product = get_product(db, product_id)
if not product:
return None
for key, value in kwargs.items():
setattr(product, key, value)
db.flush()
db.refresh(product)
return product
# Bulk UPDATE
def deactivate_products(db: Session, category_id: int) -> int:
stmt = (
update(Product)
.where(Product.category_id == category_id)
.values(is_active=False)
.execution_options(synchronize_session="fetch")
)
result = db.execute(stmt)
return result.rowcount
# DELETE
def delete_product(db: Session, product_id: int) -> bool:
product = get_product(db, product_id)
if not product:
return False
db.delete(product)
return True
Aggregation and Subqueries
from sqlalchemy import func, and_, or_
# Aggregation
def get_category_stats(db: Session) -> list[dict]:
stmt = (
select(
Category.name,
func.count(Product.id).label("product_count"),
func.avg(Product.price).label("avg_price"),
func.sum(Product.stock).label("total_stock"),
)
.join(Product, isouter=True)
.group_by(Category.id)
.having(func.count(Product.id) > 0)
.order_by(func.count(Product.id).desc())
)
rows = db.execute(stmt).all()
return [row._asdict() for row in rows]
# Subquery
def get_expensive_products(db: Session) -> list[Product]:
avg_price_subq = select(func.avg(Product.price)).scalar_subquery()
stmt = select(Product).where(Product.price > avg_price_subq)
return list(db.scalars(stmt))
Async SQLAlchemy
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
# Async engine
async_engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/dbname",
echo=False,
)
AsyncSessionLocal = async_sessionmaker(
bind=async_engine, expire_on_commit=False
)
# FastAPI dependency
async def get_async_db():
async with AsyncSessionLocal() as db:
try:
yield db
await db.commit()
except Exception:
await db.rollback()
raise
# Async query
async def get_product_async(db: AsyncSession, product_id: int) -> Product | None:
stmt = (
select(Product)
.options(selectinload(Product.category)) # selectinload recommended for async
.where(Product.id == product_id)
)
return await db.scalar(stmt)
Summary
| Feature | Method |
|---|---|
| Model definition | Mapped[type] = mapped_column(...) |
| Single query | db.scalar(select(Model).where(...)) |
| Multiple query | db.scalars(stmt) |
| Eager loading | joinedload() / selectinload() |
| Bulk update | update(Model).where(...).values(...) |
| Aggregation | func.count(), func.avg(), group_by() |
| Async | AsyncSession + create_async_engine |
SQLAlchemy 2.0's select() style is recommended as the standard for superior type safety and IDE autocompletion.