A quick primer on global aggregations

As noted in the previous section, so far, our script has performed a point in time streaming word count. The following diagram denotes the lines DStream and its micro-batches as per how our script had executed in the previous section:

A quick primer on global aggregations

At the 1 second mark, our Python Spark Streaming script returned the value of {(blue, 5), (green, 3)}, at the 2 second mark it returned {(gohawks, 1)}, and at the 4 second mark, it returned {(green, 2)}. But what if you had wanted the aggregate word count over a specific time window?

The following figure represents us calculating a stateful aggregation:

A quick primer on global aggregations

In this case, we have a time window between 0-5 seconds. Note, that in our script we have not got the specified time window: each second, we calculate the cumulative sum of the words. Therefore, at the 2 second mark, the output is not just the green and blue from the 1 second mark, but it also includes the gohawks from the 2 second mark: {(blue, 5), (green, 3), (gohawks, 1)}. At the 4 second mark, the additional 2 greens provide us a total of {(blue, 5), (green, 5), (gohawks, 1)}.

For those of you who regularly work with relational databases, this seems to be just a GROUP BY, SUM() statement. Yet, in the case of streaming analytics, the duration to persist the data long enough to run a GROUP BY, SUM() statement is longer than the batch interval (for example, 1 second). This means that we would constantly be running behind and trying to catch up with the data stream.

For example, if you were to run the 1. Streaming and DataFrames.scala Databricks notebook at https://github.com/dennyglee/databricks/blob/master/notebooks/Users/denny%40databricks.com/content/Streaming%20Meetup%20RSVPs/1.%20Streaming%20and%20DataFrames.scala, and you were to view the Streaming jobs in the Spark UI, you would get something like the following figure:

A quick primer on global aggregations

Notice in the graph that the Scheduling Delay and Total Delay numbers are rapidly increasing (for example, average Total Delay is 54 seconds 254 ms and the actual Total Delay is > 2min) and way outside the batch interval threshold of 1 second. The reason we see this delay is because, inside the streaming code for that notebook, we had also run the following code:

// Populate `meetup_stream` table
sqlContext.sql("insert into meetup_stream select * from meetup_stream_json")

That is, inserting any new chunks of data (that is, 1 second RDD micro-batches), converting them into a DataFrame (meetup_stream_json table), and inserting the data into a persistent table (meetup_stream table). Persisting the data in this fashion led to slow streaming performance with the ever-increasing scheduling delays. To solve this problem via streaming analytics, this is where creating global aggregations via UpdateStateByKey (Spark 1.5 and before) or mapWithState (Spark 1.6 onwards) come in.

Tip

For more information on Spark Streaming visualizations, please take the time to review New Visualizations for Understanding Apache Spark Streaming Applications: https://databricks.com/blog/2015/07/08/new-visualizations-for-understanding-apache-spark-streaming-applications.html.

Knowing this, let's re-write the original streaming_word_count.py so that we now have a stateful version called stateful_streaming_word_count.py; you can get the full version of this script at https://github.com/drabastomek/learningPySpark/blob/master/Chapter10/stateful_streaming_word_count.py.

The initial set of commands for our script are noted here:

 1. # Create a local SparkContext and Streaming Contexts
 2. from pyspark import SparkContext
 3. from pyspark.streaming import StreamingContext
 4. 
 5. # Create sc with two working threads 
 6. sc = SparkContext("local[2]", "StatefulNetworkWordCount")
 7. 
 8. # Create local StreamingContext with batch interval of 1 sec
 9. ssc = StreamingContext(sc, 1)
10. 
11. # Create checkpoint for local StreamingContext
12. ssc.checkpoint("checkpoint")
13. 
14. # Define updateFunc: sum of the (key, value) pairs
15. def updateFunc(new_values, last_sum):
16.   return sum(new_values) + (last_sum or 0)
17. 
18. # Create DStream that connects to localhost:9999
19. lines = ssc.socketTextStream("localhost", 9999)

If you recall streaming_word_count.py, the primary differences start at line 11:

  • The ssc.checkpoint("checkpoint") on line 12 configures a Spark Streaming checkpoint. To ensure that Spark Streaming is fault tolerant due to its continual operation, it needs to checkpoint enough information to fault-tolerant storage, so it can recover from failures. Note, we will not dive deep into this concept (though more information is available in the following Tip section), as many of these configurations will be abstracted away with Structured Streaming.
  • The updateFunc on line 15 tells the program to update the application's state (later in the code) via UpdateStateByKey. In this case, it is returning a sum of the previous value (last_sum) and the sum of the new values (sum(new_values) + (last_sum or 0)).
  • At line 19, we have the same ssc.socketTextStream as the previous script.

    Tip

    For more information on Spark Streaming checkpoint, some good references are:

    Spark Streaming Programming Guide > Checkpoint: https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html#checkpointing

    Exploring Stateful Streaming with Apache Spark: http://asyncified.io/2016/07/31/exploring-stateful-streaming-with-apache-spark/

The final section of the code is as follows:

 1. # Calculate running counts
 2. running_counts = lines.flatMap(lambda line: line.split(" "))
 3.           .map(lambda word: (word, 1))
 4.           .updateStateByKey(updateFunc)
 5. 
 6. # Print the first ten elements of each RDD generated in this 
 7. # stateful DStream to the console
 8. running_counts.pprint()
 9. 
10. # Start the computation
11. ssc.start()             
12. 
13. # Wait for the computation to terminate
14. ssc.awaitTermination()  

While lines 10-14 are identical to the previous script, the difference is that we now have a running_counts variable that splits to get the words and runs a map function to count each word in each batch (in the previous script this was the words and pairs variables).

The primary difference is the use of the updateStateByKey method, which will execute the previously noted updateFunc that performs the sum. updateStateByKey is Spark Streaming's method to perform calculations against your stream of data and update the state for each key in a performant manner. It is important to note that you would typically use updateStateByKey for Spark 1.5 and earlier; the performance of these stateful global aggregations is proportional to the size of the state. From Spark 1.6 onwards, you should use mapWithState, as the performance is proportional to the size of the batch.

Tip

Note, there is more code typically involved with mapWithState (in comparison to updateStateByKey), hence the examples were written using updateStateByKey.

For more information about stateful Spark Streaming, including the use of mapWithState, please refer to:

Stateful Network Wordcount Python example: https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py

Global Aggregation using mapWithState (Scala): https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#07%20Spark%20Streaming/12%20Global%20Aggregations%20-%20mapWithState.html

Word count using mapWithState (Scala): https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html

Faster Stateful Stream Processing in Apache Spark Streaming: https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset