Kafka Producer Deep Dive
The Kafka Producer is deceptively simple on the surface—call send() and your message appears in a topic. But beneath this simplicity lies a sophisticated architecture designed for high throughput, durability, and exactly-once semantics. Understanding these internals is essential for building production-grade streaming applications.
Producer Architecture Overview
When you call producer.send(), your message doesn’t immediately hit the wire. Instead, it traverses a carefully orchestrated pipeline designed to maximize throughput while maintaining ordering and delivery guarantees.
┌─────────────────────────────────────────────────────────────────────┐
│ KafkaProducer │
│ ┌──────────────┐ ┌────────────┐ ┌────────────┐ ┌─────────────┐ │
│ │ Interceptors │→ │ Serializer │→ │ Partitioner│→ │ Record │ │
│ │ │ │ (Key/Value)│ │ │ │ Accumulator │ │
│ └──────────────┘ └────────────┘ └────────────┘ └──────┬──────┘ │
│ │ │
│ ┌────────────────────────────────────────────────────────┼───────┐ │
│ │ Sender Thread ▼ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Metadata │←──→│ Network │←──→│ In-Flight Requests │ │ │
│ │ │ Manager │ │ Client │ │ (per broker) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘The architecture consists of two main threads: the application thread (which calls send()) and the sender thread (a background I/O thread that actually transmits data to brokers).
Producer Interceptors
Before any serialization or partitioning happens, records pass through the Interceptor Chain. This is a powerful hook for observability, enforcing standards, or modifying records on the fly.
Interceptors implement the ProducerInterceptor interface and are commonly used for:
- Tracing: Injecting OpenTelemetry/Zipkin headers into the record.
- Monitoring: Capturing metrics about message rates and sizes before they enter the internal buffer.
- Enforcement: Masking PII or validating that every message has a specific header.
public class TracingInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// Modify headers to inject trace ID
record.headers().add("trace-id", UUID.randomUUID().toString().getBytes());
return record;
}
// ... other methods
}Serialization & Schemas
The Serializer converts your key and value objects into bytes. While StringSerializer is good for “Hello World”, production systems rarely use it.
The Problem with Ad-Hoc Serialization
Sending JSON strings or raw bytes creates a schema coupling problem. If a producer changes the data format (e.g., renames a field), consumers may crash.
Schema Registry
The best practice is to use a definition language like Avro or Protobuf combined with a Schema Registry.
- Producer compiles a schema (e.g., Avro
.avsc). - Serializer registers this schema with the Registry and gets a unique
schema_id. - Serializer prepends this
schema_id(usually 4 bytes) to the message payload. - Consumer extracts the ID, fetches the schema, and deserializes safely.
This guarantees strictly typed contracts between decoupled services.
The Record Accumulator: Heart of Batching
The RecordAccumulator is where Kafka’s batching magic happens. It maintains a map of TopicPartition → Deque<ProducerBatch>, accumulating records into batches before transmission.
How Batching Works
When a record arrives, the accumulator attempts to append it to the last batch for that partition. A batch is ready for sending when any of these conditions are met:
- Batch is full: The batch has reached
batch.sizebytes - Linger time elapsed: The batch has waited for
linger.msmilliseconds - Memory pressure: The buffer pool is exhausted and threads are blocking
- Flush requested: Application called
flush()or is closing
Key Batching Parameters
| Parameter | Default (Kafka 4.x) | Purpose |
|---|---|---|
batch.size | 16384 (16 KB) | Target size for batches |
linger.ms | 5 | Max wait time before sending incomplete batch |
buffer.memory | 33554432 (32 MB) | Total memory for buffering |
max.block.ms | 60000 | Max time send() blocks when buffer full |
Important Change in Kafka 4.0: The default linger.ms changed from 0 to 5. This small delay allows more records to accumulate, resulting in larger batches and better throughput—often with similar or even lower latency due to reduced request overhead.
Partitioning Strategies
The partitioner determines which partition receives each record. Kafka 4.x provides several strategies:
Default Partitioner (Sticky Partitioning)
The default behavior since Kafka 2.4 uses “sticky” partitioning for records without keys:
- With key: Hash-based assignment ensures all records with the same key go to the same partition
- Without key: Records stick to one partition until
batch.sizebytes accumulate, then switch
This dramatically improves batching efficiency compared to round-robin for keyless messages.
// Key present: consistent partition assignment via murmur2 hash
producer.send(new ProducerRecord<>("orders", "customer-123", orderJson));
// No key: sticky to current partition until batch fills
producer.send(new ProducerRecord<>("events", null, eventJson));Adaptive Partitioning
Kafka 4.x includes adaptive partitioning (partitioner.adaptive.partitioning.enable=true by default) that routes more traffic to brokers with lower latency, improving overall throughput.
Delivery Guarantees & Idempotence
Understanding delivery semantics is crucial for building reliable systems.
The Duplicate Problem
Without idempotence, this scenario causes duplicates:
Producer Broker
│ │
│──── Send(msg, seq=1) ───→│
│ │ (writes to log)
│ ✗ ACK lost/timeout │
│ │
│──── Retry(msg, seq=1) ──→│
│ │ (writes AGAIN - duplicate!)
│←─────── ACK ─────────────│Idempotent Producer
When enable.idempotence=true (default since Kafka 3.0), the producer gets a unique Producer ID (PID) and assigns monotonically increasing sequence numbers per partition:
Producer (PID=1000) Broker
│ │
│── Send(msg, pid=1000, ──→│
│ seq=42) │ (writes, tracks seq=42)
│ │
│ ✗ ACK lost/timeout │
│ │
│── Retry(msg, pid=1000, ─→│
│ seq=42) │ (seq≤42, deduplicate!)
│←─────── ACK ─────────────│The broker rejects (but acknowledges) any message with sequence ≤ the last committed sequence for that PID/partition pair.
Idempotence Configuration (Kafka 4.x Defaults)
These settings are now the defaults and provide the strongest guarantees:
var config = Map.of(
"enable.idempotence" , "true", // default
"acks" , "all", // default
"retries" , Integer.toString(Integer.MAX_VALUE), // effective default
"max.in.flight.requests.per.connection", "5" // default, ordering preserved
);Key insight: With idempotence enabled and max.in.flight.requests.per.connection ≤ 5, Kafka guarantees ordering even with retries. The broker tracks sequences for up to 5 batches per producer.
Acks Deep Dive & Min In-Sync Replicas
The acks setting controls when the producer considers a request complete, but it must be paired with broker-side configurations for true durability.
| Acks Setting | Behavior | Durability | Latency |
|---|---|---|---|
acks=0 | Fire and forget. No acknowledgement waited. | None (high risk of loss) | Lowest |
acks=1 | Leader writes to its local log and acknowledges. | Medium (safe if leader survives) | Low |
acks=all (or -1) | Leader waits for the full set of in-sync replicas (ISR) to acknowledge. | Highest | Medium/High |
[!IMPORTANT]
acks=allis not enough! If you haveacks=allbut your topic’smin.insync.replicas=1, then “all” just means “the 1 available replica” (the leader). If that leader crashes, data is lost.Golden Rule for Durability: set
acks=all,replication.factor=3, andmin.insync.replicas=2. This allows one broker to fail without data loss while still accepting writes.
Transactions: Atomic Multi-Partition Writes
While idempotence provides exactly-once within a partition, transactions extend this to atomic writes across multiple partitions.
Transaction Lifecycle
┌─────────────────────────────────────────────────────────┐
│ Transaction States │
│ │
│ Empty ─→ Ongoing ─→ PrepareCommit ─→ CompleteCommit │
│ │ │ │
│ └──→ PrepareAbort ──→ CompleteAbort │
└─────────────────────────────────────────────────────────┘Transactional Producer Pattern
var config = Map.of(
"bootstrap.servers", "localhost:9092",
"transactional.id" , "order-processor-1"
// idempotence automatically enabled
);
try (var producer = new KafkaProducer<String, String>(config, new StringSerializer(), new StringSerializer())) {
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders", orderId, orderJson));
producer.send(new ProducerRecord<>("inventory", productId, updateJson));
producer.send(new ProducerRecord<>("notifications", null, notifyJson));
producer.commitTransaction();
} catch (ProducerFencedException e) {
// Another producer with same transactional.id took over
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
}Consumer Isolation Levels
For transactions to work end-to-end, consumers must use isolation.level=read_committed:
| Isolation Level | Behavior |
|---|---|
read_uncommitted (default) | See all messages, including uncommitted |
read_committed | Only see committed messages; aborted filtered out |
Compression Deep Dive
Compression happens at the batch level, making larger batches more effective:
Compression Types
| Type | Ratio | CPU | Best For |
|---|---|---|---|
none | 1.0x | Lowest | Already compressed data |
gzip | ~5-10x | Highest | Maximum compression, cold data |
snappy | ~2-4x | Low | Balanced, general purpose |
lz4 | ~3-5x | Very Low | High throughput, low latency |
zstd | ~4-8x | Medium | Best ratio/speed trade-off |
Fine-Tuning Compression (Kafka 4.x)
var config = Map.of(
"compression.type" , "zstd",
"compression.zstd.level", "3" // 1-22, default 3
);
// or for gzip
var gzipConfig = Map.of("compression.gzip.level", "6"); // 1-9, default -1
// or for lz4
var lz4Config = Map.of("compression.lz4.level", "9"); // 1-17, default 9Pro tip: Higher linger.ms improves compression ratios by creating larger batches.
Handling Large Messages
Kafka is optimized for small to medium-sized messages (1KB - 10KB). Sending large objects (e.g., 10MB images) requires careful tuning across the stack.
1. Producer Configuration
max.request.size: Controls the maximum size of a request (batch of messages) the producer will send. Default is 1MB. increasing this allows larger individual messages.buffer.memory: Ensure your buffer is significantly larger thanmax.request.sizeso a single large batch doesn’t block all other sends.- Compression: Essential for large text-based payloads (JSON/XML).
2. Broker Configuration
The broker must also accept these large requests. Failures here often result in RecordTooLargeException.
message.max.bytes: Per-topic or global limit on message batch size. Must be ≥ producer’smax.request.size.replica.fetch.max.bytes: Consumers/Followers must be able to fetch these large batches.
[!TIP] For truly large files (videos, huge docs), don’t put them in Kafka. Store the file in S3/GCS and send the URL/pointer in the Kafka message.
Error Handling & Retries
Retry Configuration
Kafka 4.x uses a delivery timeout model rather than explicit retry counts:
var config = Map.of(
"delivery.timeout.ms" , "120000", // 2 minutes total time to deliver
"request.timeout.ms" , "30000", // per-request timeout
"retry.backoff.ms" , "100", // initial backoff
"retry.backoff.max.ms" , "1000" // max backoff (exponential)
);The relationship: delivery.timeout.ms ≥ linger.ms + request.timeout.ms
Handling Send Failures
// Async with callback
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// Kafka will auto-retry; log for visibility
logger.warn("Retriable error, will retry: " + exception.getMessage());
} else if (exception instanceof SerializationException) {
// Data issue, won't be retried
deadLetterQueue.send(record);
} else {
// Unknown error
metrics.increment("producer.errors");
}
}
});
// Sync (blocks, throws on failure)
try {
var metadata = producer.send(record).get(10, TimeUnit.SECONDS);
} catch (ExecutionException e) {
handleError(e.getCause());
} catch (TimeoutException | InterruptedException e) {
handleTimeout();
}Performance Tuning Guide
High Throughput Configuration
var config = Map.of(
"batch.size" , "65536", // 64 KB batches
"linger.ms" , "20", // Wait up to 20ms
"buffer.memory" , "67108864", // 64 MB buffer
"compression.type", "lz4",
"acks" , "1" // Trade durability for speed
);Low Latency Configuration
var config = Map.of(
"batch.size" , "16384", // Default 16 KB
"linger.ms" , "0", // Send immediately
"acks" , "1", // Don't wait for all replicas
"compression.type", "none" // Skip compression overhead
);Balanced Production Configuration
var config = Map.of(
// Durability
"acks" , "all",
"enable.idempotence", "true",
// Batching
"batch.size" , "32768",
"linger.ms" , "5",
// Compression
"compression.type" , "zstd",
// Buffer
"buffer.memory" , "33554432",
"max.block.ms" , "60000",
// Reliability
"delivery.timeout.ms" , "120000",
"request.timeout.ms" , "30000",
"max.in.flight.requests.per.connection", "5"
);Monitoring Producer Health
Key Metrics to Watch
| Metric | Warning Threshold | Meaning |
|---|---|---|
record-send-rate | Dropping | Throughput issues |
record-error-rate | > 0 | Delivery failures |
request-latency-avg | > 100ms | Network/broker issues |
batch-size-avg | < batch.size/2 | Inefficient batching |
buffer-available-bytes | < 20% of total | Memory pressure |
waiting-threads | > 0 | Blocked on buffer |
records-per-request-avg | < 10 | Poor batching |
Enabling Metrics
var config = Map.of(
"metric.reporters" , "org.apache.kafka.common.metrics.JmxReporter",
"metrics.recording.level", "DEBUG" // INFO, DEBUG, or TRACE
);Best Practices Summary
-
Use idempotence (enabled by default in Kafka 4.x) unless you have a specific reason not to
-
Set
acks=allfor durability-critical data; it’s the default for good reason -
Don’t set retries explicitly; use
delivery.timeout.msto control total delivery time -
Size buffers appropriately:
buffer.memoryshould handle burst traffic without blocking -
Tune batching for your workload: High-throughput systems benefit from larger
linger.msandbatch.size -
Use compression (especially
lz4orzstd) for most workloads—the CPU cost is usually worth the network savings -
Reuse producer instances; they’re thread-safe and expensive to create
-
Always close producers in finally blocks or use try-with-resources to avoid leaking resources
-
Monitor buffer metrics to detect backpressure before it causes application issues
-
Use transactions only when needed; they add overhead and complexity