© Raul Estrada and Isaac Ruiz 2016

Raul Estrada and Isaac Ruiz, Big Data SMACK, 10.1007/978-1-4842-2175-4_8

8. The Broker: Apache Kafka

Raul Estrada and Isaac Ruiz1

(1)Mexico City, Mexico

The goal of this chapter is to get you familiar with Apache Kafka and show you how to solve the consumption of millions of messages in a pipeline architecture. Here we show some Scala examples to give you a solid foundation for the different types of implementations and integrations for Kafka producers and consumers.

In addition to the explanation of the Apache Kafka architecture and principles, we explore Kafka integration with the rest of the SMACK stack, specifically with Spark and Mesos. At the end of the chapter, we show how to administer Apache Kafka.

This chapter covers the following topics:

  • Kafka introduction

  • Kafka installation

  • Kafka in cluster

  • Kafka architecture

  • Kafka producers

  • Kafka consumers

  • Kafka integration

  • Kafka administration

Kafka Introduction

The Apache Kafka author, Jay Kreps, who took a lot of literature courses in the college, if the project is mainly optimized for writing (in this book when we say “optimized” we mean 2 million writes per second on three commodity machines) when he open sourced it, he thought it should have a cool name: Kafka, in honor of Franz Kafka, who was a very prolific author despite dying at 40 age.

Nowadays, real-time information is continuously generated. This data needs easy ways to be delivered to multiple types of receivers. Most of the time the information generators and the information consumers are inaccessible to each other; this is when integration tools enter the scene.

In the 1980s, 1990s and 2000s, the large software vendors whose names have three letters (IBM, SAP, BEA, etc.) and more (Oracle, Microsoft, Google) have found a very well-paid market in the integration layer, the layer where live: enterprise service bus, SOA architectures, integrators, and other panaceas that cost several millions of dollars.

Now, all traditional applications tend to have a point of integration between them, therefore, creating the need for a mechanism for seamless integration between data consumers and data producers to avoid any kind of application rewriting at either end.

As we mentioned in earlier chapters, in the big data era, the first challenge was the data collection and the second challenge was to analyze that huge amount of data.

Message publishing is the mechanism for connecting heterogeneous applications through sending messages among them. The message router is known as message broker. Apache Kafka is a software solution to quickly route real-time information to consumers.

The message broker provides seamless integration, but there are two collateral objectives: the first is to not block the producers and the second is to not let the producers know who the final consumers are.

Apache Kafka is a real-time publish-subscribe solution messaging system: open source, distributed, partitioned, replicated, commit-log based with a publish-subscribe schema. Its main characteristics are as follows:

  • Distributed. Cluster-centric design that supports the distribution of the messages over the cluster members, maintaining the semantics. So you can grow the cluster horizontally without downtime.

  • Multiclient. Easy integration with different clients from different platforms: Java, .NET, PHP, Ruby, Python, etc.

  • Persistent. You cannot afford any data lost. Kafka is designed with efficient O(1), so data structures provide constant time performance no matter the data size.

  • Real time. The messages produced are immediately seen by consumer threads; these are the basis of the systems called complex event processing (CEP) .

  • Very high throughput. As we mentioned, all the technologies in the stack are designed to work in commodity hardware. Kafka can handle hundreds of read and write operations per second from a large number of clients.

Figure 8-1 shows an Apache Kafka messaging system typical scenario.

A420086_1_En_8_Fig1_HTML.jpg
Figure 8-1. Apache Kafka typical scenario

On the producers’ side, you can find several types of actors , for example:

  • Adapters. Generate transformation information; for example, a database listener or a file system listener.

  • Logs. The log files of application servers and other systems, for example.

  • Proxies. Generate web analytics information.

  • Web pages. Front-end applications generating information.

  • Web services. The service layer; generate invocation traces.

You could group the clients on the customer side as three types:

  • Offline. The information is stored for posterior analysis; for example, Hadoop and data warehouses.

  • Near real time. The information is stored but it is not requested at the same time; for example, Apache Cassandra, and NoSQL databases.

  • Real time. The information is analyzed as it is generated; for example, an engine like Apache Spark or Apache Storm (used to make analysis over HDFS).

Born in the Fast Data Era

As we have mentioned, data is the new ingredient of Internet-based systems. Simply, a web page needs to know user activity, logins, page visits, clicks, scrolls, comments, heat zone analysis, shares, and so forth.

Traditionally, the data was handled and stored with traditional aggregation solutions. Due to the high throughput, the analysis could not be done until the next day. Today, yesterday’s information often useless. Offline analysis such as Hadoop is being left out of the new economy.

There are several examples of Apache Kafka use cases:

  • Web searches based on relevance

  • Application security: login analysis, brute force attack detection, systemic denial of service attack detection

  • Recommendations based on popularity, correlation

  • Sentiment analysis, tendencies, segmentation

  • Collecting data from device sensors or sophisticated sensors like surveillance cameras to GPS cell phone sensors; passing through sensors: light, temperature, pressure, humidity

  • Real-time merchandising to a huge population

  • Collecting logs from business systems such as application server logs, CRM, ERP, and so forth.

In all of these cases, the analysis is done in real time or it is never done, without middle points.

Apache Kafka usually is compared to traditional messaging systems such as ActiveMQ or RabitMQ. The difference is the data volume that Kafka can handle in real time.

Use Cases

Well, you have seen some business scenarios that are solved with Apache Kafka. In which layer in the architecture should you put Kafka? Here are some popular (real examples with real enterprises) use cases:

  • Commit logs. What happens when your system does not have a log system? In these cases, you can use Kafka. Many times systems do not have logs, simply because (so far) it’s not possible to handle such a large data volume. The stories of application servers falling simply because they could not write their logs correctly with the verbosity needed by the business are more common than it seems. Kafka can also help to start and restart fallen log servers.

  • Log aggregation. Contrary to what people believe, much of the work of the onsite support team is on log analysis. Kafka not only provides a system for log management, but it can also handle heterogeneous aggregation of several logs. Kafka can physically collect the logs and remove cumbersome details such as file location or format. In addition, it provides low latency and supports multiple data sources while making distributed consumption.

  • Messaging. Systems are often heterogeneous, and instead of rewriting them, you have to translate between them. Often the manufacturer’s adapters are unaffordable to a company; for such cases, Kafka is the solution because it is open source and can handle more volume than many traditional commercial brokers.

  • Stream processing. We could write an entire book on this topic. In some business cases, the process of collecting information consists of several stages. A clear example is when a broker is used not only to gather information but also to transform it. This is the real meaning and success of the Enterprise Service Bus (ESB) architectures. With Kafka, the information can be collected and further enriched; this (very well paid) enrichment process is known as stream processing.

  • Record user activity. Many marketing and advertising companies are interested in recording all the customer activity on a web page. This seems a luxury, but until recently, it was very difficult to keep track of the clicks that a user makes on a site. For those tasks where the data volume is huge, you can use Kafka for real-time process and monitoring.

All of this seems good, but who is using Kafka today? Here are some examples:

  • LinkedIn.1 Used for activity stream and operational metrics. We cannot imagine the today’s LinkedIn newsfeed without Kafka.

  • Uber.2 Relied on Kafka data feeds to bulk-load log data into Amazon S3 to stream change-data logs from the local data centers .

  • Twitter.3 Handling five billion sessions a day in real time requires Kafka to handle their stream processing infrastructure.

  • Netflix.4 Kafka is the backbone of Netflix’s data pipeline for real-time monitoring and event processing.

  • Spotify.5 Kafka is used as part of their log delivery system.

  • Yahoo. Used by the media analytics team as a real-time analytics pipeline. Their cluster handles 20Gbps of compressed data.

Kafka Installation

Go to the Apache Kafka home page at http://kafka.apache.org/downloads , as shown in Figure 8-2.

A420086_1_En_8_Fig2_HTML.jpg
Figure 8-2. Apache Kafka download page

The Apache Kafka current version available as a stable release is 0.10.0.0. The major limitation with Kafka since 0.8.x is that it is not backward-compatible. So, you cannot replace this version for one prior to 0.8.

Once you download the available release, let’s proceed with the installation.

Installing Java

You need to install Java 1.7 or later. Download the latest JDK from Oracle’s web site at http://www.oracle.com/technetwork/java/javase/downloads/index.html .

For example, to install in Linux:

  1. Change the file mode:

    [restrada@localhost opt]# chmod +x jdk-8u91-linux-x64.rpm
  2. Change the directory in which you want to perform the installation:

    [restrada@localhost opt]# cd <directory path name>
  3. To install the software in the /usr/java/ directory, type the following command:

    [restrada@localhost opt]# cd /usr/java
  4. Run the installer using this command:

    [restrada@localhost java]# rpm -ivh jdk-8u91-linux-x64.rpm
  5. Finally, add the JAVA_HOME environment variable. This command will write the JAVA_HOME environment variable to the /etc/profile file:

    [restrada@localhost opt]# echo "export JAVA_HOME=/usr/java/jdk1.8.0_91" >> /etc/profile

Installing Kafka

To install in Linux , take the following steps.

  1. Extract the downloaded kafka_2.10-0.10.0.0.tgz file:

    [restrada@localhost opt]# tar xzf kafka_2.10-0.10.0.0.tgz
  2. Add the Kafka bin directory to PATH as follows:

    [restrada@localhost opt]# export KAFKA_HOME=/opt/kafka_2.10-0.10.0.0

    [restrada@localhost opt]# export PATH=$PATH:$KAFKA_HOME/bin

Importing Kafka

To include Kafka in our programming projects, we include the dependencies.

With SBT:

// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10
libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.10.0.0"

With Maven:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.0.0</version>
</dependency>

With Gradle:

// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10
compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.10.0.0'

Kafka in Cluster

We are ready to program with the Apache Kafka publisher-subscriber messaging system. In Kafka, there are three types of clusters:

  • Single node–single broker

  • Single node–multiple broker

  • Multiple node–multiple broker

A Kafka cluster has five main components:

  • Topic. A category or feed name in which messages are published by the message producers. Topics are partitioned; each partition is represented by an ordered immutable messages sequence. The cluster has a partitioned log for each topic. Each message in the partition has a unique sequential id called an offset.

  • Broker. A Kafka cluster has one or more physical servers in which each one may have one or more server processes running. Each server process is called a broker. The topics live in the broker processes.

  • Producer. Publishes data to topics by choosing the appropriate partition in the topic. For load balancing, the messages allocation to the topic partition can be done in a round-robin mode or by defining a custom function.

  • Consumer. Applications or processes subscribed to topics and process the feed of published messages.

  • ZooKeeper. ZooKeeper is the coordinator between the broker and the consumers. ZooKeeper coordinates the distributed processes through a shared hierarchical name space of data registers; these registers are called znodes.

    There are two differences between ZooKeeper and a file system:

    • Every znode has data associated and is designed to store coordination data.

    • Znodes are limited on the amount of data that they can have.

Single Node–Single Broker Cluster

An example diagram of a single node–single broker cluster is shown in Figure 8-3.

A420086_1_En_8_Fig3_HTML.jpg
Figure 8-3. Single node–single broker Kafka cluster example

First, start the ZooKeeper server. Kafka provides a simple ZooKeeper configuration file to launch a single ZooKeeper instance. To install the ZooKeeper instance, use the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties

The following are the main properties defined in zookeeper.properties:

  • dataDir. The data directory where ZooKeeper is stored:

    dataDir=/tmp/zookeeper
  • clientPort. The listening port for client requests. By default, ZooKeeper listens in the 2181 TCP port:

    clientPort=2181
  • maxClientCnxns. The limit per IP for the number of connections (0 = unbounded):

    maxClientCnxns=0

For more information about Apache ZooKeeper, visit the project page at http://zookeeper.apache.org/ .

Starting the Broker

After start ZooKeeper , start the Kafka broker with the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-server-start.sh config/server.properties

The following are the main properties defined in server.properties:

  • Broker id. The unique positive integer id for each broker.

    Broker.id=0
  • Port. The port where the socket server listens on:

    port=9092
  • Log dir. The directory to store log files:

    log.dir=/tmp/kafka10-logs
  • Num partitions. The number of log partitions per topic:

    num.partitions=2
  • ZooKeeper connect. The ZooKeeper connection URL:

    zookeeper.connect=localhost:2181

Creating a Topic

Kafka has a command to create topics. Let’s create a topic called amazingTopicwith one partition and one replica:

[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic amazingTopic              

Obtain the output, as follows:

Created topic "amazingTopic".

These are the parameters:

  • --replication-factor 1 indicates one replica

  • --partition 1 indicates one partition

  • --zookeeper localhost:2181 indicates the ZooKeeper URL

To obtain the list of topics on any Kafka server, use the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --list
--zookeeper localhost:2181

Obtain the output:

amazingTopic

Starting a Producer

Kafka has a command line to start producers . It accepts input from the command line and publishes them as messages. By default, each new line is considered a message.

[restrada@localhost kafka_2.10.0-0.0.0.']# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic amazingTopic

Two parameters are required:

  • broker-list. The URL of the brokers to be connected.

  • topic. The name of the topic used to send a message to its subscribers.

Now type the following:

Valar morghulis [Enter]
Valar dohaeris [Enter]

You get this output:

Valar morghulis
Valar dohaeris

The following are the main properties defined in producer.properties :

  • Metadata broker list. A list of brokers used for bootstrapping knowledge about the rest of the cluster.

    Format: host1:port1, host2:port2

    metadata.broker.list=localhost:9092
  • Compression codec. The compression codec for data generated.

    Example: none, gzip, snappy

    compression.codec=none

Later on this chapter, you see how to write producers.

Starting a Consumer

Kafka has a command line to start a message consumer client . It shows the output at the command line as soon as it subscribes to the topic in the broker:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic amazingTopic --from-beginning

As you request from-beginning, you see the following output:

Valar morghulis
Valar dohaeris

The following is the main property defined in consumer.properties:

  • Group id. A string that identifies a set of consumers in the same group:

    group.id=test-consumer-group

Later on this chapter, you will learn how to write consumers.

Now let’s play with the new toy architecture. Open each technology in a different console: ZooKeeper, broker, producer, and consumer. Type the commands in the producer and watch them displayed in the consumer.

If you don’t recall how to run producers or consumers, running the command with no arguments will show the possible parameters.

Single Node–Multiple Broker Cluster

An example diagram of a single node–multiple broker cluster is shown in Figure 8-4.

A420086_1_En_8_Fig4_HTML.jpg
Figure 8-4. Single node–multiple broker Kafka cluster example

As usual, start the ZooKeeper server:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties

You need a different server.properties file for every broker. Let’s call them: server-1.properties, server-2.properties, server-3.properties, and so forth.

In server-1.properties, specify the following:

  • broker.id=1

  • port=9093

  • log.dir=/tmp/kafka-logs-1

Similarly, on server-2.properties, specify the following:

  • broker.id=2

  • port=9094

  • log.dir=/tmp/kafka-logs-2

Follow the same procedure for server-3.properties:

  • broker.id=3

  • port=9095

  • log.dir=/tmp/kafka-logs-3

Starting the Brokers

With ZooKeeper running , start the Kafka brokers with the following commands:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-server-start.sh config/server-1.properties

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-server-start.sh config/server-2.properties

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-server-start.sh config/server-3.properties

Creating a Topic

Using the command to create topics, create a topic called reAmazingTopic(re stands for replicated). It has two partitions and two replicas:

[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic reAmazingTopic              

Obtain the output, as follows:

Created topic "reAmazingTopic".

Starting a Producer

Now that you know the command to start producers , indicating more brokers in the broker-list is a trivial task:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9093, localhost:9094, localhost:9095 --topic reAmazingTopic

Yes, our architects always have weird requirements; if we need to run multiple producers connecting to different broker combinations, we need to specify a different broker-list for each producer.

Starting a Consumer

To start a consumer , use the same Kafka command that you already know:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic reAmazingTopic

Multiple Node–Multiple Broker Cluster

An example of a multiple node–multiple broker cluster is shown in Figure 8-5.

A420086_1_En_8_Fig5_HTML.jpg
Figure 8-5. Multiple node–multiple broker Kafka cluster

Here you are in front of the real power of the cluster. Kafka should be installed in every machine in the cluster. Every physical server could have one or many Kafka brokers. All the nodes on the same cluster should connect to the same ZooKeeper.

But don’t worry, all the previous commands remain equal. The commands for ZooKeeper, broker, producer, and consumer don’t change.

Broker Properties

To recapitulate the section, Table 8-1 lists the most popular broker properties.6

Table 8-1. Kafka Broker Most Important Properties

Name

Default value

Description

broker.id

0

Each broker is identified with a positive integer id. This id is the broker’s name and allows the broker to be moved to a different host or port without losing consumers.

log.dirs

/tmp/kafka-logs

The directory where the log data is stored. Each new partition created will be placed in the directory with the fewest partitions.

zookeper.connect

localhost:2181

The ZooKeeper’s connection string in the hostname:port/chroot form. Here, chroot is the base directory for the path operations (namespace for sharing with other applications on the same ZooKeeper cluster).

host.name

null

The broker’s hostname. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces and publish one to ZooKeeper.

num.partitions

1

The number of partitions per topic if a partition count isn’t given at topic creation.

auto.create.topics.enable

true

Enables the autocreation of the topic on the server. If this is set to true, the attempts to produce, consume, or fetch data for a non-existent topic will automatically create a new one with the default replication factor and the default number of partitions.

default.replication.factor

1

The default replication factor for automatically created topics.

Kafka Architecture

In its beginning, LinkedIn used Java Message Service (JMS) . But when more power was needed (i.e., a scalable architecture), the LinkedIn development team decided to build the project that we know today as Kafka. In 2011, Kafka was an open source Apache project. In this chapter, section we give you some reflections on why things are designed in the way they are.

The following are Kafka’s project goals:

  • An API. Supports custom implementation of producers and consumers.

  • Low overhead. Low network latency and low storage overhead with message persistence on disk.

  • High throughput. Publishes and subscribes millions of messages; supports data feeds and real time.

  • Distributed. Highly scalable architecture to handle low-latency delivery.

  • High availability. Autobalances consumers in case of failure.

  • Fault tolerant. Guarantees data integrity in case of failure.

Kafka is more than a queuing platform; the messages are received and enqueued to be delivered to a consumer pool.

Kafka is more than a published-subscriber platform; the messages are not published to all customers.

The following describes Kafka’s operation in a nutshell:

  • Messages are published to a Kafka topic, which is a message queue or a message category.

  • The Kafka topic runs in the Kafka broker, which is a server. Kafka brokers do not just run the topics, but also store the messages when required.

  • The consumers use the ZooKeeper service to get the information to track the messages (the data about a message state).

Figure 8-6 shows a topic with three partitions. You can see the five Kafka components: ZooKeeper, broker, topic, producer, and consumer.

A420086_1_En_8_Fig6_HTML.jpg
Figure 8-6. A topic with three partitions

The following describes parts of the partition.

  • Segment files. Internally, every partition is a logical log file, represented as a set of segment files with the same size. The partition is a sequence of ordered messages. When a message is published, the broker appends the message to the last segment of the file. When a certain number of messages is reached, the segment file is flushed to the disk. Once the file is flushed, the messages are available for consumption by the consumers.

  • Offset. The partitions are assigned to a unique sequential number called an offset. The offset is used to identify messages inside the partition. The partitions are replicated between the servers for fault tolerance.

  • Leaders. Inside Kafka, each partition has one Kafka server as it leader. The other servers are followers. The leader of a partition coordinates the read and write requests for that partition. The followers asynchronously replicate the data from the leader. If the leader fails, another server becomes the new leader. In a cluster, every server has two roles: the leader on some partitions and a follower on other partitions.

  • Groups. The consumers are organized into groups. Each consumer is represented as a process; one process belongs to only one group.

In Kafka, there are three ways to deliver messages (as a reflection exercise, think about why there are only three):

  • Messages are never redelivered but may be lost.

  • Messages may be redelivered but never lost.

  • Messages are delivered once and only once.

Log Compaction

There are two types of retention: finer-grained (per message) and coarser-grained (time based). Log compaction is the process to pass from time-based to per-message retention.

In Kafka, the retention policy can be set to per-topic (time based), size-based, and log compaction–based. Log compaction ensures the following:

  • Reads begin at offset 0; if the consumer begins at the start of the log, the messages are in the order that they were written.

  • Messages have sequential offsets that never change.

  • Message order is always preserved.

  • A group of background threads recopy log segment files; the records whose keys appear in the log head are removed.

As another reflection exercise, can you deduce why the log compaction ensures these four points?

Kafka Design

The following are Kafka design bullet points:

  • Storage. The purpose of Kafka is to provide message processing. The main functions are caching and storing messages on a file system. The caching and flushing to disk are configurable.

  • Retention. If a message is consumed, the message is not wasted; it is retained, allowing message reconsumption.

  • Metadata. In many messaging systems, the message metadata is kept at the server level. In Kafka, the message state is maintained at the consumer level. This prevents the following:

    • Multiple deliveries of the same message

    • Losing messages due to failures

  • OLTP. Consumers store the state in ZooKeeper, but Kafka also allows the storage in OLTP external systems (online transaction processing).

  • Push and pull. Producers push the message to the broker and consumers pull the message from the broker.

  • Masterless. Like Apache Cassandra, Apache Kafka is masterless; you can remove any broker at any time. The metadata is maintained by ZooKeeper and shared with the customers.

  • Synchronous. Producers have the option to be asynchronous or synchronous when sending messages to the broker.

Message Compression

There are cases where the network bandwidth is the bottleneck. This usually does not happen, but it could. In Kafka, there is a mechanism to compress groups of messages. Note that without being compression experts, we can deduce that it is better to compress a group of messages than compress every message individually.

When a group of messages is compressed, the network overhead is reduced. Before Kafka 0.8.0, groups of messages were compressed and presented to the consumer as a single message; the consumer decompressed it later. But there were issues with decompression that made overhead.

Since Kafka 0.8.0, some changes were introduced to the broker to handle offsets; so the problem was moved to the broker, but the overall performance improved. The lead broker is responsible for compressing messages, which lowers the network overhead but could also increase the load in the broker’s CPU.

As you saw, Kafka handles Gzip and Snappy compression protocols. You need to specify this configuration in the producer to use compression.

  • compression.codec. This parameter indicates the codec for all data generated on the producer. The default value is none. The valid values are none, gzip, and snappy.

  • compressed.topics. This parameter turns on the compression on particular topics. Note that if the list of compressed topics is empty, then you are enabling the compression for all the topics. If the compression codec is none, the compression is disabled for all the topics.

If there is a mission in your work that does not let you sleep, and it is related to mirror data across data centers, consider using Kafka. Kafka is a good option when you have to transfer huge amounts of data between active and passive data centers in a compressed format with low network bandwidth overhead.

Replication

When you have message partitioning in Kafka, the partitioning strategy decision is made on the broker side. The decision on how the message is partitioned is made at the producer end. The broker stores the messages as they arrive. If you recall, the number of partitions configured for each topic is done in the Kafka broker.

Replication is one of the best features introduced in Kafka 0.8.0. Replication ensures that messages will be published and consumed in the event of broker failure. Both producers and consumers are replication-aware.

In replication, each partition has n replicas of a message (to handle n–1 failures). One replica acts as the leader. ZooKeeper knows who the replica leader is. The lead replica has a list of its follower replicas. The replicas store their part of the message in local logs.

Kafka has two replication modes : the synchronous replication process and the asynchronous replication process .

This is the synchronous replication process:

  1. The producer identifies the lead replica from ZooKeeper.

  2. The producer publishes the message.

  3. When the message is published, it is written to the lead replica’s log.

  4. The followers pull the message.

  5. The leader waits for all the followers to acknowledge that the replica was written.

  6. Once replications are complete, the leader sends the acknowledgment to the producer.

This is the asynchronous replication process:

  1. The producer identifies the lead replica from ZooKeeper.

  2. The producer publishes the message.

  3. When the message is published, it is written to the lead replica’s log.

  4. The followers pull the message.

  5. Once the message is written on the lead replica, the leader sends the acknowledgment to the consumer.

As you can see, asynchronous mode is faster, but it is not fault tolerant.

Replication ensures strong durability and high availability. It guarantees that any successfully published message will not be lost and will be consumed, even in the event of broker failures.

Kafka Producers

As you saw, producers are applications that create messages and publish them to the broker. Normally, producers are front-end applications, web pages, web services, back-end services, proxies, and adapters to legacy systems. You can write Kafka producers in Java, Scala, C, and Python.

The process begins when the producer connects to any live node and requests metadata about the partitions’ leaders on a topic so as to put the message directly to the partition’s lead broker.

Producer API

First, you need to understand the required classes to write a producer:

  • Producer. The class is KafkaProducer <K, V> in org.apache.kafka.clients.producer.KafkaProducer

    KafkaProducer is a type of Java generic written in Scala. K specifies the partition key type and V specifies the message value.

  • ProducerRecord. The class is ProducerRecord <K, V> in org.apache.kafka.clients.producer.ProducerRecord

    This class encapsulates the data required for establishing the connection with the brokers (broker list, partition, message serializer, and partition key).

    ProducerRecord is a type of Java generic written in Scala. K specifies the partition key type and V specifies the message value.

The Producer API encapsulates all the low-level producer implementations. The default mode is asynchronous, but you can specify in producer.type in the producer configuration.

Scala Producers

Now let’s write a simple Scala Kafka producer to send messages to the broker. The SimpleProducer class is used to create a message for a specific topic and to send it using message partitioning. This chapter’s examples were tested with Scala version 2.10.

Step 1. Import Classes

Import two classes:

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

Step 2. Define Properties

Define the following properties :

val props = new Properties()
props.put("metadata.broker.list",
  "192.168.146.132:9093, 192.168.146.132:9094, 192.168.146.132:9095")


props.put("serializer.class", "kafka.serializer.StringEncoder")

props.put("request.required.acks", "1")  

producer = new KafkaProducer(props)

As you can see, the properties are as follows:

  • metadata.broker.list

    Specifies the list of brokers that connect to the producer in the format [node:port, node:port]. As you know, Kafka determines the lead broker of the topic.

  • serializer.class

    Specifies the serializer used while preparing the message for transmission from the producer to the broker.

    In this example, we use the string encoder provided by Kafka. By default, the serializer for the key and message is the same, but you can also implement the custom serializer class by extending kafka.serializer.Encoder.

  • request.required.acks

    Indicates to the broker to send an acknowledgment to the producer when a message is received.

    1 means that the producer receives an acknowledgment once the lead replica has received the message. The default mode is “fire and forget,” so that it is not informed in the event of message loss.

Step 3. Build and Send the Message

The following code should be self-explanatory:

val runtime = new Date().toString
val msg = "Message Publishing Time - " + runtime
val data = new ProducerRecord[String, String](topic, msg)
producer.send(data)

Listing 8-1 shows the complete SimpleProducer.

Listing 8-1. SimpleProducer.scala
package apress.ch08

import java.util.{Date, Properties}

import apress.ch08.SimpleProducer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}


object SimpleProducer {

  private var producer: KafkaProducer[String, String] = _

  def main(args: Array[String]) {
    val argsCount = args.length
    if (argsCount == 0 || argsCount == 1)
      throw new IllegalArgumentException(
        "Provide topic name and Message count as arguments")


    // Topic name and the message count to be published is passed from the
    // command line
    val topic = args(0)
    val count = args(1)


    val messageCount = java.lang.Integer.parseInt(count)
    println("Topic Name - " + topic)
    println("Message Count - " + messageCount)
    val simpleProducer = new SimpleProducer()
    simpleProducer.publishMessage(topic, messageCount)
  }
}


class SimpleProducer {

  val props = new Properties()

  // Set the broker list for requesting metadata to find the lead broker
  props.put("bootstrap.servers",
    "192.168.146.132:9093, 192.168.146.132:9094, 192.168 146.132:9095")


  //This specifies the serializer class for keys
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


  // 1 means the producer receives an acknowledgment once the lead replica
  // has received the data. This option provides better durability as the
  // client waits until the server acknowledges the request as successful.
  props.put("request.required.acks", "1")


  producer = new KafkaProducer(props)

  private def publishMessage(topic: String, messageCount: Int) {
    for (mCount <- 0 until messageCount) {
      val runtime = new Date().toString
      val msg = "Message Publishing Time - " + runtime
      println(msg)


      // Create a message
      val data = new ProducerRecord[String, String](topic, msg)


      // Publish the message
      producer.send(data)
    }


    // Close producer connection with broker.
    producer.close()
  }
}

Step 4. Create the Topic

Before running the program, you must create the topic . You can create it using the API (amazing, isn’t it?) or from the command line:

[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic amazingTopic

Step 5. Compile the Producer

Compile the program with this command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# scalac . apress/ch08/SimpleProducer.scala

Step 6. Run the Producer

Run the SimpleProducer with the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# scala apress.ch08.SimpleProducer amazingTopic 10

This program takes two arguments: the topic name and the number of messages to publish.

Step 7. Run a Consumer

As you already saw, you can run the consumer program with the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic amazingTopic

Producers with Custom Partitioning

Let’s jump to the next level by writing another program that implements customized message partitioning . The example consists of recollecting the IPs visiting a web site, which are recorded and published. The message has three parts: timestamp, web site name, and IP address.

Step 1. Import Classes

Import these classes:

import java.util.Date
import java.util.Properties
import java.util.Random
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord

Step 2. Define Properties

Define the following properties :

val props = new Properties()

props.put("metadata.broker.list",
  "192.168.146.132:9092, 192.168.146.132:9093, 192.168.146.132:9094")


props.put("serializer.class", "kafka.serializer.StringEncoder")

// Defines the class to be used for determining the partition
// in the topic where the message needs to be sent.
props.put("partitioner.class", "apress.ch08.SimplePartitioner")


props.put("request.required.acks", "1")

producer = new KafkaProducer(props)

Step 3. Implement the Partitioner class

Write the SimplePartitioner class that extends the Partitioner abstract class. The class takes the key, in this case the IP address, and makes a modulo operation with the number of partitions. Listing 8-2 shows the SimplePartitioner code.

Listing 8-2. SimplePartitioner.scala
package apress.ch08

import java.util

import kafka.utils.VerifiableProperties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Partitioner
import org.apache.kafka.common.Cluster


object SimplePartitioner {

  private var producer: KafkaProducer[String, String] = _
}


class SimplePartitioner extends Partitioner {

  def partition(key: AnyRef, a_numPartitions: Int): Int = {
    var partition = 0
    val partitionKey = key.asInstanceOf[String]
    val offset = partitionKey.lastIndexOf('.')
    if (offset > 0) {
      partition = java.lang.Integer.parseInt(partitionKey.substring(offset + 1)) %
        a_numPartitions
    }
    Partition
  }


  override def partition(topic: String,
                         key: AnyRef,
                         keyBytes: Array[Byte],
                         value: AnyRef,
                         valueBytes: Array[Byte],
                         cluster: Cluster): Int = partition(key, 10)


  override def close() {
  }


  override def configure(configs: util.Map[String, _]) {
  }
}

Step 4. Build and Send the Message

Listing 8-3 presents the complete CustomPartitionProducer.scala.

Listing 8-3. CustomPartitionProducer.scala
package apress.ch08

import java.util.Date
import java.util.Properties
import java.util.Random
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import CustomPartitionProducer._


object CustomPartitionProducer {

  var producer: KafkaProducer[String, String] = _

  def main(args: Array[String]) {
    val argsCount = args.length
    if (argsCount == 0 || argsCount == 1)
      throw new IllegalArgumentException(
        "Please provide topic name and Message count as arguments")


    // Topic name and the message count to be published is passed from the
    // command line
    val topic = args(0)
    val count = args(1)
    val messageCount = java.lang.Integer.parseInt(count)
    println("Topic Name - " + topic)
    println("Message Count - " + messageCount)
    val simpleProducer = new CustomPartitionProducer()
    simpleProducer.publishMessage(topic, messageCount)
  }
}


class CustomPartitionProducer {

  val props = new Properties()

  // Set the broker list for requesting metadata to find the lead broker
  props.put("metadata.broker.list",
    "192.168.146.132:9092, 192.168.146.132:9093, 192.168.146.132:9094")


  // This specifies the serializer class for keys
  props.put("serializer.class", "kafka.serializer.StringEncoder")


  // Defines the class to be used for determining the partition
  // in the topic where the message needs to be sent.
  props.put("partitioner.class", "apress.ch08.SimplePartitioner")


  // 1 means the producer receives an acknowledgment once the lead replica
  // has received the data. This option provides better durability as the
  // client waits until the server acknowledges the request as successful.
  props.put("request.required.acks", "1")


  producer = new KafkaProducer(props)

  private def publishMessage(topic: String, messageCount: Int) {
    val random = new Random()
    for (mCount <- 0 until messageCount) {
      val clientIP = "192.168.14." + random.nextInt(255)
      val accessTime = new Date().toString
      val msg = accessTime + ",kafka.apache.org," + clientIP
      println(msg)
      // Create a ProducerRecord instance
      val data = new ProducerRecord[String, String](topic, clientIP, msg)


      // Publish the message
      producer.send(data)
    }
    producer.close()
  }
}

Step 5. Create the Topic

Before running the program, you must create the pageHits topic from the command line:

[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic pageHits

Step 6. Compile the Programs

Compile the programs with the following commands:

[restrada@localhost kafka_2.10.0-0.0.0.0]# scalac . apress/ch08/SimplePartitioner.scala

[restrada@localhost kafka_2.10.0-0.0.0.0]# scalac . apress/ch08/CustomPartitionProducer.scala

Step 7. Run the Producer

Run CustomPartitionProducer with the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# scala apress.ch08.CustomPartitionProducer pageHits 100

The program takes two arguments: the topic name and the number of messages to publish.

Step 8. Run a Consumer

As you already saw, you can run the consumer program with the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic pageHits

Producer Properties

To recapitulate the section, Table 8-2 lists the most popular producer properties.7

Table 8-2. Kafka Producer Most Important Properties

Name

Type

Default

Description

bootstrap.servers

list

 

The producer uses this property to get metadata about topics, partitions, and replicas. The format is host1:port1,host2:port2.

key.serializer

class

 

Specifies the serializer class for the messages. The default encoder accepts and returns the same byte.

value.serializer

class

 

Specifies the serializer value for the messages.

acks

string

1

Controls when the producer request is considered complete and when the producer receives an acknowledgment from the broker:

   

0 = producer will never wait for an acknowledgment from the broker; lowest latency, but with weakest durability.

   

1 = producer receives an acknowledgment once the lead replica has received the data; better durability as the client waits until the server acknowledges a successful request.

   

–1 = producer will receive an acknowledgment once all the in-sync replicas have received the data; the best durability.

buffer.memory

long

33554432

The total bytes of memory that the producer can use to buffer records waiting to be sent to the server.

compression.type

string

none

Specifies the compression codec for all data generated by this producer. The values accepted are none, gzip, and snappy.

retries

int

0

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.

Kafka Consumers

As you saw, consumers are applications that consume the messages published by the broker. Normally, producers are real-time analysis applications, near real-time analysis applications, NoSQL solutions, data warehouses, back-end services, and subscriber-based solutions. You can write Kafka producers in Java, Scala, C, and Python.

The consumer subscribes for the message consumption on a specific topic on the Kafka broker. The consumer then makes a fetch request to the lead broker to consume the message partition by specifying the message offset. The consumer works in the pull model and always pulls all available messages after its current position.

Consumer API

In the Kafka 0.8.0 version, there were two API types for consumers: the high-level API and the low-level API. In version 0.10.0, they are unified.

To use the consumer API with Maven, you should import the following dependency:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.0.0</version>
</dependency>

The consumer API classes with SBT are imported, as follows:

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.0.0"

The consumer API classes with Gradle are imported:

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.0'

Simple Scala Consumers

Let’s write a single threaded Scala consumer using the Consumer API for consuming the messages from a topic. This SimpleConsumer is used to fetch messages from a topic and consume them. We assume that there is a single partition in the topic.

Step 1. Import Classes

Import these classes:

import java.util
import java.util.Properties
import kafka.consumer.ConsumerConfig

Step 2. Define Properties

Define the following properties :

val props = new Properties()
props.put("zookeeper.connect", zookeeper)
props.put("group.id", groupId)
props.put("zookeeper.session.timeout.ms", "500")
props.put("zookeeper.sync.time.ms", "250")
props.put("auto.commit.interval.ms", "1000")
new ConsumerConfig(props)

Now let’s go over the major properties mentioned in the code:

  • zookeeper.connect. Specifies the ZooKeeper <node:port> connection used to find the ZooKeeper running instance in the cluster. ZooKeeper is used to store offsets of messages consumed for a specific topic and partition by this consumer group.

  • group.id. Specifies the consumer group name (shared by all the consumers in the group). This is the process name used by ZooKeeper to store offsets.

  • zookeeper.session.timeout.ms. Specifies the ZooKeeper session timeout in milliseconds. Represents the amount of time Kafka will wait for a ZooKeeper response to a request before giving up and continuing with consuming messages.

  • zookeeper.sync.time.ms. Specifies the ZooKeeper sync time (in milliseconds) between the leader and the followers.

  • auto.commit.interval.ms. Defines the frequency (in milliseconds) at which consumer offsets get committed.

Step 3. Code the SimpleConsumer

Write the SimpleConsumer class , as shown in Listing 8-4.

Listing 8-4. SimpleConsumer.scala
package apress.ch08

import java.util
import java.util.Properties


import kafka.consumer.ConsumerConfig
import SimpleConsumer._


import scala.collection.JavaConversions._

object SimpleConsumer {

  private def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
    val props = new Properties()
    props.put("zookeeper.connect", zookeeper)
    props.put("group.id", groupId)
    props.put("zookeeper.session.timeout.ms", "500")
    props.put("zookeeper.sync.time.ms", "250")
    props.put("auto.commit.interval.ms", "1000")
    new ConsumerConfig(props)
  }


  def main(args: Array[String]) {
    val zooKeeper = args(0)
    val groupId = args(1)
    val topic = args(2)
    val simpleHLConsumer = new SimpleConsumer(zooKeeper, groupId, topic)
    simpleHLConsumer.testConsumer()
  }
}


class SimpleConsumer(zookeeper: String, groupId: String, private val topic: String) {

  private val consumer =
    kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId))


  def testConsumer() {
    val topicMap = new util.HashMap[String, Integer]()
    topicMap.put(topic, 1)
    val consumerStreamsMap = consumer.createMessageStreams(topicMap)
    val streamList = consumerStreamsMap.get(topic)
    for (stream <- streamList; aStream <- stream)
      println("Message from Single Topic :: " + new String(aStream.message()))
    if (consumer != null) {
      consumer.shutdown()
    }
  }
}

Step 4. Create the Topic

Before running the program, you must create the amazingTopic topic from the command line:

[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic amazingTopic

Step 5. Compile the Program

Compile the program with the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# scalac . apress/ch08/SimpleConsumer.scala

Step 6. Run the Producer

Run the SimpleProducer with the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# scala apress.ch08.SimpleProducer amazingTopic 100

Step 7. Run the Consumer

Run SimpleConsumer with the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# scala apress.ch08.SimpleConsumer localhost:2181 testGroup amazingTopic

The SimpleConsumer class takes three arguments: the ZooKeeper connection string in <host:port> form, the unique group id, and the Kafka topic name.

Multithread Scala Consumers

A multithreaded consumer API design is based on the number of partitions in the topic and has a one-to-one mapping approach between the thread and the partitions in the topic.

If you don’t have the one-to-one relation, conflicts may occur, such as a thread that never receives a message or a thread that receives messages from multiple partitions. Let’s program MultiThreadConsumer.

Step 1. Import Classes

Import these classes:

import java.util
import java.util.Properties
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kafka.consumer.ConsumerConfig

Step 2. Define Properties

Define the following properties :

val props = new Properties()
props.put("zookeeper.connect", zookeeper)
props.put("group.id", groupId)
props.put("zookeeper.session.timeout.ms", "500")
props.put("zookeeper.sync.time.ms", "250")
props.put("auto.commit.interval.ms", "1000")
new ConsumerConfig(props)

Step 3. Code the MultiThreadConsumer

Write the MultiThreadConsumer class , as shown in Listing 8-5.

Listing 8-5. MultiThreadConsumer.scala
package apress.ch08

import java.util
import java.util.Properties
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors


import kafka.consumer.ConsumerConfig
import MultiThreadConsumer._


import scala.collection.JavaConversions._

object MultiThreadConsumer {

  private def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
    val props = new Properties()
    props.put("zookeeper.connect", zookeeper)
    props.put("group.id", groupId)
    props.put("zookeeper.session.timeout.ms", "500")
    props.put("zookeeper.sync.time.ms", "250")
    props.put("auto.commit.interval.ms", "1000")
    new ConsumerConfig(props)
  }


  def main(args: Array[String]) {
    val zooKeeper = args(0)
    val groupId = args(1)
    val topic = args(2)
    val threadCount = java.lang.Integer.parseInt(args(3))
    val multiThreadHLConsumer = new MultiThreadConsumer(zooKeeper, groupId, topic)
    multiThreadHLConsumer.testMultiThreadConsumer(threadCount)
    try {
      Thread.sleep(10000)
    } catch {
      case ie: InterruptedException =>
    }
    multiThreadHLConsumer.shutdown()
  }
}


class MultiThreadConsumer(zookeeper: String, groupId: String, topic: String) {

  private var executor: ExecutorService = _

  private val consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper,
    groupId))


  def shutdown() {
    if (consumer != null) consumer.shutdown()
    if (executor != null) executor.shutdown()
  }


  def testMultiThreadConsumer(threadCount: Int) {
    val topicMap = new util.HashMap[String, Integer]()


    // Define thread count for each topic
    topicMap.put(topic, threadCount)


    // Here we have used a single topic but we can also add
    // multiple topics to topicCount MAP
    val consumerStreamsMap = consumer.createMessageStreams(topicMap)
    val streamList = consumerStreamsMap.get(topic)


    // Launching the thread pool
    executor = Executors.newFixedThreadPool(threadCount)


    // Creating an object messages consumption
    var count = 0
    for (stream <- streamList) {
      val threadNumber = count
      executor.submit(new Runnable() {


        def run() {
          val consumerIte = stream.iterator()
          while (consumerIte.hasNext)
            println("Thread Number " + threadNumber + ": " + new String(consumerIte.next().message()))
          println("Shutting down Thread Number: " + threadNumber)
        }
      })
      count += 1
    }
    if (consumer != null) consumer.shutdown()
    if (executor != null) executor.shutdown()
  }
}

