Let's go through some of the important advanced concepts of Spark Streaming.
We learned Spark SQL and DataFrames in Chapter 4, Big Data Analytics with Spark SQL, DataFrames, and Datasets. There are many use cases where you want to convert DStream and DataFrame to do interactive analytics. RDDs generated by DStreams can be converted to DataFrames and queried with SQL internally within the program or from external SQL clients as well. Refer to the sql_network_wordcount.py
program in /usr/lib/spark/examples/lib/streaming
for implementing SQL in a Spark Streaming application. You can also start JDBC server within the application with the following code:
HiveThriftServer2.startWithContext(hiveContext)
Temporary tables can now be accessed from any SQL client such as beeline to query the data.
It is easy to implement machine learning algorithms in Spark Streaming applications. The following Scala code trains a KMeans clustering model with training data offline and then applies the model on DStreams in real-time coming from Kafka:
val model = KMeans.train(training_data, ...) val DStream = KafkaUtils.createDStream(...) DStream.map { event => model.predict(featurize(event)) }
Persistence levels available on RDDs can be applied on DStreams as well using the persist()
method. This is needed if multiple actions are applied on the same DStream multiple times.
With window-based operations and state-based operations like updateStateByKey
, DStreams are automatically persisted in memory with serialization by default. DStreams created by receiving the data over the network are always replicated twice for fault tolerance.
There are two kinds of failures in any Spark Streaming application: failure of executor or failure of driver. Let's understand how fault recovery is achieved in these two scenarios.
Executors can fail because of hardware or software issues. If an executor fails, all tasks running on the executor will fail and all in-memory data stored in the executor JVM will be lost. If a receiver is running on this node, all the blocks that are buffered but not processed yet will be lost. Spark automatically handles these failures by placing a new receiver on a new node and tasks are restarted on block replicas as shown in Figure 5.7.
If a driver fails, all executors will fail including the computation and replicated in-memory blocks. There are two ways to recover from driver failures: recovering with checkpoint and recovering with the WAL. Typically, both are needed for zero data loss.
A Spark application must checkpoint data to a storage system like HDFS to recover from failures. There are two types of data stored in the checkpoint directory; metadata and data. Metadata is configuration for an application, DStream operations, and incomplete batch information. Data is nothing more than the storage of RDD content. Metadata checkpointing is for driver recovery and data checkpointing is for recovery of stateful transformations. Recovering driver failures with checkpointing is illustrated in Figure 5.8.
The following points must be considered when enabling checkpointing:
A driver can be automatically restarted with the --supervise
option in the standalone master. In YARN, the driver is automatically restarted with the configuration parameter yarn.resourcemanager.am.max-attempts
.
When a Spark Streaming application recovers from driver failure, blocks received, but not yet processed by the receiver, will be lost. Enabling WAL reduces this loss. To enable WAL, set the following configuration property:
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
Enabling WAL provides fault recovery, but it reduces the performance. Consider using the Kafka direct API wherever possible since it does not need WAL.
Spark Streaming applications tuning is needed before deploying in production. Consider the following points for better performance:
x
seconds and y
input rate and check the total delay in the streaming tab of the spark UI. If total delay is stable, you can decrease batch window and increase input rate until you get an optimized batch window needed for the business.--conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
.Use the following formulas as a rule of thumb for performance considerations:
Consumer parallelism = Number of consumers created
Spark parallelism = spark.cores.max - number of consumers
To maximize the chances of data locality, and even parallel execution, spark.cores.max
should be a multiple of the number of consumers:
Batch processing time = Number of tasks * scheduling cost + number of tasks * time -complexity per task / parallelism level
Number of tasks = Number of stages * Number of partitions
Number of partitions in the RDDs created by DStream = batchInterval / spark.streaming.blockInterval multiplied by number of receivers