Chapter 26. Performance Tuning

The performance characteristics of a distributed streaming application are often dictated by complex relationships among internal and external factors involved in its operation.

External factors are bound to the environment in which the application executes, like the hosts that constitute the cluster and the network that connects them. Each host provides resources like CPU, memory, and storage with certain performance characteristics. For example, we might have magnetic disks that are typically slow but offer low-cost storage or fast solid-state drive (SSD) arrays that provide very fast access at a higher cost per storage unit. Or we might be using cloud storage, which is bound to the capacity of the network and the available internet connection. Likewise, the data producers are often outside of the control of the streaming application.

Under internal factors, we consider the complexity of the algorithms implemented, the resources assigned to the application, and the particular configuration that dictates how the application must behave.

In this chapter, we first work to gain a deeper understanding of the performance factors in Spark Streaming. Then, we survey several strategies that you can apply to tune the performance of an existing job.

The Performance Balance of Spark Streaming

Performance tuning in Spark Streaming can sometimes be complex, but it always begins with the simple equilibrium between the batch interval and the batch processing time. We can view the batch processing time as the time cost we have to complete the processing of all the received data and any other related bookkeeping, whereas the batch interval is the budget we have allocated. Much like the financial analogy, a healthy application will fit its processing cost within the allocated budget. Although it might happen that in some particular moments when the pressure goes up, we go beyond the budget, we must see that in the longer term, our balance is preserved. An application that exceeds this time-budget balance over a long period will result in a systemic failure, usually resulting in the crash of the application due to resource exhaustion.

The Relationship Between Batch Interval and Processing Delay

A strong constraint with streaming applications, in general, is that data ingestion does not stop. In Spark Streaming, the ingestion of data occurs at regular intervals, and there are no facilities to turn it off arbitrarily. Hence, if the job queue is not empty by the time that the new batch interval starts, and new data is inserted into the system, Spark Streaming needs to finish processing the prior jobs before getting to the new data that is just entering the queue.

With only one job running at a time, we can see the following:

  • If the batch processing time is only temporarily greater than the batch interval but, in general, Spark is able to process a batch in less than the batch interval, Spark Streaming will eventually catch up and empty the job (RDD) queue.

  • If, on the other hand, the lateness is systemic and on average the cluster takes more than a batch interval to process a microbatch, Spark Streaming will keep accepting on average more data than it can remove from its storage management on every batch interval. Eventually, the cluster will run out of resources and crash.

We need then to consider what happens when that accumulation of excess data occurs for a stable amount of time. By default, RDDs that represent the data fed into the system are put into the memory of the cluster’s machines. Within that memory, the origin data—a source RDD—requires a replication, meaning that as the data is fed into the system, a second copy is created for fault tolerance, progressively, on every block interval. As a consequence, for a temporary amount of time, and until the data of this RDD is processed, that data is present in two copies in the memory of executors of the system. In the receiver model, because the data is always present in one copy on the receiver, this machine bears most of the memory pressure.

The Last Moments of a Failing Job

Eventually, if we add too much data into a spot in the system, we end up overflowing the memory of a few executors. In the receiver model, this might well be the receiver executor that happens to crash with an OutOfMemoryError. What happens next is that another machine on the cluster will be designated as the new receiver and will begin receiving new data. Because some of the blocks that were in the memory of that receiver have now been lost due to the crash, they will now be present in only the cluster in one single copy, meaning that this will trigger a reduplication of this data before processing of that data can occur. So, the existing executors in the cluster will pick up the prior memory pressure—there is no inherent relief from data lost during the crash. A few executors will be busy copying the data, and one new executor will be accepting data once again. But remember that if prior to the crash our cluster included N executors, it is now composed of N1 executors, and it is potentially slower in processing the same rhythm of data ingestion—not to mention that most executors are now busy with data replication instead of processing as usual. The batch-processing times we observed before the crash can now only be higher, and, in particular, higher than the batch interval.

In conclusion, having a batch-processing time that is, on average, higher than the batch interval has the potential of creating cascading crashes throughout your cluster. It is therefore extremely important to maintain Spark’s equilibrium in considering the batch interval as a time budget for all of the things that we might want to do during the normal functioning of the cluster.

Note

You can remove the constraint that only one job can execute at a given time by setting spark.streaming.concurrent.jobs to a value greater than the one in your Spark configuration. However, this can be risky in that it can create competition for resources and can make it more difficult to debug whether there are sufficient resources in the system to process the ingested data fast enough.

Going Deeper: Scheduling Delay and Processing Delay

Many factors can have an influence on the batch-processing time. Of course, the first and foremost constraint is the analysis that is to be performed on the data—the logic of the job itself. The running time of that computation might or might not depend on the size of the data, and might or might not depend on the values present in the data.

This purely computational time is accounted for under the name of processing delay, which is the difference between the time elapsed running the job and the time elapsed setting it up.

Scheduling delay, on the other hand, accounts for the time necessary in taking the job definition (often a closure), serializing it, and sending it to an executor that will need to process it. Naturally, this distribution of tasks implies some overhead—time that is not spent computing—so it’s wise not to decompose our workload into too many small jobs and to tune the parallelism so that it is commensurate with the number of executors on our cluster. Finally, the scheduling delay also accounts for job lateness, if our Spark Streaming cluster has accumulated jobs on its queue. It is formally defined as the time between the entrance of the job (RDD) in the job queue, and the moment Spark Streaming actually begins computation.

Another important factor influencing scheduling delay are the locality settings, in particular spark.locality.wait, which dictates how long to wait for the most local placement of the task with relation to the data before escalating to the next locality level. The following are the locality levels:

PROCESS_LOCAL

Same process Java virtual machine (JVM). This is the highest locality level.

NODE_LOCAL

Same executor machine.

NO_PREF

No locality preference.

RACK_LOCAL

Same server rack.

ANY

This is the lowest locality level, usually as a result of not being able to obtain a locality at any level above.

Checkpoint Influence in Processing Time

There are other factors that, perhaps counterintuitively, can contribute to the batch processing time, in particular checkpointing. As is discussed in Chapter 24, checkpointing is a safeguard that is necessary in the processing of stateful streams to avoid data loss while recovering from failure. It uses the storage of intermediate computation values on the disk so that in the event of a failure, data that depends on values seen in the stream since the very beginning of processing do not need to be recomputed from the data source, but only from the time of the last checkpoint. The checkpointing operation is structurally programmed by Spark as a periodic job, and, as such, the time making the checkpoint is actually considered as part of the processing delay, not the scheduling delay.

The usual checkpointing on a stateful stream for which checkpoints are usually significant, in terms of semantics and in the size of the safeguarded data, can take an amount of time much larger than a batch interval. Checkpointing durations on the order of 10 batch intervals is not unheard of. As a consequence, when making sure that the average batch-processing time is less than the batch interval, it’s necessary to take checkpointing into account. The contribution of checkpointing to the average batch-processing time is as follows:

checkpointingdelaybatchinterval*checkpointingduration

This should be added to the average computation time observed during a noncheckpointing job to have an idea of the real batch-processing time. Alternatively, another way to proceed is to compute how much time we have left in our budget (the difference between batch interval and batch-processing time) without checkpointing and tune the checkpointing interval in a function:

checkpointingdelaycheckpointingduration/(batchinterval-batchprocessingtime*)

Where * marks the measure of the batch-processing time without checkpointing.

External Factors that Influence the Job’s Performance

Finally, if all those factors have been taken into account and you are still witnessing spikes in the processing delay of your jobs, another aspect that we really need to pay attention to is the changing conditions on the cluster.

For example, other systems colocated on our cluster may impact our shared processing resources: the Hadoop Distributed File System (HDFS) is known to have had bugs in its older versions that constrained concurrent disk writes.1 Therefore, we might be running a cluster at a very stable rate, while simultaneously, a different job—that might not even be Spark related—can require heavy use of the disk. This can affect the following:

  • Data ingestion in the reliable receiver model, when using a write-ahead log (WAL)

  • Checkpointing time

  • Actions of our stream processing that involve saving data to the disk

To alleviate this issue of external impacts on our job through disk usage, we could do the following:

  • Use a distributed in-memory cache such as Alluxio2

  • Reduce disk pressure by saving structured, small data in a NoSQL database rather than on files

  • Avoid colocating more disk-intensive applications with Spark than is strictly necessary

Disk access is only one of the possible bottlenecks that could affect our job through resource sharing with the cluster. Another possibility can be network starvation or, more generally, the existence of workloads that cannot be monitored and scheduled through our resource manager.

How to Improve Performance?

In the previous section, we discussed the intrinsic and extrinsic factors that can influence the performance of a Spark Streaming job.

Let’s imagine that we are in a situation in which we developed a job and we observe certain issues that affect the performance and, hence, the stability of the job. The first step to take would be to gain insights in the different performance indicators of our job, perhaps using the techniques outlined in “Understanding Job Performance Using the Streaming UI”.

We use that information as a comparison baseline as well as guidance to use one or more of the different strategies that follow.

Tweaking the Batch Interval

A strategy that is mentioned frequently is to lengthen the batch interval. This approach might help to improve some parallelism and resource usage issues. For example, if we increase a batch interval from one minute to five minutes, we have to serialize only the tasks that are the components of our job once every five minutes instead of once every minute—a five-fold reduction.

Nonetheless, the batches of our stream will represent five minutes’ worth of data seen “over the wire” instead of one, and because most of the instability issues are caused by an inadequate distribution of our resources to the throughput of our stream, the batch interval might change little to this imbalance. More important the batch interval that we seek to implement is often of high semantic value in our analysis; if only because, as we have seen in Chapter 21, it constrains the windowing and sliding intervals that we can create on an aggregated stream. Changing these analysis semantics to accommodate processing constraints should be envisioned only as a last resort.

A more compelling strategy consists of reducing general inefficiencies, such as using a fast serialization library or implementing algorithms with better performance characteristics. We can also accelerate disk-writing speeds by augmenting or replacing our distributed filesystem with an in-memory cache, such as Alluxio. When that’s not sufficient, we should consider adding more resources to our cluster, letting us distribute the stream on more executors by correspondingly augmenting the number of partitions that we use through, for example, block interval tuning.

Limiting the Data Ingress with Fixed-Rate Throttling

If getting more resources is absolutely impossible, we need to look at reducing the number of data elements that we must deal with.

Since version 1.3, Spark includes a fixed-rate throttling that allows it to accept a maximum number of elements. We can set this by adding spark.streaming.receiver.maxRate to a value in elements per second in your Spark configuration. Note that for the receiver-based consumers, this limitation is enforced at block creation and simply refuses to read any more elements from the data source if the throttle limit has been reached.

For the Kafka direct connector, there’s a dedicated configuration spark.streaming.kafka.maxRatePerPartition that sets the max rate limit per partition in the topic in records per second. When using this option, be mindful that the total rate will be as follows:

maxRatePerPartition*partitionspertopic*batchinterval

Note that this behavior does not, in and of itself, include any signaling; Spark will just let a limited amount of elements, and pick up the reading of new elements on the next batch interval. This has consequences on the system that is feeding data into Spark:

  • If this is a pull-based system, such as in Kafka, Flume, and others, the input system could compute the number of elements read and manage the overflow data in a custom fashion.

  • If the input system is more prosaically a buffer (file buffer, TCP buffer), it will overflow after a few block intervals (because our stream has a large throughput than the throttle) and will periodically be flushed (deleted) when this happens.

As a consequence, throttled ingestion in Spark can exhibit some “jitter” in the elements read, because Spark reads every element until an underlying TCP or file buffer, used as a queue for “late” elements, reaches capacity and is flushed as a whole. The effect of this is that the input stream is separated in large intervals of processed elements interspersed with “holes” (dropped elements) of a regular size (e.g., one TCP buffer).

Backpressure

The queue-based system we have described with fixed-rate throttling has the disadvantage that it makes it obscure for our entire pipeline to understand where inefficiencies lie. Indeed, we have considered a data source (e.g., a TCP socket) that consists of reading data from an external server (e.g., an HTTP server), into a local system-level queue (a TCP buffer), before Spark feeds this data in an application-level buffer (Spark Streaming’s RDDs). Unless we use a listener tied to our Spark Streaming receiver, it is challenging to detect and diagnose that our system is congested and, if so, where the congestion occurs.

The external server could perhaps decide, if it was aware that our Spark Streaming cluster is congested, to react on that signal and use its own approach to either delay or select the incoming elements to Spark. More important, it could make the congestion information flow back up the stream to the data producers it depends on, calling every part of the pipeline to be aware of and help with the congestion. It would also allow any monitoring system to have a better view of how and where congestion happens in our system helping with resource management and tuning.

The upstream-flowing, quantified signal about congestion is called backpressure. This is a continuous signaling that explicitly says how many elements the system in question (here, our Spark Streaming cluster) can be expected to process at this specific instant. Backpressure signaling has an advantage with respect to throttling because it is set up as a dynamic signal that varies in function to the influx of elements and the state of the queue in Spark. As such, it does not affect the system if there is no congestion, and it does not require tuning of an arbitrary limit, avoiding the associated risks in misconfiguration (underused resources if the limit is too restrictive; overflow if the limit is too permissive).

This approach has been available in Spark since version 1.5 and can, in a nutshell, provide dynamic throttling.

Dynamic Throttling

In Spark Streaming, dynamic throttling is regulated by default with a Proportional-Integral-Derivative (PID) controller, which observes an error signal as the difference between the latest ingestion rate, observed on a batch interval in terms of number of elements per second, and the processing rate, which is the number of elements that have been processed per second. We could consider this error as the imbalance between the number of elements coming in and the number of elements going out of Spark at the current instant (with an “instant” rounded to a full batch interval).

The PID controller then aims at regulating the number of ingested elements on the next batch interval by taking into account the following:

  • A proportional term (the error at this instant)

  • An integral or “historic” term (the sum of all errors in the past; here, the number of unprocessed elements lying in the queue)

  • A derivative or “speed” term (the rate at which the number of elements has been diminishing in the past)

The PID then attempts to compute an ideal number depending on these three factors.

Backpressure-based dynamic throttling in Spark can be turned on by setting spark.streaming.backpressure.enabled to true in your Spark configuration. Another variable spark.streaming.backpressure.initialRate dictates the number of elements per second the throttling should initially expect. You should set it slightly above your best estimate of the throughput of your stream to allow the algorithm to “warm up.”

Note

The approach of focusing on backpressure to deal with congestion in a pipelined system is inspired by the Reactive Streams specification, an implementation-agnostic API intended to realize a manifesto on the advantages of this approach, backed by numerous industry players with a stake in stream processing, including Netflix, Lightbend, and Twitter.

Tuning the Backpressure PID

PID tuning is a well-established and vast subject, the scope of which is beyond this book, but the Spark Streaming user should have an intuition of what this is used for. The proportional term helps with dealing with the current snapshot of the error, the integral term helps the system to deal with the accumulated error until now, and the derivative term helps the system either avoid overshooting for cases in which it is correcting too fast, or undercorrection in case we face a brutal spike in the throughput of stream elements.

Each of the terms of the PID has a weight factor attached to it, between 0 and 1, as befits a classical implementation of PIDs. Here are the parameters that you need to set in your Spark configuration:

spark.streaming.backpressure.pid.proportional
spark.streaming.backpressure.pid.integral
spark.streaming.backpressure.pid.derived

By default, Spark implements a proportional–integral controller, with a proportional weight of 1, an integral weight of 0.2, and a derivative weight of 0. This offers a sensible default in Spark Streaming applications where the stream throughput varies relatively slowly with respect to the batch interval, and is easier to interpret: Spark aims to ingest no more than the last rate of processing allowed, with a “buffer” for processing one-fifth of the late elements on each batch. Note, however, that if you are faced with a fast-changing stream with an irregular throughput, you might consider having a nonzero derivative term.

Custom Rate Estimator

The PID estimator is not the only rate estimator that we can implement in Spark. It is an implementation of the RateEstimator trait, and the particular implementation can be swapped by setting the value of spark.streaming.backpressure.rateEstimator to your class name. Remember that you will need to include the class in question in the Spark classpath; for example, through the --jars argument to spark-submit.

The RateEstimator trait is a serializable trait that requires a single method:

  def compute(
      time: Long,
      elements: Long,
      processingDelay: Long,
      schedulingDelay: Long): Option[Double]
}

This function should return an estimate of the number of records the stream attached to this RateEstimator should ingest per second, given an update on the size and completion times of the latest batch. You should feel free to contribute an alternative implementation.

A Note on Alternative Dynamic Handling Strategies

Throttling in Spark, dynamic or not, is expressed in the InputDStream classes, which include ReceiverInputDStream for the receiver model and DirectKafkaInputDStream for the Kafka direct receiver. These implementations currently both have a simple way of dealing with excess elements: they are neither read from the input source (ReceiverInputDStream) nor consumed from the topic (DirectKafkaInputDStream).

But it would be reasonable to propose several possible alternative implementations based on the backpressure signal received at the InputDStream. We could imagine policies such as taking the first, largest, or smallest elements, or a random sample.

Sadly, the rateController: RateController member of these classes is protected[streaming], but this member has a getLatestRate function that lets the DStream implementation receive the relevant limit at any instant. Any implementation of a custom DStream could thus take inspiration from the nonpublic but open source methods of rate control to help dealing with congestion in a better way.

Caching

Caching in Spark Streaming is a feature that, when well manipulated, can significantly speed up the computation performed by your application. This seems to be counterintuitive given that the base RDDs representing the data stored in the input of a computation are actually replicated twice before any job runs on them.

However, over the lifetime of your application, there might be a very long pipeline that takes your computation from those base RDDs to some very refined and structured representations of the data that usually involves a key–value tuple. At the end of the computation performed by your application, you are probably looking at doing some distribution of the output of your computation into various outlets: data stores or databases such as Cassandra, for example. That distribution usually involves looking at the data computed during the previous batch interval and finding which portions of the output data should go where.

A typical use case for that is to look at the keys in the RDD of the structured output data (the last DStream in your computation), to find exactly where to put the results of your computation outside of Spark, depending on these keys. Another use case would be to look for only some specific elements on the RDD received during the last batch. Indeed, your RDD might actually be the output of a computation that depends not only on the last batch of data, but on many prior events received since the start of the application. The last step of your pipeline might summarize the state of your system. Looking at that RDD of output structured results, we might be searching for some elements that pass certain criteria, comparing new results with previous values, or distributing data to different organizational entities, to name a few cases.

For example, think of anomaly detection. You might compute some metrics or features on values (users or elements that you are monitoring on a routine basis). Some of those features might reveal some problems or that some alerts need to be produced. To output those to an alerting system, you want to find elements that pass some criteria in the RDD of data that you’re currently looking at. To do that, you are going to iterate over the RDD of results. Beside the alerting, you might also want to publish the state of your application to, for example, feed a data visualization or a dashboard, informing you on more general characteristics of the system that you are currently surveying.

The point of this thought exercise is to envision that computing on an output DStream involves several operations for each and every RDD that composes the final result of your pipeline, despite it being very structured and probably reduced in size from the input data. For that purpose, using the cache to store that final RDD before several iterations occur on it is extremely useful.

When you do several iterations on a cached RDD, while the first cycle takes the same time as the noncached version, while each subsequent iteration takes only a fraction of the time. The reason for that is that although the base data of Spark Streaming is cached in the system, intermediate steps need to be recovered from that base data along the way, using the potentially very long pipeline defined by your application. Retrieving that elaborated data takes time, in every single iteration that is required to process the data as specified by your application, as shown here:

dstream.foreachRDD{ (rdd)  =>
  rdd.cache()
  keys.foreach{ (key) =>
    rdd.filter(elem=> key(elem) == key).saveAsFooBar(...)
  }
  rdd.unpersist()
}

As a consequence, if your DStream or the corresponding RDDs are used multiple times, caching them significantly speeds up the process. However, it is very important to not overtax Spark’s memory management and assume RDDs of a DStream will naturally fall out of cache when your DStream moves to the next RDD, after a batch interval. It is very important that at the end of the iteration over every single RDD of your DStream you think of unpersisting the RDD to let it fall out of cache.

Otherwise, Spark will need to do some relatively clever computation to try to understand which pieces of data it should retain. That particular computation might slow down the results of your application or limit the memory that would be accessible to it.

One last point to consider is that you should not use cache eagerly and everywhere. The cache operation has a cost that might outweigh the benefits if the cached data is not used enough times. In summary, cache is a performance-boosting function that should be used with care.

Speculative Execution

Apache Spark deals with straggler jobs, whether in streaming or batch execution, using speculative execution. This mechanism uses the fact that Spark’s processing puts the same task in the queue of every worker at the same time. As such, it seems reasonable to estimate that workers should require more or less the same amount of time to complete one task. If that is not the case, it’s most often because of one of two reasons:

  • Either our dataset is suffering from data skew, in which a few tasks concentrate most of the computation. This is in some cases normal,3 but in most instances a bad situation, that we will want alleviate (e.g., via shuffling our input).

  • Or a particular executor is slow because it’s that executor, presenting a case of bad hardware on the node, or if the node is otherwise overloaded in the context of a shared cluster.

If Spark detects this unusually long execution time and has resources available, it has the ability to relaunch on another node the task that is currently running late. This speculative task (which speculates that something has gone wrong with the original) will either finish first and cancel the old job, or be canceled as soon as the former one returns. Overall, this competition between the “tortoise and the hare” yields a better completion time and usage of available resources.

Speculative execution is responsive to four configuration parameters, listed in Table 26-1.

Table 26-1. Speculative execution configuration parameters
Option Default Meaning

spark.speculation

false

If set to “true,” performs speculative execution of tasks

spark.speculation.interval

100ms

How often Spark will check for tasks to speculate

spark.speculation.multiplier

1.5

How many times slower a task is than the median to be considered for speculation

spark.speculation.quantile

0.75

Fraction of tasks that must be complete before speculation is enabled for a particular stage

1 You can refer to HDFS-7489 for an example of one of those subtle concurrency issues.

2 Alluxio was originally named Tachyon and was part of the Spark code base, which hints at how complementary its features are to data processing with Spark.

3 For example, in anomaly detection inference, the executor detecting an anomalous value sometimes has duties of alerting that are an additional burden on top of the regular node duties.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset