Chapter 22. Arbitrary Stateful Streaming Computation

So far, we have seen how Spark Streaming can work on the incoming data independently of past records. In many applications, we are also interested in analyzing the evolution of the data that arrives with respect to older data points. We could also be interested in tracking changes generated by the received data points. That is, we might be interested in building a stateful representation of a system using the data that we have already seen.

Spark Streaming provides several functions that let us build and store knowledge about previously seen data as well as use that knowledge to transform new data.

Statefulness at the Scale of a Stream

Functional programmers like functions without statefulness. These functions return values that are independent of the state of the world outside their function definition, caring only about the value of their input.

However, a function can be stateless, care only about its input, and yet maintain a notion of a managed value along with its computation, without breaking any rules about being functional. The idea is that this value, representing some intermediate state, is used in the traversal of one or several arguments of the computation, to keep some record simultaneously with the traversal of the argument’s structure.

For example, the reduce operation discussed in Chapter 17 keeps one single value updated along the traversal of whichever RDD it was given as an argument:

val streamSums = stream.reduce {
  case (accum, x) => (accum + x)
}

Here, the computation of the intermediate sums for each RDD along the input DStream is made by iterating over the elements of the RDD, from left to right, and keeping an accumulator variable updated—an operation specified thanks to the update operation that returns the new value of the accumulator (between brackets).

updateStateByKey

Sometimes it is useful to compute some result that depends on the previous elements of the stream, which took place more than one batch before the current one. Examples of such cases include the following:

  • The running sum of all elements of the stream

  • The number of occurrences of a specific, marker value

  • The highest elements encountered in the stream, given a particular ordering of the elements of the stream

This computation can often be thought of as the result of a big reduce operation, that would update some representation of the state of a computation, all along the traversal of the stream. In Spark Streaming, this is offered by the updateStateByKey function:

  def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S]
    ): DStream[(K, S)]

updateStateBykey is an operation that is defined only on DStreams of key–value pairs. It takes a state update function as an argument. This state update function should have the following type:

`Seq[V] -> Option[S] -> Option[S]`

This type reflects how the update operation takes a set of new values for type V, which corresponds to all values for a given key that arrive during the operation of the current batch, and an optional state represented with type S. It then computes and returns a new value for the state S as Some(state) if there is one to return or None if there’s no new state, in which case, the stored state corresponding to this key is deleted from the internal representation:

def updateFunction(Values: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = runningCount.getOrElse(0) + Values.filter(x => x >5).length
    if (newCount > 0)
      Some(newCount)
    else
      None
}

The update state function is called, on each batch, on all of the keys that the executor encounters since the beginning of processing this stream. In some cases, this is on a new key that was never previously seen. This is the case in which the second argument of the update function, the state, is None. In other cases, it will be on a key for which no new values have come in this batch, in which case the first argument of the update function, the new value, is Nil.

Finally, the updateStateByKey function returns a value (that is, a snapshot of the new state for a particular key), only when the user’s updates mandate it should. This explains the Option in the return type of the function: in the previous example, we update the state only when we actually encounter integers greater than five. If a particular key encounters only values less than five, there will be no state created for this key, and correspondingly no update.

Figure 22-1 depicts the dynamics of the internal state preserved when using a stateful computation such as updateStateByKey. The intermediate state of the stream is kept in an internal state store. At each batch interval, the internal state is combined with the new data coming from the stream using the updateFunc function, producing a secondary stream with the current results of the stateful computation.

spas 2201
Figure 22-1. The data flow produced by updateStateByKey

Mandatory Checkpointing on Stateful Streams

Note that when starting a Spark Streaming Context for this application, Spark might output the following error:

java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()

The reason for this is that the StateStream created under the hood by updateStateByKey has RDDs that inherently each have a dependency on the previous ones, meaning that the only way to reconstruct the chain of partial sums at each batch interval for each hashtag is to replay the entire stream. This does not play well with fault tolerance because we would need to preserve every record received to be able to recreate the state at some arbitrary point in time. Instead of preserving all records, we save the intermediary result of state to disk. In case one of the executors working on this stream crashes, it can recover from this intermediate state.

Luckily, the error tells us exactly how to do so, using ssc.checkpoint("path/to/checkpoint/dir"). Replace the content of the String passed as an argument with a directory in a shared filesystem, accessible by the driver and all executors that are part of the job.

Limitation of updateStateByKey

The updateStateByKey function that we have described so far allows us to do stateful programming using Spark Streaming. It allows us to encode, for example, the concept of user sessions—for which no particular batch interval is a clear match to the application at hand. However, there are two problems with this approach. Let’s look at those a bit more closely.

Performance

The first problem is related to performance: this updateStateByKey function is run on every single key encountered by the framework of the application since the beginning of the run. This is problematic because, even on a dataset that is somewhat sparse—provided there is a long tail to the variety of the data and in particular, in the variety of its keys—there is a clear argument that the total amount of data represented in memory grows indefinitely.

For example, if a key or a particular user is seen on a website at the beginning of the run of the application, what is the relevance of updating the state of that user to signify that we have not seen a session from this particular individual since the beginning of the application (e.g., since last month)? The benefits to the application are not clear.

Memory Usage

The second issue is that because the state should not grow indefinitely, the programmer must do the bookkeeping of memory by themselves—writing code for every key, to figure out whether it is still relevant to keep data in state for that particular element. This is a complexity that requires manual accounting for memory management.

Indeed, for most stateful computations, dealing with state is a simple operation: either a key is still relevant, such as a user who has visited a website within a certain time frame, or it hasn’t been refreshed in a while.

Introducing Stateful Computation with mapwithState

mapWithState is a better model for stateful updates in Spark, which overcomes the two aforementioned shortcomings: updating every key and setting default timeouts to limit the size of the state objects created along with the computation. It was introduced in Spark 1.5:

  def mapWithState[StateType: ClassTag, MappedType: ClassTag](
      spec: StateSpec[K, V, StateType, MappedType]
    ): MapWithStateDStream[K, V, StateType, MappedType]

mapWithState requires you to write a StateSpec function that operates on a state specification containing a key, an optional value, and a State object. Even though this is apparently more complex, because it involves a few explicit types, it simplifies a number of elements:

  • The programmer operates on values one by one rather than as a list

  • The update function has access to the key itself

  • This update is run only on keys that have a new value in the current batch

  • Updating the state is an imperative call to a method of the state object, rather than the implicit act of producing an output

  • The programmer can now produce an output independently of state management

  • The function has an automatic timeout

Figure 22-2 presents the data flow when using the mapWithState function.

spas 2202
Figure 22-2. The dataflow produced by mapWithState

If you want to see a snapshot of the state for each and every key on each and every batch interval, call the .snapshots function on your particular DStream created by mapWithState.

mapWithState and updateStateByKey: When to Use Which

mapWithState is more performant and nicer to use than the updateStateByKey function, and the former is indeed a good default choice for stateful computation. However, one caveat is that the model for pushing data out of the state representation (aka flushing State data) is specifically a timeout, and no longer user controlled. As a consequence, mapWithState is particularly appropriate if you want to keep state under a freshness condition (e.g., the number of clicks on a time-bound session of a web user). We would prefer updateStateByKey for the niche cases for which we absolutely want to guarantee maintaining a small state over a long period of time.

An example is flood watching: if we are dealing with sensors reporting water heights at specific locations near a river, and if we want to keep the largest values ever observed over the length of our observation, it might make sense to use updateStateBykey rather than mapWithState.

With mapWithState, the computation on elements of our stream can start as events are being received and, ideally, completes shortly after the last few structural events are received. We are going to see an example of this in the next few pages.

Using mapWithState

mapWithState requires the user to provide a StateSpec object, which describes the workings of the state computation. The core piece of this is, naturally, the function that takes in new values for a given key and returns an output as well as updates the state for this key. In fact, the builder object for this StateSpec makes this function mandatory.

This StateSpec is parameterized by four types:

  • Key type K

  • Value type V

  • State type S, representing the type used to store the state

  • Output type U

In its most general form, the StateSpec builder, StateSpec.function, requires a (Time, K, Option[V], State[S]) => Option[U] argument, or a (K, Option[V], State[S]) => Option[U] if you don’t need the timestamp of the batch that mapWithState comes with.

The state type that intervenes in the definition of this function can be seen as a mutable cell with support for timeout. You can query it by using state.exists() and state.get(), or even treat it like an option with state.getOption(), you can check whether it’s timing out by using state.isTimingOut(), erase it using state.remove(), or update it using state.update(newState: S).

Let’s assume that we’re monitoring a plant with sensors and we want both the average temperature over the last batch and a straightforward way of detecting anomalous temperatures. For this exercise, let’s define an anomalous temperature to be higher than 80 degrees:

import org.apache.spark.streaming.State

case class Average(count: Int, mean: Float){
  def ingest(value: Float) =
    Average(count + 1, mean + (value - mean) / (count + 1))
}

def trackHighestTemperatures(sensorID: String,
    temperature: Option[Float],
    average: State[Average]): Option[(String, Float)] = {
  val oldMax = average.getOption.getOrElse(Average(0, 0f))
  temperature.foreach{ t => average.update(oldMax.ingest(t)) }
  temperature.map{
    case Some(t) if t >= (80) => Some(sensorID, t)
    case _ => None
  }
}

val highTempStateSpec = StateSpec.function(trackHighestTemperatures)
                                 .timeout(Seconds(3600))

In this function, we extract the old maximum value and run both an aggregation of the latest value with the mean and with the threshold value, routing the results to, correspondingly, the state update and our output value. This lets us exploit two streams:

  • temperatureStream.mapWithState(highTempStateSpec), which tracks high temperatures as they occur

  • temperatureStream.mapWithState(highTempStateSpec).stateSnapshots(), which tracks the mean temperatures for each sensor

If a sensor stops emitting for 60 minutes, its state is removed automatically, preventing the state storage explosion that we feared. Note that we could have used the explicit remove() function for this, as well.

There is, however, an issue with this: in the first few values of a sensor, we compare the sensor’s value to a low default, which might not be appropriate for each sensor. We might detect temperature spikes reading values that might be perfectly appropriate for this particular sensor, simply because we do not have values for it yet.

In this case, we have the opportunity to provide initial values for our sensor, using highTempStateSpec.initialState(initialTemps: RDD[(String, Float)]).

Event-Time Stream Computation Using mapWithState

An auxiliary benefit of mapWithState is that it lets us efficiently and explicitly store data about the past in its State object. This can be very useful in order to accomplish event-time computing.

Indeed, elements seen “on the wire” in a streaming system can arrive out of order, be delayed, or even arrive exceptionally fast with respect to other elements. Because of this, the only way to be sure that we are dealing with data elements that have been generated at a very specific time is to timestamp them at the time of generation. A stream of temperatures in which we are trying to detect spikes, as in our previous example, could confuse temperature increases and temperature decreases if some events arrive in inverse order, for example.

Note

Although event-time computation is supported natively by Structured Streaming as we saw in Chapter 12, you can programmatically implement it in Spark Streaming using the technique described here.

However, if we aim to process events in order, we need to be able to detect and invert the misordering by reading the timestamps present on data elements seen on our stream. To perform this reordering, we need to have an idea of the order of magnitude of the delays (in one direction or another) that we can expect to see on our stream. Indeed, without this boundary on the scope of the reordering, we would need to wait indefinitely to be able to compute a final result on a particular period of time: we could always receive yet another element that would have been delayed.

To deal with this practically, we are going to define a watermark, or the highest duration of time for which we are going to wait for straggler elements. In the spirit of Spark Streaming’s notion of time, it should be a multiple of the batch interval. After this watermark, we are going to “seal” the result of the computation and ignore elements that are delayed by more than the watermark.

Caution

The spontaneous approach to dealing with this misordering could be windowed streams: define a window interval equal to the watermark, and make it slide by exactly one batch, defining a transformation that orders elements by their timestamp.

This is correct insofar as it will result in a correct view of the ordered elements, as soon as they’re past the first watermark interval. However, it requires the user to accept an initial delay equal to the watermark to see the results of the computation. It’s plausible, however, to see a watermark that’s one order of magnitude higher than the batch interval, and such latency would not be acceptable for a system like Spark Streaming, which already incurs high latency because of its microbatching approach.

A good event-time streaming solution would allow us to compute based on a provisional view of the events of the stream and then update this result if and when delayed elements arrive.

Suppose that we have a notion of a circular buffer, a fixed-size vector of size k, that contains the last k elements it has received:

import scala.collection.immutable

object CircularBuffer {
  def empty[T](): CircularBuffer[T] = immutable.Vector.empty[T]
}

implicit class CircularBuffer[T](v: Vector[T]) extends Serializable {
  val maxSize = 4
  def get(): Vector[T] = v
  def addItem(item : T) : CircularBuffer[T]  =
    v.drop(Math.min(v.size, v.size - maxSize + 1)) :+ item
}

This object keeps an inner vector of at least one, and at most maxSize of elements, selecting its most recent additions.

Let’s now assume that we are tracking the average temperature for the last four batches, assuming a batch interval of five milliseconds:

import org.apache.spark.streaming.State

def batch(t:Time): Long = (t.milliseconds % 5000)

def trackTempStateFunc(
  batchTime: Time,
  sensorName: String,
  value: Option[(Time, Float)],
  state: State[CB]): Option[(String, Time, Int)] = {

  value.flatMap { (t: Time, temperature: Float) =>
     if ( batch(t) <= batch(batchTime)) { // this element is in the past
      val newState: CB =
        state.getOption.fold(Vector((t, Average(1, temperature))): CB){ c =>
          val (before, hereOrAfter) =
            c.get.partition{case (timeStamp, _) => batch(timeStamp) < batch(t) }
          (hereOrAfter.toList match {
            case (tS, avg: Average) :: tl if (batch(tS) == batch(t)) =>
              (tS, avg.ingest(temperature)) ::tl
            case l@_ => (t, Average(1, temperature)) :: l
          }).toVector.foldLeft(before: CB){ case (cB, item) => cB.addItem(item)}
        }
      state.update(newState) // update the State
      // output the new average temperature for the batch that was updated!
      newState.get.find{ case (tS, avg) => batch(tS) == batch(t) }.map{
        case (ts, i) => (key, ts, i)
      }
    }
    else None // this element is from the future! ignore it.
  }
}

In this function, our State is a set of four cells containing averages for each of the batches. Here we are using mapWithState in the variant that takes the current batch time as an argument. We use the batch function in order to make batch comparisons sensible, in that if t1, t2 are within the same batch, we expect that batch(t1) == batch(t2).

We begin by examining our new value and its event time. If that event time’s batch is beyond the current batch time, there is an error in our wall clock or the event time. For this example, we return None, but we could log an error as well. If the event is in the past, we need to find which batch it belongs to. For that, we use Scala’s partition function on the batches of each cell of our CircularBuffer State and separate the elements coming from before our element’s batch from those coming from the same batch or after.

We then look at whether there was already an average initialized for the batch of our event that we should find at the head of the latter list (thanks to our partition). If there is one, we add the new temperature to it; otherwise, we make an average out of our single element. Finally, we take the batches from before the current element’s time and add all the posterior batches to it in order. The CircularBuffer natively ensures that we retain only the latest elements if there are more than our threshold (four).

As the last step, we look up the updated average on the cell that we updated with our new element (if there indeed was one, we might have updated a stale element), and we output the new average if so. As a consequence, the mapWithState stream we can create on an RDD of (String, (Time, Float)) elements (with the sensor name as a key, and the timestamped temperature as a value) updates the averages for the last updates we received, from the very first batch.

Naturally, it uses a linear time in processing the content of our CircularBuffer, a consequence of the simplicity that we wanted to reach through this example. Note, however, how we are dealing with a structure that is ordered by timestamp and how a different data structure such as a skip list would let us gain a lot in processing speed and let us make this scalable.

In sum, mapWithState, with its powerful state update semantics, its parsimonious timeout semantics, and the versatility brought by snapshots(), gives us a powerful tool to represent basic event-time processing in a few lines of Scala.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset