Kafka Consumer Deep Dive
The Kafka Consumer appears straightforward—subscribe, poll, process. But production systems demand understanding of consumer groups, rebalancing dynamics, offset management, and now in Kafka 4.x, an entirely new consumer protocol. This post explores these mechanisms in depth.
Consumer Groups: The Foundation
A consumer group is a set of consumers cooperating to consume data from topics. Kafka distributes partitions among group members, enabling horizontal scaling while preserving per-partition ordering.
Topic: orders (6 partitions)
┌─────┬─────┬─────┬─────┬─────┬─────┐
│ P0 │ P1 │ P2 │ P3 │ P4 │ P5 │
└──┬──┴──┬──┴──┬──┴──┬──┴──┬──┴──┬──┘
│ │ │ │ │ │
┌───────────┴─────┴─────┴─────┴─────┴─────┘
│ Consumer Group: order-processors
│
┌──────┴──────┬─────────────┬─────────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│Consumer│ │Consumer│ │Consumer│ │Consumer│
│ A │ │ B │ │ C │ │ D │
│ P0,P1 │ │ P2,P3 │ │ P4 │ │ P5 │
└────────┘ └────────┘ └────────┘ └────────┘Key principles:
- Each partition is assigned to exactly one consumer within a group
- A consumer can handle multiple partitions
- If consumers > partitions, some consumers remain idle
- Multiple groups can independently consume the same topic
The Group Coordinator
Every consumer group has a designated broker serving as its Group Coordinator. This coordinator manages membership, triggers rebalances, and stores committed offsets.
The coordinator is determined by hashing the group.id to a partition of the __consumer_offsets topic—the leader of that partition becomes the coordinator. This distributes coordination load across the cluster.
┌─────────────────────────────────────────────────────────────┐
│ __consumer_offsets (50 partitions) │
│ ┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬───────┐ │
│ │ P0 │ P1 │ P2 │ ... │ P25 │ ... │ P47 │ P48 │ P49 │ │
│ └──┬──┴─────┴─────┴─────┴──┬──┴─────┴─────┴─────┴───────┘ │
│ │ │ │
│ Broker1 Broker3 │
│ (Leader P0) (Leader P25) │
│ │ │ │
│ Coordinates: Coordinates: │
│ • group-A • group-B │
│ • group-C • group-D │
└─────────────────────────────────────────────────────────────┘Classic vs New Rebalance Protocol
Kafka 4.0 introduced KIP-848, a fundamentally redesigned consumer protocol. Understanding both is crucial for migration planning.
Classic Protocol (Pre-4.0 Default)
The classic protocol uses a synchronous barrier model with JoinGroup/SyncGroup APIs:
Consumer A Coordinator Consumer B
│ │ │
│───JoinGroup───────→│←────JoinGroup──────│
│ │ │
│ (waits for all members to join) │
│ │ │
│←──JoinResponse─────│────JoinResponse───→│
│ (A elected │ │
│ leader) │ │
│ │ │
│───SyncGroup────────│←────SyncGroup──────│
│ (with │ (empty) │
│ assignments) │ │
│ │ │
│←──SyncResponse─────│────SyncResponse───→│
│ (P0,P1) │ (P2,P3) │Problems with classic protocol:
- Stop-the-world: All consumers pause during rebalance
- Slowest member bottleneck: One slow consumer delays everyone
- Client-side complexity: Assignment logic runs on a “leader” consumer
- Cascading failures: One misbehaving consumer affects the entire group
New Protocol (KIP-848)
The new protocol uses continuous heartbeats with server-side coordination:
Consumer A Coordinator Consumer B
│ │ │
│──Heartbeat(sub)───→│ │
│ │←──Heartbeat(sub)───│
│ │ │
│ (Coordinator computes assignment) │
│ │ │
│←─Heartbeat(assign)─│ │
│ (P0,P1,P2) │ │
│ │ │
│ │──Heartbeat(assign)→│
│ │ (P3,P4,P5) │
│ │ │
│ (Consumers reconcile incrementally) │Key improvements:
- Incremental rebalancing: Only affected partitions move
- No global barrier: Consumers continue processing during rebalance
- Server-side assignment: Coordinator computes assignments, not clients
- Declarative state: Consumers declare subscriptions; coordinator reconciles
Enabling the New Protocol
var config = Map.of(
"group.protocol" , "consumer", // Enable KIP-848
"group.remote.assignor", "uniform" // Server-side assignor (uniform/range)
// Other standard configs...
);Requirements:
- Kafka 4.0+ brokers
- Client library supporting KIP-848 (Java 4.0+, librdkafka 2.10+)
- Broker config:
group.coordinator.rebalance.protocols=classic,consumer
Performance Comparison
| Scenario | Classic Protocol | New Protocol |
|---|---|---|
| 10 consumers, add 900 partitions | ~103 seconds | ~5 seconds |
| Single consumer joins large group | Full rebalance | Incremental |
| Consumer crash during processing | All consumers pause | Others continue |
| Coordinator unreachable | Consumers stop fetching | Consumers continue, can’t commit |
Partition Assignment Strategies
Server-Side Assignors (New Protocol)
| Assignor | Behavior |
|---|---|
uniform | Distributes partitions evenly across consumers |
range | Assigns contiguous ranges per topic to each consumer |
Client-Side Assignors (Classic Protocol)
| Assignor | Behavior | Use Case |
|---|---|---|
RangeAssignor | Contiguous ranges per topic | Co-partitioned topics |
RoundRobinAssignor | Round-robin across all partitions | Even distribution |
StickyAssignor | Minimizes partition movement | Reduce rebalance impact |
CooperativeStickyAssignor | Sticky + cooperative | Production default |
// Classic protocol with cooperative rebalancing
var config = Map.of(
"partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
);Offset Management
Offsets are the consumer’s progress markers. Managing them correctly is the difference between exactly-once and data loss.
The __consumer_offsets Topic
Committed offsets are stored in a compacted internal topic with 50 partitions by default. Each commit writes:
Key: (group_id, topic, partition)
Value: (offset, leader_epoch, metadata, timestamp)Auto vs Manual Commit
| Mode | Config | Behavior | Risk |
|---|---|---|---|
| Auto | enable.auto.commit=true | Commits every auto.commit.interval.ms | Data loss or duplicates on crash |
| Manual Sync | commitSync() | Blocks until commit confirmed | Performance impact |
| Manual Async | commitAsync() | Non-blocking, callback on completion | No retry on failure |
Commit Strategies Compared
Auto Commit (Default)
var config = Map.of(
"enable.auto.commit" , "true",
"auto.commit.interval.ms", "5000"
);
// Danger: Offsets committed before processing completesSynchronous After Batch
while (true) {
var records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
process(record);
}
consumer.commitSync(); // Blocks until confirmed
}Asynchronous with Callback
while (true) {
var records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
process(record);
}
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
logger.warn("Commit failed: " + exception.getMessage());
}
});
}Per-Record with Batching
int processed = 0;
var currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
while (true) {
var records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
process(record);
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
processed++;
if (processed % 100 == 0) {
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
}
}Delivery Semantics Summary
| Semantic | Strategy | Trade-off |
|---|---|---|
| At-most-once | Commit before processing | Risk of data loss |
| At-least-once | Commit after processing | Risk of duplicates |
| Exactly-once | Transactions + read_committed | Performance overhead |
Production Reality: Manual Commit is the Standard
Manual commit dominates production systems where data integrity matters. Auto-commit is typically reserved for non-critical pipelines or development environments. The fundamental problem with auto-commit is the timing gap:
poll() returns 1000 messages
│
▼
Auto-commit fires (offset = 1000) ← Kafka thinks you're done
│
▼
Processing message 500...
│
▼
💥 Application crashes
│
▼
Restart: consumer resumes at offset 1001
│
▼
Messages 501-1000 are LOSTCommon production patterns (most to least common):
- Manual sync commit after batch — Simple, reliable at-least-once
- Manual async with sync on rebalance — Better throughput for high-volume systems
- Transactional commits — Kafka Streams and exactly-once pipelines
- External offset storage — True exactly-once with databases (store offset in same transaction as processed data)
When auto-commit is acceptable:
- Metrics/logging pipelines where losing a few data points is fine
- Idempotent consumers where reprocessing has no side effects
- Development/testing for simplicity
Production Recommendation: Disable auto-commit (enable.auto.commit=false) and use manual commits after processing completes. This provides at-least-once delivery semantics—the standard for production systems. Auto-commit’s convenience rarely outweighs its data loss risk in real workloads.
Consumer Configuration Deep Dive
Essential Parameters (Kafka 4.x)
| Parameter | Default | Purpose |
|---|---|---|
group.id | (required) | Consumer group identifier |
group.protocol | classic | Protocol: classic or consumer |
auto.offset.reset | latest | Where to start: earliest, latest, none |
enable.auto.commit | true | Auto-commit offsets |
max.poll.records | 500 | Max records per poll() |
max.poll.interval.ms | 300000 | Max time between polls before rebalance |
session.timeout.ms | 45000 | Heartbeat session timeout |
heartbeat.interval.ms | 3000 | Heartbeat frequency |
fetch.min.bytes | 1 | Minimum data per fetch |
fetch.max.wait.ms | 500 | Max wait for fetch.min.bytes |
New in Kafka 4.x: auto.offset.reset by Duration
// Reset to 24 hours ago instead of earliest/latest
var config = Map.of(
"auto.offset.reset", "by_duration:PT24H" // ISO 8601 duration
);Isolation Levels for Transactions
var config = Map.of(
"isolation.level", "read_committed" // Only see committed transactional messages
// Default: "read_uncommitted"
);The Poll Loop: Getting It Right
The poll() method is the consumer’s heartbeat. Calling it regularly is crucial for group membership.
Basic Pattern
try (var consumer = new KafkaConsumer<String, String>(config)) {
consumer.subscribe(List.of("orders"));
while (running.get()) {
var records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
processRecord(record);
}
}
}Handling Long Processing
If processing takes longer than max.poll.interval.ms, the consumer is considered dead and triggers a rebalance.
Solution 1: Pause/Resume
var records = consumer.poll(Duration.ofMillis(100));
consumer.pause(consumer.assignment()); // Stop fetching
for (var record : records) {
processSlowly(record); // Can take time
}
consumer.resume(consumer.assignment()); // Resume fetchingSolution 2: Increase Interval
var config = Map.of(
"max.poll.interval.ms", "600000", // 10 minutes
"max.poll.records" , "100" // Smaller batches
);Solution 3: Offload Processing
var executor = Executors.newFixedThreadPool(10);
while (running.get()) {
var records = consumer.poll(Duration.ofMillis(100));
var futures = new ArrayList<Future<?>>();
for (var record : records) {
futures.add(executor.submit(() -> process(record)));
}
// Wait for batch, then commit
for (var future : futures) {
future.get();
}
consumer.commitSync();
}Error Handling & Retry Strategy
Production consumers must handle unexpected failures without crashing the entire group or permanently stalling a partition.
1. Blocking Retry (Simplest)
Retry the operation within the poll loop.
Pros: Preserves ordering.
Cons: Blocks the consumer; can trigger group rebalance if retries exceed max.poll.interval.ms.
int retries = 0;
while (retries < 3) {
try {
process(record);
retries = 3; // Success
} catch (Exception e) {
retries++;
Thread.sleep(100L * retries);
if (retries == 3) throw e; // Give up
}
}2. Dead Letter Queue (DLQ)
If a message fails repeatedly (or is a known “poison pill”), publish it to a separate dead-letter-topic and commit the offset to move on.
try {
process(record);
} catch (Exception e) {
logger.error("Failed to process, sending to DLQ", e);
producer.send(new ProducerRecord<>("my-app-dlq", record.key(), record.value()));
// Functionally "consumed" even though processing failed
}3. Non-Blocking Retry (Advanced)
Publish the failed message to a retry topic with a delay (often implemented via separate topics like retry-1m, retry-5m). This requires a complex topology of multiple consumers.
Concurrency & Thread Safety
[!WARNING] The
KafkaConsumeris NOT thread-safe.
You cannot call methods on a single consumer instance from multiple threads. Access must be synchronized or confined to a single thread.
Multi-Threaded Processing Patterns
- Thread-per-Consumer: Run strictly one consumer per thread. Good for simple isolation.
- Consumer-Producer Decoupling: A single thread polls and pushes records to a thread pool (as seen in the Offload Processing example).
- Risk: You cannot commit offsets safely until all tasks in the batch are finished.
- Mitigation: Accumulate offsets and commit them only when order is guaranteed or gaps are acceptable.
Rebalance Listeners
React to partition assignment changes with ConsumerRebalanceListener:
consumer.subscribe(List.of("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("Revoked: " + partitions);
// Commit current progress before losing partitions
consumer.commitSync();
// Flush any in-memory state
stateStore.flush();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("Assigned: " + partitions);
// Initialize state for new partitions
for (var tp : partitions) {
stateStore.initialize(tp);
}
}
});Static Group Membership
Prevent unnecessary rebalances during rolling restarts with static membership:
var config = Map.of(
"group.instance.id", "order-processor-" + hostname
);With static membership:
- Consumer can disconnect for up to
session.timeout.mswithout triggering rebalance - Same instance ID rejoining gets the same partition assignment
- Ideal for Kubernetes deployments with predictable pod names
Rack Awareness
In cloud environments (AWS, GCP), cross-zone data transfer is expensive and adds latency. Kafka can let consumers fetch from the closest replica (leader or follower).
Configuration
var config = Map.of(
"client.rack", "us-east-1a" // The zone this consumer is running in
// Brokers must also have broker.rack configured
);Benefits:
- Reduced Cost: avoid cross-AZ transfer fees.
- Lower Latency: fetch from local broker.
- Load Balancing: spreads fetch load across followers.
Consumer Lag Monitoring
Consumer lag is the gap between the latest message and the last consumed message:
Lag = Log End Offset - Consumer Committed OffsetKey Metrics
| Metric | Meaning |
|---|---|
records-lag | Current lag per partition |
records-lag-max | Maximum lag across partitions |
fetch-rate | Requests per second |
records-consumed-rate | Messages consumed per second |
commit-rate | Offset commits per second |
CLI Monitoring
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group order-processors
# Output:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# order-processors orders 0 15234 15240 6
# order-processors orders 1 14892 14892 0Best Practices Summary
- Use the new protocol (
group.protocol=consumer) for Kafka 4.0+ deployments—fewer rebalances, better availability - Disable auto-commit for at-least-once semantics; commit after processing
- Size
max.poll.recordsbased on processing time to stay withinmax.poll.interval.ms - Use static membership in containerized environments to minimize rebalance churn
- Implement rebalance listeners to commit offsets and flush state before partition revocation
- Monitor consumer lag as a primary health indicator; alert on sustained high lag
- Use
read_committedisolation when consuming from transactional producers - Set meaningful
client.idandgroup.instance.idfor operational visibility - Handle deserialization errors gracefully—use a DLQ or specific error handlers to prevent poison messages from crashing the loop.
- Enable Rack Awareness (
client.rack) to reduce cross-zone data transfer costs and latency. - Close consumers properly in finally blocks to ensure clean group leave.