Skip to main content
Advertisement

Redis Usage

Redis is an in-memory data structure store. It's used for caching, sessions, queues, Pub/Sub, and more.


Installation

pip install redis           # synchronous
pip install redis[asyncio] # async (aioredis built-in)

Basic Connection

import redis

# Synchronous client
r = redis.Redis(
host="localhost",
port=6379,
db=0,
decode_responses=True, # auto convert bytes → str
)

# Connection pool (recommended)
pool = redis.ConnectionPool(host="localhost", port=6379, db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)

# Async client
import redis.asyncio as aioredis

async_r = aioredis.Redis(host="localhost", port=6379, db=0, decode_responses=True)

Basic Data Structures

# STRING — basic key-value
r.set("user:1:name", "Alice")
r.set("user:1:score", 100, ex=3600) # ex=expiry(seconds), px=milliseconds
name = r.get("user:1:name") # "Alice"

r.incr("visit_count") # atomic increment
r.incrby("visit_count", 5)

# HASH — dictionary
r.hset("user:1", mapping={"name": "Alice", "email": "alice@example.com", "age": 30})
user = r.hgetall("user:1") # get all fields
email = r.hget("user:1", "email") # get single field
r.hincrby("user:1", "age", 1) # increment numeric field

# LIST — linked list (queue/stack)
r.lpush("queue", "task1", "task2") # add to left
r.rpush("queue", "task3") # add to right
item = r.lpop("queue") # pop from left (FIFO)
item = r.brpop("queue", timeout=5) # blocking pop (with timeout)
items = r.lrange("queue", 0, -1) # get all items

# SET — unique collection
r.sadd("tags:post:1", "python", "backend", "api")
r.smembers("tags:post:1") # {'python', 'backend', 'api'}
r.sinter("tags:post:1", "tags:post:2") # intersection
r.sunion("tags:post:1", "tags:post:2") # union

# SORTED SET — score-based ordered collection (rankings)
r.zadd("leaderboard", {"Alice": 1500, "Bob": 1300, "Charlie": 1700})
top3 = r.zrevrange("leaderboard", 0, 2, withscores=True)
# [('Charlie', 1700.0), ('Alice', 1500.0), ('Bob', 1300.0)]
rank = r.zrevrank("leaderboard", "Alice") # 1 (0-indexed)

Caching Patterns

import json
from functools import wraps


# Cache-Aside pattern
def get_product(product_id: int) -> dict:
cache_key = f"product:{product_id}"

# 1. Check cache
cached = r.get(cache_key)
if cached:
return json.loads(cached)

# 2. Query DB
product = db.query(Product).get(product_id)
if not product:
return None

data = {"id": product.id, "name": product.name, "price": product.price}

# 3. Store in cache (5 minutes)
r.set(cache_key, json.dumps(data), ex=300)
return data


# Cache decorator
def cache(key_prefix: str, ttl: int = 300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = f"{key_prefix}:{':'.join(str(a) for a in args)}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
result = func(*args, **kwargs)
r.set(cache_key, json.dumps(result), ex=ttl)
return result
return wrapper
return decorator


@cache("user", ttl=600)
def get_user(user_id: int) -> dict:
user = db.query(User).get(user_id)
return {"id": user.id, "name": user.name}


# Cache invalidation (pattern delete)
def invalidate_product_cache(product_id: int):
r.delete(f"product:{product_id}")
# Pattern-based delete (caution: slow with many keys)
for key in r.scan_iter("product_list:*"):
r.delete(key)

Pub/Sub — Message Publish/Subscribe

# Publisher
def publish_event(channel: str, event: dict):
r.publish(channel, json.dumps(event))


# Publish user signup event
publish_event("user:events", {"type": "signup", "user_id": 42, "email": "user@example.com"})


# Subscriber — runs in separate thread/process
def start_subscriber():
pubsub = r.pubsub()
pubsub.subscribe("user:events")

for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
if event["type"] == "signup":
send_welcome_email(event["user_id"])


# Pattern subscription
pubsub.psubscribe("user:*") # all channels starting with user:

Session Storage Pattern

import secrets
from datetime import timedelta


SESSION_TTL = 3600 # 1 hour


def create_session(user_id: int) -> str:
session_id = secrets.token_urlsafe(32)
session_data = {
"user_id": user_id,
"created_at": str(datetime.utcnow()),
}
r.hset(f"session:{session_id}", mapping=session_data)
r.expire(f"session:{session_id}", SESSION_TTL)
return session_id


def get_session(session_id: str) -> dict | None:
data = r.hgetall(f"session:{session_id}")
if not data:
return None
# Renew session (sliding expiration)
r.expire(f"session:{session_id}", SESSION_TTL)
return data


def delete_session(session_id: str):
r.delete(f"session:{session_id}")

Rate Limiting

def rate_limit(user_id: int, limit: int = 100, window: int = 3600) -> bool:
"""Check if within limit requests per time window"""
key = f"rate_limit:{user_id}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current <= limit


# Usage example
def api_view(request):
if not rate_limit(request.user.id, limit=100, window=3600):
return {"error": "Rate limit exceeded"}, 429
# ... handle request

Summary

Data StructureUse Cases
StringKey-value cache, counters, tokens
HashUser info, sessions, config
ListTask queues, recent history
SetTags, follow relationships
Sorted SetRankings, timelines
Pub/SubEvent notifications, real-time messages

Redis's strength lies in atomic operations and TTL-based auto-expiration that simplify complex concurrency problems.

Advertisement