Receiver-based

The first integration between Spark and Kafka is the receiver-based integration. In the receiver-based approach, the driver starts the receivers on the executors, which then pull data using a high-level API from the Kafka brokers. Since the events are being pulled from the Kafka brokers, the receivers update the offsets into Zookeeper, which is also used by the Kafka cluster. The important aspect here is the use of the write ahead log (WAL), which is what the receiver writes to as it collects data from Kafka. If there is a problem and the executors and receivers have to restart or are lost, the WAL can be utilized to recover the events and process them. As a result, this design, based on logging, helps to provide durability, as well as consistency.

An input DStream of events is created by each receiver from a Kafka topic while it queries Zookeeper for Kafka topics, brokers, and offsets. Parallelism is made complicated by logged-in, running receivers, since the workload will not be properly distributed as the application is scaled. Another problem is the dependence upon HDFS, along with write operation duplication. There is also a need for reliability with regards to the exactly-once paradigm of processing, since only an idempotent approach will work. Transactional approaches will not work in the receiver-based approach, because there is not a way to access offset ranges from Zookeeper or the HDFS location. The receiver-based approach is also more general purpose, since it works with any messaging system as shown in the following diagram:

A receiver-based stream can be created by invoking the createStream() API:

def createStream(
ssc: StreamingContext,
// StreamingContext object
zkQuorum: String,
//Zookeeper quorum (hostname:port,hostname:port,..)
groupId: String,
//Group id for the consumer
topics: Map[String, Int],
//Map of (topic_name to numPartitions) to consume
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
//Storage level to use for storing the received objects
(default: StorageLevel.MEMORY_AND_DISK_SER_2)
): ReceiverInputDStream[(String, String)]
//DStream of (Kafka message key,
Kafka message value)

An example of the creation of a receiver-based stream that pulls messages from Kafka brokers is as follows:

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset