The goal of this chapter is to get you familiar with Apache Kafka and show you how to solve the consumption of millions of messages in a pipeline architecture. Here we show some Scala examples to give you a solid foundation for the different types of implementations and integrations for Kafka producers and consumers.
In addition to the explanation of the Apache Kafka architecture and principles, we explore Kafka integration with the rest of the SMACK stack, specifically with Spark and Mesos. At the end of the chapter, we show how to administer Apache Kafka.
This chapter covers the following topics:
Kafka introduction
Kafka installation
Kafka in cluster
Kafka architecture
Kafka producers
Kafka consumers
Kafka integration
Kafka administration
Kafka Introduction
The Apache Kafka author, Jay Kreps, who took a lot of literature courses in the college, if the project is mainly optimized for writing (in this book when we say “optimized” we mean 2 million writes per second on three commodity machines) when he open sourced it, he thought it should have a cool name: Kafka, in honor of Franz Kafka, who was a very prolific author despite dying at 40 age.
Nowadays, real-time information is continuously generated. This data needs easy ways to be delivered to multiple types of receivers. Most of the time the information generators and the information consumers are inaccessible to each other; this is when integration tools enter the scene.
In the 1980s, 1990s and 2000s, the large software vendors whose names have three letters (IBM, SAP, BEA, etc.) and more (Oracle, Microsoft, Google) have found a very well-paid market in the integration layer, the layer where live: enterprise service bus, SOA architectures, integrators, and other panaceas that cost several millions of dollars.
Now, all traditional applications tend to have a point of integration between them, therefore, creating the need for a mechanism for seamless integration between data consumers and data producers to avoid any kind of application rewriting at either end.
As we mentioned in earlier chapters, in the big data era, the first challenge was the data collection and the second challenge was to analyze that huge amount of data.
Message publishing is the mechanism for connecting heterogeneous applications through sending messages among them. The message router is known as message broker. Apache Kafka is a software solution to quickly route real-time information to consumers.
The message broker provides seamless integration, but there are two collateral objectives: the first is to not block the producers and the second is to not let the producers know who the final consumers are.
Apache Kafka is a real-time publish-subscribe solution messaging system: open source, distributed, partitioned, replicated, commit-log based with a publish-subscribe schema. Its main characteristics are as follows:
Distributed. Cluster-centric design that supports the distribution of the messages over the cluster members, maintaining the semantics. So you can grow the cluster horizontally without downtime.
Multiclient. Easy integration with different clients from different platforms: Java, .NET, PHP, Ruby, Python, etc.
Persistent. You cannot afford any data lost. Kafka is designed with efficient O(1), so data structures provide constant time performance no matter the data size.
Real time. The messages produced are immediately seen by consumer threads; these are the basis of the systems called complex event processing (CEP) .
Very high throughput. As we mentioned, all the technologies in the stack are designed to work in commodity hardware. Kafka can handle hundreds of read and write operations per second from a large number of clients.
Figure 8-1 shows an Apache Kafka messaging system typical scenario.
Figure 8-1. Apache Kafka typical scenario
On the producers’ side, you can find several types of actors , for example:
Adapters. Generate transformation information; for example, a database listener or a file system listener.
Logs. The log files of application servers and other systems, for example.
Proxies. Generate web analytics information.
Web pages. Front-end applications generating information.
Web services. The service layer; generate invocation traces.
You could group the clients on the customer side as three types:
Offline. The information is stored for posterior analysis; for example, Hadoop and data warehouses.
Near real time. The information is stored but it is not requested at the same time; for example, Apache Cassandra, and NoSQL databases.
Real time. The information is analyzed as it is generated; for example, an engine like Apache Spark or Apache Storm (used to make analysis over HDFS).
Born in the Fast Data Era
As we have mentioned, data is the new ingredient of Internet-based systems. Simply, a web page needs to know user activity, logins, page visits, clicks, scrolls, comments, heat zone analysis, shares, and so forth.
Traditionally, the data was handled and stored with traditional aggregation solutions. Due to the high throughput, the analysis could not be done until the next day. Today, yesterday’s information often useless. Offline analysis such as Hadoop is being left out of the new economy.
There are several examples of Apache Kafka use cases:
Web searches based on relevance
Application security: login analysis, brute force attack detection, systemic denial of service attack detection
Recommendations based on popularity, correlation
Sentiment analysis, tendencies, segmentation
Collecting data from device sensors or sophisticated sensors like surveillance cameras to GPS cell phone sensors; passing through sensors: light, temperature, pressure, humidity
Real-time merchandising to a huge population
Collecting logs from business systems such as application server logs, CRM, ERP, and so forth.
In all of these cases, the analysis is done in real time or it is never done, without middle points.
Apache Kafka usually is compared to traditional messaging systems such as ActiveMQ or RabitMQ. The difference is the data volume that Kafka can handle in real time.
Use Cases
Well, you have seen some business scenarios that are solved with Apache Kafka. In which layer in the architecture should you put Kafka? Here are some popular (real examples with real enterprises) use cases:
Commit logs. What happens when your system does not have a log system? In these cases, you can use Kafka. Many times systems do not have logs, simply because (so far) it’s not possible to handle such a large data volume. The stories of application servers falling simply because they could not write their logs correctly with the verbosity needed by the business are more common than it seems. Kafka can also help to start and restart fallen log servers.
Log aggregation. Contrary to what people believe, much of the work of the onsite support team is on log analysis. Kafka not only provides a system for log management, but it can also handle heterogeneous aggregation of several logs. Kafka can physically collect the logs and remove cumbersome details such as file location or format. In addition, it provides low latency and supports multiple data sources while making distributed consumption.
Messaging. Systems are often heterogeneous, and instead of rewriting them, you have to translate between them. Often the manufacturer’s adapters are unaffordable to a company; for such cases, Kafka is the solution because it is open source and can handle more volume than many traditional commercial brokers.
Stream processing. We could write an entire book on this topic. In some business cases, the process of collecting information consists of several stages. A clear example is when a broker is used not only to gather information but also to transform it. This is the real meaning and success of the Enterprise Service Bus (ESB) architectures. With Kafka, the information can be collected and further enriched; this (very well paid) enrichment process is known as stream processing.
Record user activity. Many marketing and advertising companies are interested in recording all the customer activity on a web page. This seems a luxury, but until recently, it was very difficult to keep track of the clicks that a user makes on a site. For those tasks where the data volume is huge, you can use Kafka for real-time process and monitoring.
All of this seems good, but who is using Kafka today? Here are some examples:
LinkedIn.1 Used for activity stream and operational metrics. We cannot imagine the today’s LinkedIn newsfeed without Kafka.
Uber.2 Relied on Kafka data feeds to bulk-load log data into Amazon S3 to stream change-data logs from the local data centers .
Twitter.3 Handling five billion sessions a day in real time requires Kafka to handle their stream processing infrastructure.
Netflix.4 Kafka is the backbone of Netflix’s data pipeline for real-time monitoring and event processing.
Spotify.5 Kafka is used as part of their log delivery system.
Yahoo. Used by the media analytics team as a real-time analytics pipeline. Their cluster handles 20Gbps of compressed data.
Kafka Installation
Go to the Apache Kafka home page at http://kafka.apache.org/downloads , as shown in Figure 8-2.
Figure 8-2. Apache Kafka download page
The Apache Kafka current version available as a stable release is 0.10.0.0. The major limitation with Kafka since 0.8.x is that it is not backward-compatible. So, you cannot replace this version for one prior to 0.8.
Once you download the available release, let’s proceed with the installation.
Installing Java
You need to install Java 1.7 or later. Download the latest JDK from Oracle’s web site at http://www.oracle.com/technetwork/java/javase/downloads/index.html .
For example, to install in Linux:
Change the file mode:
[restrada@localhost opt]# chmod +x jdk-8u91-linux-x64.rpm
Change the directory in which you want to perform the installation:
[restrada@localhost opt]# cd <directory path name>
To install the software in the /usr/java/ directory, type the following command:
[restrada@localhost opt]# cd /usr/java
Run the installer using this command:
[restrada@localhost java]# rpm -ivh jdk-8u91-linux-x64.rpm
Finally, add the JAVA_HOME environment variable. This command will write the JAVA_HOME environment variable to the /etc/profile file:
[restrada@localhost opt]# echo "export JAVA_HOME=/usr/java/jdk1.8.0_91" >> /etc/profile
Installing Kafka
To install in Linux , take the following steps.
Extract the downloaded kafka_2.10-0.10.0.0.tgz file:
[restrada@localhost opt]# tar xzf kafka_2.10-0.10.0.0.tgz
Add the Kafka bin directory to PATH as follows:
[restrada@localhost opt]# export KAFKA_HOME=/opt/kafka_2.10-0.10.0.0
[restrada@localhost opt]# export PATH=$PATH:$KAFKA_HOME/bin
Importing Kafka
To include Kafka in our programming projects, we include the dependencies.
With SBT:
// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10
libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.10.0.0"
With Maven:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.0</version>
</dependency>
With Gradle:
// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10
compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.10.0.0'
Kafka in Cluster
We are ready to program with the Apache Kafka publisher-subscriber messaging system. In Kafka, there are three types of clusters:
Single node–single broker
Single node–multiple broker
Multiple node–multiple broker
A Kafka cluster has five main components:
Topic. A category or feed name in which messages are published by the message producers. Topics are partitioned; each partition is represented by an ordered immutable messages sequence. The cluster has a partitioned log for each topic. Each message in the partition has a unique sequential id called an offset.
Broker. A Kafka cluster has one or more physical servers in which each one may have one or more server processes running. Each server process is called a broker. The topics live in the broker processes.
Producer. Publishes data to topics by choosing the appropriate partition in the topic. For load balancing, the messages allocation to the topic partition can be done in a round-robin mode or by defining a custom function.
Consumer. Applications or processes subscribed to topics and process the feed of published messages.
ZooKeeper. ZooKeeper is the coordinator between the broker and the consumers. ZooKeeper coordinates the distributed processes through a shared hierarchical name space of data registers; these registers are called znodes.
There are two differences between ZooKeeper and a file system:
Every znode has data associated and is designed to store coordination data.
Znodes are limited on the amount of data that they can have.
Single Node–Single Broker Cluster
An example diagram of a single node–single broker cluster is shown in Figure 8-3.
Figure 8-3. Single node–single broker Kafka cluster example
First, start the ZooKeeper server. Kafka provides a simple ZooKeeper configuration file to launch a single ZooKeeper instance. To install the ZooKeeper instance, use the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties
The following are the main properties defined in zookeeper.properties:
dataDir. The data directory where ZooKeeper is stored:
dataDir=/tmp/zookeeper
clientPort. The listening port for client requests. By default, ZooKeeper listens in the 2181 TCP port:
clientPort=2181
maxClientCnxns. The limit per IP for the number of connections (0 = unbounded):
maxClientCnxns=0
For more information about Apache ZooKeeper, visit the project page at http://zookeeper.apache.org/ .
Starting the Broker
After start ZooKeeper , start the Kafka broker with the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-server-start.sh config/server.properties
The following are the main properties defined in server.properties:
Broker id. The unique positive integer id for each broker.
Broker.id=0
Port. The port where the socket server listens on:
port=9092
Log dir. The directory to store log files:
log.dir=/tmp/kafka10-logs
Num partitions. The number of log partitions per topic:
num.partitions=2
ZooKeeper connect. The ZooKeeper connection URL:
zookeeper.connect=localhost:2181
Creating a Topic
Kafka has a command to create topics. Let’s create a topic called amazingTopicwith one partition and one replica:
[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic amazingTopic
Obtain the output, as follows:
Created topic "amazingTopic".
These are the parameters:
--replication-factor 1 indicates one replica
--partition 1 indicates one partition
--zookeeper localhost:2181 indicates the ZooKeeper URL
To obtain the list of topics on any Kafka server, use the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --list
--zookeeper localhost:2181
Obtain the output:
amazingTopic
Starting a Producer
Kafka has a command line to start producers . It accepts input from the command line and publishes them as messages. By default, each new line is considered a message.
[restrada@localhost kafka_2.10.0-0.0.0.']# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic amazingTopic
Two parameters are required:
broker-list. The URL of the brokers to be connected.
topic. The name of the topic used to send a message to its subscribers.
Now type the following:
Valar morghulis [Enter]
Valar dohaeris [Enter]
You get this output:
Valar morghulis
Valar dohaeris
The following are the main properties defined in producer.properties :
Metadata broker list. A list of brokers used for bootstrapping knowledge about the rest of the cluster.
Format: host1:port1, host2:port2
metadata.broker.list=localhost:9092
Compression codec. The compression codec for data generated.
Example: none, gzip, snappy
compression.codec=none
Later on this chapter, you see how to write producers.
Starting a Consumer
Kafka has a command line to start a message consumer client . It shows the output at the command line as soon as it subscribes to the topic in the broker:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic amazingTopic --from-beginning
As you request from-beginning, you see the following output:
Valar morghulis
Valar dohaeris
The following is the main property defined in consumer.properties:
Group id. A string that identifies a set of consumers in the same group:
group.id=test-consumer-group
Later on this chapter, you will learn how to write consumers.
Now let’s play with the new toy architecture. Open each technology in a different console: ZooKeeper, broker, producer, and consumer. Type the commands in the producer and watch them displayed in the consumer.
If you don’t recall how to run producers or consumers, running the command with no arguments will show the possible parameters.
Single Node–Multiple Broker Cluster
An example diagram of a single node–multiple broker cluster is shown in Figure 8-4.
Figure 8-4. Single node–multiple broker Kafka cluster example
As usual, start the ZooKeeper server:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties
You need a different server.properties file for every broker. Let’s call them: server-1.properties, server-2.properties, server-3.properties, and so forth.
In server-1.properties, specify the following:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
Similarly, on server-2.properties, specify the following:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
Follow the same procedure for server-3.properties:
broker.id=3
port=9095
log.dir=/tmp/kafka-logs-3
Starting the Brokers
With ZooKeeper running , start the Kafka brokers with the following commands:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-server-start.sh config/server-1.properties
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-server-start.sh config/server-2.properties
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-server-start.sh config/server-3.properties
Creating a Topic
Using the command to create topics, create a topic called reAmazingTopic(re stands for replicated). It has two partitions and two replicas:
[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic reAmazingTopic
Obtain the output, as follows:
Created topic "reAmazingTopic".
Starting a Producer
Now that you know the command to start producers , indicating more brokers in the broker-list is a trivial task:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9093, localhost:9094, localhost:9095 --topic reAmazingTopic
Yes, our architects always have weird requirements; if we need to run multiple producers connecting to different broker combinations, we need to specify a different broker-list for each producer.
Starting a Consumer
To start a consumer , use the same Kafka command that you already know:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic reAmazingTopic
Multiple Node–Multiple Broker Cluster
An example of a multiple node–multiple broker cluster is shown in Figure 8-5.
Figure 8-5. Multiple node–multiple broker Kafka cluster
Here you are in front of the real power of the cluster. Kafka should be installed in every machine in the cluster. Every physical server could have one or many Kafka brokers. All the nodes on the same cluster should connect to the same ZooKeeper.
But don’t worry, all the previous commands remain equal. The commands for ZooKeeper, broker, producer, and consumer don’t change.
Broker Properties
To recapitulate the section, Table 8-1 lists the most popular broker properties.6
Table 8-1. Kafka Broker Most Important Properties
Name | Default value | Description |
---|---|---|
broker.id | 0 | Each broker is identified with a positive integer id. This id is the broker’s name and allows the broker to be moved to a different host or port without losing consumers. |
log.dirs | /tmp/kafka-logs | The directory where the log data is stored. Each new partition created will be placed in the directory with the fewest partitions. |
zookeper.connect | localhost:2181 | The ZooKeeper’s connection string in the hostname:port/chroot form. Here, chroot is the base directory for the path operations (namespace for sharing with other applications on the same ZooKeeper cluster). |
host.name | null | The broker’s hostname. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces and publish one to ZooKeeper. |
num.partitions | 1 | The number of partitions per topic if a partition count isn’t given at topic creation. |
auto.create.topics.enable | true | Enables the autocreation of the topic on the server. If this is set to true, the attempts to produce, consume, or fetch data for a non-existent topic will automatically create a new one with the default replication factor and the default number of partitions. |
default.replication.factor | 1 | The default replication factor for automatically created topics. |
Kafka Architecture
In its beginning, LinkedIn used Java Message Service (JMS) . But when more power was needed (i.e., a scalable architecture), the LinkedIn development team decided to build the project that we know today as Kafka. In 2011, Kafka was an open source Apache project. In this chapter, section we give you some reflections on why things are designed in the way they are.
The following are Kafka’s project goals:
An API. Supports custom implementation of producers and consumers.
Low overhead. Low network latency and low storage overhead with message persistence on disk.
High throughput. Publishes and subscribes millions of messages; supports data feeds and real time.
Distributed. Highly scalable architecture to handle low-latency delivery.
High availability. Autobalances consumers in case of failure.
Fault tolerant. Guarantees data integrity in case of failure.
Kafka is more than a queuing platform; the messages are received and enqueued to be delivered to a consumer pool.
Kafka is more than a published-subscriber platform; the messages are not published to all customers.
The following describes Kafka’s operation in a nutshell:
Messages are published to a Kafka topic, which is a message queue or a message category.
The Kafka topic runs in the Kafka broker, which is a server. Kafka brokers do not just run the topics, but also store the messages when required.
The consumers use the ZooKeeper service to get the information to track the messages (the data about a message state).
Figure 8-6 shows a topic with three partitions. You can see the five Kafka components: ZooKeeper, broker, topic, producer, and consumer.
Figure 8-6. A topic with three partitions
The following describes parts of the partition.
Segment files. Internally, every partition is a logical log file, represented as a set of segment files with the same size. The partition is a sequence of ordered messages. When a message is published, the broker appends the message to the last segment of the file. When a certain number of messages is reached, the segment file is flushed to the disk. Once the file is flushed, the messages are available for consumption by the consumers.
Offset. The partitions are assigned to a unique sequential number called an offset. The offset is used to identify messages inside the partition. The partitions are replicated between the servers for fault tolerance.
Leaders. Inside Kafka, each partition has one Kafka server as it leader. The other servers are followers. The leader of a partition coordinates the read and write requests for that partition. The followers asynchronously replicate the data from the leader. If the leader fails, another server becomes the new leader. In a cluster, every server has two roles: the leader on some partitions and a follower on other partitions.
Groups. The consumers are organized into groups. Each consumer is represented as a process; one process belongs to only one group.
In Kafka, there are three ways to deliver messages (as a reflection exercise, think about why there are only three):
Messages are never redelivered but may be lost.
Messages may be redelivered but never lost.
Messages are delivered once and only once.
Log Compaction
There are two types of retention: finer-grained (per message) and coarser-grained (time based). Log compaction is the process to pass from time-based to per-message retention.
In Kafka, the retention policy can be set to per-topic (time based), size-based, and log compaction–based. Log compaction ensures the following:
Reads begin at offset 0; if the consumer begins at the start of the log, the messages are in the order that they were written.
Messages have sequential offsets that never change.
Message order is always preserved.
A group of background threads recopy log segment files; the records whose keys appear in the log head are removed.
As another reflection exercise, can you deduce why the log compaction ensures these four points?
Kafka Design
The following are Kafka design bullet points:
Storage. The purpose of Kafka is to provide message processing. The main functions are caching and storing messages on a file system. The caching and flushing to disk are configurable.
Retention. If a message is consumed, the message is not wasted; it is retained, allowing message reconsumption.
Metadata. In many messaging systems, the message metadata is kept at the server level. In Kafka, the message state is maintained at the consumer level. This prevents the following:
Multiple deliveries of the same message
Losing messages due to failures
OLTP. Consumers store the state in ZooKeeper, but Kafka also allows the storage in OLTP external systems (online transaction processing).
Push and pull. Producers push the message to the broker and consumers pull the message from the broker.
Masterless. Like Apache Cassandra, Apache Kafka is masterless; you can remove any broker at any time. The metadata is maintained by ZooKeeper and shared with the customers.
Synchronous. Producers have the option to be asynchronous or synchronous when sending messages to the broker.
Message Compression
There are cases where the network bandwidth is the bottleneck. This usually does not happen, but it could. In Kafka, there is a mechanism to compress groups of messages. Note that without being compression experts, we can deduce that it is better to compress a group of messages than compress every message individually.
When a group of messages is compressed, the network overhead is reduced. Before Kafka 0.8.0, groups of messages were compressed and presented to the consumer as a single message; the consumer decompressed it later. But there were issues with decompression that made overhead.
Since Kafka 0.8.0, some changes were introduced to the broker to handle offsets; so the problem was moved to the broker, but the overall performance improved. The lead broker is responsible for compressing messages, which lowers the network overhead but could also increase the load in the broker’s CPU.
As you saw, Kafka handles Gzip and Snappy compression protocols. You need to specify this configuration in the producer to use compression.
compression.codec. This parameter indicates the codec for all data generated on the producer. The default value is none. The valid values are none, gzip, and snappy.
compressed.topics. This parameter turns on the compression on particular topics. Note that if the list of compressed topics is empty, then you are enabling the compression for all the topics. If the compression codec is none, the compression is disabled for all the topics.
If there is a mission in your work that does not let you sleep, and it is related to mirror data across data centers, consider using Kafka. Kafka is a good option when you have to transfer huge amounts of data between active and passive data centers in a compressed format with low network bandwidth overhead.
Replication
When you have message partitioning in Kafka, the partitioning strategy decision is made on the broker side. The decision on how the message is partitioned is made at the producer end. The broker stores the messages as they arrive. If you recall, the number of partitions configured for each topic is done in the Kafka broker.
Replication is one of the best features introduced in Kafka 0.8.0. Replication ensures that messages will be published and consumed in the event of broker failure. Both producers and consumers are replication-aware.
In replication, each partition has n replicas of a message (to handle n–1 failures). One replica acts as the leader. ZooKeeper knows who the replica leader is. The lead replica has a list of its follower replicas. The replicas store their part of the message in local logs.
Kafka has two replication modes : the synchronous replication process and the asynchronous replication process .
This is the synchronous replication process:
The producer identifies the lead replica from ZooKeeper.
The producer publishes the message.
When the message is published, it is written to the lead replica’s log.
The followers pull the message.
The leader waits for all the followers to acknowledge that the replica was written.
Once replications are complete, the leader sends the acknowledgment to the producer.
This is the asynchronous replication process:
The producer identifies the lead replica from ZooKeeper.
The producer publishes the message.
When the message is published, it is written to the lead replica’s log.
The followers pull the message.
Once the message is written on the lead replica, the leader sends the acknowledgment to the consumer.
As you can see, asynchronous mode is faster, but it is not fault tolerant.
Replication ensures strong durability and high availability. It guarantees that any successfully published message will not be lost and will be consumed, even in the event of broker failures.
Kafka Producers
As you saw, producers are applications that create messages and publish them to the broker. Normally, producers are front-end applications, web pages, web services, back-end services, proxies, and adapters to legacy systems. You can write Kafka producers in Java, Scala, C, and Python.
The process begins when the producer connects to any live node and requests metadata about the partitions’ leaders on a topic so as to put the message directly to the partition’s lead broker.
Producer API
First, you need to understand the required classes to write a producer:
Producer. The class is KafkaProducer <K, V> in org.apache.kafka.clients.producer.KafkaProducer
KafkaProducer is a type of Java generic written in Scala. K specifies the partition key type and V specifies the message value.
ProducerRecord. The class is ProducerRecord <K, V> in org.apache.kafka.clients.producer.ProducerRecord
This class encapsulates the data required for establishing the connection with the brokers (broker list, partition, message serializer, and partition key).
ProducerRecord is a type of Java generic written in Scala. K specifies the partition key type and V specifies the message value.
The Producer API encapsulates all the low-level producer implementations. The default mode is asynchronous, but you can specify in producer.type in the producer configuration.
Scala Producers
Now let’s write a simple Scala Kafka producer to send messages to the broker. The SimpleProducer class is used to create a message for a specific topic and to send it using message partitioning. This chapter’s examples were tested with Scala version 2.10.
Step 1. Import Classes
Import two classes:
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
Step 2. Define Properties
Define the following properties :
val props = new Properties()
props.put("metadata.broker.list",
"192.168.146.132:9093, 192.168.146.132:9094, 192.168.146.132:9095")
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("request.required.acks", "1")
producer = new KafkaProducer(props)
As you can see, the properties are as follows:
metadata.broker.list
Specifies the list of brokers that connect to the producer in the format [node:port, node:port]. As you know, Kafka determines the lead broker of the topic.
serializer.class
Specifies the serializer used while preparing the message for transmission from the producer to the broker.
In this example, we use the string encoder provided by Kafka. By default, the serializer for the key and message is the same, but you can also implement the custom serializer class by extending kafka.serializer.Encoder.
request.required.acks
Indicates to the broker to send an acknowledgment to the producer when a message is received.
1 means that the producer receives an acknowledgment once the lead replica has received the message. The default mode is “fire and forget,” so that it is not informed in the event of message loss.
Step 3. Build and Send the Message
The following code should be self-explanatory:
val runtime = new Date().toString
val msg = "Message Publishing Time - " + runtime
val data = new ProducerRecord[String, String](topic, msg)
producer.send(data)
Listing 8-1 shows the complete SimpleProducer.
Listing 8-1. SimpleProducer.scala
package apress.ch08
import java.util.{Date, Properties}
import apress.ch08.SimpleProducer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object SimpleProducer {
private var producer: KafkaProducer[String, String] = _
def main(args: Array[String]) {
val argsCount = args.length
if (argsCount == 0 || argsCount == 1)
throw new IllegalArgumentException(
"Provide topic name and Message count as arguments")
// Topic name and the message count to be published is passed from the
// command line
val topic = args(0)
val count = args(1)
val messageCount = java.lang.Integer.parseInt(count)
println("Topic Name - " + topic)
println("Message Count - " + messageCount)
val simpleProducer = new SimpleProducer()
simpleProducer.publishMessage(topic, messageCount)
}
}
class SimpleProducer {
val props = new Properties()
// Set the broker list for requesting metadata to find the lead broker
props.put("bootstrap.servers",
"192.168.146.132:9093, 192.168.146.132:9094, 192.168 146.132:9095")
//This specifies the serializer class for keys
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// 1 means the producer receives an acknowledgment once the lead replica
// has received the data. This option provides better durability as the
// client waits until the server acknowledges the request as successful.
props.put("request.required.acks", "1")
producer = new KafkaProducer(props)
private def publishMessage(topic: String, messageCount: Int) {
for (mCount <- 0 until messageCount) {
val runtime = new Date().toString
val msg = "Message Publishing Time - " + runtime
println(msg)
// Create a message
val data = new ProducerRecord[String, String](topic, msg)
// Publish the message
producer.send(data)
}
// Close producer connection with broker.
producer.close()
}
}
Step 4. Create the Topic
Before running the program, you must create the topic . You can create it using the API (amazing, isn’t it?) or from the command line:
[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic amazingTopic
Step 5. Compile the Producer
Compile the program with this command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# scalac . apress/ch08/SimpleProducer.scala
Step 6. Run the Producer
Run the SimpleProducer with the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# scala apress.ch08.SimpleProducer amazingTopic 10
This program takes two arguments: the topic name and the number of messages to publish.
Step 7. Run a Consumer
As you already saw, you can run the consumer program with the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic amazingTopic
Producers with Custom Partitioning
Let’s jump to the next level by writing another program that implements customized message partitioning . The example consists of recollecting the IPs visiting a web site, which are recorded and published. The message has three parts: timestamp, web site name, and IP address.
Step 1. Import Classes
Import these classes:
import java.util.Date
import java.util.Properties
import java.util.Random
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
Step 2. Define Properties
Define the following properties :
val props = new Properties()
props.put("metadata.broker.list",
"192.168.146.132:9092, 192.168.146.132:9093, 192.168.146.132:9094")
props.put("serializer.class", "kafka.serializer.StringEncoder")
// Defines the class to be used for determining the partition
// in the topic where the message needs to be sent.
props.put("partitioner.class", "apress.ch08.SimplePartitioner")
props.put("request.required.acks", "1")
producer = new KafkaProducer(props)
Step 3. Implement the Partitioner class
Write the SimplePartitioner class that extends the Partitioner abstract class. The class takes the key, in this case the IP address, and makes a modulo operation with the number of partitions. Listing 8-2 shows the SimplePartitioner code.
Listing 8-2. SimplePartitioner.scala
package apress.ch08
import java.util
import kafka.utils.VerifiableProperties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Partitioner
import org.apache.kafka.common.Cluster
object SimplePartitioner {
private var producer: KafkaProducer[String, String] = _
}
class SimplePartitioner extends Partitioner {
def partition(key: AnyRef, a_numPartitions: Int): Int = {
var partition = 0
val partitionKey = key.asInstanceOf[String]
val offset = partitionKey.lastIndexOf('.')
if (offset > 0) {
partition = java.lang.Integer.parseInt(partitionKey.substring(offset + 1)) %
a_numPartitions
}
Partition
}
override def partition(topic: String,
key: AnyRef,
keyBytes: Array[Byte],
value: AnyRef,
valueBytes: Array[Byte],
cluster: Cluster): Int = partition(key, 10)
override def close() {
}
override def configure(configs: util.Map[String, _]) {
}
}
Step 4. Build and Send the Message
Listing 8-3 presents the complete CustomPartitionProducer.scala.
Listing 8-3. CustomPartitionProducer.scala
package apress.ch08
import java.util.Date
import java.util.Properties
import java.util.Random
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import CustomPartitionProducer._
object CustomPartitionProducer {
var producer: KafkaProducer[String, String] = _
def main(args: Array[String]) {
val argsCount = args.length
if (argsCount == 0 || argsCount == 1)
throw new IllegalArgumentException(
"Please provide topic name and Message count as arguments")
// Topic name and the message count to be published is passed from the
// command line
val topic = args(0)
val count = args(1)
val messageCount = java.lang.Integer.parseInt(count)
println("Topic Name - " + topic)
println("Message Count - " + messageCount)
val simpleProducer = new CustomPartitionProducer()
simpleProducer.publishMessage(topic, messageCount)
}
}
class CustomPartitionProducer {
val props = new Properties()
// Set the broker list for requesting metadata to find the lead broker
props.put("metadata.broker.list",
"192.168.146.132:9092, 192.168.146.132:9093, 192.168.146.132:9094")
// This specifies the serializer class for keys
props.put("serializer.class", "kafka.serializer.StringEncoder")
// Defines the class to be used for determining the partition
// in the topic where the message needs to be sent.
props.put("partitioner.class", "apress.ch08.SimplePartitioner")
// 1 means the producer receives an acknowledgment once the lead replica
// has received the data. This option provides better durability as the
// client waits until the server acknowledges the request as successful.
props.put("request.required.acks", "1")
producer = new KafkaProducer(props)
private def publishMessage(topic: String, messageCount: Int) {
val random = new Random()
for (mCount <- 0 until messageCount) {
val clientIP = "192.168.14." + random.nextInt(255)
val accessTime = new Date().toString
val msg = accessTime + ",kafka.apache.org," + clientIP
println(msg)
// Create a ProducerRecord instance
val data = new ProducerRecord[String, String](topic, clientIP, msg)
// Publish the message
producer.send(data)
}
producer.close()
}
}
Step 5. Create the Topic
Before running the program, you must create the pageHits topic from the command line:
[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic pageHits
Step 6. Compile the Programs
Compile the programs with the following commands:
[restrada@localhost kafka_2.10.0-0.0.0.0]# scalac . apress/ch08/SimplePartitioner.scala
[restrada@localhost kafka_2.10.0-0.0.0.0]# scalac . apress/ch08/CustomPartitionProducer.scala
Step 7. Run the Producer
Run CustomPartitionProducer with the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# scala apress.ch08.CustomPartitionProducer pageHits 100
The program takes two arguments: the topic name and the number of messages to publish.
Step 8. Run a Consumer
As you already saw, you can run the consumer program with the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic pageHits
Producer Properties
To recapitulate the section, Table 8-2 lists the most popular producer properties.7
Table 8-2. Kafka Producer Most Important Properties
Name | Type | Default | Description |
---|---|---|---|
bootstrap.servers | list | The producer uses this property to get metadata about topics, partitions, and replicas. The format is host1:port1,host2:port2. | |
key.serializer | class | Specifies the serializer class for the messages. The default encoder accepts and returns the same byte. | |
value.serializer | class | Specifies the serializer value for the messages. | |
acks | string | 1 | Controls when the producer request is considered complete and when the producer receives an acknowledgment from the broker: |
0 = producer will never wait for an acknowledgment from the broker; lowest latency, but with weakest durability. | |||
1 = producer receives an acknowledgment once the lead replica has received the data; better durability as the client waits until the server acknowledges a successful request. | |||
–1 = producer will receive an acknowledgment once all the in-sync replicas have received the data; the best durability. | |||
buffer.memory | long | 33554432 | The total bytes of memory that the producer can use to buffer records waiting to be sent to the server. |
compression.type | string | none | Specifies the compression codec for all data generated by this producer. The values accepted are none, gzip, and snappy. |
retries | int | 0 | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. |
Kafka Consumers
As you saw, consumers are applications that consume the messages published by the broker. Normally, producers are real-time analysis applications, near real-time analysis applications, NoSQL solutions, data warehouses, back-end services, and subscriber-based solutions. You can write Kafka producers in Java, Scala, C, and Python.
The consumer subscribes for the message consumption on a specific topic on the Kafka broker. The consumer then makes a fetch request to the lead broker to consume the message partition by specifying the message offset. The consumer works in the pull model and always pulls all available messages after its current position.
Consumer API
In the Kafka 0.8.0 version, there were two API types for consumers: the high-level API and the low-level API. In version 0.10.0, they are unified.
To use the consumer API with Maven, you should import the following dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
The consumer API classes with SBT are imported, as follows:
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.0.0"
The consumer API classes with Gradle are imported:
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.0'
Simple Scala Consumers
Let’s write a single threaded Scala consumer using the Consumer API for consuming the messages from a topic. This SimpleConsumer is used to fetch messages from a topic and consume them. We assume that there is a single partition in the topic.
Step 1. Import Classes
Import these classes:
import java.util
import java.util.Properties
import kafka.consumer.ConsumerConfig
Step 2. Define Properties
Define the following properties :
val props = new Properties()
props.put("zookeeper.connect", zookeeper)
props.put("group.id", groupId)
props.put("zookeeper.session.timeout.ms", "500")
props.put("zookeeper.sync.time.ms", "250")
props.put("auto.commit.interval.ms", "1000")
new ConsumerConfig(props)
Now let’s go over the major properties mentioned in the code:
zookeeper.connect. Specifies the ZooKeeper <node:port> connection used to find the ZooKeeper running instance in the cluster. ZooKeeper is used to store offsets of messages consumed for a specific topic and partition by this consumer group.
group.id. Specifies the consumer group name (shared by all the consumers in the group). This is the process name used by ZooKeeper to store offsets.
zookeeper.session.timeout.ms. Specifies the ZooKeeper session timeout in milliseconds. Represents the amount of time Kafka will wait for a ZooKeeper response to a request before giving up and continuing with consuming messages.
zookeeper.sync.time.ms. Specifies the ZooKeeper sync time (in milliseconds) between the leader and the followers.
auto.commit.interval.ms. Defines the frequency (in milliseconds) at which consumer offsets get committed.
Step 3. Code the SimpleConsumer
Write the SimpleConsumer class , as shown in Listing 8-4.
Listing 8-4. SimpleConsumer.scala
package apress.ch08
import java.util
import java.util.Properties
import kafka.consumer.ConsumerConfig
import SimpleConsumer._
import scala.collection.JavaConversions._
object SimpleConsumer {
private def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
val props = new Properties()
props.put("zookeeper.connect", zookeeper)
props.put("group.id", groupId)
props.put("zookeeper.session.timeout.ms", "500")
props.put("zookeeper.sync.time.ms", "250")
props.put("auto.commit.interval.ms", "1000")
new ConsumerConfig(props)
}
def main(args: Array[String]) {
val zooKeeper = args(0)
val groupId = args(1)
val topic = args(2)
val simpleHLConsumer = new SimpleConsumer(zooKeeper, groupId, topic)
simpleHLConsumer.testConsumer()
}
}
class SimpleConsumer(zookeeper: String, groupId: String, private val topic: String) {
private val consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId))
def testConsumer() {
val topicMap = new util.HashMap[String, Integer]()
topicMap.put(topic, 1)
val consumerStreamsMap = consumer.createMessageStreams(topicMap)
val streamList = consumerStreamsMap.get(topic)
for (stream <- streamList; aStream <- stream)
println("Message from Single Topic :: " + new String(aStream.message()))
if (consumer != null) {
consumer.shutdown()
}
}
}
Step 4. Create the Topic
Before running the program, you must create the amazingTopic topic from the command line:
[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic amazingTopic
Step 5. Compile the Program
Compile the program with the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# scalac . apress/ch08/SimpleConsumer.scala
Step 6. Run the Producer
Run the SimpleProducer with the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# scala apress.ch08.SimpleProducer amazingTopic 100
Step 7. Run the Consumer
Run SimpleConsumer with the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# scala apress.ch08.SimpleConsumer localhost:2181 testGroup amazingTopic
The SimpleConsumer class takes three arguments: the ZooKeeper connection string in <host:port> form, the unique group id, and the Kafka topic name.
Multithread Scala Consumers
A multithreaded consumer API design is based on the number of partitions in the topic and has a one-to-one mapping approach between the thread and the partitions in the topic.
If you don’t have the one-to-one relation, conflicts may occur, such as a thread that never receives a message or a thread that receives messages from multiple partitions. Let’s program MultiThreadConsumer.
Step 1. Import Classes
Import these classes:
import java.util
import java.util.Properties
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kafka.consumer.ConsumerConfig
Step 2. Define Properties
Define the following properties :
val props = new Properties()
props.put("zookeeper.connect", zookeeper)
props.put("group.id", groupId)
props.put("zookeeper.session.timeout.ms", "500")
props.put("zookeeper.sync.time.ms", "250")
props.put("auto.commit.interval.ms", "1000")
new ConsumerConfig(props)
Step 3. Code the MultiThreadConsumer
Write the MultiThreadConsumer class , as shown in Listing 8-5.
Listing 8-5. MultiThreadConsumer.scala
package apress.ch08
import java.util
import java.util.Properties
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kafka.consumer.ConsumerConfig
import MultiThreadConsumer._
import scala.collection.JavaConversions._
object MultiThreadConsumer {
private def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
val props = new Properties()
props.put("zookeeper.connect", zookeeper)
props.put("group.id", groupId)
props.put("zookeeper.session.timeout.ms", "500")
props.put("zookeeper.sync.time.ms", "250")
props.put("auto.commit.interval.ms", "1000")
new ConsumerConfig(props)
}
def main(args: Array[String]) {
val zooKeeper = args(0)
val groupId = args(1)
val topic = args(2)
val threadCount = java.lang.Integer.parseInt(args(3))
val multiThreadHLConsumer = new MultiThreadConsumer(zooKeeper, groupId, topic)
multiThreadHLConsumer.testMultiThreadConsumer(threadCount)
try {
Thread.sleep(10000)
} catch {
case ie: InterruptedException =>
}
multiThreadHLConsumer.shutdown()
}
}
class MultiThreadConsumer(zookeeper: String, groupId: String, topic: String) {
private var executor: ExecutorService = _
private val consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper,
groupId))
def shutdown() {
if (consumer != null) consumer.shutdown()
if (executor != null) executor.shutdown()
}
def testMultiThreadConsumer(threadCount: Int) {
val topicMap = new util.HashMap[String, Integer]()
// Define thread count for each topic
topicMap.put(topic, threadCount)
// Here we have used a single topic but we can also add
// multiple topics to topicCount MAP
val consumerStreamsMap = consumer.createMessageStreams(topicMap)
val streamList = consumerStreamsMap.get(topic)
// Launching the thread pool
executor = Executors.newFixedThreadPool(threadCount)
// Creating an object messages consumption
var count = 0
for (stream <- streamList) {
val threadNumber = count
executor.submit(new Runnable() {
def run() {
val consumerIte = stream.iterator()
while (consumerIte.hasNext)
println("Thread Number " + threadNumber + ": " + new String(consumerIte.next().message()))
println("Shutting down Thread Number: " + threadNumber)
}
})
count += 1
}
if (consumer != null) consumer.shutdown()
if (executor != null) executor.shutdown()
}
}
Step 4. Create the Topic
Before running the program, you must create the amazingTopic topic from the command line:
[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic amazingTopic
Step 5. Compile the Program
Compile the program with the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# scalac . apress/ch08/MultiThreadConsumer.scala
Step 6. Run the Producer
Run SimpleProducer with the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# scala apress.ch08.SimpleProducer amazingTopic 100
Step 7. Run the Consumer
Run MultiThreadConsumer with the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# scala apress.ch08.MultiThreadConsumer localhost:2181 testGroup amazingTopic 4
MultiThreadConsumer takes four arguments:
ZooKeeper connection string in <host:port> form
An unique group id
Kafka topic name
Thread count
This program prints all partitions of messages associated with each thread.
Consumer Properties
To recapitulate the section, Table 8-3 lists the most popular consumer properties.8
Table 8-3. Kafka Consumer Most Important Properties
Name | Default | Type | Description |
---|---|---|---|
bootstrap.servers | list | A list of pairs host/port to establishing the initial connection to the cluster. Should be in the form host1:port1,host2:port2, and so forth. | |
fetch.min.bytes | 1 | int | The minimum amount of data the server should return for a fetch request. Setting this to something greater than 1 causes the server to wait to accumulate larger data amounts, which improves server throughput a bit at the cost of additional latency. |
group.id | “” | string | A unique string that identifies the consumer group that this consumer belongs to. |
heartbeat.interval.ms | 3000 | int | The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. |
key.deserializer | class | A deserializer class for key that implements the Deserializer interface. | |
max.partition.fetch.bytes | 1048576 | int | The maximum amount of data per partition that the server will return. The maximum total memory used for a request is #partitions * max.partition.fetch.bytes. |
session.timeout.ms | 30000 | int | Timeout used to detect failures when using Kafka’s group management facilities. When a consumer’s heartbeat is not received within the session timeout, the broker will mark the consumer as failed and rebalance the group. |
value.deserializer | class | Deserializer class for value that implements the Deserializer interface. |
Kafka Integration
When processing small amounts of data in real time, it is not a challenge when using Java Messaging Service (JMS); but learning from LinkedIn’s experience, you see that this processing system has serious performance limitations when dealing with large data volumes. Moreover, this system is a nightmare when trying to scale horizontally, because it can’t.
Integration with Apache Spark
In the next example, you need a Kafka cluster up and running. Also, you need Spark installed on your machine, ready to be deployed.
Apache Spark has a utility class to create the data stream to be read from Kafka. But, as with any Spark project, you first need to create SparkConf and the Spark StreamingContext.
val sparkConf = new SparkConf().setAppName("SparkKafkaTest")
val jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10))
Create the hash set for the topic and Kafka consumer parameters:
val topicsSet = new HashSet[String]()
topicsSet.add("mytesttopic")
val kafkaParams = new HashMap[String, String]()
kafkaParams.put("metadata.broker.list", "localhost:9092")
You can create a direct Kafka stream with brokers and topics:
val messages = KafkaUtils.createDirectStream(
jssc,
classOf[String],
classOf[String],
classOf[StringDecoder],
classOf[StringDecoder],
kafkaParams,
topicsSet)
With this stream, you can run the regular data processing algorithms .
Create a Spark StreamingContext that sets up the entry point for all stream functionality. Then set up the stream processing batch interval at 10 seconds.
Create the hash set for the topics to read from.
Set the parameters for the Kafka producer using a hash map. This map must have a value for metadata.broker.list, which is the comma-separated list of host and port numbers.
Create the input DStream using the KafkaUtils class.
Once you have the DStream ready, you can apply your algorithms to it. Explaining how to do that is beyond the scope of this book.
Spark Streaming is explained in detail in Chapter 6.
Kafka Administration
There are numerous tools provided by Kafka to administrate features such as cluster management, topic tools, and cluster mirroring. Let’s look at these tools in detail.
Cluster Tools
As you already know, when replicating multiple partitions, you can have replicated data. Among replicas, one acts as leader and the rest as followers. When there is no leader, a follower takes leadership.
When the broker has to be shut down for maintenance activities, the new leader is elected sequentially. This means significant I/O operations on ZooKeeper. With a big cluster, this means a delay in availability.
To reach high availability, Kafka provides tools for shutting down brokers. This tool transfers the leadership among the replicas or to another broker. If you don’t have an in-sync replica available, the tool fails to shut down the broker to ensure data integrity.
This tool is used through the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-run-class.sh kafka.admin. ShutdownBroker --zookeeper <zookeeper_host:port/namespace> --broker <brokerID> --num.retries 3 --retry.interval.ms 100
The ZooKeeper URL and the broker id are mandatory parameters. There are other optional parameters; for example, num.retries (the default value is 0) and retry.interval.ms (the default value is 1000).
When the server is stopped gracefully, it syncs all of its logs to disk to avoid any log recovery when it is restarted again, because log recovery is a time-consuming task. Before shutdown, it migrates the leader partitions to other replicas; so it ensures low downtime for each partition.
Controlled shutdown is enabled in this way:
controlled.shutdown.enable=true
When there is a big cluster, Kafka ensures that the lead replicas are equally distributed among the broker. If a broker fails in shutdown, this distribution cannot be balanced.
To maintain a balanced distribution, Kafka has a tool to distribute lead replicas across the brokers in the cluster. This tool’s syntax is as follows:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-preferred-replica-election.sh --zookeeper <zookeeper_host:port/namespace>
This tool updates the ZooKeeper path with a list of topic partitions whose lead replica needs to be moved. If the controller finds that the preferred replica is not the leader, it sends a request to the broker to make the preferred replica the partition leader. If the preferred replica is not in the ISR list, the controller fails the operation to avoid data loss.
You can specify a JSON list for this tool in this format:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-preferred-replicaelection.
sh --zookeeper <zookeeper_host:port/namespace> --path-to-jsonfile
topicPartitionList.json
The following is the topicPartitionList.json file format:
{"partitions":
[
{"topic": "AmazingTopic", "partition": "0"},
{"topic": "AmazingTopic", "partition": "1"},
{"topic": "AmazingTopic", "partition": "2"},
{"topic": "reAmazingTopic", "partition": "0"},
{"topic": "reAmazingTopic", "partition": "1"},
{"topic": "reAmazingTopic", "partition": "2"},
]
}
Adding Servers
When you add servers to the cluster, a unique broker id needs to be assigned to the new server. This way, adding a server doesn’t assign data partitions. So, a new server won’t perform any work until new partitions are migrated to it or new topics are created.
Let’s discuss moving partitions between brokers. There is a tool that reassigns partitions in bin/kafka-reassign-partitions.sh. This tool takes care of everything. When migrating, Kafka makes the new server a follower of the migrating partition. This enables the new server to fully replicate the existing data in the partition.
The reassign-partition tool runs in three different modes:
--generate. Moves the partitions based on the topics and the brokers list shared with the tool.
--execute. Moves the partitions based on the user plan specified in --reassignment-json-file.
--verify. Moves the partitions based on the status (successful/failed/in progress) of the last --execute.
The partition reassignment tool could be used to move selected topics form current brokers to new brokers. The administrator provides a list of topics and a target list of new broker ids. This tool distributes the partitions of a given topic among the new brokers. For example:
[restrada@localhost kafka_2.10.0-0.0.0.0]# cat topics-for-new-server.json
{"partitions":
[{"topic": "amazingTopic",
{"topic": "reAmazingTopic"}],
"version":1
}
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-for-new-server.json --broker-list "4,5" -–generate new-topic-reassignment.json
This command generates the assignment (new-topic-reassignment.json) plan to move all partitions for topics amazingTopic and reAmazingTopic to the new set of brokers having ids 4 and 5. At the end of this move, all partitions will only exist on brokers 5 and 6. To initiate the assignment with the kafka-reassign-partitions.sh tool, use this:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-reassign-partitions. sh --zookeeper localhost:2181 --reassignment-json-file new-topic-reassignment.json --execute
You could use this tool to selectively move the partitions from the existing broker to the new broker:
[restrada@localhost kafka_2.10.0-0.0.0.0]# cat partitions-reassignment.json
{"partitions":
[{"topic": "amazingTopic",
"partition": 1,
"replicas": [1,2,4] }],
}],
"version":1
}
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file partitions-reassignment.json --execute
This command moves some replicas for certain partitions to the new server. Once the reassignment is done, the operation can be verified:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-reassign-partitions. sh --zookeeper localhost:2181 --reassignment-json-file new-topic-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [amazingTopic,0] completed successfully
Reassignment of partition [amazingTopic,1] is in progress
Reassignment of partition [amazingTopic,2] completed successfully
Reassignment of partition [reAmazingTopic,0] completed successfully
Reassignment of partition [reAmazingTopic,1] completed successfully
Reassignment of partition [reAmazingTopic,2] is in progress
To separate a server from the Kafka cluster, you have to move the replica for all partitions hosted on the server to be detached from the remaining brokers. You can also use the kafka-reassign-partitions.sh tool to increase the partition’s replication factor, as follows:
[restrada@localhost kafka_2.10.0-0.0.0.0]# cat increase-replication-factor.json {"partitions":[{"topic":"amazingTopic","partition":0,"replicas":[2,3]}],
"version":1 }
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
This command assumes that partition 0 of the amazingTopic has a replication factor of 1 (the replica is on broker 2); and now it increases the replication factor to 2 and also creates the new replica on the next server, which is broker 3.
Kafka Topic Tools
When Kafka creates topics, it uses the default number of partitions (1) and the default replication factor (1). In real life, you need to specify these parameters.
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181/chroot --replication-factor 3 --partitions 10 --topic amazingTopic
You can interpret this command as follows: replication factor 3 means that up to two servers can fail before data access is lost. Ten partitions are defined for a topic, which means that the full data set will be handled by no more than ten brokers, excluding replicas.
To alter existent Kafka topics , use this command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --partitions 20 --topic amazingTopic
With this command, we are adding ten more partitions to the topic created in the previous example.
To delete a topic, use the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --delete --zookeeper localhost:2181/chroot --topic amazingTopic
Using the kafka-topics.sh utility, the configuration can also be added to the Kafka topic, as follows:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --topic amazingTopic --config <key>=<value>
To remove a configuration from the topic, use the following command:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --topic amazingTopic --deleteconfig <key>=<value>
There is a utility to search for the list of topics on the server. The list tool provides a listing of topics and information about partitions, replicas, and leaders by querying ZooKeeper.
The following command obtains a list of topics:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
The table obtained with this command has the following headers :
leader: A randomly selected node for a specific portion of the partitions; responsible for the reads and writes on this partition.
replicas: The list of nodes that holds the log for a specified partition.
isr: The subset of the in-sync list of replicas that is currently alive and in-sync with the leader.
Cluster Mirroring
Mirroring is used to create a replication of an existing cluster; for example, replicating an active data center into a passive data center. The mirror-maker tool mirrors the source cluster into a target cluster.
To mirror the source cluster, bring up the target cluster and start the mirror-maker processes, as follows:
[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-run-class.sh kafka.tools. MirrorMaker --consumer.config sourceClusterConsumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"
There are also tools to check the position of the consumer while mirroring or in general. The tool shows the position of all the consumers in a consumer group and how far they are to the log’s end; it also indicates how well cluster mirroring is performing. This tool is used as follows:
[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-run-class.sh kafka.tools. ConsumerOffsetChecker --group MirrorGroup --zkconnect localhost:2181 --topic kafkatopic
Summary
During this complete journey through Apache Kafka, we touched upon many important facts. You learned how to install Kafka, how to set up a Kafka cluster with single and multiple brokers on a single node, how to run command-line producers and consumers, and how to exchange some messages. You discovered important Kafka broker settings. You also learned the reason why Kafka was developed, its installation procedures, and its support for different types of clusters.
We explored the Kafka’s design approach and wrote a few basic producers and consumers. Finally, we discussed Kafka’s integration with technologies, such as Spark. In the next chapter, we review all the enterprise integration patterns.
Footnotes
6 The complete list is in http://kafka.apache.org/documentation.html#brokerconfigs
7 The complete list is in http://kafka.apache.org/documentation.html#producerconfigs
8 The complete list is in http://kafka.apache.org/documentation.html#consumerconfigs