Redis Caching: Complete Guide to High-Performance Data Caching
Master Redis caching for web applications. Learn cache strategies, data structures, pub/sub, sessions, and build scalable caching solutions.
Moshiour Rahman
Advertisement
What is Redis?
Redis is an in-memory data store used as a cache, message broker, and database. Its speed and versatility make it essential for high-performance applications.
Redis Use Cases
| Use Case | Description |
|---|---|
| Caching | Store frequently accessed data |
| Sessions | User session management |
| Rate Limiting | API request throttling |
| Pub/Sub | Real-time messaging |
| Queues | Job and task queues |
Getting Started
Installation
# Docker (recommended)
docker run -d --name redis -p 6379:6379 redis:latest
# macOS
brew install redis
brew services start redis
# Ubuntu
sudo apt install redis-server
sudo systemctl start redis
# Python client
pip install redis
Basic Operations
import redis
# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# String operations
r.set('name', 'John')
r.set('age', 30)
r.set('session', 'abc123', ex=3600) # Expires in 1 hour
name = r.get('name')
print(f"Name: {name}")
# Check existence
exists = r.exists('name')
print(f"Exists: {exists}")
# Delete keys
r.delete('name')
# Set multiple values
r.mset({'key1': 'value1', 'key2': 'value2'})
# Get multiple values
values = r.mget(['key1', 'key2'])
print(values)
# Increment/Decrement
r.set('counter', 0)
r.incr('counter') # 1
r.incrby('counter', 5) # 6
r.decr('counter') # 5
Data Structures
Lists
import redis
r = redis.Redis(decode_responses=True)
# Add to list
r.lpush('queue', 'task1', 'task2') # Left push
r.rpush('queue', 'task3') # Right push
# Get list
items = r.lrange('queue', 0, -1)
print(items) # ['task2', 'task1', 'task3']
# Pop from list
task = r.lpop('queue') # Remove from left
task = r.rpop('queue') # Remove from right
# Blocking pop (for queues)
task = r.blpop('queue', timeout=5)
# List length
length = r.llen('queue')
# Get by index
item = r.lindex('queue', 0)
Hashes
# Store user object
r.hset('user:1', mapping={
'name': 'John',
'email': 'john@example.com',
'age': 30
})
# Get single field
name = r.hget('user:1', 'name')
# Get all fields
user = r.hgetall('user:1')
print(user) # {'name': 'John', 'email': 'john@example.com', 'age': '30'}
# Update field
r.hset('user:1', 'age', 31)
# Increment field
r.hincrby('user:1', 'age', 1)
# Delete field
r.hdel('user:1', 'email')
# Check field exists
exists = r.hexists('user:1', 'name')
Sets
# Add to set
r.sadd('tags:article1', 'python', 'redis', 'database')
r.sadd('tags:article2', 'python', 'django', 'web')
# Get all members
tags = r.smembers('tags:article1')
print(tags) # {'python', 'redis', 'database'}
# Check membership
is_member = r.sismember('tags:article1', 'python')
# Set operations
intersection = r.sinter('tags:article1', 'tags:article2') # Common tags
union = r.sunion('tags:article1', 'tags:article2') # All tags
diff = r.sdiff('tags:article1', 'tags:article2') # In article1 but not article2
# Random member
random_tag = r.srandmember('tags:article1')
# Remove member
r.srem('tags:article1', 'database')
Sorted Sets
# Leaderboard example
r.zadd('leaderboard', {
'player1': 100,
'player2': 200,
'player3': 150
})
# Get top players
top = r.zrevrange('leaderboard', 0, 2, withscores=True)
print(top) # [('player2', 200.0), ('player3', 150.0), ('player1', 100.0)]
# Get player rank
rank = r.zrevrank('leaderboard', 'player1')
# Get player score
score = r.zscore('leaderboard', 'player1')
# Increment score
r.zincrby('leaderboard', 50, 'player1')
# Get players by score range
players = r.zrangebyscore('leaderboard', 100, 200)
Caching Strategies
Cache-Aside Pattern
import redis
import json
from typing import Optional, Any
from functools import wraps
r = redis.Redis(decode_responses=True)
def get_user_from_db(user_id: int) -> dict:
"""Simulate database query."""
return {'id': user_id, 'name': f'User {user_id}'}
def get_user(user_id: int) -> Optional[dict]:
"""Cache-aside pattern."""
cache_key = f'user:{user_id}'
# Try cache first
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss - get from database
user = get_user_from_db(user_id)
# Store in cache
r.set(cache_key, json.dumps(user), ex=3600)
return user
# Decorator version
def cached(prefix: str, ttl: int = 3600):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = f"{prefix}:{':'.join(map(str, args))}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
result = func(*args, **kwargs)
r.set(cache_key, json.dumps(result), ex=ttl)
return result
return wrapper
return decorator
@cached('product', ttl=1800)
def get_product(product_id: int) -> dict:
return {'id': product_id, 'name': f'Product {product_id}'}
Write-Through Cache
class WriteThoughCache:
def __init__(self, redis_client, db):
self.redis = redis_client
self.db = db
def set(self, key: str, value: Any, ttl: int = 3600):
"""Write to both cache and database."""
# Write to database first
self.db.save(key, value)
# Then update cache
self.redis.set(key, json.dumps(value), ex=ttl)
def get(self, key: str) -> Optional[Any]:
"""Read from cache, fallback to database."""
cached = self.redis.get(key)
if cached:
return json.loads(cached)
value = self.db.get(key)
if value:
self.redis.set(key, json.dumps(value), ex=3600)
return value
Cache Invalidation
class CacheManager:
def __init__(self, redis_client):
self.redis = redis_client
def invalidate_key(self, key: str):
"""Delete single key."""
self.redis.delete(key)
def invalidate_pattern(self, pattern: str):
"""Delete all keys matching pattern."""
cursor = 0
while True:
cursor, keys = self.redis.scan(cursor, match=pattern, count=100)
if keys:
self.redis.delete(*keys)
if cursor == 0:
break
def invalidate_user_cache(self, user_id: int):
"""Invalidate all user-related cache."""
patterns = [
f'user:{user_id}',
f'user:{user_id}:*',
f'orders:user:{user_id}:*'
]
for pattern in patterns:
self.invalidate_pattern(pattern)
Session Management
import redis
import uuid
import json
from datetime import timedelta
class SessionManager:
def __init__(self, redis_client, ttl: int = 86400):
self.redis = redis_client
self.ttl = ttl
self.prefix = 'session:'
def create_session(self, user_id: int, data: dict = None) -> str:
"""Create new session."""
session_id = str(uuid.uuid4())
session_data = {
'user_id': user_id,
'created_at': str(datetime.now()),
**(data or {})
}
key = f'{self.prefix}{session_id}'
self.redis.set(key, json.dumps(session_data), ex=self.ttl)
return session_id
def get_session(self, session_id: str) -> Optional[dict]:
"""Get session data."""
key = f'{self.prefix}{session_id}'
data = self.redis.get(key)
if data:
# Refresh TTL on access
self.redis.expire(key, self.ttl)
return json.loads(data)
return None
def update_session(self, session_id: str, data: dict):
"""Update session data."""
key = f'{self.prefix}{session_id}'
current = self.get_session(session_id)
if current:
current.update(data)
self.redis.set(key, json.dumps(current), ex=self.ttl)
def destroy_session(self, session_id: str):
"""Delete session."""
key = f'{self.prefix}{session_id}'
self.redis.delete(key)
def get_user_sessions(self, user_id: int) -> list:
"""Get all sessions for a user."""
sessions = []
cursor = 0
while True:
cursor, keys = self.redis.scan(cursor, match=f'{self.prefix}*')
for key in keys:
data = self.redis.get(key)
if data:
session = json.loads(data)
if session.get('user_id') == user_id:
sessions.append(key.replace(self.prefix, ''))
if cursor == 0:
break
return sessions
Rate Limiting
Token Bucket
import redis
import time
class RateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
def is_allowed(
self,
key: str,
max_requests: int,
window_seconds: int
) -> tuple[bool, int]:
"""Check if request is allowed."""
now = time.time()
window_start = now - window_seconds
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count requests in window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, window_seconds)
results = pipe.execute()
request_count = results[1]
if request_count < max_requests:
return True, max_requests - request_count - 1
return False, 0
def sliding_window(
self,
key: str,
max_requests: int,
window_seconds: int
) -> bool:
"""Sliding window rate limiter."""
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, window_seconds)
return current <= max_requests
# FastAPI middleware example
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
limiter = RateLimiter(redis.Redis())
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host
key = f"rate_limit:{client_ip}"
allowed, remaining = limiter.is_allowed(key, max_requests=100, window_seconds=60)
if not allowed:
raise HTTPException(429, "Too many requests")
response = await call_next(request)
response.headers["X-RateLimit-Remaining"] = str(remaining)
return response
Pub/Sub Messaging
import redis
import threading
import json
r = redis.Redis(decode_responses=True)
# Publisher
def publish_message(channel: str, message: dict):
r.publish(channel, json.dumps(message))
# Subscriber
def subscribe_handler(channel: str, callback):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
callback(data)
# Example usage
def on_message(data):
print(f"Received: {data}")
# Start subscriber in background
thread = threading.Thread(
target=subscribe_handler,
args=('notifications', on_message)
)
thread.daemon = True
thread.start()
# Publish messages
publish_message('notifications', {'type': 'alert', 'message': 'Hello!'})
FastAPI Integration
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
import redis.asyncio as redis
app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
app.state.redis = redis.Redis(decode_responses=True)
yield
# Shutdown
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)
async def get_redis():
return app.state.redis
@app.get("/cached/{key}")
async def get_cached(key: str, r = Depends(get_redis)):
value = await r.get(key)
return {"key": key, "value": value}
@app.post("/cache/{key}")
async def set_cached(key: str, value: str, ttl: int = 3600, r = Depends(get_redis)):
await r.set(key, value, ex=ttl)
return {"status": "cached"}
Summary
| Feature | Command |
|---|---|
| Set value | r.set(key, value) |
| Get value | r.get(key) |
| Hash | r.hset(), r.hget() |
| List | r.lpush(), r.lpop() |
| Set | r.sadd(), r.smembers() |
| Sorted Set | r.zadd(), r.zrange() |
| Pub/Sub | r.publish(), r.subscribe() |
Redis provides lightning-fast caching and data structures for building scalable applications.
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
API Design Part 5: Caching Strategies
Master multi-layer caching architecture, HTTP cache headers, ETags, and cache invalidation patterns. Build fast, scalable APIs with proper caching.
System DesignAPI Design Part 3: Rate Limiting & Pagination
Master rate limiting algorithms, production Redis implementations, and cursor pagination. Protect your API from abuse while efficiently serving large datasets.
System DesignElasticsearch: Complete Full-Text Search Guide
Master Elasticsearch for full-text search. Learn indexing, queries, aggregations, and build powerful search applications with Python.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.