연결 풀링과 성능 최적화
데이터베이스 성능의 핵심은 연결 관리, 인덱스 설계, 쿼리 분석입니다.
커넥션 풀링
# SQLAlchemy 연결 풀 설정
from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool, NullPool
engine = create_engine(
"postgresql://user:pass@localhost/dbname",
poolclass=QueuePool, # 기본값 (연결 재사용)
pool_size=10, # 유지할 연결 수
max_overflow=20, # 초과 허용 연결 (최대 30개)
pool_timeout=30, # 연결 대기 타임아웃(초)
pool_recycle=3600, # 1시간마다 연결 재생성 (stale 방지)
pool_pre_ping=True, # 사용 전 연결 유효성 확인
)
# 연결 풀 상태 확인
def pool_status():
pool = engine.pool
return {
"size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
}
# Redis 연결 풀
import redis
pool = redis.ConnectionPool(
host="localhost",
port=6379,
db=0,
max_connections=50,
decode_responses=True,
)
r = redis.Redis(connection_pool=pool)
# MongoDB 연결 풀
from pymongo import MongoClient
client = MongoClient(
"mongodb://localhost:27017/",
maxPoolSize=50, # 최대 연결 수
minPoolSize=10, # 최소 유지 연결 수
maxIdleTimeMS=60000, # 유휴 연결 유지 시간
waitQueueTimeoutMS=5000,
)
PostgreSQL 인덱스 전략
-- 단일 컬럼 인덱스
CREATE INDEX idx_products_category ON products(category_id);
CREATE INDEX idx_products_created ON products(created_at DESC);
-- 복합 인덱스 (순서 중요: 선택성 높은 컬럼 먼저)
CREATE INDEX idx_products_active_cat ON products(is_active, category_id);
-- 부분 인덱스 (조건 필터링 — 인덱스 크기 절감)
CREATE INDEX idx_active_products ON products(price) WHERE is_active = true;
-- 커버링 인덱스 (쿼리에 필요한 모든 컬럼 포함 → 테이블 접근 없음)
CREATE INDEX idx_product_list ON products(category_id, is_active)
INCLUDE (name, price, stock);
-- CONCURRENTLY: 테이블 락 없이 인덱스 생성 (프로덕션)
CREATE INDEX CONCURRENTLY idx_products_name ON products(name);
# SQLAlchemy에서 인덱스 정의
from sqlalchemy import Index
class Product(Base):
__tablename__ = "products"
# ... 컬럼 정의 ...
__table_args__ = (
Index("idx_products_active_cat", "is_active", "category_id"),
Index("idx_products_created", "created_at", postgresql_ops={"created_at": "DESC"}),
)
EXPLAIN ANALYZE — 쿼리 분석
-- 쿼리 실행 계획 분석
EXPLAIN ANALYZE
SELECT p.*, c.name as category_name
FROM products p
JOIN categories c ON p.category_id = c.id
WHERE p.is_active = true AND p.price > 10000
ORDER BY p.created_at DESC
LIMIT 20;
-- 결과 해석
Limit (cost=0.43..15.23 rows=20 width=...) (actual time=0.123..0.456 rows=20 loops=1)
-> Index Scan using idx_products_created on products (cost=...)
Filter: (is_active = true AND price > 10000)
Rows Removed by Filter: 150
Planning Time: 0.5 ms
Execution Time: 1.2 ms ← 목표: < 100ms
-- 주의 신호
-- Seq Scan (대용량 테이블): 인덱스 추가 검토
-- Rows Removed by Filter 값이 크면: 더 선택적인 인덱스 필요
-- Hash Join vs Nested Loop: 데이터 크기에 따라 다름
# Python에서 EXPLAIN 실행
from sqlalchemy import text
def explain_query(db, stmt):
explain_stmt = text(f"EXPLAIN ANALYZE {stmt}")
result = db.execute(explain_stmt)
for row in result:
print(row[0])
N+1 쿼리 탐지와 해결
# ❌ N+1 문제
products = db.scalars(select(Product)).all()
for product in products:
# 루프마다 쿼리 발생!
print(product.category.name)
# ✅ joinedload (FK 관계)
from sqlalchemy.orm import joinedload
stmt = select(Product).options(joinedload(Product.category))
products = db.scalars(stmt).unique().all()
# ✅ selectinload (역방향 관계, 비동기에서도 안전)
from sqlalchemy.orm import selectinload
stmt = select(Category).options(selectinload(Category.products))
categories = db.scalars(stmt).all()
# ✅ contains_eager (이미 join한 경우)
from sqlalchemy.orm import contains_eager
stmt = (
select(Product)
.join(Product.category)
.options(contains_eager(Product.category))
.where(Category.slug == "books")
)
쿼리 성능 모니터링
import time
import logging
from sqlalchemy import event
from sqlalchemy.engine import Engine
logger = logging.getLogger("sqlalchemy.slow")
SLOW_QUERY_THRESHOLD = 0.5 # 500ms 이상은 slow query
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault("query_start_time", []).append(time.time())
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - conn.info["query_start_time"].pop(-1)
if total > SLOW_QUERY_THRESHOLD:
logger.warning(f"Slow query ({total:.3f}s): {statement[:200]}")
벌크 연산 최적화
from sqlalchemy import insert, update, delete
# 벌크 INSERT (가장 빠름)
def bulk_create_products(db, products_data: list[dict]):
db.execute(insert(Product), products_data)
db.flush()
# 벌크 UPDATE
def bulk_update_prices(db, updates: list[dict]):
# updates = [{"id": 1, "price": 35000}, ...]
db.execute(update(Product), updates)
# 청크 단위 처리 (메모리 관리)
def process_all_products(db, chunk_size: int = 1000):
offset = 0
while True:
stmt = select(Product).offset(offset).limit(chunk_size)
products = db.scalars(stmt).all()
if not products:
break
for product in products:
process(product)
db.expunge_all() # 메모리 해제
offset += chunk_size
정리
| 최적화 항목 | 방법 |
|---|---|
| 연결 재사용 | pool_size, max_overflow 설정 |
| 연결 유효성 | pool_pre_ping=True |
| 쿼리 최적화 | EXPLAIN ANALYZE 분석 |
| 인덱스 | 복합 인덱스 + 부분 인덱스 |
| N+1 제거 | joinedload, selectinload |
| 대량 처리 | bulk_create, iterator |
| 느린 쿼리 | SQLAlchemy 이벤트 훅으로 감지 |
데이터베이스 성능의 80%는 적절한 인덱스와 N+1 제거로 해결됩니다.