Apache Kafka: Event Streaming for Modern Applications
Master Apache Kafka for event-driven architectures. Learn producers, consumers, topics, partitions, Kafka Streams, and build scalable data pipelines.
Moshiour Rahman
Advertisement
What is Apache Kafka?
Apache Kafka is a distributed event streaming platform used for building real-time data pipelines and streaming applications. It’s designed for high-throughput, fault-tolerant, and scalable messaging.
Use Cases
| Use Case | Example |
|---|---|
| Messaging | Microservices communication |
| Activity Tracking | User clickstreams |
| Metrics | Application monitoring |
| Log Aggregation | Centralized logging |
| Stream Processing | Real-time analytics |
| Event Sourcing | Order processing |
Core Concepts
Architecture Overview

Key Components
- Producer: Publishes messages to topics
- Consumer: Subscribes and reads messages
- Topic: Category/feed of messages
- Partition: Ordered, immutable message log
- Broker: Kafka server node
- Consumer Group: Set of consumers sharing workload
Getting Started
Docker Setup
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
Node.js with KafkaJS
npm install kafkajs
// kafka.ts
import { Kafka, Producer, Consumer, logLevel } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
logLevel: logLevel.INFO
});
export const producer: Producer = kafka.producer();
export const consumer: Consumer = kafka.consumer({ groupId: 'my-group' });
export async function connectKafka() {
await producer.connect();
await consumer.connect();
console.log('Kafka connected');
}
export async function disconnectKafka() {
await producer.disconnect();
await consumer.disconnect();
console.log('Kafka disconnected');
}
Producers
Basic Producer
import { producer } from './kafka';
interface OrderEvent {
orderId: string;
userId: string;
items: { productId: string; quantity: number }[];
total: number;
timestamp: Date;
}
async function sendOrder(order: OrderEvent) {
await producer.send({
topic: 'orders',
messages: [
{
key: order.orderId,
value: JSON.stringify(order),
headers: {
'event-type': 'order-created',
'source': 'order-service'
}
}
]
});
console.log(`Order ${order.orderId} sent`);
}
Batch Producer
async function sendBatch(orders: OrderEvent[]) {
const messages = orders.map(order => ({
key: order.orderId,
value: JSON.stringify(order)
}));
await producer.send({
topic: 'orders',
messages
});
console.log(`Sent ${orders.length} orders`);
}
// With compression
await producer.send({
topic: 'orders',
compression: CompressionTypes.GZIP,
messages
});
Producer with Acknowledgments
import { Kafka, CompressionTypes } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const producer = kafka.producer({
allowAutoTopicCreation: true,
transactionTimeout: 30000
});
// Idempotent producer (exactly-once semantics)
const idempotentProducer = kafka.producer({
idempotent: true,
maxInFlightRequests: 1
});
async function sendWithAck(topic: string, message: any) {
const result = await producer.send({
topic,
messages: [{ value: JSON.stringify(message) }],
acks: -1 // Wait for all replicas
});
console.log('Message sent:', result);
return result;
}
Consumers
Basic Consumer
import { consumer } from './kafka';
async function startConsumer() {
await consumer.subscribe({
topic: 'orders',
fromBeginning: false // Start from latest
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const order = JSON.parse(message.value!.toString());
console.log({
topic,
partition,
offset: message.offset,
key: message.key?.toString(),
order
});
// Process order
await processOrder(order);
}
});
}
async function processOrder(order: OrderEvent) {
console.log(`Processing order ${order.orderId}`);
// Business logic here
}
Batch Consumer
await consumer.run({
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
isRunning,
isStale
}) => {
for (const message of batch.messages) {
if (!isRunning() || isStale()) break;
const order = JSON.parse(message.value!.toString());
await processOrder(order);
resolveOffset(message.offset);
await heartbeat();
}
}
});
Multiple Topics
await consumer.subscribe({
topics: ['orders', 'payments', 'shipments'],
fromBeginning: false
});
await consumer.run({
eachMessage: async ({ topic, message }) => {
switch (topic) {
case 'orders':
await handleOrder(message);
break;
case 'payments':
await handlePayment(message);
break;
case 'shipments':
await handleShipment(message);
break;
}
}
});
Consumer Groups
// Multiple consumers in same group share partitions
const consumer1 = kafka.consumer({ groupId: 'order-processors' });
const consumer2 = kafka.consumer({ groupId: 'order-processors' });
// Different groups receive all messages independently
const analyticsConsumer = kafka.consumer({ groupId: 'analytics' });
const notificationConsumer = kafka.consumer({ groupId: 'notifications' });
Topics and Partitions
Topic Management
const admin = kafka.admin();
async function createTopics() {
await admin.connect();
await admin.createTopics({
topics: [
{
topic: 'orders',
numPartitions: 6,
replicationFactor: 3,
configEntries: [
{ name: 'retention.ms', value: '604800000' }, // 7 days
{ name: 'cleanup.policy', value: 'delete' }
]
},
{
topic: 'users',
numPartitions: 3,
replicationFactor: 3,
configEntries: [
{ name: 'cleanup.policy', value: 'compact' } // Keep latest per key
]
}
]
});
await admin.disconnect();
}
async function listTopics() {
await admin.connect();
const topics = await admin.listTopics();
console.log('Topics:', topics);
await admin.disconnect();
}
Partition Assignment
// Messages with same key go to same partition
await producer.send({
topic: 'orders',
messages: [
{ key: 'user-123', value: JSON.stringify(order1) },
{ key: 'user-123', value: JSON.stringify(order2) }, // Same partition
{ key: 'user-456', value: JSON.stringify(order3) } // Different partition
]
});
// Custom partitioner
const producer = kafka.producer({
createPartitioner: () => {
return ({ topic, partitionMetadata, message }) => {
// Custom logic to determine partition
const numPartitions = partitionMetadata.length;
const key = message.key?.toString() || '';
return Math.abs(hashCode(key)) % numPartitions;
};
}
});
Error Handling
Producer Error Handling
producer.on('producer.connect', () => {
console.log('Producer connected');
});
producer.on('producer.disconnect', () => {
console.log('Producer disconnected');
});
producer.on('producer.network.request_timeout', (payload) => {
console.error('Request timeout:', payload);
});
async function sendWithRetry(topic: string, message: any, retries = 3) {
for (let attempt = 1; attempt <= retries; attempt++) {
try {
return await producer.send({
topic,
messages: [{ value: JSON.stringify(message) }]
});
} catch (error) {
console.error(`Attempt ${attempt} failed:`, error);
if (attempt === retries) throw error;
await new Promise(r => setTimeout(r, 1000 * attempt));
}
}
}
Consumer Error Handling
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
await processMessage(message);
} catch (error) {
console.error('Processing error:', error);
// Send to dead letter queue
await producer.send({
topic: `${topic}.dlq`,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'error': error.message,
'original-topic': topic,
'failed-at': new Date().toISOString()
}
}]
});
}
}
});
// Handle crashes
consumer.on('consumer.crash', async ({ error, restart }) => {
console.error('Consumer crashed:', error);
// Optionally restart
// await restart();
});
Event-Driven Microservices
Order Service
// order-service.ts
class OrderService {
async createOrder(orderData: CreateOrderDTO): Promise<Order> {
const order = await this.orderRepository.create(orderData);
// Publish event
await producer.send({
topic: 'orders',
messages: [{
key: order.id,
value: JSON.stringify({
type: 'ORDER_CREATED',
data: order,
timestamp: new Date().toISOString()
})
}]
});
return order;
}
}
Inventory Service
// inventory-service.ts
async function startInventoryConsumer() {
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value!.toString());
switch (event.type) {
case 'ORDER_CREATED':
await reserveInventory(event.data);
break;
case 'ORDER_CANCELLED':
await releaseInventory(event.data);
break;
}
}
});
}
async function reserveInventory(order: Order) {
for (const item of order.items) {
await inventoryRepository.reserve(item.productId, item.quantity);
}
// Publish inventory reserved event
await producer.send({
topic: 'inventory',
messages: [{
key: order.id,
value: JSON.stringify({
type: 'INVENTORY_RESERVED',
orderId: order.id,
timestamp: new Date().toISOString()
})
}]
});
}
Saga Pattern
// saga-orchestrator.ts
interface SagaStep {
execute: () => Promise<void>;
compensate: () => Promise<void>;
}
class OrderSaga {
private steps: SagaStep[] = [];
private completedSteps: SagaStep[] = [];
async run(order: Order) {
this.steps = [
{
execute: () => this.reserveInventory(order),
compensate: () => this.releaseInventory(order)
},
{
execute: () => this.processPayment(order),
compensate: () => this.refundPayment(order)
},
{
execute: () => this.createShipment(order),
compensate: () => this.cancelShipment(order)
}
];
try {
for (const step of this.steps) {
await step.execute();
this.completedSteps.push(step);
}
} catch (error) {
console.error('Saga failed, compensating...', error);
await this.compensate();
throw error;
}
}
private async compensate() {
for (const step of this.completedSteps.reverse()) {
await step.compensate();
}
}
}
Kafka Streams (Java)

// For reference - Kafka Streams is Java-based
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// Read from topic
KStream<String, Order> orders = builder.stream("orders");
// Process stream
KTable<String, Long> orderCounts = orders
.groupBy((key, order) -> order.getUserId())
.count();
// Write to topic
orderCounts.toStream().to("order-counts");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Monitoring
Consumer Lag
const admin = kafka.admin();
async function getConsumerLag(groupId: string) {
await admin.connect();
const offsets = await admin.fetchOffsets({
groupId,
topics: ['orders']
});
const topicOffsets = await admin.fetchTopicOffsets('orders');
const lag = offsets.map(o => {
const topicOffset = topicOffsets.find(
to => to.partition === o.partition
);
return {
partition: o.partition,
currentOffset: parseInt(o.offset),
latestOffset: parseInt(topicOffset?.offset || '0'),
lag: parseInt(topicOffset?.offset || '0') - parseInt(o.offset)
};
});
await admin.disconnect();
return lag;
}
Metrics
const { InstrumentationEvent } = require('kafkajs');
kafka.on('consumer.group_join', (event) => {
console.log('Consumer joined group:', event.payload);
});
kafka.on('consumer.fetch', (event) => {
console.log('Fetch metrics:', event.payload);
});
// Prometheus metrics
const { Registry, Counter, Gauge } = require('prom-client');
const register = new Registry();
const messagesProduced = new Counter({
name: 'kafka_messages_produced_total',
help: 'Total messages produced',
labelNames: ['topic']
});
const consumerLag = new Gauge({
name: 'kafka_consumer_lag',
help: 'Consumer lag',
labelNames: ['topic', 'partition', 'group']
});
register.registerMetric(messagesProduced);
register.registerMetric(consumerLag);
Summary
| Concept | Purpose |
|---|---|
| Producer | Send messages |
| Consumer | Receive messages |
| Topic | Message category |
| Partition | Parallel processing |
| Consumer Group | Load balancing |
| Offset | Message position |
Apache Kafka enables building scalable, fault-tolerant event-driven systems for real-time data processing.
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
Kafka Message Ordering: Guarantee Sequence Across Partitions
Master Kafka message ordering with single partitions, external sequencing, idempotent producers, and time window buffering. Production patterns included.
System DesignAPI Design Part 6: GraphQL & gRPC
Master modern API protocols beyond REST. Learn when to use GraphQL for flexible queries, gRPC for high-performance microservices, and how to implement both in production.
System DesignAPI Design Mastery: Complete 8-Part Series
Master API design from HTTP fundamentals to production systems. 8-part comprehensive guide covering REST, security, caching, GraphQL, gRPC, resilience, and interview preparation.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.