비동기 DB 연동 — SQLAlchemy 2.0 async
SQLAlchemy 2.0의 async 엔진과 FastAPI의 Depends를 결합해 비동기 DB 연산을 처리합니다.
설치
pip install sqlalchemy[asyncio] asyncpg # PostgreSQL
pip install aiosqlite # SQLite (개발·테스트)
모델과 엔진 설정
# database.py
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Integer, Float, DateTime, func
# 엔진 (connection pool)
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
# PostgreSQL: "postgresql+asyncpg://user:pass@localhost/dbname"
engine = create_async_engine(
DATABASE_URL,
echo=True, # SQL 로그 출력 (개발용)
)
# 세션 팩토리
AsyncSessionFactory = async_sessionmaker(
engine,
expire_on_commit=False,
class_=AsyncSession,
)
# 베이스 클래스
class Base(DeclarativeBase):
pass
# 모델 정의
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
name: Mapped[str] = mapped_column(String(100), nullable=False)
email: Mapped[str] = mapped_column(String(200), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(200))
created_at: Mapped[DateTime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
class Item(Base):
__tablename__ = "items"
id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
name: Mapped[str] = mapped_column(String(100), nullable=False)
price: Mapped[float] = mapped_column(Float, nullable=False)
owner_id: Mapped[int] = mapped_column(Integer, nullable=False)
# 테이블 생성 (앱 시작 시)
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
DB 세션 의존성
# dependencies.py
from typing import AsyncGenerator, Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from database import AsyncSessionFactory
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionFactory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
DB = Annotated[AsyncSession, Depends(get_db)]
CRUD 패턴
# crud.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from database import User, Item
from pydantic import BaseModel
# Pydantic 스키마
class UserCreate(BaseModel):
name: str
email: str
password: str
class UserOut(BaseModel):
id: int
name: str
email: str
model_config = {"from_attributes": True}
class ItemCreate(BaseModel):
name: str
price: float
class ItemOut(BaseModel):
id: int
name: str
price: float
owner_id: int
model_config = {"from_attributes": True}
# CRUD 함수
async def create_user(db: AsyncSession, user_data: UserCreate) -> User:
user = User(
name=user_data.name,
email=user_data.email,
hashed_password=f"hashed_{user_data.password}", # 실제로는 bcrypt
)
db.add(user)
await db.flush() # ID 할당 (commit 전)
await db.refresh(user)
return user
async def get_user(db: AsyncSession, user_id: int) -> User | None:
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100) -> list[User]:
result = await db.execute(select(User).offset(skip).limit(limit))
return list(result.scalars().all())
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def update_user(db: AsyncSession, user_id: int, name: str) -> User | None:
await db.execute(
update(User).where(User.id == user_id).values(name=name)
)
return await get_user(db, user_id)
async def delete_user(db: AsyncSession, user_id: int) -> bool:
result = await db.execute(delete(User).where(User.id == user_id))
return result.rowcount > 0
FastAPI 라우터 연결
# routers/users.py
from fastapi import APIRouter, HTTPException, status
from typing import Annotated
from sqlalchemy.ext.asyncio import AsyncSession
from dependencies import DB
import crud
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/", response_model=crud.UserOut, status_code=status.HTTP_201_CREATED)
async def create_user(user_data: crud.UserCreate, db: DB):
existing = await crud.get_user_by_email(db, user_data.email)
if existing:
raise HTTPException(status_code=400, detail="이미 사용 중인 이메일")
return await crud.create_user(db, user_data)
@router.get("/", response_model=list[crud.UserOut])
async def list_users(db: DB, skip: int = 0, limit: int = 100):
return await crud.get_users(db, skip=skip, limit=limit)
@router.get("/{user_id}", response_model=crud.UserOut)
async def get_user(user_id: int, db: DB):
user = await crud.get_user(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="사용자 없음")
return user
@router.put("/{user_id}", response_model=crud.UserOut)
async def update_user(user_id: int, name: str, db: DB):
user = await crud.update_user(db, user_id, name)
if not user:
raise HTTPException(status_code=404, detail="사용자 없음")
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int, db: DB):
deleted = await crud.delete_user(db, user_id)
if not deleted:
raise HTTPException(status_code=404, detail="사용자 없음")
앱 통합
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from database import create_tables
from routers import users
@asynccontextmanager
async def lifespan(app: FastAPI):
await create_tables() # 앱 시작 시 테이블 생성
yield
app = FastAPI(lifespan=lifespan)
app.include_router(users.router)
정리
| 구성 요소 | 역할 |
|---|---|
create_async_engine | 비동기 DB 연결 풀 |
async_sessionmaker | 세션 팩토리 |
AsyncSession | 비동기 DB 세션 |
Mapped, mapped_column | 타입 안전 ORM 컬럼 정의 |
select() | 비동기 SELECT 쿼리 |
session.flush() | DB 전송 (commit 전 ID 확인) |
session.commit() | 트랜잭션 확정 |
SQLAlchemy 2.0 async는 FastAPI의 비동기 아키텍처와 완벽히 통합되어 높은 동시성을 제공합니다.