Spark Streaming processes a continuous stream of data by dividing the stream into micro-batches called a Discretized Stream or DStream. DStream is an API provided by Spark Streaming that creates and processes micro-batches. DStream is nothing but a sequence of RDDs processed on Spark's core execution engine like any other RDD. DStream can be created from any streaming source such as Flume or Kafka.
As shown in the following Figure 5.1, input data from streaming sources are received by the Spark Streaming application to create sub-second DStreams, which are then processed by the Spark core engine. Batches of each output are then sent to various output sinks. The input data is received by receivers and distributed across the cluster to form the micro-batch. Once the time interval completes, the micro-batch is processed through parallel operations such as join, transform, window operations, or output operations.
From deployment and execution perspective, Spark Streaming is just like a regular Spark job. But each executor will run a receiver to receive streaming data from input sources. Spark Streaming receivers chunk data into blocks before storing in the cluster. A new block is generated every spark.streaming.blockInterval
milliseconds which is set to 200 milliseconds by default, and each block is turned into a partition of the RDD that Spark will work with.
Typically, one RDD is created in a batch by all receivers using the getOrCompute
method of DStream. The number of partitions in the RDD created by DStream is determined by dividing batchInterval
by spark.streaming.blockInterval
and then multiplying it by the number of receivers. For example, if the batch interval is 1
second, the block interval is 200
miliseconds (default), and has a single receiver, your RDD will contain 10 partitions.
Dividing input stream into micro-batches allows multiple advantages. Let's understand the benefits of DStream processing:
For reference visit https://databricks.com/blog/2015/07/30/diving-into-spark-streamings-execution-model.html.
Let's understand the Spark Streaming application flow with a simple network word count example provided along with Spark installation, which can be found at /spark/examples/lib/streaming/network_wordcount.py
:
from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr) exit(-1) sc = SparkContext(appName="PythonStreamingNetworkWordCount") ssc = StreamingContext(sc, 1) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) counts = lines.flatMap(lambda line: line.split(" ")) .map(lambda word: (word, 1)) .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()
The previous program creates a StreamingContext
called ssc
in local mode with the available number of execution threads and with a batch interval of 1
second. Using the ssc
context, you can create a DStream called lines
from a TCP source. Regular wordcount operations such as flatmap
, map
, and reduceByKey
are applied to create a new DStream and the results are printed on screen. The ssc
context then starts and waits for termination. The procedure to execute this program is shown in the next section.
A stateless transformation applies a function on every batch of the DStream and produces the output. They do not depend on previous batches to create new batches. A stateful transformation creates a state for DStream and it will be updated with incoming batches of DStreams. There are two types of stateful transformations, window operations, which act on sliding windows, and updateStateByKey
, which is used to track the state across all events. To understand stateless and stateful transformations, let's go through the examples provided along with Spark installation.
Let's start two terminals with the following commands:
netcat
command against port number 9999 as shown here:[cloudera@quickstart ~]$ nc -lk 9999
network_wordcount.py
shown in the previous section:[cloudera@quickstart ~]$ cd /usr/lib/spark/examples/lib/streaming/ [cloudera@quickstart ~]$ sudo tar xvzf python.tar.gz [cloudera@quickstart lib]$ spark-submit --master local[*] network_wordcount.py localhost 9999
Enter some data in the netcat
terminal as shown in the following Figure 5.2. You can see that hadoop
and spark
words are processed in the first batch to provide the count of 3 and then the next batch is processed to provide a count of 3 again. Notice that the counts are displayed every second because our streaming context is created with a batch of 1 second. You can increase this to see the difference:
Now let's take a look at a stateful transformation example:
[cloudera@quickstart ~]$ nc -lk 9999
[cloudera@quickstart streaming]$ spark-submit --master local[*] stateful_network_wordcount.py localhost 9999
The stateful_network_wordcount.py
program creates a DStream called running_counts
and it is updated by the updateStateByKey()
function with the incoming batches. Note that the second set of hadoop
and spark
words are added to the first set count to result in 6 as shown in Figure 5.3.
Fault tolerance in stateless transformation is achieved by replicating the DStreams in memory, but it is not fully recoverable in case of machine failures. The stateful transformation example writes the data to the checkpoint directory on HDFS for better recovery during failures. You can check the checkpoint directory with the following command. Checkpointing is explained in more detail in the Advanced concepts of Spark Streaming section:
[cloudera@quickstart streaming]$ hadoop fs -ls checkpoint
Also, check the number of jobs, tasks, and their DAG visualization on the UI at http://localhost:4040/jobs
. The streaming application provides a streaming tab in the UI, which provides complete information about the streaming application. This is explained in detail in the Monitoring applications section at the end of this chapter.