Step 4. Create the Topic

Before running the program, you must create the amazingTopic topic from the command line:

[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic amazingTopic

Step 5. Compile the Program

Compile the program with the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# scalac . apress/ch08/MultiThreadConsumer.scala

Step 6. Run the Producer

Run SimpleProducer with the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# scala apress.ch08.SimpleProducer amazingTopic 100

Step 7. Run the Consumer

Run MultiThreadConsumer with the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# scala apress.ch08.MultiThreadConsumer localhost:2181 testGroup amazingTopic 4

MultiThreadConsumer takes four arguments:

  • ZooKeeper connection string in <host:port> form

  • An unique group id

  • Kafka topic name

  • Thread count

This program prints all partitions of messages associated with each thread.

Consumer Properties

To recapitulate the section, Table 8-3 lists the most popular consumer properties.8

Table 8-3. Kafka Consumer Most Important Properties

Name

Default

Type

Description

bootstrap.servers

 

list

A list of pairs host/port to establishing the initial connection to the cluster. Should be in the form host1:port1,host2:port2, and so forth.

fetch.min.bytes

1

int

The minimum amount of data the server should return for a fetch request. Setting this to something greater than 1 causes the server to wait to accumulate larger data amounts, which improves server throughput a bit at the cost of additional latency.

group.id

“”

string

A unique string that identifies the consumer group that this consumer belongs to.

heartbeat.interval.ms

3000

int

The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.

key.deserializer

 

class

A deserializer class for key that implements the Deserializer interface.

max.partition.fetch.bytes

1048576

int

The maximum amount of data per partition that the server will return. The maximum total memory used for a request is #partitions * max.partition.fetch.bytes.

session.timeout.ms

30000

int

Timeout used to detect failures when using Kafka’s group management facilities. When a consumer’s heartbeat is not received within the session timeout, the broker will mark the consumer as failed and rebalance the group.

value.deserializer

 

class

Deserializer class for value that implements the Deserializer interface.

Kafka Integration

When processing small amounts of data in real time, it is not a challenge when using Java Messaging Service (JMS); but learning from LinkedIn’s experience, you see that this processing system has serious performance limitations when dealing with large data volumes. Moreover, this system is a nightmare when trying to scale horizontally, because it can’t.

Integration with Apache Spark

In the next example, you need a Kafka cluster up and running. Also, you need Spark installed on your machine, ready to be deployed.

Apache Spark has a utility class to create the data stream to be read from Kafka. But, as with any Spark project, you first need to create SparkConf and the Spark StreamingContext.

val sparkConf = new SparkConf().setAppName("SparkKafkaTest")
val jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10))

Create the hash set for the topic and Kafka consumer parameters:

val topicsSet = new HashSet[String]()
topicsSet.add("mytesttopic")


val kafkaParams = new HashMap[String, String]()
kafkaParams.put("metadata.broker.list", "localhost:9092")

You can create a direct Kafka stream with brokers and topics:

val messages = KafkaUtils.createDirectStream(
        jssc,
        classOf[String],
        classOf[String],
        classOf[StringDecoder],
        classOf[StringDecoder],
        kafkaParams,
        topicsSet)

With this stream, you can run the regular data processing algorithms .

  1. Create a Spark StreamingContext that sets up the entry point for all stream functionality. Then set up the stream processing batch interval at 10 seconds.

  2. Create the hash set for the topics to read from.

  3. Set the parameters for the Kafka producer using a hash map. This map must have a value for metadata.broker.list, which is the comma-separated list of host and port numbers.

  4. Create the input DStream using the KafkaUtils class.

Once you have the DStream ready, you can apply your algorithms to it. Explaining how to do that is beyond the scope of this book.

Spark Streaming is explained in detail in Chapter 6.

Kafka Administration

There are numerous tools provided by Kafka to administrate features such as cluster management, topic tools, and cluster mirroring. Let’s look at these tools in detail.

Cluster Tools

As you already know, when replicating multiple partitions, you can have replicated data. Among replicas, one acts as leader and the rest as followers. When there is no leader, a follower takes leadership.

When the broker has to be shut down for maintenance activities, the new leader is elected sequentially. This means significant I/O operations on ZooKeeper. With a big cluster, this means a delay in availability.

To reach high availability, Kafka provides tools for shutting down brokers. This tool transfers the leadership among the replicas or to another broker. If you don’t have an in-sync replica available, the tool fails to shut down the broker to ensure data integrity.

This tool is used through the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-run-class.sh kafka.admin. ShutdownBroker --zookeeper <zookeeper_host:port/namespace> --broker <brokerID> --num.retries 3 --retry.interval.ms 100

The ZooKeeper URL and the broker id are mandatory parameters. There are other optional parameters; for example, num.retries (the default value is 0) and retry.interval.ms (the default value is 1000).

When the server is stopped gracefully, it syncs all of its logs to disk to avoid any log recovery when it is restarted again, because log recovery is a time-consuming task. Before shutdown, it migrates the leader partitions to other replicas; so it ensures low downtime for each partition.

Controlled shutdown is enabled in this way:

controlled.shutdown.enable=true

When there is a big cluster, Kafka ensures that the lead replicas are equally distributed among the broker. If a broker fails in shutdown, this distribution cannot be balanced.

To maintain a balanced distribution, Kafka has a tool to distribute lead replicas across the brokers in the cluster. This tool’s syntax is as follows:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-preferred-replica-election.sh --zookeeper <zookeeper_host:port/namespace>

This tool updates the ZooKeeper path with a list of topic partitions whose lead replica needs to be moved. If the controller finds that the preferred replica is not the leader, it sends a request to the broker to make the preferred replica the partition leader. If the preferred replica is not in the ISR list, the controller fails the operation to avoid data loss.

You can specify a JSON list for this tool in this format:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-preferred-replicaelection.
sh --zookeeper <zookeeper_host:port/namespace> --path-to-jsonfile
topicPartitionList.json

The following is the topicPartitionList.json file format:

{"partitions":
        [
                {"topic": "AmazingTopic", "partition": "0"},
                {"topic": "AmazingTopic", "partition": "1"},
                {"topic": "AmazingTopic", "partition": "2"},


                {"topic": "reAmazingTopic", "partition": "0"},
                {"topic": "reAmazingTopic", "partition": "1"},
                {"topic": "reAmazingTopic", "partition": "2"},
        ]
}

Adding Servers

When you add servers to the cluster, a unique broker id needs to be assigned to the new server. This way, adding a server doesn’t assign data partitions. So, a new server won’t perform any work until new partitions are migrated to it or new topics are created.

