Apache Kafka is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Kafka plays an important role in any streaming application. Let's see what happens without having Kafka in a streaming application. If the streaming application processing the streams is down for 1 minute for some reason, what will happen to the stream of data for that 1 minute? We will end up losing 1 minute's worth of data. Having Kafka as one more layer buffers incoming stream data and prevents any data loss. Also, if something goes wrong within the Spark Streaming application or target database, messages can be replayed from Kafka. Once the streaming application pulls a message from Kafka, acknowledgement is sent to Kafka only when data is replicated in the streaming application. This makes Kafka a reliable receiver.
There are two approaches to receive data from Kafka.
Using the Kafka consumer API, receivers in a Spark Streaming application will receive data from Kafka partitions. As with all receivers, the data received from Kafka through a receiver is stored in Spark executors, and then jobs launched by Spark Streaming process the data. Offsets consumed by the receiver are stored in Zookeeper for tracking the progress. To ensure zero-data loss, you have to additionally enable WAL in Spark Streaming. This process is illustrated in Figure 5.5.
Let's go through a Kafka word count example that is receiver-based and shipped with the Spark installation. The easiest way to work with Kafka is to install and start Kafka broker. Perform the following commands to download, unzip, and start the Kafka broker:
wget http://www.trieuvan.com/apache/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz tar xzvf kafka_2.10-0.8.2.0.tgz cd kafka_2.10-0.8.2.0/ bin/kafka-server-start.sh config/server.properties
Open another terminal and create a topic called test
with two partitions and the replication factor as 1
:
bin/kafka-topics.sh --zookeeper quickstart.cloudera:2181 --topic test --create --partitions 2 --replication-factor 1
Start a Kafka console producer to start sending messages to the topic test
:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Start another terminal and submit the Kafka wordcount job:
cd /usr/lib/spark/examples/lib/streaming spark-submit --master local[*] --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.0 kafka_wordcount.py localhost:2181 test
Now, let's start entering some messages in the kafka console producer window; those should show up as wordcounts in the Kafka Streaming program.
For every message published in Kafka, it assigns an offset number. We can see the latest offset numbers with the following commands:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test -time -1 --partitions 0 test:0:18 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test -time -1 --partitions 1 test:1:16
In the receiver-based approach, the consumer, which is a Spark Streaming program, maintains offsets in Zookeeper. We can watch the offsets by entering the zkCli shell. Use the following commands to check the offsets of the spark-streaming-consumer
group:
/usr/lib/zookeeper/bin/zkCli.sh -server localhost:2181 get /consumers/spark-streaming-consumer/offsets/test/0 /usr/lib/zookeeper/bin/zkCli.sh -server localhost:2181 get /consumers/spark-streaming-consumer/offsets/test/1
The output of the previous commands will be similar to the output shown in the following. Note that 18
is the latest offset of the Spark Streaming consumer for the partition 0
. This offset number is exactly same as the offset in Kafka (see the previous result), which means that all the messages from Kafka are consumed:
WatchedEvent state:SyncConnected type:None path:null 18 cZxid = 0x44f ctime = Sun Dec 20 15:47:14 PST 2015 mZxid = 0x4bd mtime = Sun Dec 20 15:58:07 PST 2015 pZxid = 0x44f cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 2 numChildren = 0
Let's check the lag between the offsets of Kafka and the Spark Streaming consumer using the following command:
bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic test --group spark-streaming-consumer Group Topic Pid Offset logSize Lag spark-streaming-consumer test 0 18 18 0 spark-streaming-consumer test 1 16 16 0
This was introduced in Spark 1.3 to ensure exactly once semantics of receiving data even in case of failures. The direct approach periodically queries Kafka for the latest offsets in each topic and partition, and accordingly defines the offset ranges to process in each batch as shown in Figure 5.6.
This approach provides the following benefits:
Let's run a Kafka direct word count now. Use the same procedure as the previous to enter messages in the Kafka console producer and then start a Spark Streaming program with the following command. Note that this program takes an argument of Kafka broker while the earlier program takes an argument of Zookeeper. Offsets are not maintained within Zookeeper now:
cd /usr/lib/spark/examples/lib/streaming spark-submit --master local[*] --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.0 direct_kafka_wordcount.py localhost:9092 test
The number of records per second is controlled by setting the parameters spark.streaming.receiver.maxRate
and spark.streaming.kafka.maxRatePerPartition
.
Integration with HBase is fairly easy. Use newAPIHadoopRDD
for reading HBase data and use saveAsNewAPIHadoopDataset
for writing data to HBase. Let's go through a HBase write example by creating a table called test
with column family f1
. Then, run a Spark job to write data to the test
table with col1
and value1
:
[cloudera@quickstart lib]$ hbase shell hbase(main):002:0> create 'test', 'f1' 0 row(s) in 0.6460 seconds cd /usr/lib/spark/examples/lib/ spark-submit --master local[*] --driver-class-path /usr/lib/spark/lib/spark-examples.jar hbase_outputformat.py localhost test row1 f1 col1 value1 hbase(main):005:0> scan 'test' ROW COLUMN+CELL row1 column=f1:col1, timestamp=1450657755249, value=value1 1 row(s) in 0.0700 seconds
Similar logic from the previous example can be applied in Spark Streaming as well. Transform the RDD into a (key,value) pair with content as (rowkey, [row key, column family, column name , value]). Then write DStream to HBase. Alternatively, you can implement foreachRDD
for writing data out.
Note that the HBase native API provides put
, get
, scan
, filter
, and coprocessor
methods. Hadoop provides InputFormat
and OutputFormat
to read and write data. The Hadoop API provides low performance while reading data from HBase rather than using the HBase native API. Using Spark SQL on HBase or Spark connector for HBase (introduced in Chapter 4, Big Data Analytics with Spark SQL, DataFrames, and Datasets) provides high performance with native HBase APIs and Spark's in-memory performance. It is the recommended approach to use Spark SQL on HBase or Spark connector for HBase for optimized performance.