Chapter 21. Time-Based Stream Processing

As we have hinted at previously, and as we have shown in previous transformations, Spark Streaming offers the capability of building time-based aggregates of data. In contrast with Structured Streaming, the out-of-the-box capabilities of Spark Streaming in this area are limited to processing time, which, if you recall from “The Effect of Time”, is the time when the streaming engine processes the events.

In this chapter, we are going to look into the different aggregation capabilities of Spark Streaming. Although they are constrained to the processing-time domain, they provide rich semantics and can be helpful to process data in a scalable and resource-constrained way.

Window Aggregations

Aggregation is a frequent pattern in stream data processing, reflecting the difference in concerns from the producers of the data (at the input) and the consumers of data (at the output).

As discussed in “Window Aggregations”, the concept of a window of data over time can help us to create aggregates that span large periods of time. The Spark Streaming API offers definitions for the two generic window concepts presented in that section, tumbling and sliding windows, and provides specialized reduce functions that operate over windows to limit the amount of intermediate memory required to execute a given aggregation over a period of time.

In the next pages, we are going to explore the windowing capabilities of Spark Streaming:

  • Tumbling windows

  • Sliding windows

  • Window-based reductions

Tumbling Windows

In Spark Streaming, the most basic window definition is the window(<time>) operation on DStreams. This DStream transformation creates a new windowed DStream that can further transform to implement our desired logic.

Assuming a DStream of hashtags, we can do this with tumbling windows:

val tumblingHashtagFrequency = hashTags.window(Seconds(60))
                                       .map(hashTag => (hashTag,1))
                                       .reduceByKey(_ + _)

In the window operation, before the map and reduceByKey steps—that we now know do a simple counting—we are reprogramming the segmentation of our DStream into RDDs. The original stream, hashTags, follows a strict segmentation according to the batch interval: one RDD per batch.

In this case, we are configuring the new DStream, hashTags.window(Seconds(60)), to contain one RDD per 60 seconds. Every time the clock ticks 60 seconds, a new RDD is created on the cluster’s resources, independently of the previous elements of the same windowed DStream. In that sense, the window is tumbling as explained in “Tumbling Windows”: every RDD is 100% composed of new “fresh” elements read on the wire.

Window Length Versus Batch Interval

Because the creation of a windowed stream is obtained by coalescing the information of several RDDs of the original stream into a single one for the windowed stream, the window interval must be a multiple of the batch interval.

Naturally, any window length that’s a multiple of the initial batch interval can be passed as an argument. Hence, this kind of grouped stream allows the user to ask questions of the last minute, 15 minutes, or hour of data—more precisely, of the k-th interval of the windowed streaming computation runtime.

One important observation that takes new users by surprise: the window interval is aligned with the start of the streaming application. For example, given a 30-minute window over a DStream with a batch interval of two minutes, if the streaming job starts at 10:11, the windowed intervals will be computed at 10:41, 11:11, 11:41, 12:11, 12:41, and so on.

Sliding Windows

Although the information of “what were the most popular hashtags each 10 minutes during a famous sporting event” is interesting for forensics or future predictions, it is not the kind of question one usually asks during that event. Neither is a tumbling window the relevant time frame when detecting anomalies. In that case, aggregates are usually necessary because the values being observed have frequent but often small, and therefore meaningless fluctuations, that require the context provided by additional data points for analysis. The price of a stock or the temperature of a component have small fluctuations that should not be observed individually. The actual trends become visible by observing a series of recent events.

It is often useful to look at a different type of aggregate that presents data over a relatively large period, while keeping part of it fresh—sliding windows:

val slidingSums = hashTags.window(Seconds(60), Seconds(10))
                          .map(hashTag => (hashTag, 1))
                          .reduceByKey(_ + _)

In this example, we are describing a different type of DStream to compute the most frequent hashtags on: this time, a new RDD is produced every 10 seconds. In this alternative of the window function, the first argument, named windowDuration, determines the length of the window, whereas the second argument, called slideDuration, dictates how often we want to observe a new window of data.

Coming back to the example, the resulting windowed DStream will contain RDDs that present the result of the computation over the latest 60 seconds of data, produced every 10 seconds.

This sliding view makes it possible to implement a monitoring application, and it is not infrequent to see the data produced by such a stream be sent to a dashboard after processing. In that case, naturally, the refresh rate of the dashboard is linked to the slide interval.

Sliding Windows Versus Batch Interval

Again, because the RDDs of the output function are obtained by coalescing the input RDDs of the original DStream, it is clear that the slide interval needs to be a multiple of the batch interval, and that the window interval needs to be a multiple of the slide interval.

For example, using a streaming context with a batch interval of 5 seconds, and a base DStream called stream , the expression stream.window(30, 9) is illegal because the slide interval is not a multiple of the batch interval. A correct window specification would be stream.window(30, 10). The expression stream.window(Seconds(40), Seconds(25)) is equally invalid, despite the window duration and slide interval being a multiple of the batch interval. That’s because the window interval must be a multiple of the slide interval. In this case, stream.window(Seconds(50), Seconds(25)) is a correct window duration and sliding interval definition.

In a nutshell, the batch interval can be seen as an “indivisible atom” of the time interval of windowed DStreams.

Finally, note that the sliding window needs to be smaller than the window length for the computation to make sense. Spark will output runtime errors if one of these constraints is not respected.

Sliding Windows Versus Tumbling Windows

The trivial case of a sliding window is that one in which the slide interval is equal to the window length. In this case, you’ll notice that the stream is the equivalent to the tumbling windows case presented earlier, in which the window function has only the windowDuration argument. This corresponds precisely to the semantics implemented internally by the window function.

Using Windows Versus Longer Batch Intervals

You could wonder, looking at the simple tumbling windows, why it would be necessary to use windowed streams in tumbling mode rather than simply increase the batch interval: after all, if a user wants the data aggregated per minute, isn’t that exactly what the batch interval is made for? There are several counterarguments to this approach:

Multiple aggregation needs

It sometimes happens that users want to see data computed upon in different increments, which would require extracting two particular frequencies of looking at the data. In that case, because the batch interval is indivisible and is the source of the aggregation for other windows, it is best set to the smallest of the necessary latencies. In mathematical terms, if we want multiple windows of our data, say of durations x, y, and z, we want to set our batch interval to the greatest common divisor (gcd) of them: gcd(x,y,z).

Safety and locality

The batch interval is not only the source of the division of a DStream into RDDs. For receiver-based sources, the batch interval also plays a role in how its data is replicated and moved across the network. If, for example, we have a cluster of eight machines, each with four cores, because we might want to have on the order of two partitions per core, we want to set the block interval so that there are 32 block intervals per batch. It is the block interval that determines the clock tick at which data received in Spark is considered for replication by the block manager. Hence, when the batch interval grows, the block interval, which should be configured to a fraction of the batch interval, can grow, as well, and would make the system more susceptible to a crash that would compromise data ingestion (e.g., if a receiver machine dies). For example, with a large batch interval of one hour, and a block interval of two minutes, the potential data loss in case of a receiver crash is of maximum two minutes, which depending on the data frequency, might be inappropriate. We can mitigate this risk by using reliable receivers that use the write-ahead log (WAL) to avoid data loss, yet that comes at the cost of extra overhead and storage.

For the case in which we would want an aggregation of one hour, a tumbling window of one hour, based on a source DStream with batch intervals of five minutes, gives a block interval a little bit under 10 seconds, which would make for a lesser potential data loss.

In sum, keeping a batch interval of a reasonable size increases the resiliency of the cluster’s setup.

Window Reductions

Toward the end of building a complex pipeline, we often want to see indicators of our data that are inherently something that depends on various notions of time. We expect to see, for example, the count of visitors on our website or cars passing through a crossroad during the past 15 minutes, during the past hour, and during the previous day.

Those three pieces of information can all be computed based on counts on a windowed DStream, which we have already seen in this chapter. Although the window-based functions provide us with the primitives that we need to produce aggregates over different periods of time, they also require us to maintain all of the data for the specified period. For example, to produce a 24-hour aggregation, the window functions we know so far would require us to keep 24 hours’ worth of data in storage (memory and/or disk—depending on the DStream configuration).

Imagine that we want the total of user visits to our site during that 24-hour period. We don’t really need to keep each individual record for 24 hours to then count. Instead, we can have a running count and add new data as it comes in. This is the intuition behind window-based reductions. The provided function—which is assumed to be associative—is applied to each new microbatch of data and then, the result is added to the aggregation maintained by the window DStream. Instead of keeping potentially large amounts of data around, we aggregate it as it comes into the system, providing a scalable aggregation that uses minimal memory resources.

The windowed reducers family of functions in Spark Streaming take a reducer function together with the parameters for a window definition that we learned earlier. The following sections discuss a few such reducers:

reduceByWindow

reduceByWindow takes a reduce function, the window duration, and the slide duration. The reduce function must combine two elements of the original DStream and produce a new combined element of the same type. slideDuration can be omitted to produce a tumbling window of windowDuration length:

 def reduceByWindow(
      reduceFunc: (T, T) => T,
      windowDuration: Duration,
      slideDuration: Duration
    ): DStream[T]

reduceByKeyAndWindow

reduceByKeyAndWindow is defined only in pair DStreams—a DStream of (Key,Value) tuples. It requires similar parameters as reduceByWindow but the reduce function applies to the values of the DStream. This operation is potentially more useful than its sibling reduceByWindow because we can have keys to indicate what values we are dealing with:

def reduceByKeyAndWindow(
      reduceFunc: (V, V) => V,
      windowDuration: Duration,
      slideDuration: Duration
    ): DStream[(K, V)]

Coming back to our example of hashtags, we could implement a daily aggregate of hashtag frequency by using the reduceByKeyAndWindow function:

val sumFunc: Long => Long => Long = x => y => x+y
val reduceAggregatedSums = hashTags.map(hashTag => (hashTag, 1))
                      .reduceByKeyAndwindow(sumFunc, Seconds(60), Seconds(10))

countByWindow

countByWindow is a specialized form of reduceByWindow in which we are interested only in the count of elements in the DStream over a window of time. It answers the question: how many events were received in the given window?

  def countByWindow(
      windowDuration: Duration,
      slideDuration: Duration): DStream[Long]

countByWindow uses the same windowDuration and slideDuration parameters that we saw defined in “Sliding Windows”.

countByValueAndWindow

countByValueAndWindow is a grouped variant of the countByWindow operation just mentioned.

def countByValueAndWindow(
      windowDuration: Duration,
      slideDuration: Duration,
      numPartitions: Int = ssc.sc.defaultParallelism)
      (implicit ord: Ordering[T] = null) : DStream[(T, Long)]

countByValueAndWindow uses the values in the original DStream as keys for counting. It makes our hashtag example rather trivial:

val sumFunc: Long => Long => Long = x => y => x+y
val reduceAggregatedSums =
  hashTags.countByValueAndWindow(Seconds(60), Seconds(10))

Internally, it is doing similar steps to our earlier example: creating tuples with the form (value, 1L) and then using reduceByKeyAndWindow over the resulting DStream. As the name gives away, we use countByValueAndWindow to count the occurrences of each value in the original DStream for each window specified.

It uses the same windowDuration and slideDuration parameters that we saw defined in “Sliding Windows”

Invertible Window Aggregations

The reduceByWindow and reduceByKeyAndWindow functions contain an additional fourth argument as an optional parameter. That argument is called the inverse reduce function. It matters only if you happen to use an aggregation function that is invertible, meaning you can “take away” an element from the aggregate.

Formally, the inverse function invReduceFunc is such that for any accumulated value y, and an element x: invReduceFunc(reduceFunc(x, y), x) = y.

Behind the scenes, this invertible notion lets Spark simplify the computation of our aggregates by letting Spark compute over the elements of a couple of slide intervals on each new sliding window rather than of the full contents of the window.

For example, suppose that we aggregate integer counts that we see with a batch interval of one minute, over a window of 15 minutes, sliding every minute. If you are not specifying an inverse reduce function, you need to add every new count over 15 minutes for summarizing the data that you have seen on your DStream. We sketch this process in Figure 21-1.

spas 2101
Figure 21-1. The aggregation of a reduceByWindow with a noninvertible function

This works, but if we see 100,000 elements in a minute, we are summing 1.5 million data points in the window for every minute—and, more important, we need to store those 1.5 million elements in memory. Can we do better?

We could remember the count over the previous 15 minutes and consider that we have a new minute of data incoming. Taking the count over those previous 15 minutes, we could subtract the count for the oldest minute of that 15-minute aggregate, because a counting (summation) function is invertible. We subtract the count for the oldest minute, resulting in a 14-minute aggregate, to which we just need to add the newest one minute of data, giving us the last 15 minutes of data. Figure 21-2 shows this process at work.

spas 2102
Figure 21-2. The aggregation of a reduceByWindow with an invertible function

The interesting part of this is that we do not need to store 1.5 million data points; rather, we only need the intermediate counts for every minute—that is, 15 values.

As you can see, having an invertible function for reduction can be tremendously useful in the case of windowed aggregations. It is also something that we can make the generation of various DStreams created by reduceByWindow inexpensive, giving us an excellent way to share information and aggregates on the values that we want to consider when analyzing our stream even over long aggregation periods.

Caution

The inverse of the aggregation function is useless for tumbling windows, since every aggregation interval is entirement disjoint from the others. It is therefore not worth bothering with this option if you do not use a slide interval!

Slicing Streams

Finally, note Spark’s DStreams also have a selection function called slice, which returns the particular subpart of a DStream that was included between two bounds. You can specify the bounds using a beginning and end Time, which corresponds to Spark’s org.apache.spark.streaming.Time or as an org.apache.spark.streaming.Interval. Both are simple reimplementations of time arithmetic using milliseconds as the base unit—leaving the user with a decent expressivity.

Spark will produce sliced DStreams by letting through the elements that carry the correct timestamp. Note also that slice produces as many RDDs as there are batch intervals between the two boundaries of the DStream.

What If Your Slice Specification Doesn’t Fit Neatly into Batches?

The timing of an RDD’s original batch and the output of the slice might not exactly match if the beginning and end time are not aligned with the original DStream’s batch interval ticks. Any change in timings will be reflected in logs at the INFO level:

INFO Slicing from [fromTime] to [toTime]
  (aligned to [alignedFromTime] and [alignedToTime])

Summary

In this chapter, we looked at the capabilities of Spark Streaming to create and process windows of data from a DStream. You are now able to do the following:

  • Express a tumbling and a sliding window using the DStream API

  • Count elements in a window, including using keys in the data to group the counts

  • Create window-based counts and aggregations

  • Use the optimized reduce versions that exploit function inversibility to drastically reduce the amount of internal memory used

Window aggregations let us observe trends in the data as it unfolds over periods of time much longer than a batch interval. The tools you just learned empower you to apply these techniques in Spark Streaming.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset