Connection Pooling and Performance Optimization
The keys to database performance are connection management, index design, and query analysis.
Connection Pooling
# SQLAlchemy connection pool configuration
from sqlalchemy import create_engine, event
from sqlalchemy.pool import QueuePool, NullPool
engine = create_engine(
"postgresql://user:pass@localhost/dbname",
poolclass=QueuePool, # default (reuse connections)
pool_size=10, # number of connections to maintain
max_overflow=20, # additional allowed connections (max 30)
pool_timeout=30, # connection wait timeout (seconds)
pool_recycle=3600, # recreate connections every 1 hour (prevent stale)
pool_pre_ping=True, # verify connection before use
)
# Check pool status
def pool_status():
pool = engine.pool
return {
"size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
}
# Redis connection pool
import redis
pool = redis.ConnectionPool(
host="localhost",
port=6379,
db=0,
max_connections=50,
decode_responses=True,
)
r = redis.Redis(connection_pool=pool)
# MongoDB connection pool
from pymongo import MongoClient
client = MongoClient(
"mongodb://localhost:27017/",
maxPoolSize=50, # maximum connections
minPoolSize=10, # minimum maintained connections
maxIdleTimeMS=60000, # idle connection retention time
waitQueueTimeoutMS=5000,
)
PostgreSQL Index Strategy
-- Single column index
CREATE INDEX idx_products_category ON products(category_id);
CREATE INDEX idx_products_created ON products(created_at DESC);
-- Compound index (order matters: higher selectivity first)
CREATE INDEX idx_products_active_cat ON products(is_active, category_id);
-- Partial index (conditional filtering — reduces index size)
CREATE INDEX idx_active_products ON products(price) WHERE is_active = true;
-- Covering index (includes all columns needed by query → no table access)
CREATE INDEX idx_product_list ON products(category_id, is_active)
INCLUDE (name, price, stock);
-- CONCURRENTLY: create index without table lock (production)
CREATE INDEX CONCURRENTLY idx_products_name ON products(name);
# Define index in SQLAlchemy
from sqlalchemy import Index
class Product(Base):
__tablename__ = "products"
# ... column definitions ...
__table_args__ = (
Index("idx_products_active_cat", "is_active", "category_id"),
Index("idx_products_created", "created_at", postgresql_ops={"created_at": "DESC"}),
)
EXPLAIN ANALYZE — Query Analysis
-- Analyze query execution plan
EXPLAIN ANALYZE
SELECT p.*, c.name as category_name
FROM products p
JOIN categories c ON p.category_id = c.id
WHERE p.is_active = true AND p.price > 10000
ORDER BY p.created_at DESC
LIMIT 20;
-- Interpreting results
Limit (cost=0.43..15.23 rows=20 width=...) (actual time=0.123..0.456 rows=20 loops=1)
-> Index Scan using idx_products_created on products (cost=...)
Filter: (is_active = true AND price > 10000)
Rows Removed by Filter: 150
Planning Time: 0.5 ms
Execution Time: 1.2 ms ← target: < 100ms
-- Warning signs
-- Seq Scan on large tables: consider adding an index
-- Large "Rows Removed by Filter": need a more selective index
-- Hash Join vs Nested Loop: depends on data size
# Run EXPLAIN from Python
from sqlalchemy import text
def explain_query(db, stmt):
explain_stmt = text(f"EXPLAIN ANALYZE {stmt}")
result = db.execute(explain_stmt)
for row in result:
print(row[0])
N+1 Query Detection and Resolution
# ❌ N+1 problem
products = db.scalars(select(Product)).all()
for product in products:
# A query fires for every iteration!
print(product.category.name)
# ✅ joinedload (FK relationship)
from sqlalchemy.orm import joinedload
stmt = select(Product).options(joinedload(Product.category))
products = db.scalars(stmt).unique().all()
# ✅ selectinload (reverse relationship, safe for async too)
from sqlalchemy.orm import selectinload
stmt = select(Category).options(selectinload(Category.products))
categories = db.scalars(stmt).all()
# ✅ contains_eager (when already joined)
from sqlalchemy.orm import contains_eager
stmt = (
select(Product)
.join(Product.category)
.options(contains_eager(Product.category))
.where(Category.slug == "books")
)
Query Performance Monitoring
import time
import logging
from sqlalchemy import event
from sqlalchemy.engine import Engine
logger = logging.getLogger("sqlalchemy.slow")
SLOW_QUERY_THRESHOLD = 0.5 # queries over 500ms are slow queries
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault("query_start_time", []).append(time.time())
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - conn.info["query_start_time"].pop(-1)
if total > SLOW_QUERY_THRESHOLD:
logger.warning(f"Slow query ({total:.3f}s): {statement[:200]}")
Bulk Operation Optimization
from sqlalchemy import insert, update, delete
# Bulk INSERT (fastest)
def bulk_create_products(db, products_data: list[dict]):
db.execute(insert(Product), products_data)
db.flush()
# Bulk UPDATE
def bulk_update_prices(db, updates: list[dict]):
# updates = [{"id": 1, "price": 35000}, ...]
db.execute(update(Product), updates)
# Process in chunks (memory management)
def process_all_products(db, chunk_size: int = 1000):
offset = 0
while True:
stmt = select(Product).offset(offset).limit(chunk_size)
products = db.scalars(stmt).all()
if not products:
break
for product in products:
process(product)
db.expunge_all() # release memory
offset += chunk_size
Summary
| Optimization Area | Method |
|---|---|
| Connection reuse | pool_size, max_overflow configuration |
| Connection validity | pool_pre_ping=True |
| Query optimization | EXPLAIN ANALYZE analysis |
| Indexing | Compound indexes + partial indexes |
| Eliminate N+1 | joinedload, selectinload |
| Bulk operations | bulk_create, iterator |
| Slow queries | Detect with SQLAlchemy event hooks |
80% of database performance issues are solved by proper indexing and N+1 elimination.