API Rate Limiting: Complete Implementation Guide
Master API rate limiting for production systems. Learn token bucket, sliding window, Redis-based limiting, and protect your APIs from abuse.
Moshiour Rahman
Advertisement
What is Rate Limiting?
Rate limiting controls how many requests a client can make to an API within a time window. It protects services from abuse, ensures fair usage, and maintains system stability.
Why Rate Limit?
| Purpose | Description |
|---|---|
| Prevent Abuse | Stop malicious requests |
| Fair Usage | Equal access for all users |
| Cost Control | Limit resource consumption |
| Stability | Prevent system overload |
Rate Limiting Algorithms
Fixed Window
import time
from typing import Tuple
import redis
class FixedWindowLimiter:
"""Simple fixed window rate limiter."""
def __init__(self, redis_client, max_requests: int, window_seconds: int):
self.redis = redis_client
self.max_requests = max_requests
self.window_seconds = window_seconds
def is_allowed(self, key: str) -> Tuple[bool, dict]:
current_window = int(time.time() // self.window_seconds)
redis_key = f"rate_limit:{key}:{current_window}"
current = self.redis.incr(redis_key)
if current == 1:
self.redis.expire(redis_key, self.window_seconds)
remaining = max(0, self.max_requests - current)
reset_time = (current_window + 1) * self.window_seconds
return current <= self.max_requests, {
"limit": self.max_requests,
"remaining": remaining,
"reset": reset_time
}
# Usage
r = redis.Redis()
limiter = FixedWindowLimiter(r, max_requests=100, window_seconds=60)
allowed, info = limiter.is_allowed("user:123")
print(f"Allowed: {allowed}, Remaining: {info['remaining']}")
Sliding Window Log
import time
import redis
class SlidingWindowLogLimiter:
"""Sliding window log algorithm - most accurate."""
def __init__(self, redis_client, max_requests: int, window_seconds: int):
self.redis = redis_client
self.max_requests = max_requests
self.window_seconds = window_seconds
def is_allowed(self, key: str) -> Tuple[bool, dict]:
now = time.time()
window_start = now - self.window_seconds
redis_key = f"rate_limit:swl:{key}"
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(redis_key, 0, window_start)
# Count requests in window
pipe.zcard(redis_key)
# Add current request timestamp
pipe.zadd(redis_key, {str(now): now})
# Set expiry
pipe.expire(redis_key, self.window_seconds)
results = pipe.execute()
request_count = results[1]
allowed = request_count < self.max_requests
remaining = max(0, self.max_requests - request_count - 1)
return allowed, {
"limit": self.max_requests,
"remaining": remaining,
"reset": int(now + self.window_seconds)
}
# Usage
limiter = SlidingWindowLogLimiter(redis.Redis(), max_requests=100, window_seconds=60)
Token Bucket
import time
import redis
class TokenBucketLimiter:
"""Token bucket algorithm - allows bursts."""
def __init__(
self,
redis_client,
bucket_size: int,
refill_rate: float, # tokens per second
):
self.redis = redis_client
self.bucket_size = bucket_size
self.refill_rate = refill_rate
def is_allowed(self, key: str, tokens: int = 1) -> Tuple[bool, dict]:
redis_key = f"rate_limit:tb:{key}"
now = time.time()
# Get current bucket state
bucket_data = self.redis.hgetall(redis_key)
if bucket_data:
last_update = float(bucket_data[b'last_update'])
current_tokens = float(bucket_data[b'tokens'])
# Calculate tokens to add based on time passed
time_passed = now - last_update
tokens_to_add = time_passed * self.refill_rate
current_tokens = min(self.bucket_size, current_tokens + tokens_to_add)
else:
current_tokens = self.bucket_size
# Check if we have enough tokens
if current_tokens >= tokens:
current_tokens -= tokens
allowed = True
else:
allowed = False
# Update bucket state
self.redis.hset(redis_key, mapping={
'tokens': current_tokens,
'last_update': now
})
self.redis.expire(redis_key, 3600)
return allowed, {
"limit": self.bucket_size,
"remaining": int(current_tokens),
"refill_rate": self.refill_rate
}
# Usage - 100 tokens max, refills at 10 tokens/second
limiter = TokenBucketLimiter(redis.Redis(), bucket_size=100, refill_rate=10)
Leaky Bucket
import time
import redis
from typing import Tuple
class LeakyBucketLimiter:
"""Leaky bucket - smooth output rate."""
def __init__(
self,
redis_client,
bucket_size: int,
leak_rate: float # requests per second
):
self.redis = redis_client
self.bucket_size = bucket_size
self.leak_rate = leak_rate
def is_allowed(self, key: str) -> Tuple[bool, dict]:
redis_key = f"rate_limit:lb:{key}"
now = time.time()
# Get current bucket state
bucket_data = self.redis.hgetall(redis_key)
if bucket_data:
last_update = float(bucket_data[b'last_update'])
water_level = float(bucket_data[b'water_level'])
# Calculate water leaked since last update
time_passed = now - last_update
leaked = time_passed * self.leak_rate
water_level = max(0, water_level - leaked)
else:
water_level = 0
# Try to add water (request)
if water_level < self.bucket_size:
water_level += 1
allowed = True
else:
allowed = False
# Update state
self.redis.hset(redis_key, mapping={
'water_level': water_level,
'last_update': now
})
self.redis.expire(redis_key, 3600)
return allowed, {
"limit": self.bucket_size,
"current_level": int(water_level),
"leak_rate": self.leak_rate
}
FastAPI Integration
Middleware Implementation
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import redis
import time
app = FastAPI()
redis_client = redis.Redis(decode_responses=True)
class RateLimitMiddleware:
def __init__(
self,
app,
max_requests: int = 100,
window_seconds: int = 60
):
self.app = app
self.max_requests = max_requests
self.window_seconds = window_seconds
self.redis = redis_client
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive)
# Get client identifier
client_ip = request.client.host
api_key = request.headers.get("X-API-Key", "")
identifier = api_key if api_key else client_ip
# Check rate limit
allowed, info = self._check_limit(identifier)
if not allowed:
response = JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded"},
headers={
"X-RateLimit-Limit": str(info["limit"]),
"X-RateLimit-Remaining": str(info["remaining"]),
"X-RateLimit-Reset": str(info["reset"]),
"Retry-After": str(info["reset"] - int(time.time()))
}
)
await response(scope, receive, send)
return
# Add rate limit headers to response
async def send_wrapper(message):
if message["type"] == "http.response.start":
headers = list(message.get("headers", []))
headers.extend([
(b"X-RateLimit-Limit", str(info["limit"]).encode()),
(b"X-RateLimit-Remaining", str(info["remaining"]).encode()),
(b"X-RateLimit-Reset", str(info["reset"]).encode()),
])
message["headers"] = headers
await send(message)
await self.app(scope, receive, send_wrapper)
def _check_limit(self, key: str) -> Tuple[bool, dict]:
now = time.time()
window_start = now - self.window_seconds
redis_key = f"rate_limit:{key}"
pipe = self.redis.pipeline()
pipe.zremrangebyscore(redis_key, 0, window_start)
pipe.zcard(redis_key)
pipe.zadd(redis_key, {str(now): now})
pipe.expire(redis_key, self.window_seconds)
results = pipe.execute()
count = results[1]
return count < self.max_requests, {
"limit": self.max_requests,
"remaining": max(0, self.max_requests - count - 1),
"reset": int(now + self.window_seconds)
}
# Add middleware
app.add_middleware(RateLimitMiddleware, max_requests=100, window_seconds=60)
Decorator-Based Limiting
from fastapi import FastAPI, Request, HTTPException, Depends
from functools import wraps
import redis
app = FastAPI()
redis_client = redis.Redis(decode_responses=True)
def rate_limit(max_requests: int, window_seconds: int):
"""Rate limit decorator for specific endpoints."""
def decorator(func):
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
client_ip = request.client.host
key = f"rate_limit:{func.__name__}:{client_ip}"
current_window = int(time.time() // window_seconds)
redis_key = f"{key}:{current_window}"
current = redis_client.incr(redis_key)
if current == 1:
redis_client.expire(redis_key, window_seconds)
if current > max_requests:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
return await func(request, *args, **kwargs)
return wrapper
return decorator
@app.get("/api/data")
@rate_limit(max_requests=10, window_seconds=60)
async def get_data(request: Request):
return {"data": "limited endpoint"}
@app.get("/api/public")
@rate_limit(max_requests=100, window_seconds=60)
async def get_public(request: Request):
return {"data": "less limited endpoint"}
Tiered Rate Limits
from fastapi import FastAPI, Request, Depends, HTTPException
from enum import Enum
import redis
app = FastAPI()
redis_client = redis.Redis(decode_responses=True)
class UserTier(str, Enum):
FREE = "free"
BASIC = "basic"
PRO = "pro"
ENTERPRISE = "enterprise"
TIER_LIMITS = {
UserTier.FREE: {"requests": 100, "window": 3600},
UserTier.BASIC: {"requests": 1000, "window": 3600},
UserTier.PRO: {"requests": 10000, "window": 3600},
UserTier.ENTERPRISE: {"requests": 100000, "window": 3600},
}
class TieredRateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
def check_limit(self, user_id: str, tier: UserTier) -> Tuple[bool, dict]:
limits = TIER_LIMITS[tier]
key = f"rate_limit:{tier}:{user_id}"
now = time.time()
window_start = now - limits["window"]
pipe = self.redis.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, limits["window"])
results = pipe.execute()
count = results[1]
allowed = count < limits["requests"]
remaining = max(0, limits["requests"] - count - 1)
return allowed, {
"tier": tier,
"limit": limits["requests"],
"remaining": remaining,
"reset": int(now + limits["window"])
}
limiter = TieredRateLimiter(redis_client)
async def get_user_tier(request: Request) -> Tuple[str, UserTier]:
# In production, get from auth token or database
api_key = request.headers.get("X-API-Key", "")
# Lookup user tier from API key
return "user_123", UserTier.BASIC
@app.get("/api/resource")
async def get_resource(
request: Request,
user_info: Tuple[str, UserTier] = Depends(get_user_tier)
):
user_id, tier = user_info
allowed, info = limiter.check_limit(user_id, tier)
if not allowed:
raise HTTPException(
status_code=429,
detail={
"error": "Rate limit exceeded",
"tier": tier,
"limit": info["limit"],
"reset": info["reset"]
}
)
return {"data": "success", "rate_limit": info}
Distributed Rate Limiting
Redis Cluster
import redis
from redis.cluster import RedisCluster
import time
class DistributedRateLimiter:
"""Rate limiter for distributed systems using Redis Cluster."""
def __init__(self, startup_nodes: list):
self.redis = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True
)
def is_allowed(
self,
key: str,
max_requests: int,
window_seconds: int
) -> Tuple[bool, dict]:
# Use Lua script for atomic operation
lua_script = """
local key = KEYS[1]
local max_requests = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- Remove old entries
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- Get current count
local count = redis.call('ZCARD', key)
if count < max_requests then
-- Add request
redis.call('ZADD', key, now, now .. '-' .. math.random())
redis.call('EXPIRE', key, window)
return {1, max_requests - count - 1}
else
return {0, 0}
end
"""
now = time.time()
result = self.redis.eval(
lua_script,
1, # number of keys
f"rate_limit:{key}",
max_requests,
window_seconds,
now
)
allowed = result[0] == 1
remaining = result[1]
return allowed, {
"limit": max_requests,
"remaining": remaining,
"reset": int(now + window_seconds)
}
Response Headers
from fastapi import FastAPI, Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware
class RateLimitHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Get rate limit info from request state
rate_limit_info = getattr(request.state, "rate_limit_info", None)
if rate_limit_info:
response.headers["X-RateLimit-Limit"] = str(rate_limit_info["limit"])
response.headers["X-RateLimit-Remaining"] = str(rate_limit_info["remaining"])
response.headers["X-RateLimit-Reset"] = str(rate_limit_info["reset"])
response.headers["X-RateLimit-Policy"] = rate_limit_info.get("policy", "sliding_window")
return response
app = FastAPI()
app.add_middleware(RateLimitHeadersMiddleware)
Summary
| Algorithm | Best For |
|---|---|
| Fixed Window | Simple, high performance |
| Sliding Window | Accurate, smooth |
| Token Bucket | Allowing bursts |
| Leaky Bucket | Smooth output |
Rate limiting protects APIs and ensures fair resource usage across all clients.
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
FastAPI Tutorial Part 18: API Security Best Practices
Secure your FastAPI application against common vulnerabilities. Learn input validation, rate limiting, CORS, and OWASP security patterns.
PythonFastAPI Tutorial: Build Modern Python APIs
Master FastAPI for building high-performance Python APIs. Learn async endpoints, validation, authentication, database integration, and deployment.
PythonFastAPI Tutorial Part 20: Building a Production-Ready API
Build a complete production-ready FastAPI application. Combine all concepts into a real-world e-commerce API with authentication, database, and deployment.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.