Architecture of Spark Streaming

Spark Streaming processes a continuous stream of data by dividing the stream into micro-batches called a Discretized Stream or DStream. DStream is an API provided by Spark Streaming that creates and processes micro-batches. DStream is nothing but a sequence of RDDs processed on Spark's core execution engine like any other RDD. DStream can be created from any streaming source such as Flume or Kafka.

As shown in the following Figure 5.1, input data from streaming sources are received by the Spark Streaming application to create sub-second DStreams, which are then processed by the Spark core engine. Batches of each output are then sent to various output sinks. The input data is received by receivers and distributed across the cluster to form the micro-batch. Once the time interval completes, the micro-batch is processed through parallel operations such as join, transform, window operations, or output operations.

From deployment and execution perspective, Spark Streaming is just like a regular Spark job. But each executor will run a receiver to receive streaming data from input sources. Spark Streaming receivers chunk data into blocks before storing in the cluster. A new block is generated every spark.streaming.blockInterval milliseconds which is set to 200 milliseconds by default, and each block is turned into a partition of the RDD that Spark will work with.

Typically, one RDD is created in a batch by all receivers using the getOrCompute method of DStream. The number of partitions in the RDD created by DStream is determined by dividing batchInterval by spark.streaming.blockInterval and then multiplying it by the number of receivers. For example, if the batch interval is 1 second, the block interval is 200 miliseconds (default), and has a single receiver, your RDD will contain 10 partitions.

Architecture of Spark Streaming

Figure 5.1: Spark Streaming architecture

Dividing input stream into micro-batches allows multiple advantages. Let's understand the benefits of DStream processing:

  • Dynamic load balancing: Traditional record-at-a-time stream processing frameworks tend to partition streams unevenly to different nodes. Spark Streaming schedules tasks based on the availability of resources.
  • Fast failure recovery: If any node fails, the tasks handled by the node will fail. Failed tasks are re-launched on other nodes to provide quick fault recovery.
  • Unification of batch and streaming: Batch and Streaming workloads can be combined in the same program instead of separate processing.
  • Machine learning and SQL: Machine learning models can be applied in real-time on DStreams. Also, DataFrames can be created from RDDs generated by DStreams and DataFrame operations can be applied with SQL or DSL.
  • Performance: Spark Streaming provides higher throughput than other streaming frameworks, but the latency can be more than a few hundred milliseconds.

For reference visit https://databricks.com/blog/2015/07/30/diving-into-spark-streamings-execution-model.html.

Spark Streaming application flow

Let's understand the Spark Streaming application flow with a simple network word count example provided along with Spark installation, which can be found at /spark/examples/lib/streaming/network_wordcount.py:

Note

All programs in this chapter are executed on CDH 5.8 VM. For other environments, the file paths might change, but the concepts are the same in any environment.

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))
                  .map(lambda word: (word, 1))
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

The previous program creates a StreamingContext called ssc in local mode with the available number of execution threads and with a batch interval of 1 second. Using the ssc context, you can create a DStream called lines from a TCP source. Regular wordcount operations such as flatmap, map, and reduceByKey are applied to create a new DStream and the results are printed on screen. The ssc context then starts and waits for termination. The procedure to execute this program is shown in the next section.

Stateless and stateful stream processing

A stateless transformation applies a function on every batch of the DStream and produces the output. They do not depend on previous batches to create new batches. A stateful transformation creates a state for DStream and it will be updated with incoming batches of DStreams. There are two types of stateful transformations, window operations, which act on sliding windows, and updateStateByKey, which is used to track the state across all events. To understand stateless and stateful transformations, let's go through the examples provided along with Spark installation.

Let's start two terminals with the following commands:

  • Terminal 1: Execute the Linux netcat command against port number 9999 as shown here:
    [cloudera@quickstart ~]$ nc -lk 9999
    
  • Terminal 2: Submit network_wordcount.py shown in the previous section:
    [cloudera@quickstart ~]$ cd /usr/lib/spark/examples/lib/streaming/
    [cloudera@quickstart ~]$ sudo tar xvzf python.tar.gz
    [cloudera@quickstart lib]$ spark-submit --master local[*] network_wordcount.py localhost 9999
    

Enter some data in the netcat terminal as shown in the following Figure 5.2. You can see that hadoop and spark words are processed in the first batch to provide the count of 3 and then the next batch is processed to provide a count of 3 again. Notice that the counts are displayed every second because our streaming context is created with a batch of 1 second. You can increase this to see the difference:

Stateless and stateful stream processing

Figure 5.2: Stateless transformation example

Now let's take a look at a stateful transformation example:

  • Terminal 1:
    [cloudera@quickstart ~]$ nc -lk 9999
    
  • Terminal 2:
    [cloudera@quickstart streaming]$ spark-submit --master local[*] stateful_network_wordcount.py localhost 9999
    
    Stateless and stateful stream processing

    Figure 5.3: Stateful transformation example

The stateful_network_wordcount.py program creates a DStream called running_counts and it is updated by the updateStateByKey() function with the incoming batches. Note that the second set of hadoop and spark words are added to the first set count to result in 6 as shown in Figure 5.3.

Fault tolerance in stateless transformation is achieved by replicating the DStreams in memory, but it is not fully recoverable in case of machine failures. The stateful transformation example writes the data to the checkpoint directory on HDFS for better recovery during failures. You can check the checkpoint directory with the following command. Checkpointing is explained in more detail in the Advanced concepts of Spark Streaming section:

[cloudera@quickstart streaming]$ hadoop fs -ls checkpoint

Also, check the number of jobs, tasks, and their DAG visualization on the UI at http://localhost:4040/jobs. The streaming application provides a streaming tab in the UI, which provides complete information about the streaming application. This is explained in detail in the Monitoring applications section at the end of this chapter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset