Advanced concepts of Spark Streaming

Let's go through some of the important advanced concepts of Spark Streaming.

Using DataFrames

We learned Spark SQL and DataFrames in Chapter 4, Big Data Analytics with Spark SQL, DataFrames, and Datasets. There are many use cases where you want to convert DStream and DataFrame to do interactive analytics. RDDs generated by DStreams can be converted to DataFrames and queried with SQL internally within the program or from external SQL clients as well. Refer to the sql_network_wordcount.py program in /usr/lib/spark/examples/lib/streaming for implementing SQL in a Spark Streaming application. You can also start JDBC server within the application with the following code:

HiveThriftServer2.startWithContext(hiveContext)

Temporary tables can now be accessed from any SQL client such as beeline to query the data.

MLlib operations

It is easy to implement machine learning algorithms in Spark Streaming applications. The following Scala code trains a KMeans clustering model with training data offline and then applies the model on DStreams in real-time coming from Kafka:

val model = KMeans.train(training_data, ...)

val DStream = KafkaUtils.createDStream(...)
DStream.map { event => model.predict(featurize(event)) }

Caching/persistence

Persistence levels available on RDDs can be applied on DStreams as well using the persist() method. This is needed if multiple actions are applied on the same DStream multiple times.

With window-based operations and state-based operations like updateStateByKey, DStreams are automatically persisted in memory with serialization by default. DStreams created by receiving the data over the network are always replicated twice for fault tolerance.

Fault-tolerance in Spark Streaming

There are two kinds of failures in any Spark Streaming application: failure of executor or failure of driver. Let's understand how fault recovery is achieved in these two scenarios.

Failure of executor

Executors can fail because of hardware or software issues. If an executor fails, all tasks running on the executor will fail and all in-memory data stored in the executor JVM will be lost. If a receiver is running on this node, all the blocks that are buffered but not processed yet will be lost. Spark automatically handles these failures by placing a new receiver on a new node and tasks are restarted on block replicas as shown in Figure 5.7.

Failure of executor

Figure 5.7: Spark Streaming behavior in executor failure

Failure of driver

If a driver fails, all executors will fail including the computation and replicated in-memory blocks. There are two ways to recover from driver failures: recovering with checkpoint and recovering with the WAL. Typically, both are needed for zero data loss.

Recovering with checkpointing

A Spark application must checkpoint data to a storage system like HDFS to recover from failures. There are two types of data stored in the checkpoint directory; metadata and data. Metadata is configuration for an application, DStream operations, and incomplete batch information. Data is nothing more than the storage of RDD content. Metadata checkpointing is for driver recovery and data checkpointing is for recovery of stateful transformations. Recovering driver failures with checkpointing is illustrated in Figure 5.8.

Recovering with checkpointing

Figure 5.8: Spark Streaming behavior in driver failure with checkpointing

The following points must be considered when enabling checkpointing:

  • Checkpointing RDDs to reliable, external storage slows down the processing time of the batches
  • With a batch interval size of 1 second, checkpointing every batch will significantly slow down operation throughput
  • Checkpointing too infrequently causes the lineage and task sizes to grow too large with detrimental effects
  • Ensure that a checkpoint interval of 5-10 times the sliding interval has been set
  • For stateful transformations, the default checkpoint interval will be at least 10 seconds and will be a multiple of the batch interval

A driver can be automatically restarted with the --supervise option in the standalone master. In YARN, the driver is automatically restarted with the configuration parameter yarn.resourcemanager.am.max-attempts.

Recovering with WAL

When a Spark Streaming application recovers from driver failure, blocks received, but not yet processed by the receiver, will be lost. Enabling WAL reduces this loss. To enable WAL, set the following configuration property:

sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

Enabling WAL provides fault recovery, but it reduces the performance. Consider using the Kafka direct API wherever possible since it does not need WAL.

Performance tuning of Spark Streaming applications

Spark Streaming applications tuning is needed before deploying in production. Consider the following points for better performance:

  • Batch window—Batch window depends on the needs of the application, input rate, and scheduling delay. Start with a batch window of x seconds and y input rate and check the total delay in the streaming tab of the spark UI. If total delay is stable, you can decrease batch window and increase input rate until you get an optimized batch window needed for the business.
  • Increase throughput by increasing the number of receivers and increasing parallelism while processing.
  • Garbage collection and memory usage:
    • Using CMS garbage collector --conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC.
    • Caching RDDs in serialized form.
    • Using Kryo Serialization.
    • Using more executors with a smaller heap size to reduce garbage collector impact.
    • Estimate and allocate sufficient memory needed by the application. For example, if you are using a window operation for 5 minutes, you need to allocate memory for storing and processing 5 minutes' worth of data.

Use the following formulas as a rule of thumb for performance considerations:

Consumer parallelism = Number of consumers created

Spark parallelism = spark.cores.max - number of consumers

To maximize the chances of data locality, and even parallel execution, spark.cores.max should be a multiple of the number of consumers:

Batch processing time = Number of tasks * scheduling cost + number of tasks * time -complexity per task / parallelism level

Number of tasks = Number of stages * Number of partitions

Number of partitions in the RDDs created by DStream = batchInterval / spark.streaming.blockInterval multiplied by number of receivers

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset