Python AsyncIO: Master Asynchronous Programming
Learn Python async/await for concurrent programming. Master coroutines, tasks, event loops, and build high-performance async applications.
Moshiour Rahman
Advertisement
What is AsyncIO?
AsyncIO is Python’s built-in library for writing concurrent code using the async/await syntax. It enables efficient handling of I/O-bound operations without multi-threading complexity.
Sync vs Async
| Synchronous | Asynchronous |
|---|---|
| Blocking I/O | Non-blocking I/O |
| One task at a time | Multiple tasks concurrently |
| Simpler code | More scalable |
| Threads for concurrency | Single-threaded concurrency |
Getting Started
Basic Async Function
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1) # Non-blocking sleep
print("World")
# Run the coroutine
asyncio.run(hello())
Coroutines and Tasks
import asyncio
async def fetch_data(delay: int, data: str) -> str:
print(f"Fetching {data}...")
await asyncio.sleep(delay)
print(f"Got {data}")
return data
async def main():
# Sequential execution
result1 = await fetch_data(2, "data1")
result2 = await fetch_data(2, "data2")
# Total time: 4 seconds
# Concurrent execution with gather
results = await asyncio.gather(
fetch_data(2, "data1"),
fetch_data(2, "data2"),
fetch_data(2, "data3")
)
# Total time: 2 seconds
print(results) # ['data1', 'data2', 'data3']
asyncio.run(main())
Creating Tasks
import asyncio
async def background_task(name: str):
while True:
print(f"Task {name} running")
await asyncio.sleep(1)
async def main():
# Create task (starts running immediately)
task = asyncio.create_task(background_task("worker"))
# Do other work
await asyncio.sleep(3)
# Cancel task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task cancelled")
asyncio.run(main())
Async Context Managers
async with
import asyncio
import aiofiles
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource")
await asyncio.sleep(0.1)
async def do_work(self):
print("Working...")
async def main():
async with AsyncResource() as resource:
await resource.do_work()
# File operations with aiofiles
async with aiofiles.open("file.txt", "w") as f:
await f.write("Hello, async!")
asyncio.run(main())
Async Iterators
async for
import asyncio
class AsyncCounter:
def __init__(self, stop: int):
self.stop = stop
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.current += 1
return self.current
async def main():
async for num in AsyncCounter(5):
print(num)
asyncio.run(main())
Async Generators
import asyncio
async def async_range(start: int, stop: int):
for i in range(start, stop):
await asyncio.sleep(0.1)
yield i
async def main():
async for num in async_range(1, 6):
print(num)
# Async comprehension
numbers = [num async for num in async_range(1, 6)]
print(numbers)
asyncio.run(main())
Concurrent Execution
asyncio.gather
import asyncio
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(1)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# Run multiple coroutines concurrently
users = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
return_exceptions=True # Don't fail on first exception
)
print(users)
asyncio.run(main())
asyncio.wait
import asyncio
async def task(name: str, delay: int):
await asyncio.sleep(delay)
return f"Task {name} completed"
async def main():
tasks = [
asyncio.create_task(task("A", 2)),
asyncio.create_task(task("B", 1)),
asyncio.create_task(task("C", 3))
]
# Wait for first to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
for t in done:
print(t.result())
# Cancel remaining
for t in pending:
t.cancel()
asyncio.run(main())
asyncio.as_completed
import asyncio
async def fetch(url: str) -> str:
delay = len(url) % 3 + 1
await asyncio.sleep(delay)
return f"Data from {url}"
async def main():
urls = ["url1", "url2", "url3"]
tasks = [fetch(url) for url in urls]
# Process results as they complete
for coro in asyncio.as_completed(tasks):
result = await coro
print(result)
asyncio.run(main())
Timeouts
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "Done"
async def main():
# Using wait_for
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=2.0
)
except asyncio.TimeoutError:
print("Operation timed out")
# Using timeout context manager (Python 3.11+)
async with asyncio.timeout(2.0):
result = await slow_operation()
# Using timeout_at for absolute deadline
deadline = asyncio.get_event_loop().time() + 2.0
async with asyncio.timeout_at(deadline):
result = await slow_operation()
asyncio.run(main())
Synchronization Primitives
Lock
import asyncio
class Counter:
def __init__(self):
self.value = 0
self._lock = asyncio.Lock()
async def increment(self):
async with self._lock:
current = self.value
await asyncio.sleep(0.01) # Simulate work
self.value = current + 1
async def main():
counter = Counter()
# Without lock: race condition
# With lock: safe concurrent access
await asyncio.gather(*[counter.increment() for _ in range(100)])
print(f"Counter: {counter.value}") # 100
asyncio.run(main())
Semaphore
import asyncio
async def fetch_with_limit(url: str, semaphore: asyncio.Semaphore):
async with semaphore:
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Data from {url}"
async def main():
# Limit concurrent operations to 3
semaphore = asyncio.Semaphore(3)
urls = [f"url{i}" for i in range(10)]
tasks = [fetch_with_limit(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
Event
import asyncio
async def waiter(event: asyncio.Event, name: str):
print(f"{name} waiting for event")
await event.wait()
print(f"{name} got the event!")
async def setter(event: asyncio.Event):
await asyncio.sleep(2)
print("Setting event")
event.set()
async def main():
event = asyncio.Event()
await asyncio.gather(
waiter(event, "Task 1"),
waiter(event, "Task 2"),
setter(event)
)
asyncio.run(main())
Queue
import asyncio
async def producer(queue: asyncio.Queue, n: int):
for i in range(n):
await asyncio.sleep(0.1)
await queue.put(f"item-{i}")
print(f"Produced item-{i}")
await queue.put(None) # Sentinel to stop consumer
async def consumer(queue: asyncio.Queue, name: str):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"{name} consumed {item}")
await asyncio.sleep(0.2)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue, 10),
consumer(queue, "Consumer 1"),
consumer(queue, "Consumer 2")
)
asyncio.run(main())
HTTP Requests with aiohttp
import asyncio
import aiohttp
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3"
]
results = await fetch_all(urls)
print(results)
asyncio.run(main())
Database Operations
import asyncio
import asyncpg
async def main():
# Connect to PostgreSQL
pool = await asyncpg.create_pool(
host='localhost',
database='mydb',
user='user',
password='password',
min_size=5,
max_size=20
)
async with pool.acquire() as conn:
# Execute query
rows = await conn.fetch("SELECT * FROM users LIMIT 10")
for row in rows:
print(dict(row))
# Insert data
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"John", "john@example.com"
)
# Transaction
async with conn.transaction():
await conn.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
await conn.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
await pool.close()
asyncio.run(main())
Web Server with FastAPI
from fastapi import FastAPI
import asyncio
app = FastAPI()
async def fetch_user_data(user_id: int) -> dict:
await asyncio.sleep(0.1)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_user_orders(user_id: int) -> list:
await asyncio.sleep(0.1)
return [{"order_id": 1}, {"order_id": 2}]
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Fetch data concurrently
user, orders = await asyncio.gather(
fetch_user_data(user_id),
fetch_user_orders(user_id)
)
return {"user": user, "orders": orders}
@app.get("/users")
async def get_users():
user_ids = [1, 2, 3, 4, 5]
users = await asyncio.gather(*[
fetch_user_data(uid) for uid in user_ids
])
return users
Best Practices
Error Handling
import asyncio
async def risky_operation():
await asyncio.sleep(1)
raise ValueError("Something went wrong")
async def main():
# Handle individual task errors
try:
result = await risky_operation()
except ValueError as e:
print(f"Error: {e}")
# Handle errors in gather
results = await asyncio.gather(
risky_operation(),
asyncio.sleep(1),
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
print(f"Task failed: {result}")
else:
print(f"Task succeeded: {result}")
asyncio.run(main())
Task Groups (Python 3.11+)
import asyncio
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data(1))
task2 = tg.create_task(fetch_data(2))
task3 = tg.create_task(fetch_data(3))
# All tasks completed successfully
print(task1.result(), task2.result(), task3.result())
asyncio.run(main())
Summary
| Concept | Purpose |
|---|---|
async def | Define coroutine |
await | Pause and wait for result |
asyncio.gather | Run coroutines concurrently |
asyncio.create_task | Schedule coroutine execution |
asyncio.Lock | Synchronize access |
asyncio.Semaphore | Limit concurrency |
asyncio.Queue | Producer-consumer pattern |
AsyncIO enables efficient, scalable Python applications for I/O-bound workloads without multi-threading complexity.
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
FastAPI Tutorial Part 17: Performance and Caching
Optimize FastAPI performance with caching, async operations, connection pooling, and profiling. Build blazing-fast APIs.
PythonFastAPI Tutorial Part 11: Background Tasks and Celery
Handle long-running operations in FastAPI. Learn built-in BackgroundTasks, Celery integration, task queues, and async processing patterns.
PythonClaude Computer Use: Build AI Agents That Control Your Screen
Master Claude's Computer Use API to build AI agents that see screens, click elements, and automate tasks. Complete Python tutorial with working code examples.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.