FastAPI Tutorial Part 10: Role-Based Access Control (RBAC)
Implement RBAC in FastAPI. Learn user roles, permissions, scopes, and granular access control for secure multi-tenant applications.
Moshiour Rahman
Advertisement
Understanding RBAC
Role-Based Access Control assigns permissions to roles, and roles to users. This provides a scalable way to manage access in applications.

Database Models
# app/models/rbac.py
from sqlalchemy import Column, Integer, String, ForeignKey, Table
from sqlalchemy.orm import relationship
from ..database import Base
# Association tables
user_roles = Table(
"user_roles",
Base.metadata,
Column("user_id", Integer, ForeignKey("users.id")),
Column("role_id", Integer, ForeignKey("roles.id"))
)
role_permissions = Table(
"role_permissions",
Base.metadata,
Column("role_id", Integer, ForeignKey("roles.id")),
Column("permission_id", Integer, ForeignKey("permissions.id"))
)
class Permission(Base):
__tablename__ = "permissions"
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True) # e.g., "users:read"
description = Column(String(200))
class Role(Base):
__tablename__ = "roles"
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True) # e.g., "admin"
description = Column(String(200))
permissions = relationship("Permission", secondary=role_permissions)
users = relationship("User", secondary=user_roles, back_populates="roles")
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, unique=True)
username = Column(String, unique=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
roles = relationship("Role", secondary=user_roles, back_populates="users")
@property
def permissions(self) -> set:
perms = set()
for role in self.roles:
for perm in role.permissions:
perms.add(perm.name)
return perms
def has_permission(self, permission: str) -> bool:
return permission in self.permissions
def has_role(self, role_name: str) -> bool:
return any(r.name == role_name for r in self.roles)
Permission Checking
Permission Dependencies
# app/auth/permissions.py
from fastapi import Depends, HTTPException, status
from ..auth.oauth2 import get_current_active_user
from ..models.user import User
class PermissionChecker:
def __init__(self, required_permissions: list[str]):
self.required_permissions = required_permissions
def __call__(self, user: User = Depends(get_current_active_user)) -> User:
for permission in self.required_permissions:
if not user.has_permission(permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission denied: {permission} required"
)
return user
# Usage
require_read = PermissionChecker(["items:read"])
require_write = PermissionChecker(["items:write"])
require_admin = PermissionChecker(["admin"])
@app.get("/items")
def list_items(user: User = Depends(require_read)):
return get_items()
@app.post("/items")
def create_item(item: ItemCreate, user: User = Depends(require_write)):
return create_new_item(item)
Role Dependencies
def require_role(role_name: str):
def role_checker(user: User = Depends(get_current_active_user)) -> User:
if not user.has_role(role_name):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role '{role_name}' required"
)
return user
return role_checker
@app.get("/admin/dashboard")
def admin_dashboard(admin: User = Depends(require_role("admin"))):
return {"message": "Admin area"}
Multiple Permissions (AND/OR)
def require_all_permissions(permissions: list[str]):
"""User must have ALL permissions."""
def checker(user: User = Depends(get_current_active_user)) -> User:
missing = [p for p in permissions if not user.has_permission(p)]
if missing:
raise HTTPException(
status_code=403,
detail=f"Missing permissions: {missing}"
)
return user
return checker
def require_any_permission(permissions: list[str]):
"""User must have AT LEAST ONE permission."""
def checker(user: User = Depends(get_current_active_user)) -> User:
if not any(user.has_permission(p) for p in permissions):
raise HTTPException(
status_code=403,
detail=f"Requires one of: {permissions}"
)
return user
return checker
# Usage
@app.delete("/items/{id}")
def delete_item(
id: int,
user: User = Depends(require_any_permission(["items:delete", "admin"]))
):
return delete_item_by_id(id)
Resource-Based Permissions
Owner Check
def require_owner_or_admin(resource_getter):
"""Check if user owns the resource or is admin."""
async def checker(
resource_id: int,
user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
resource = resource_getter(db, resource_id)
if not resource:
raise HTTPException(status_code=404, detail="Not found")
if resource.owner_id != user.id and not user.has_role("admin"):
raise HTTPException(status_code=403, detail="Not authorized")
return resource
return checker
get_item_if_authorized = require_owner_or_admin(get_item)
@app.put("/items/{resource_id}")
def update_item(
item_update: ItemUpdate,
item = Depends(get_item_if_authorized)
):
return update_item_in_db(item, item_update)
Complete RBAC System
# app/auth/rbac.py
from fastapi import Depends, HTTPException, status
from typing import List, Optional, Callable
from enum import Enum
from ..models.user import User
from .oauth2 import get_current_active_user
class Permission(str, Enum):
# Users
USERS_READ = "users:read"
USERS_WRITE = "users:write"
USERS_DELETE = "users:delete"
# Items
ITEMS_READ = "items:read"
ITEMS_WRITE = "items:write"
ITEMS_DELETE = "items:delete"
# Admin
ADMIN = "admin"
class Role(str, Enum):
ADMIN = "admin"
EDITOR = "editor"
VIEWER = "viewer"
ROLE_PERMISSIONS = {
Role.ADMIN: [p.value for p in Permission],
Role.EDITOR: [
Permission.ITEMS_READ.value,
Permission.ITEMS_WRITE.value,
Permission.USERS_READ.value,
],
Role.VIEWER: [
Permission.ITEMS_READ.value,
Permission.USERS_READ.value,
],
}
class RBACManager:
@staticmethod
def get_user_permissions(user: User) -> set:
permissions = set()
for role in user.roles:
if role.name in ROLE_PERMISSIONS:
permissions.update(ROLE_PERMISSIONS[Role(role.name)])
return permissions
@staticmethod
def has_permission(user: User, permission: str) -> bool:
return permission in RBACManager.get_user_permissions(user)
@staticmethod
def require_permissions(*permissions: str):
def dependency(user: User = Depends(get_current_active_user)):
user_perms = RBACManager.get_user_permissions(user)
missing = set(permissions) - user_perms
if missing:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"message": "Insufficient permissions",
"required": list(permissions),
"missing": list(missing)
}
)
return user
return dependency
@staticmethod
def require_roles(*roles: str):
def dependency(user: User = Depends(get_current_active_user)):
user_roles = {r.name for r in user.roles}
if not user_roles.intersection(roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires role: {roles}"
)
return user
return dependency
# Convenience dependencies
RequireAdmin = RBACManager.require_roles(Role.ADMIN.value)
RequireEditor = RBACManager.require_roles(Role.ADMIN.value, Role.EDITOR.value)
RequireItemsRead = RBACManager.require_permissions(Permission.ITEMS_READ.value)
RequireItemsWrite = RBACManager.require_permissions(Permission.ITEMS_WRITE.value)
# app/routers/items.py
from ..auth.rbac import RequireItemsRead, RequireItemsWrite, RequireAdmin
router = APIRouter(prefix="/items", tags=["Items"])
@router.get("/")
def list_items(user: User = Depends(RequireItemsRead)):
return {"items": get_all_items()}
@router.post("/")
def create_item(item: ItemCreate, user: User = Depends(RequireItemsWrite)):
return create_new_item(item, owner_id=user.id)
@router.delete("/{item_id}")
def delete_item(item_id: int, user: User = Depends(RequireAdmin)):
return delete_item_by_id(item_id)
Scopes (OAuth2 Style)
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="auth/login",
scopes={
"items:read": "Read items",
"items:write": "Create and modify items",
"users:read": "Read user profiles",
"admin": "Admin access"
}
)
def get_current_user(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value}
)
payload = decode_token(token)
if not payload:
raise credentials_exception
user = get_user_by_id(db, payload.get("sub"))
if not user:
raise credentials_exception
# Check scopes
token_scopes = payload.get("scopes", [])
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=403,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value}
)
return user
# Usage with Security
from fastapi import Security
@app.get("/items")
def list_items(user: User = Security(get_current_user, scopes=["items:read"])):
return get_items()
@app.post("/items")
def create_item(
item: ItemCreate,
user: User = Security(get_current_user, scopes=["items:write"])
):
return create_new_item(item)
Summary
| Pattern | Use Case |
|---|---|
| Role check | Simple role verification |
| Permission check | Granular access control |
| Resource ownership | User owns the resource |
| Scopes | OAuth2-style permissions |
| Component | Description |
|---|---|
| Role | Named group of permissions |
| Permission | Single access right |
| Scope | OAuth2 permission string |
Next Steps
In Part 11, we’ll explore Background Tasks and Celery - handling long-running tasks asynchronously.
Series Navigation:
- Part 1-9: Foundations & Authentication
- Part 10: RBAC (You are here)
- Part 11: Background Tasks
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
FastAPI Tutorial Part 18: API Security Best Practices
Secure your FastAPI application against common vulnerabilities. Learn input validation, rate limiting, CORS, and OWASP security patterns.
PythonFastAPI Tutorial Part 9: JWT Authentication - Secure Your API
Implement JWT authentication in FastAPI. Learn token generation, password hashing, OAuth2 flows, refresh tokens, and protecting API endpoints.
PythonFastAPI Tutorial Part 20: Building a Production-Ready API
Build a complete production-ready FastAPI application. Combine all concepts into a real-world e-commerce API with authentication, database, and deployment.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.