본문으로 건너뛰기
Advertisement

Redis 활용

Redis는 인메모리 데이터 구조 저장소입니다. 캐싱, 세션, 큐, Pub/Sub 등 다양한 용도로 활용됩니다.


설치

pip install redis           # 동기
pip install redis[asyncio] # 비동기 (aioredis 내장)

기본 연결

import redis

# 동기 클라이언트
r = redis.Redis(
host="localhost",
port=6379,
db=0,
decode_responses=True, # bytes → str 자동 변환
)

# 연결 풀 (권장)
pool = redis.ConnectionPool(host="localhost", port=6379, db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)

# 비동기 클라이언트
import redis.asyncio as aioredis

async_r = aioredis.Redis(host="localhost", port=6379, db=0, decode_responses=True)

기본 자료구조

# STRING — 기본 키-값
r.set("user:1:name", "Alice")
r.set("user:1:score", 100, ex=3600) # ex=만료(초), px=밀리초
name = r.get("user:1:name") # "Alice"

r.incr("visit_count") # 원자적 증가
r.incrby("visit_count", 5)

# HASH — 딕셔너리
r.hset("user:1", mapping={"name": "Alice", "email": "alice@example.com", "age": 30})
user = r.hgetall("user:1") # 전체 조회
email = r.hget("user:1", "email") # 단일 필드
r.hincrby("user:1", "age", 1) # 숫자 필드 증가

# LIST — 연결 리스트 (큐/스택)
r.lpush("queue", "task1", "task2") # 왼쪽에 추가
r.rpush("queue", "task3") # 오른쪽에 추가
item = r.lpop("queue") # 왼쪽에서 꺼내기 (FIFO)
item = r.brpop("queue", timeout=5) # blocking pop (타임아웃)
items = r.lrange("queue", 0, -1) # 전체 조회

# SET — 중복 없는 집합
r.sadd("tags:post:1", "python", "backend", "api")
r.smembers("tags:post:1") # {'python', 'backend', 'api'}
r.sinter("tags:post:1", "tags:post:2") # 교집합
r.sunion("tags:post:1", "tags:post:2") # 합집합

# SORTED SET — 점수 기반 정렬 집합 (랭킹 구현)
r.zadd("leaderboard", {"Alice": 1500, "Bob": 1300, "Charlie": 1700})
top3 = r.zrevrange("leaderboard", 0, 2, withscores=True)
# [('Charlie', 1700.0), ('Alice', 1500.0), ('Bob', 1300.0)]
rank = r.zrevrank("leaderboard", "Alice") # 1 (0부터 시작)

캐싱 패턴

import json
from functools import wraps


# Cache-Aside 패턴
def get_product(product_id: int) -> dict:
cache_key = f"product:{product_id}"

# 1. 캐시 조회
cached = r.get(cache_key)
if cached:
return json.loads(cached)

# 2. DB 조회
product = db.query(Product).get(product_id)
if not product:
return None

data = {"id": product.id, "name": product.name, "price": product.price}

# 3. 캐시 저장 (5분)
r.set(cache_key, json.dumps(data), ex=300)
return data


# 캐시 데코레이터
def cache(key_prefix: str, ttl: int = 300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = f"{key_prefix}:{':'.join(str(a) for a in args)}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
result = func(*args, **kwargs)
r.set(cache_key, json.dumps(result), ex=ttl)
return result
return wrapper
return decorator


@cache("user", ttl=600)
def get_user(user_id: int) -> dict:
user = db.query(User).get(user_id)
return {"id": user.id, "name": user.name}


# 캐시 무효화 (패턴 삭제)
def invalidate_product_cache(product_id: int):
r.delete(f"product:{product_id}")
# 패턴 기반 삭제 (주의: 대량 키가 있으면 느림)
for key in r.scan_iter("product_list:*"):
r.delete(key)

Pub/Sub — 메시지 발행/구독

# 발행자 (Publisher)
def publish_event(channel: str, event: dict):
r.publish(channel, json.dumps(event))


# 사용자 가입 이벤트 발행
publish_event("user:events", {"type": "signup", "user_id": 42, "email": "user@example.com"})


# 구독자 (Subscriber) — 별도 스레드/프로세스에서 실행
def start_subscriber():
pubsub = r.pubsub()
pubsub.subscribe("user:events")

for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
if event["type"] == "signup":
send_welcome_email(event["user_id"])


# 패턴 구독
pubsub.psubscribe("user:*") # user:로 시작하는 모든 채널

세션 저장소 패턴

import secrets
from datetime import timedelta


SESSION_TTL = 3600 # 1시간


def create_session(user_id: int) -> str:
session_id = secrets.token_urlsafe(32)
session_data = {
"user_id": user_id,
"created_at": str(datetime.utcnow()),
}
r.hset(f"session:{session_id}", mapping=session_data)
r.expire(f"session:{session_id}", SESSION_TTL)
return session_id


def get_session(session_id: str) -> dict | None:
data = r.hgetall(f"session:{session_id}")
if not data:
return None
# 세션 갱신 (슬라이딩 만료)
r.expire(f"session:{session_id}", SESSION_TTL)
return data


def delete_session(session_id: str):
r.delete(f"session:{session_id}")

Rate Limiting

def rate_limit(user_id: int, limit: int = 100, window: int = 3600) -> bool:
"""시간 창(window)당 limit 횟수 이내인지 확인"""
key = f"rate_limit:{user_id}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current <= limit


# 사용 예
def api_view(request):
if not rate_limit(request.user.id, limit=100, window=3600):
return {"error": "Rate limit exceeded"}, 429
# ... 처리

정리

자료구조활용 사례
String키-값 캐시, 카운터, 토큰
Hash사용자 정보, 세션, 설정
List작업 큐, 최근 이력
Set태그, 팔로우 관계
Sorted Set랭킹, 타임라인
Pub/Sub이벤트 알림, 실시간 메시지

Redis의 핵심은 원자적 연산TTL 기반 자동 만료로 복잡한 동시성 문제를 단순하게 해결하는 것입니다.

Advertisement