Redis 활용
Redis는 인메모리 데이터 구조 저장소입니다. 캐싱, 세션, 큐, Pub/Sub 등 다양한 용도로 활용됩니다.
설치
pip install redis # 동기
pip install redis[asyncio] # 비동기 (aioredis 내장)
기본 연결
import redis
# 동기 클라이언트
r = redis.Redis(
host="localhost",
port=6379,
db=0,
decode_responses=True, # bytes → str 자동 변환
)
# 연결 풀 (권장)
pool = redis.ConnectionPool(host="localhost", port=6379, db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)
# 비동기 클라이언트
import redis.asyncio as aioredis
async_r = aioredis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
기본 자료구조
# STRING — 기본 키-값
r.set("user:1:name", "Alice")
r.set("user:1:score", 100, ex=3600) # ex=만료(초), px=밀리초
name = r.get("user:1:name") # "Alice"
r.incr("visit_count") # 원자적 증가
r.incrby("visit_count", 5)
# HASH — 딕셔너리
r.hset("user:1", mapping={"name": "Alice", "email": "alice@example.com", "age": 30})
user = r.hgetall("user:1") # 전체 조회
email = r.hget("user:1", "email") # 단일 필드
r.hincrby("user:1", "age", 1) # 숫자 필드 증가
# LIST — 연결 리스트 (큐/스택)
r.lpush("queue", "task1", "task2") # 왼쪽에 추가
r.rpush("queue", "task3") # 오른쪽에 추가
item = r.lpop("queue") # 왼쪽에서 꺼내기 (FIFO)
item = r.brpop("queue", timeout=5) # blocking pop (타임아웃)
items = r.lrange("queue", 0, -1) # 전체 조회
# SET — 중복 없는 집합
r.sadd("tags:post:1", "python", "backend", "api")
r.smembers("tags:post:1") # {'python', 'backend', 'api'}
r.sinter("tags:post:1", "tags:post:2") # 교집합
r.sunion("tags:post:1", "tags:post:2") # 합집합
# SORTED SET — 점수 기반 정렬 집합 (랭킹 구현)
r.zadd("leaderboard", {"Alice": 1500, "Bob": 1300, "Charlie": 1700})
top3 = r.zrevrange("leaderboard", 0, 2, withscores=True)
# [('Charlie', 1700.0), ('Alice', 1500.0), ('Bob', 1300.0)]
rank = r.zrevrank("leaderboard", "Alice") # 1 (0부터 시작)
캐싱 패턴
import json
from functools import wraps
# Cache-Aside 패턴
def get_product(product_id: int) -> dict:
cache_key = f"product:{product_id}"
# 1. 캐시 조회
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 2. DB 조회
product = db.query(Product).get(product_id)
if not product:
return None
data = {"id": product.id, "name": product.name, "price": product.price}
# 3. 캐시 저장 (5분)
r.set(cache_key, json.dumps(data), ex=300)
return data
# 캐시 데코레이터
def cache(key_prefix: str, ttl: int = 300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = f"{key_prefix}:{':'.join(str(a) for a in args)}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
result = func(*args, **kwargs)
r.set(cache_key, json.dumps(result), ex=ttl)
return result
return wrapper
return decorator
@cache("user", ttl=600)
def get_user(user_id: int) -> dict:
user = db.query(User).get(user_id)
return {"id": user.id, "name": user.name}
# 캐시 무효화 (패턴 삭제)
def invalidate_product_cache(product_id: int):
r.delete(f"product:{product_id}")
# 패턴 기반 삭제 (주의: 대량 키가 있으면 느림)
for key in r.scan_iter("product_list:*"):
r.delete(key)
Pub/Sub — 메시지 발행/구독
# 발행자 (Publisher)
def publish_event(channel: str, event: dict):
r.publish(channel, json.dumps(event))
# 사용자 가입 이벤트 발행
publish_event("user:events", {"type": "signup", "user_id": 42, "email": "user@example.com"})
# 구독자 (Subscriber) — 별도 스레드/프로세스에서 실행
def start_subscriber():
pubsub = r.pubsub()
pubsub.subscribe("user:events")
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
if event["type"] == "signup":
send_welcome_email(event["user_id"])
# 패턴 구독
pubsub.psubscribe("user:*") # user:로 시작하는 모든 채널
세션 저장소 패턴
import secrets
from datetime import timedelta
SESSION_TTL = 3600 # 1시간
def create_session(user_id: int) -> str:
session_id = secrets.token_urlsafe(32)
session_data = {
"user_id": user_id,
"created_at": str(datetime.utcnow()),
}
r.hset(f"session:{session_id}", mapping=session_data)
r.expire(f"session:{session_id}", SESSION_TTL)
return session_id
def get_session(session_id: str) -> dict | None:
data = r.hgetall(f"session:{session_id}")
if not data:
return None
# 세션 갱신 (슬라이딩 만료)
r.expire(f"session:{session_id}", SESSION_TTL)
return data
def delete_session(session_id: str):
r.delete(f"session:{session_id}")
Rate Limiting
def rate_limit(user_id: int, limit: int = 100, window: int = 3600) -> bool:
"""시간 창(window)당 limit 횟수 이내인지 확인"""
key = f"rate_limit:{user_id}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current <= limit
# 사용 예
def api_view(request):
if not rate_limit(request.user.id, limit=100, window=3600):
return {"error": "Rate limit exceeded"}, 429
# ... 처리
정리
| 자료구조 | 활용 사례 |
|---|---|
| String | 키-값 캐시, 카운터, 토큰 |
| Hash | 사용자 정보, 세션, 설정 |
| List | 작업 큐, 최근 이력 |
| Set | 태그, 팔로우 관계 |
| Sorted Set | 랭킹, 타임라인 |
| Pub/Sub | 이벤트 알림, 실시간 메시지 |
Redis의 핵심은 원자적 연산과 TTL 기반 자동 만료로 복잡한 동시성 문제를 단순하게 해결하는 것입니다.