FastAPI Tutorial Part 12: WebSockets - Real-Time Communication
Build real-time features with FastAPI WebSockets. Learn bidirectional communication, chat systems, live updates, and broadcasting patterns.
Moshiour Rahman
Advertisement
Understanding WebSockets
WebSockets enable persistent, bidirectional communication between client and server, unlike HTTP’s request-response model.
HTTP: Client --request--> Server --response--> Client (connection closes)
WebSocket: Client <========== Full Duplex ===========> Server (stays open)
Basic WebSocket
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message received: {data}")
Client (JavaScript)
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onopen = () => {
console.log("Connected");
ws.send("Hello Server!");
};
ws.onmessage = (event) => {
console.log("Received:", event.data);
};
ws.onclose = () => {
console.log("Disconnected");
};
Connection Management
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"{client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"{client_id} left the chat")
Chat Room Example
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set
from pydantic import BaseModel
import json
app = FastAPI()
class Room:
def __init__(self):
self.connections: Dict[str, WebSocket] = {}
async def connect(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.connections[user_id] = websocket
await self.broadcast_system(f"{user_id} joined")
def disconnect(self, user_id: str):
if user_id in self.connections:
del self.connections[user_id]
async def broadcast(self, message: dict):
for websocket in self.connections.values():
await websocket.send_json(message)
async def broadcast_system(self, text: str):
await self.broadcast({"type": "system", "message": text})
async def send_to_user(self, user_id: str, message: dict):
if user_id in self.connections:
await self.connections[user_id].send_json(message)
rooms: Dict[str, Room] = {}
def get_room(room_id: str) -> Room:
if room_id not in rooms:
rooms[room_id] = Room()
return rooms[room_id]
@app.websocket("/chat/{room_id}/{user_id}")
async def chat_endpoint(
websocket: WebSocket,
room_id: str,
user_id: str
):
room = get_room(room_id)
await room.connect(user_id, websocket)
try:
while True:
data = await websocket.receive_json()
if data["type"] == "message":
await room.broadcast({
"type": "message",
"user": user_id,
"content": data["content"]
})
elif data["type"] == "private":
await room.send_to_user(data["to"], {
"type": "private",
"from": user_id,
"content": data["content"]
})
except WebSocketDisconnect:
room.disconnect(user_id)
await room.broadcast_system(f"{user_id} left")
Authentication
from fastapi import WebSocket, status, Query
from .auth.jwt import decode_token
async def get_ws_user(
websocket: WebSocket,
token: str = Query(...)
) -> User:
payload = decode_token(token)
if not payload:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return None
user = get_user_by_id(payload["sub"])
return user
@app.websocket("/ws")
async def authenticated_ws(
websocket: WebSocket,
user: User = Depends(get_ws_user)
):
if not user:
return
await websocket.accept()
# Handle authenticated connection
Live Data Streaming
import asyncio
from datetime import datetime
@app.websocket("/ws/prices")
async def price_stream(websocket: WebSocket):
await websocket.accept()
try:
while True:
prices = get_current_prices()
await websocket.send_json({
"timestamp": datetime.now().isoformat(),
"prices": prices
})
await asyncio.sleep(1) # Send every second
except WebSocketDisconnect:
pass
@app.websocket("/ws/notifications/{user_id}")
async def notifications_stream(websocket: WebSocket, user_id: str):
await websocket.accept()
async def notification_listener():
while True:
notification = await get_user_notifications(user_id)
if notification:
await websocket.send_json(notification)
await asyncio.sleep(0.5)
try:
asyncio.create_task(notification_listener())
while True:
# Keep connection alive, handle client messages
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
pass
Complete Example
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from typing import Dict, List
import json
app = FastAPI()
class ChatManager:
def __init__(self):
self.rooms: Dict[str, Dict[str, WebSocket]] = {}
async def join_room(self, room_id: str, user_id: str, ws: WebSocket):
await ws.accept()
if room_id not in self.rooms:
self.rooms[room_id] = {}
self.rooms[room_id][user_id] = ws
await self.broadcast_to_room(room_id, {
"type": "join",
"user": user_id,
"users": list(self.rooms[room_id].keys())
})
def leave_room(self, room_id: str, user_id: str):
if room_id in self.rooms and user_id in self.rooms[room_id]:
del self.rooms[room_id][user_id]
async def broadcast_to_room(self, room_id: str, message: dict):
if room_id in self.rooms:
for ws in self.rooms[room_id].values():
await ws.send_json(message)
manager = ChatManager()
@app.websocket("/ws/chat/{room_id}/{user_id}")
async def chat(websocket: WebSocket, room_id: str, user_id: str):
await manager.join_room(room_id, user_id, websocket)
try:
while True:
data = await websocket.receive_json()
await manager.broadcast_to_room(room_id, {
"type": "message",
"user": user_id,
"content": data.get("content", "")
})
except WebSocketDisconnect:
manager.leave_room(room_id, user_id)
await manager.broadcast_to_room(room_id, {
"type": "leave",
"user": user_id
})
Summary
| Feature | Description |
|---|---|
websocket.accept() | Accept connection |
websocket.receive_*() | Receive data |
websocket.send_*() | Send data |
WebSocketDisconnect | Handle disconnection |
| Pattern | Use Case |
|---|---|
| Connection Manager | Multi-client broadcast |
| Rooms | Group communication |
| Streaming | Live data updates |
Next Steps
In Part 13, we’ll explore Middleware and Events - intercepting requests and handling application lifecycle.
Series Navigation:
- Part 1-11: Foundations & Tasks
- Part 12: WebSockets (You are here)
- Part 13: Middleware & Events
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
FastAPI Tutorial Part 20: Building a Production-Ready API
Build a complete production-ready FastAPI application. Combine all concepts into a real-world e-commerce API with authentication, database, and deployment.
PythonFastAPI Tutorial Part 19: OpenAPI and API Documentation
Create professional API documentation with FastAPI. Learn OpenAPI customization, Swagger UI, ReDoc, and documentation best practices.
PythonFastAPI Tutorial Part 16: Docker and Deployment
Deploy FastAPI applications to production. Learn Docker containerization, Docker Compose, cloud deployment, and production best practices.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.