Let’s discuss moving partitions between brokers. There is a tool that reassigns partitions in bin/kafka-reassign-partitions.sh. This tool takes care of everything. When migrating, Kafka makes the new server a follower of the migrating partition. This enables the new server to fully replicate the existing data in the partition.

The reassign-partition tool runs in three different modes:

  • --generate. Moves the partitions based on the topics and the brokers list shared with the tool.

  • --execute. Moves the partitions based on the user plan specified in --reassignment-json-file.

  • --verify. Moves the partitions based on the status (successful/failed/in progress) of the last --execute.

The partition reassignment tool could be used to move selected topics form current brokers to new brokers. The administrator provides a list of topics and a target list of new broker ids. This tool distributes the partitions of a given topic among the new brokers. For example:

[restrada@localhost kafka_2.10.0-0.0.0.0]# cat topics-for-new-server.json
{"partitions":
        [{"topic": "amazingTopic",
        {"topic": "reAmazingTopic"}],
        "version":1
}


[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-for-new-server.json --broker-list "4,5" -–generate new-topic-reassignment.json

This command generates the assignment (new-topic-reassignment.json) plan to move all partitions for topics amazingTopic and reAmazingTopic to the new set of brokers having ids 4 and 5. At the end of this move, all partitions will only exist on brokers 5 and 6. To initiate the assignment with the kafka-reassign-partitions.sh tool, use this:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-reassign-partitions. sh --zookeeper localhost:2181 --reassignment-json-file new-topic-reassignment.json --execute

You could use this tool to selectively move the partitions from the existing broker to the new broker:

[restrada@localhost kafka_2.10.0-0.0.0.0]# cat partitions-reassignment.json
{"partitions":
        [{"topic": "amazingTopic",
                "partition": 1,
                "replicas": [1,2,4] }],
        }],
        "version":1
}


[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file partitions-reassignment.json --execute

This command moves some replicas for certain partitions to the new server. Once the reassignment is done, the operation can be verified:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-reassign-partitions. sh --zookeeper localhost:2181 --reassignment-json-file new-topic-reassignment.json --verify

Status of partition reassignment:
Reassignment of partition [amazingTopic,0] completed successfully
Reassignment of partition [amazingTopic,1] is in progress
Reassignment of partition [amazingTopic,2] completed successfully
Reassignment of partition [reAmazingTopic,0] completed successfully
Reassignment of partition [reAmazingTopic,1] completed successfully
Reassignment of partition [reAmazingTopic,2] is in progress

To separate a server from the Kafka cluster, you have to move the replica for all partitions hosted on the server to be detached from the remaining brokers. You can also use the kafka-reassign-partitions.sh tool to increase the partition’s replication factor, as follows:

[restrada@localhost kafka_2.10.0-0.0.0.0]# cat increase-replication-factor.json {"partitions":[{"topic":"amazingTopic","partition":0,"replicas":[2,3]}],
"version":1 }


[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

This command assumes that partition 0 of the amazingTopic has a replication factor of 1 (the replica is on broker 2); and now it increases the replication factor to 2 and also creates the new replica on the next server, which is broker 3.

Kafka Topic Tools

When Kafka creates topics, it uses the default number of partitions (1) and the default replication factor (1). In real life, you need to specify these parameters.

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181/chroot --replication-factor 3 --partitions 10 --topic amazingTopic

You can interpret this command as follows: replication factor 3 means that up to two servers can fail before data access is lost. Ten partitions are defined for a topic, which means that the full data set will be handled by no more than ten brokers, excluding replicas.

To alter existent Kafka topics , use this command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --partitions 20 --topic amazingTopic              

With this command, we are adding ten more partitions to the topic created in the previous example.

To delete a topic, use the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --delete --zookeeper localhost:2181/chroot --topic amazingTopic

Using the kafka-topics.sh utility, the configuration can also be added to the Kafka topic, as follows:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --topic amazingTopic --config <key>=<value>

To remove a configuration from the topic, use the following command:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --topic amazingTopic --deleteconfig <key>=<value>

There is a utility to search for the list of topics on the server. The list tool provides a listing of topics and information about partitions, replicas, and leaders by querying ZooKeeper.

The following command obtains a list of topics:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181

The table obtained with this command has the following headers :

  • leader: A randomly selected node for a specific portion of the partitions; responsible for the reads and writes on this partition.

  • replicas: The list of nodes that holds the log for a specified partition.

  • isr: The subset of the in-sync list of replicas that is currently alive and in-sync with the leader.

Cluster Mirroring

Mirroring is used to create a replication of an existing cluster; for example, replicating an active data center into a passive data center. The mirror-maker tool mirrors the source cluster into a target cluster.

To mirror the source cluster, bring up the target cluster and start the mirror-maker processes, as follows:

[restrada@localhost kafka_2.10.0-0.0.0.0]# bin/kafka-run-class.sh kafka.tools. MirrorMaker --consumer.config sourceClusterConsumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"

There are also tools to check the position of the consumer while mirroring or in general. The tool shows the position of all the consumers in a consumer group and how far they are to the log’s end; it also indicates how well cluster mirroring is performing. This tool is used as follows:

[restrada@localhost kafka_2.10.0-0.0.0.0]#bin/kafka-run-class.sh kafka.tools. ConsumerOffsetChecker --group MirrorGroup --zkconnect localhost:2181 --topic kafkatopic

Summary

During this complete journey through Apache Kafka, we touched upon many important facts. You learned how to install Kafka, how to set up a Kafka cluster with single and multiple brokers on a single node, how to run command-line producers and consumers, and how to exchange some messages. You discovered important Kafka broker settings. You also learned the reason why Kafka was developed, its installation procedures, and its support for different types of clusters.

We explored the Kafka’s design approach and wrote a few basic producers and consumers. Finally, we discussed Kafka’s integration with technologies, such as Spark. In the next chapter, we review all the enterprise integration patterns.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset