As noted in the previous section, so far, our script has performed a point in time streaming word count. The following diagram denotes the lines DStream and its micro-batches as per how our script had executed in the previous section:
At the 1 second mark, our Python Spark Streaming script returned the value of {(blue, 5), (green, 3)}
, at the 2 second mark it returned {(gohawks, 1)}
, and at the 4 second mark, it returned {(green, 2)}
. But what if you had wanted the aggregate word count over a specific time window?
The following figure represents us calculating a stateful aggregation:
In this case, we have a time window between 0-5 seconds. Note, that in our script we have not got the specified time window: each second, we calculate the cumulative sum of the words. Therefore, at the 2 second mark, the output is not just the green
and blue
from the 1 second mark, but it also includes the gohawks
from the 2 second mark: {(blue, 5), (green, 3), (gohawks, 1)}
. At the 4 second mark, the additional 2 greens
provide us a total of {(blue, 5), (green, 5), (gohawks, 1)}
.
For those of you who regularly work with relational databases, this seems to be just a GROUP BY, SUM()
statement. Yet, in the case of streaming analytics, the duration to persist the data long enough to run a GROUP BY, SUM()
statement is longer than the batch interval (for example, 1 second). This means that we would constantly be running behind and trying to catch up with the data stream.
For example, if you were to run the 1. Streaming and DataFrames.scala Databricks notebook at https://github.com/dennyglee/databricks/blob/master/notebooks/Users/denny%40databricks.com/content/Streaming%20Meetup%20RSVPs/1.%20Streaming%20and%20DataFrames.scala, and you were to view the Streaming jobs in the Spark UI, you would get something like the following figure:
Notice in the graph that the Scheduling Delay and Total Delay numbers are rapidly increasing (for example, average Total Delay is 54 seconds 254 ms and the actual Total Delay is > 2min) and way outside the batch interval threshold of 1 second. The reason we see this delay is because, inside the streaming code for that notebook, we had also run the following code:
// Populate `meetup_stream` table sqlContext.sql("insert into meetup_stream select * from meetup_stream_json")
That is, inserting any new chunks of data (that is, 1 second RDD micro-batches), converting them into a DataFrame (meetup_stream_json
table), and inserting the data into a persistent table (meetup_stream
table). Persisting the data in this fashion led to slow streaming performance with the ever-increasing scheduling delays. To solve this problem via streaming analytics, this is where creating global aggregations via UpdateStateByKey
(Spark 1.5 and before) or mapWithState
(Spark 1.6 onwards) come in.
For more information on Spark Streaming visualizations, please take the time to review New Visualizations for Understanding Apache Spark Streaming Applications: https://databricks.com/blog/2015/07/08/new-visualizations-for-understanding-apache-spark-streaming-applications.html.
Knowing this, let's re-write the original streaming_word_count.py
so that we now have a stateful version called stateful_streaming_word_count.py
; you can get the full version of this script at https://github.com/drabastomek/learningPySpark/blob/master/Chapter10/stateful_streaming_word_count.py.
The initial set of commands for our script are noted here:
1. # Create a local SparkContext and Streaming Contexts 2. from pyspark import SparkContext 3. from pyspark.streaming import StreamingContext 4. 5. # Create sc with two working threads 6. sc = SparkContext("local[2]", "StatefulNetworkWordCount") 7. 8. # Create local StreamingContext with batch interval of 1 sec 9. ssc = StreamingContext(sc, 1) 10. 11. # Create checkpoint for local StreamingContext 12. ssc.checkpoint("checkpoint") 13. 14. # Define updateFunc: sum of the (key, value) pairs 15. def updateFunc(new_values, last_sum): 16. return sum(new_values) + (last_sum or 0) 17. 18. # Create DStream that connects to localhost:9999 19. lines = ssc.socketTextStream("localhost", 9999)
If you recall streaming_word_count.py
, the primary differences start at line 11:
ssc.checkpoint("checkpoint")
on line 12 configures a Spark Streaming checkpoint. To ensure that Spark Streaming is fault tolerant due to its continual operation, it needs to checkpoint enough information to fault-tolerant storage, so it can recover from failures. Note, we will not dive deep into this concept (though more information is available in the following Tip section), as many of these configurations will be abstracted away with Structured Streaming.updateFunc
on line 15 tells the program to update the application's state (later in the code) via UpdateStateByKey
. In this case, it is returning a sum of the previous value (last_sum
) and the sum of the new values (sum(new_values) + (last_sum or 0)
).ssc.socketTextStream
as the previous script.For more information on Spark Streaming checkpoint, some good references are:
Spark Streaming Programming Guide > Checkpoint: https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html#checkpointing
Exploring Stateful Streaming with Apache Spark: http://asyncified.io/2016/07/31/exploring-stateful-streaming-with-apache-spark/
The final section of the code is as follows:
1. # Calculate running counts 2. running_counts = lines.flatMap(lambda line: line.split(" ")) 3. .map(lambda word: (word, 1)) 4. .updateStateByKey(updateFunc) 5. 6. # Print the first ten elements of each RDD generated in this 7. # stateful DStream to the console 8. running_counts.pprint() 9. 10. # Start the computation 11. ssc.start() 12. 13. # Wait for the computation to terminate 14. ssc.awaitTermination()
While lines 10-14 are identical to the previous script, the difference is that we now have a running_counts
variable that splits to get the words and runs a map function to count each word in each batch (in the previous script this was the words
and pairs
variables).
The primary difference is the use of the updateStateByKey
method, which will execute the previously noted updateFunc
that performs the sum. updateStateByKey
is Spark Streaming's method to perform calculations against your stream of data and update the state for each key in a performant manner. It is important to note that you would typically use updateStateByKey
for Spark 1.5 and earlier; the performance of these stateful global aggregations is proportional to the size of the state. From Spark 1.6 onwards, you should use mapWithState
, as the performance is proportional to the size of the batch.
Note, there is more code typically involved with mapWithState
(in comparison to updateStateByKey
), hence the examples were written using updateStateByKey
.
For more information about stateful Spark Streaming, including the use of mapWithState
, please refer to:
Stateful Network Wordcount Python example: https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py
Global Aggregation using mapWithState (Scala): https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#07%20Spark%20Streaming/12%20Global%20Aggregations%20-%20mapWithState.html
Word count using mapWithState (Scala): https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html
Faster Stateful Stream Processing in Apache Spark Streaming: https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html