Apache Kafka is a distributed streaming platform that is widely used for building real-time data processing pipelines and streaming applications. It was originally developed by LinkedIn and later open-sourced as part of the Apache Software Foundation. Kafka is designed to handle high-throughput, low-latency processing of real-time data feeds.
Core Concepts
- Producer/Consumer Model: Kafka operates on a producer-consumer model, where producers publish data to Kafka topics, and consumers subscribe to those topics to receive the data.
- Topics and Partitions: Data in Kafka is organized into topics. Each topic is a stream of records and can be divided into partitions for parallel processing.
- Brokers: Kafka runs as a cluster of servers, where each server is called a broker. Brokers handle the storage of data and serve client requests.
Advantages
- Scalability: Kafka is designed to be distributed and scalable, capable of handling large volumes of data and a high number of concurrent clients.
- Fault Tolerance: It provides fault tolerance through data replication, ensuring data is not lost if a broker fails.
- Low Latency: Kafka is optimized for low latency to support real-time data processing use cases.
- High Throughput: It can handle high throughput of data, making it suitable for big data scenarios.
- Persistence: Kafka stores streams of records in a fault-tolerant way and allows the records to be persistent for a configurable amount of time.
- Replication: Data in Kafka topics is replicated across multiple brokers to ensure durability and high availability.
- Delivery Semantics: Kafka provides different delivery semantics like at-most-once, at-least-once, and exactly-once processing.
- Stream Processing: Kafka includes a stream processing library called Kafka Streams for building real-time streaming applications.
Topic
A topic is a category or a feed name to which records (messages) are published. Topics in Kafka are used to categorize and store similar types of data. Topics allow producers (data publishers) and consumers (data subscribers) to exchange messages in a decoupled and distributed manner.
(Image source: https://kafka.apache.org/documentation)
Topics can be created manually using Kafka’s command-line tools or programmatically via Kafka’s API. They can also be auto-created when a producer sends a message to a non-existent topic, depending on the cluster configuration. Topics have various configurable properties, such as the number of partitions, replication factor, retention policy, etc.
Each topic is divided into one or more partitions. Partitions allow Kafka to split the data of a topic across multiple brokers, enabling horizontal scaling. Each partition holds a subset of the data and can be hosted on different Kafka brokers. This distributes the load and allows for parallel data processing. Producers send messages to a topic, they can optionally specify a key with each message, which impact the partition selection. If a key is specified, Kafka uses it to determine the partition within the topic where the message will be placed, typically using a consistent hashing mechanism. If no key is provided, messages are distributed round-robin or based on a custom partitioning logic.
Consumers read messages from a topic. They can subscribe to one or more topics and consume data either individually or as part of a consumer group. In a consumer group, each consumer reads from a unique set of partitions, providing scalability and fault tolerance. Kafka ensures that each partition is only consumed by one consumer in the group at any given time.
Kafka supports ACLs, allowing administrators to control access to topics, including who can produce to or consume from a specific topic.
Topics in Kafka serve as the central mechanism for organizing and handling data streams. They provide a scalable, fault-tolerant, and efficient way for multiple producers and consumers to exchange data in real-time. The configurable nature of topics allows them to specific use cases and requirements, making them integral to Kafka’s functionality as a distributed streaming platform.
To view a list of Kafka topics, run the following command:
bin/kafka-topics.sh –list –bootstrap-server localhost:9092
For a more granular view of the topics and partitions:
bin/kafka-topics.sh –describe –bootstrap-server localhost:9092
Partition
A partition in Kafka is a sequential, ordered log of records (messages). It’s essentially a commit log where each record is appended.
Partition replication is a crucial feature for ensuring data reliability and high availability. Replication in Kafka is designed to prevent data loss by duplicating partition data across multiple brokers (servers) in a Kafka cluster. Each partition can be replicated a certain number of times, specified by the replication factor. A replication factor of N means there will be N copies of the data across different brokers.
For each partition, one of the replicas is designated as the leader. All produce and consume requests for that partition go through the leader. The other replicas are followers. They passively replicate the data from the leader and do not serve client requests directly. Producers write messages to the leader of the partition, records are appended sequentially to the log in the leader partition. Producers can configure their acknowledgment levels (e.g., acks=0: no wait, 1: only wait for leader, -1/all: wait for all ISR) to ensure data is replicated to a certain number of replicas before considering the write successful.
Follower brokers pull data from the leader. This process is pull-based rather than push-based, meaning each follower fetches data from the leader at its own pace. The followers continuously replicate new data from the leader and perform consistency checks to ensure they are in sync. Kafka tracks a set of replicas that are “in-sync” with the leader(In-Sync Replicas, ISR). Only in-sync replicas are eligible to be elected as leader in case of leader failure.
If a leader replica fails, one of the follower replicas is promoted to be the new leader. The new leader is typically chosen based on being the most up-to-date replica that is in sync with the old leader. Clients (producers and consumers) are automatically redirected to the new leader for all future requests. In scenarios where the leader fails, any messages that were not replicated to the followers will be lost(replication lag). Hence, minimizing replication lag is crucial for data durability.
Within a single partition, messages are guaranteed to be ordered in the sequence they were added. However, there’s no ordering guarantee across different partitions. Kafka provides different delivery semantics, such as at-most-once, at-least-once, and exactly-once, which can be configured based on the requirements of the application.
Offset
In Kafka, an offset is a unique identifier of a record within a partition. Kafka tracks the offset (position) of each consumer in each partition. This allows consumers to resume reading from where they left off. A consumer knows which record to read next based on the offset.
Ways of Committing Offsets:
- Automatic Offset Committing: This is the simplest method where the consumer automatically commits offsets. With
enable.auto.commit
set totrue
, offsets are committed every 5 seconds (controlled byauto.commit.interval.ms
, defaulting to 5 seconds). This happens during the consumer’s poll loop. However, this might lead to re-processing data from the last 5 seconds in case of failure. - Manual Synchronous Offset Committing: Developers can control when offsets are committed to reduce message losses and duplicate messages during rebalances. By setting
enable.auto.commit
tofalse
, the application decides when to commit offsets. ThecommitSync()
method, which commits the latest offset returned bypoll()
, is simple and reliable, but it blocks the application until the broker responds, throwing an error if the commit fails. - Asynchronous Offset Committing: To avoid blocking and decrease throughput, asynchronous committing can be used.
commitAsync()
commits the last offset and proceeds without waiting for a broker response. WhilecommitSync()
retries until successful,commitAsync()
does not, due to the possibility of a later, larger offset being successfully committed. It offers callbacks for handling broker responses, often used for logging errors or metrics. However, be cautious with retries to maintain order. - Combining Synchronous and Asynchronous Commits: For occasional commit failures, not retrying isn’t a major issue since subsequent commits will likely succeed. However, for the last commit before closing a consumer or during rebalancing, ensure a successful commit by combining
commitSync()
andcommitAsync()
. - Committing Specific Offsets: To avoid reprocessing a whole batch of messages during rebalancing, you might want to commit offsets in the middle of a batch.
commitSync()
andcommitAsync()
alone aren’t sufficient since they only commit the last offset. Fortunately, the consumer API allows committing a map of specific partition offsets by passing it tocommitSync()
andcommitAsync()
. .
If a consumer fails and restarts, it can resume reading from the last committed offset. This ensures no data loss and allows for exactly-once or at-least-once processing semantics. Consumers can also reset to an earlier offset to reprocess data. This is useful in scenarios where the processing logic changes or in the event of a processing error.
Consumers can configure their offset reset behavior for cases where they have no valid offset (like the first time they connect) or if the current offset is invalid (such as being out of range). Options include:
earliest
: Start from the beginning of the partition.latest
: Start from the most recent record.none
: Throw an exception if no initial offset is found or if the current offset is invalid.
Produce a Record To Kafka
1. Record Created by Producer: The producer application creates a record, which typically includes a key, a value, and optionally headers and a timestamp. The producer serializes the key and value into a binary format using a serializer (e.g., String, Avro, JSON serializers).
2. Partition Determination: The producer may specify a partition directly. If no partition is specified, Kafka determines the partition. If a key is provided and no partition is specified, Kafka uses a partitioner to map the key to a specific partition. This often involves hashing the key and mapping the hash to a partition. If no key is provided, Kafka may use round-robin or another partitioning strategy to distribute messages evenly across available partitions.
3. Record Sending: The producer establishes a connection to the Kafka broker that is the leader for the target partition. The producer may batch multiple records together for efficiency before sending them to the broker(by Sender process). The leader broker for the partition receives the record(s). The broker validates the record (e.g., for schema compatibility if schema validation is enabled). The broker appends the record to the end of the partition’s log. Kafka logs are append-only, which ensures high write throughput.
4 Replication to Follower Brokers The record is replicated to all follower brokers for the partition to ensure data redundancy. Depending on the producer’s configuration (acks), the producer may wait for acknowledgments from followers. If acks=all, the producer waits for all in-sync replicas to acknowledge. Once the record is written to the partition log and successfully replicated (based on the acknowledgment settings), it is considered committed.
5 Acknowledgment to Producer: The leader broker sends an acknowledgment back to the producer, indicating successful receipt (and replication, if applicable) of the record. The acknowledgment includes the offset of the record in the Kafka log, which can be used by the producer for future reference. Once committed, the record is immediately available for consumers subscribing to the partition.
6 Error Handling (If Any) If any error occurs (e.g., leader election, network issues), an error response is sent to the producer, and the producer may retry sending the record based on its configuration.
Consume a Record from Kafka
Consuming a record from Apache Kafka involves a series of steps that ensure efficient and reliable message delivery to consumers. Here’s a detailed breakdown of the process:
1 Consumer Configuration: The consumer is configured with various settings, including the Kafka cluster address, group ID, deserializers for keys and values, and other consumer-specific settings. The consumer subscribes to one or more Kafka topics. It can also subscribe to a pattern to dynamically subscribe to topics.
2 Group Coordination (If Part of a Consumer Group): If the consumer is part of a consumer group, it sends a join group request to the Kafka cluster. Kafka performs a group rebalance to assign partitions to each consumer in the group. Each partition is consumed by exactly one consumer in the group.
3 Connection to Broker and Fetching Records: The consumer identifies and establishes a connection to the leader broker for each of the partitions it is assigned to consume from. The consumer fetches the last committed offset for each assigned partition to know where to start consuming. The consumer sends fetch requests to the broker(s) to retrieve records from the assigned partitions starting from the last committed offset.
4 Data Transmission: The broker sends the requested data back to the consumer. This data includes a batch of records for each requested partition. The consumer deserializes the received records from binary format back into keys and values. The consumer processes the deserialized records as per its business logic.
5 Offset Management: The consumer keeps track of the offsets of records it has successfully processed. The consumer periodically commits the offsets back to Kafka. This can be done automatically (auto-commit) or manually in the consumer code.
6. Rebalance Listener: If there is a change in the consumer group (like a new consumer joining or an existing one leaving), a rebalance is triggered. The consumer can handle rebalance events to commit offsets and clean up resources.
The consumer continues to poll for new data in a loop. It repeatedly sends fetch requests for new records in the assigned partitions. The consumer handles any exceptions that occur during consumption, such as connection losses or serialization errors. When the consumer is shut down, it commits its final offsets and closes its connections to the brokers. If needed, the consumer can reset its offsets to reprocess data. This can be done by seeking to a specific offset or by using offset reset policies.
Find-tuning Kafka
Fine-tuning Apache Kafka for high throughput requires adjustments at various levels: broker configuration, topic/partition design, producer and consumer settings, and the underlying hardware and network infrastructure.
1. Broker Configuration
- Batch Settings: Increase
batch.size
andlinger.ms
to allow more messages to be batched before being sent to the broker, reducing request overhead. - Compression: Enable compression (
compression.type
) at the producer level. This reduces the size of the payload sent over the network and stored on disk. - Filesystem and Log Segments: Optimize log segment sizes (
log.segment.bytes
andlog.roll.hours
) to balance write performance and log compaction. - Increase File Descriptors: Ensure Kafka brokers have a high limit on file descriptors, as Kafka is I/O heavy and uses file descriptors for network connections and log segments.
2. Topic and Partition Design
- More Partitions: Increase the number of partitions per topic (
num.partitions
) for parallelism, but be cautious as too many partitions can increase overhead. - Replication Factor: Use an appropriate replication factor. More replicas increase fault tolerance but add overhead to replication. Typically, a replication factor of 3 is a good balance.
3. Producer Configuration
- Asynchronous Sending: Use asynchronous sends (fire-and-forget or callback mechanism) to increase throughput.
- Tuning
acks
: Setacks
to1
to achieve a balance between durability and performance. Setting it to0
increases throughput but at the risk of data loss.
4. Consumer Configuration
- Increase Fetch Size: Adjust
fetch.min.bytes
andfetch.max.wait.ms
to allow consumers to fetch more data in a single request, reducing network calls. - Consumer Parallelism: Increase the number of consumer instances or consumer threads to parallelize consumption.
5. Network Tuning
- Network Bandwidth: Ensure high network bandwidth between producers, brokers, and consumers to handle high data volumes.
- Increase Buffer Sizes: Tune network buffer settings (
receive.buffer.bytes
andsend.buffer.bytes
) on both the broker and client sides.
6. Hardware Considerations
- Fast Disks: Use SSDs instead of HDDs for better disk I/O performance, crucial for high-throughput scenarios.
- Adequate Memory: Ensure brokers have enough memory, particularly for page caching, as Kafka relies heavily on the underlying OS for caching.
- CPU Resources: Ensure sufficient CPU resources to handle serialization/deserialization and compression/decompression processes.
7. JVM Performance
- JVM Settings: Tune JVM settings for Kafka brokers, including garbage collection (GC) parameters, to ensure stable performance.
8. Use Kafka’s Quotas
- Throttling: Set quotas on client-side to prevent any client from overwhelming the cluster and impacting overall cluster performance.
9. Cluster Size and Layout
- Cluster Scaling: Scale the Kafka cluster horizontally by adding more brokers for better load distribution.
- Rack Awareness: Use rack-awareness to distribute replicas across different racks or zones for fault tolerance and performance.
Optimizing Kafka for high throughput involves a combination of configuration tuning, hardware choices, and careful design of topics and partitioning. It’s essential to balance performance with reliability and fault tolerance. Regular monitoring and adjustments based on observed performance are key to maintaining an optimized Kafka environment.
Code Examples
First, ensure you have the Kafka environment set up and the confluent-kafka-go
library installed in your Go environment:
go get -u github.com/confluentinc/confluent-kafka-go/kafka
Kafka Producer in Go
This simple producer will send messages to a specified Kafka topic:
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
topic := "test_topic"
for _, word := range []string{"Hello", "Kafka", "World"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message deliveries
p.Flush(15 * 1000)
}
Kafka Consumer in Go
This consumer will read messages from the specified Kafka topic:
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"test_topic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}