Skip to Content
DocsKafkaKafka Producer

Kafka Producer Deep Dive

The Kafka Producer is deceptively simple on the surface—call send() and your message appears in a topic. But beneath this simplicity lies a sophisticated architecture designed for high throughput, durability, and exactly-once semantics. Understanding these internals is essential for building production-grade streaming applications.

Producer Architecture Overview

When you call producer.send(), your message doesn’t immediately hit the wire. Instead, it traverses a carefully orchestrated pipeline designed to maximize throughput while maintaining ordering and delivery guarantees.

┌─────────────────────────────────────────────────────────────────────┐ │ KafkaProducer │ │ ┌──────────────┐ ┌────────────┐ ┌────────────┐ ┌─────────────┐ │ │ │ Interceptors │→ │ Serializer │→ │ Partitioner│→ │ Record │ │ │ │ │ │ (Key/Value)│ │ │ │ Accumulator │ │ │ └──────────────┘ └────────────┘ └────────────┘ └──────┬──────┘ │ │ │ │ │ ┌────────────────────────────────────────────────────────┼───────┐ │ │ │ Sender Thread ▼ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ │ │ Metadata │←──→│ Network │←──→│ In-Flight Requests │ │ │ │ │ │ Manager │ │ Client │ │ (per broker) │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │ │ └────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘

The architecture consists of two main threads: the application thread (which calls send()) and the sender thread (a background I/O thread that actually transmits data to brokers).

Producer Interceptors

Before any serialization or partitioning happens, records pass through the Interceptor Chain. This is a powerful hook for observability, enforcing standards, or modifying records on the fly.

Interceptors implement the ProducerInterceptor interface and are commonly used for:

  • Tracing: Injecting OpenTelemetry/Zipkin headers into the record.
  • Monitoring: Capturing metrics about message rates and sizes before they enter the internal buffer.
  • Enforcement: Masking PII or validating that every message has a specific header.
public class TracingInterceptor implements ProducerInterceptor<String, String> { @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // Modify headers to inject trace ID record.headers().add("trace-id", UUID.randomUUID().toString().getBytes()); return record; } // ... other methods }

Serialization & Schemas

The Serializer converts your key and value objects into bytes. While StringSerializer is good for “Hello World”, production systems rarely use it.

The Problem with Ad-Hoc Serialization

Sending JSON strings or raw bytes creates a schema coupling problem. If a producer changes the data format (e.g., renames a field), consumers may crash.

Schema Registry

The best practice is to use a definition language like Avro or Protobuf combined with a Schema Registry.

  1. Producer compiles a schema (e.g., Avro .avsc).
  2. Serializer registers this schema with the Registry and gets a unique schema_id.
  3. Serializer prepends this schema_id (usually 4 bytes) to the message payload.
  4. Consumer extracts the ID, fetches the schema, and deserializes safely.

This guarantees strictly typed contracts between decoupled services.

The Record Accumulator: Heart of Batching

The RecordAccumulator is where Kafka’s batching magic happens. It maintains a map of TopicPartition → Deque<ProducerBatch>, accumulating records into batches before transmission.

How Batching Works

When a record arrives, the accumulator attempts to append it to the last batch for that partition. A batch is ready for sending when any of these conditions are met:

  1. Batch is full: The batch has reached batch.size bytes
  2. Linger time elapsed: The batch has waited for linger.ms milliseconds
  3. Memory pressure: The buffer pool is exhausted and threads are blocking
  4. Flush requested: Application called flush() or is closing

Key Batching Parameters

ParameterDefault (Kafka 4.x)Purpose
batch.size16384 (16 KB)Target size for batches
linger.ms5Max wait time before sending incomplete batch
buffer.memory33554432 (32 MB)Total memory for buffering
max.block.ms60000Max time send() blocks when buffer full

Important Change in Kafka 4.0: The default linger.ms changed from 0 to 5. This small delay allows more records to accumulate, resulting in larger batches and better throughput—often with similar or even lower latency due to reduced request overhead.

Partitioning Strategies

The partitioner determines which partition receives each record. Kafka 4.x provides several strategies:

Default Partitioner (Sticky Partitioning)

The default behavior since Kafka 2.4 uses “sticky” partitioning for records without keys:

  • With key: Hash-based assignment ensures all records with the same key go to the same partition
  • Without key: Records stick to one partition until batch.size bytes accumulate, then switch

This dramatically improves batching efficiency compared to round-robin for keyless messages.

// Key present: consistent partition assignment via murmur2 hash producer.send(new ProducerRecord<>("orders", "customer-123", orderJson)); // No key: sticky to current partition until batch fills producer.send(new ProducerRecord<>("events", null, eventJson));

Adaptive Partitioning

Kafka 4.x includes adaptive partitioning (partitioner.adaptive.partitioning.enable=true by default) that routes more traffic to brokers with lower latency, improving overall throughput.

Delivery Guarantees & Idempotence

Understanding delivery semantics is crucial for building reliable systems.

The Duplicate Problem

Without idempotence, this scenario causes duplicates:

Producer Broker │ │ │──── Send(msg, seq=1) ───→│ │ │ (writes to log) │ ✗ ACK lost/timeout │ │ │ │──── Retry(msg, seq=1) ──→│ │ │ (writes AGAIN - duplicate!) │←─────── ACK ─────────────│

Idempotent Producer

When enable.idempotence=true (default since Kafka 3.0), the producer gets a unique Producer ID (PID) and assigns monotonically increasing sequence numbers per partition:

Producer (PID=1000) Broker │ │ │── Send(msg, pid=1000, ──→│ │ seq=42) │ (writes, tracks seq=42) │ │ │ ✗ ACK lost/timeout │ │ │ │── Retry(msg, pid=1000, ─→│ │ seq=42) │ (seq≤42, deduplicate!) │←─────── ACK ─────────────│

The broker rejects (but acknowledges) any message with sequence ≤ the last committed sequence for that PID/partition pair.

Idempotence Configuration (Kafka 4.x Defaults)

These settings are now the defaults and provide the strongest guarantees:

var config = Map.of( "enable.idempotence" , "true", // default "acks" , "all", // default "retries" , Integer.toString(Integer.MAX_VALUE), // effective default "max.in.flight.requests.per.connection", "5" // default, ordering preserved );

Key insight: With idempotence enabled and max.in.flight.requests.per.connection ≤ 5, Kafka guarantees ordering even with retries. The broker tracks sequences for up to 5 batches per producer.

Acks Deep Dive & Min In-Sync Replicas

The acks setting controls when the producer considers a request complete, but it must be paired with broker-side configurations for true durability.

Acks SettingBehaviorDurabilityLatency
acks=0Fire and forget. No acknowledgement waited.None (high risk of loss)Lowest
acks=1Leader writes to its local log and acknowledges.Medium (safe if leader survives)Low
acks=all (or -1)Leader waits for the full set of in-sync replicas (ISR) to acknowledge.HighestMedium/High

[!IMPORTANT] acks=all is not enough! If you have acks=all but your topic’s min.insync.replicas=1, then “all” just means “the 1 available replica” (the leader). If that leader crashes, data is lost.

Golden Rule for Durability: set acks=all, replication.factor=3, and min.insync.replicas=2. This allows one broker to fail without data loss while still accepting writes.

Transactions: Atomic Multi-Partition Writes

While idempotence provides exactly-once within a partition, transactions extend this to atomic writes across multiple partitions.

Transaction Lifecycle

┌─────────────────────────────────────────────────────────┐ │ Transaction States │ │ │ │ Empty ─→ Ongoing ─→ PrepareCommit ─→ CompleteCommit │ │ │ │ │ │ └──→ PrepareAbort ──→ CompleteAbort │ └─────────────────────────────────────────────────────────┘

Transactional Producer Pattern

var config = Map.of( "bootstrap.servers", "localhost:9092", "transactional.id" , "order-processor-1" // idempotence automatically enabled ); try (var producer = new KafkaProducer<String, String>(config, new StringSerializer(), new StringSerializer())) { producer.initTransactions(); try { producer.beginTransaction(); producer.send(new ProducerRecord<>("orders", orderId, orderJson)); producer.send(new ProducerRecord<>("inventory", productId, updateJson)); producer.send(new ProducerRecord<>("notifications", null, notifyJson)); producer.commitTransaction(); } catch (ProducerFencedException e) { // Another producer with same transactional.id took over producer.close(); } catch (KafkaException e) { producer.abortTransaction(); } }

Consumer Isolation Levels

For transactions to work end-to-end, consumers must use isolation.level=read_committed:

Isolation LevelBehavior
read_uncommitted (default)See all messages, including uncommitted
read_committedOnly see committed messages; aborted filtered out

Compression Deep Dive

Compression happens at the batch level, making larger batches more effective:

Compression Types

TypeRatioCPUBest For
none1.0xLowestAlready compressed data
gzip~5-10xHighestMaximum compression, cold data
snappy~2-4xLowBalanced, general purpose
lz4~3-5xVery LowHigh throughput, low latency
zstd~4-8xMediumBest ratio/speed trade-off

Fine-Tuning Compression (Kafka 4.x)

var config = Map.of( "compression.type" , "zstd", "compression.zstd.level", "3" // 1-22, default 3 ); // or for gzip var gzipConfig = Map.of("compression.gzip.level", "6"); // 1-9, default -1 // or for lz4 var lz4Config = Map.of("compression.lz4.level", "9"); // 1-17, default 9

Pro tip: Higher linger.ms improves compression ratios by creating larger batches.

Handling Large Messages

Kafka is optimized for small to medium-sized messages (1KB - 10KB). Sending large objects (e.g., 10MB images) requires careful tuning across the stack.

1. Producer Configuration

  • max.request.size: Controls the maximum size of a request (batch of messages) the producer will send. Default is 1MB. increasing this allows larger individual messages.
  • buffer.memory: Ensure your buffer is significantly larger than max.request.size so a single large batch doesn’t block all other sends.
  • Compression: Essential for large text-based payloads (JSON/XML).

2. Broker Configuration

The broker must also accept these large requests. Failures here often result in RecordTooLargeException.

  • message.max.bytes: Per-topic or global limit on message batch size. Must be ≥ producer’s max.request.size.
  • replica.fetch.max.bytes: Consumers/Followers must be able to fetch these large batches.

[!TIP] For truly large files (videos, huge docs), don’t put them in Kafka. Store the file in S3/GCS and send the URL/pointer in the Kafka message.

Error Handling & Retries

Retry Configuration

Kafka 4.x uses a delivery timeout model rather than explicit retry counts:

var config = Map.of( "delivery.timeout.ms" , "120000", // 2 minutes total time to deliver "request.timeout.ms" , "30000", // per-request timeout "retry.backoff.ms" , "100", // initial backoff "retry.backoff.max.ms" , "1000" // max backoff (exponential) );

The relationship: delivery.timeout.ms ≥ linger.ms + request.timeout.ms

Handling Send Failures

// Async with callback producer.send(record, (metadata, exception) -> { if (exception != null) { if (exception instanceof RetriableException) { // Kafka will auto-retry; log for visibility logger.warn("Retriable error, will retry: " + exception.getMessage()); } else if (exception instanceof SerializationException) { // Data issue, won't be retried deadLetterQueue.send(record); } else { // Unknown error metrics.increment("producer.errors"); } } }); // Sync (blocks, throws on failure) try { var metadata = producer.send(record).get(10, TimeUnit.SECONDS); } catch (ExecutionException e) { handleError(e.getCause()); } catch (TimeoutException | InterruptedException e) { handleTimeout(); }

Performance Tuning Guide

High Throughput Configuration

var config = Map.of( "batch.size" , "65536", // 64 KB batches "linger.ms" , "20", // Wait up to 20ms "buffer.memory" , "67108864", // 64 MB buffer "compression.type", "lz4", "acks" , "1" // Trade durability for speed );

Low Latency Configuration

var config = Map.of( "batch.size" , "16384", // Default 16 KB "linger.ms" , "0", // Send immediately "acks" , "1", // Don't wait for all replicas "compression.type", "none" // Skip compression overhead );

Balanced Production Configuration

var config = Map.of( // Durability "acks" , "all", "enable.idempotence", "true", // Batching "batch.size" , "32768", "linger.ms" , "5", // Compression "compression.type" , "zstd", // Buffer "buffer.memory" , "33554432", "max.block.ms" , "60000", // Reliability "delivery.timeout.ms" , "120000", "request.timeout.ms" , "30000", "max.in.flight.requests.per.connection", "5" );

Monitoring Producer Health

Key Metrics to Watch

MetricWarning ThresholdMeaning
record-send-rateDroppingThroughput issues
record-error-rate> 0Delivery failures
request-latency-avg> 100msNetwork/broker issues
batch-size-avg< batch.size/2Inefficient batching
buffer-available-bytes< 20% of totalMemory pressure
waiting-threads> 0Blocked on buffer
records-per-request-avg< 10Poor batching

Enabling Metrics

var config = Map.of( "metric.reporters" , "org.apache.kafka.common.metrics.JmxReporter", "metrics.recording.level", "DEBUG" // INFO, DEBUG, or TRACE );

Best Practices Summary

  1. Use idempotence (enabled by default in Kafka 4.x) unless you have a specific reason not to

  2. Set acks=all for durability-critical data; it’s the default for good reason

  3. Don’t set retries explicitly; use delivery.timeout.ms to control total delivery time

  4. Size buffers appropriately: buffer.memory should handle burst traffic without blocking

  5. Tune batching for your workload: High-throughput systems benefit from larger linger.ms and batch.size

  6. Use compression (especially lz4 or zstd) for most workloads—the CPU cost is usually worth the network savings

  7. Reuse producer instances; they’re thread-safe and expensive to create

  8. Always close producers in finally blocks or use try-with-resources to avoid leaking resources

  9. Monitor buffer metrics to detect backpressure before it causes application issues

  10. Use transactions only when needed; they add overhead and complexity

Last updated on