Chapter 13. Advanced Stateful Operations

Chapter 8 demonstrated how easy it is to express an aggregation in Structured Streaming using the existing aggregation functions in the structured Spark APIs. Chapter 12 showed the effectiveness of Spark’s built-in support for using the embedded time information in the event stream, the so-called event-time processing.

However, there are cases when we need to meet custom aggregation criteria that are not directly supported by the built-in models. In this chapter, we explore how to conduct advanced stateful operations to address these situations.

Structured Streaming offers an API to implement arbitrary stateful processing. This API is represented by two operations: mapGroupsWithState and flatMapGroupsWithState. Both operations allow us to create a custom definition of a state, set up the rules of how this state evolves as new data comes in over time, determine when it expires, and provide us with a method to combine this state definition with the incoming data to produce results.

The main difference between mapGroupsWithState and flatMapGroupsWithState is that the former must produce a single result for each processed group, whereas the latter might produce zero or more results. Semantically, this means that mapGroupsWithState should be used when new data always results in a new state, whereas flatMapGroupsWithState should be used in all other cases.

Internally, Structured Streaming takes care of managing state between operations and ensures its availability and fault-tolerant preservation during and across executions of the streaming process over time.

Example: Car Fleet Management

Let’s imagine a car fleet management solution in which the vehicles of the fleet are enabled with wireless network capabilities. Each vehicle regularly reports its geographical location and many operational parameters like fuel level, speed, acceleration, bearing, engine temperature, and so on. The stakeholders would like to exploit this stream of telemetry data to implement a range of applications to help them manage the operational and financial aspects of the business.

Using the Structured Streaming features we know so far, we could already implement many use cases, like monitoring kilometers driven per day using event-time windows or finding vehicles with a low-fuel warning by applying filters.

Now, we would like to have the notion of a trip: the driven road segment from a start to a stop. Individually, the notion of a trip is useful to compute fuel efficiency or monitor compliance to geo-fencing agreements. When analyzed in groups, trip information might reveal transportation patterns, traffic hotspots, and, when combined with other sensor information, they can even report road conditions. From our stream-processing perspective, we could see trips as an arbitrary window that opens when the vehicle starts moving and closes when it finally stops. The event-time window aggregations we saw in Chapter 12 use fixed time intervals as windowing criteria, so they are of no help to implement our trip analysis.

We can appreciate that we need a more powerful definition of state that is not purely based on time, but also on arbitrary conditions. In our example, this condition is that the vehicle is driving.

Understanding Group with State Operations

The arbitrary state operations, mapGroupsWithState and flatMapGroupWithState, work exclusively on the typed Dataset API using either the Scala or the Java bindings.

Based on the data that we are processing and the requirements of our stateful transformation, we need to provide three type definitions, typically encoded as a case class (Scala) or a Java Bean (Java):

  • The input event (I)

  • The arbitrary state to keep (S)

  • The output (O) (this type might be the same as the state representation, if suitable)

All of these types must be encodable into Spark SQL types. This means that there should be an Encoder available. The usual import statement

import spark.implicits._

is sufficient for all basic types, tuples, and case classes.

With these types in place, we can formulate the state transformation function that implements our custom state handling logic.

mapGroupsWithState requires that this function returns a single mandatory value:

def mappingFunction(key: K, values: Iterator[I], state: GroupState[S]): O

flatMapGroupsWithState requires that this function returns an Iterator, which might contain zero or more elements:

def flatMappingFunction(
    key: K, values: Iterator[I], state: GroupState[S]): Iterator[O]

GroupState[S] is a wrapper provided by Structured Streaming and used internally to manage the state S across executions. Within the function, GroupState provides mutation access to the state and the ability to check and set timeouts.

Warning

The implementation of the mappingFunction/flatMappingFunction must be Serializable.

At runtime, this function is distributed to the executors of the cluster using Java Serialization. This requirement also has the consequence that we must not include any local state like counters or other mutable variables in the body of the function. All managed state must be encapsulated in the State representation class.

Internal State Flow

In Figure 13-1, we illustrate the process that combines the incoming data, in the form of events, with the state maintained internally, to produce a result. In this chart, the mappingFunction (denoted with a Σ) uses the custom logic to process this group of elements, that when combined with the state managed by GroupState[S], leads to a result. In this illustration, we used the stop symbol to indicate a timeout. In the case of MapGroupsWithState, a timeout also triggers the production of an event and should evict the state. Given that the eviction logic is under the control of the programmed logic, the complete state management is under the responsibility of the developer. Structured Streaming provides only the building blocks.

spas 1301
Figure 13-1. Map groups with state dynamics

Using MapGroupsWithState

In “Sliding windows”, we saw how we can compute a moving average based on a time window. This time-based window would produce a result independently of the number of elements found in the window.

Now, suppose that our requirement is to compute a moving average of the last 10 elements received per key. We cannot use a time window, because we don’t know how long it will take us to have the number of elements we need. Instead, we can define our own count-based window using a custom state with MapGroupsWithState.

Online Resources

For this example, we will use the map_groups_with_state notebook in the online resources for the book, located on http://github.com/stream-processing-with-spark.

Let’s begin with the same streaming Dataset that we used in “Sliding windows”. The WeatherEvent case class becomes our input type (I):

// a representation of a weather station event
case class WeatherEvent(stationId: String,
  timestamp: Timestamp,
  location:(Double,Double),
  pressure: Double,
  temp: Double)

val weatherEvents: Dataset[WeatherEvents] = ...

Next, we define the state (S). What we want is to keep the latest n elements in our state and drop anything older. This seems a natural application of a FIFO (First In, First Out) collection, such as a Queue. Newer elements are added to the front of the queue, we keep the most recent n, and drop any older element.

Our state definition becomes a FIFOBuffer backed by a Queue with few helper methods to facilitate its usage:

import scala.collection.immutable.Queue
case class FIFOBuffer[T](
    capacity: Int, data: Queue[T] = Queue.empty
    ) extends Serializable {

  def add(element: T): FIFOBuffer[T] =
    this.copy(data = data.enqueue(element).take(capacity))

  def get: List[T] = data.toList

  def size: Int = data.size
}

Then, we need to define the output type (O) that results from the stateful computation. The desired result of our stateful computation is the moving average of the sensor values present in the input WeatherEvent. We also would like to know the time span of the values used for the computation. With this knowledge, we design our output type, WeatherEventAverage:

import java.sql.Timestamp
case class WeatherEventAverage(stationId: String,
                               startTime: Timestamp,
                               endTime:Timestamp,
                               pressureAvg: Double,
                               tempAvg: Double)

With these types defined, we can proceed to create the mappingFunction that combines the existing state and the new elements into a result. We can see the implementation of the mapping function in Example 13-1. Remember that this function is also responsible for updating the internal state through the functions provided by the GroupState wrapper. It’s important to note that the state cannot be updated with a null value. An attempt to do so will throw an IllegalArgumentException. To remove a state, use the method state.remove().

Example 13-1. Using mapGroupsWithState for a count-based moving average
import org.apache.spark.sql.streaming.GroupState
def mappingFunction(
    key: String,
    values: Iterator[WeatherEvent],
    state: GroupState[FIFOBuffer[WeatherEvent]]
  ): WeatherEventAverage = {

  // the size of the window
  val ElementCountWindowSize = 10

  // get current state or create a new one if there's no previous state
  val currentState = state.getOption
    .getOrElse(
      new FIFOBuffer[WeatherEvent](ElementCountWindowSize)
    )

  // enrich the state with the new events
  val updatedState = values.foldLeft(currentState) {
    case (st, ev) => st.add(ev)
  }

  // update the state with the enriched state
  state.update(updatedState)

  // if we have enough data, create a WeatherEventAverage from the state
  // otherwise, make a zeroed record
  val data = updatedState.get
  if (data.size > 2) {
    val start = data.head
    val end = data.last
    val pressureAvg = data
      .map(event => event.pressure)
      .sum / data.size
    val tempAvg = data
      .map(event => event.temp)
      .sum / data.size
    WeatherEventAverage(
      key,
      start.timestamp,
      end.timestamp,
      pressureAvg,
      tempAvg
    )
  } else {
    WeatherEventAverage(
      key,
      new Timestamp(0),
      new Timestamp(0),
      0.0,
      0.0
    )
  }
}

Now, we use the mappingFunction to declare the stateful transformation of the streaming Dataset:

import org.apache.spark.sql.streaming.GroupStateTimeout
val weatherEventsMovingAverage = weatherEvents
  .groupByKey(record => record.stationId)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)

Note that we first create groups out of the key identifiers in our domain. In this example, this is the stationId. The groupByKey operation creates an intermediate structure, a KeyValueGroupedDataset that becomes the entry point for the [map|flatMap]GroupWithState operations.

Besides the mapping function, we also need to provide a timeout type. A timeout type can be either a ProcessingTimeTimeout or an EventTimeTimeout. Because we are not relying on the timestamp of the events for our state management, we chose the ProcessingTimeTimeout. We discuss timeout management in detail later in this chapter.

Finally, we can easily observe the results of the query by using the Console sink:

val outQuery = weatherEventsMovingAverage.writeStream
  .format("console")
  .outputMode("update")
  .start()

+---------+-------------------+-------------------+------------+------------+
|stationId|startTime          |endTime            |pressureAvg |tempAvg     |
+---------+-------------------+-------------------+------------+------------+
|d1e46a42 |2018-07-08 19:20:31|2018-07-08 19:20:36|101.33375295|19.753225782|
|d1e46a42 |2018-07-08 19:20:31|2018-07-08 19:20:44|101.33667584|14.287718525|
|d60779f6 |2018-07-08 19:20:38|2018-07-08 19:20:48|101.59818386|11.990002708|
|d1e46a42 |2018-07-08 19:20:31|2018-07-08 19:20:49|101.34226429|11.294964619|
|d60779f6 |2018-07-08 19:20:38|2018-07-08 19:20:51|101.63191940|8.3239282534|
|d8e16e2a |2018-07-08 19:20:40|2018-07-08 19:20:52|101.61979385|5.0717571842|
|d4c162ee |2018-07-08 19:20:34|2018-07-08 19:20:53|101.55532969|13.072768358|
+---------+-------------------+-------------------+------------+------------+
// (!) output edited to fit in the page

Using FlatMapGroupsWithState

Our previous implementation has a flaw. Can you spot it?

When we start processing the stream, and before we have collected all the elements that we deem required to compute the moving average, the operation of mapGroupsWithState produces zeroed-out values:

+---------+-------------------+-------------------+-----------+-------+
|stationId|startTime          |endTime            |pressureAvg|tempAvg|
+---------+-------------------+-------------------+-----------+-------+
|d2e710aa |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |
|d1e46a42 |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |
|d4a11632 |1970-01-01 01:00:00|1970-01-01 01:00:00|0.0        |0.0    |
+---------+-------------------+-------------------+-----------+-------+

As we mentioned earlier, mapGroupsWithState requires the state handling function to produce a single record for each group processed at every trigger interval. This is fine when the arrival of new data corresponding to each key naturally updates its state.

But there are cases for which our state logic requires a series of events to occur before we can produce a result. In our current example, we need n elements before we can start producing an average over them. In other scenarios, it might be that a single incoming event might complete several temporary states and therefore produce more than one result. For example, the arrival of a single mass transport to its destination might update the traveling state of all of its passengers, potentially producing a record for each of them.

flatMapGroupsWithState is a generalization of mapGroupsWithState in which the state handling function produces an Iterator of results, which might contain zero or more elements.

Let’s see how we can use this function to improve our moving average computation over n-elements.

Online Resources

For this example, we will use the mapgroupswithstate-n-moving-average notebook in the online resources for the book, located at https://github.com/stream-processing-with-spark.

We need to update the mapping function to return an Iterator of results. In our case, this Iterator will contain zero elements when we don’t have enough values to compute the average, and a value otherwise. Our changed function looks like Example 13-2.

Example 13-2. Using FlatMapGroupsWithState for a count-based moving average
import org.apache.spark.sql.streaming._
def flatMappingFunction(
    key: String,
    values: Iterator[WeatherEvent],
    state: GroupState[FIFOBuffer[WeatherEvent]]
  ): Iterator[WeatherEventAverage] = {

  val ElementCountWindowSize = 10

  // get current state or create a new one if there's no previous state
  val currentState = state.getOption
    .getOrElse(
      new FIFOBuffer[WeatherEvent](ElementCountWindowSize)
    )

  // enrich the state with the new events
  val updatedState = values.foldLeft(currentState) {
    case (st, ev) => st.add(ev)
  }

  // update the state with the enriched state
  state.update(updatedState)

  // only when we have enough data, create a WeatherEventAverage from the state
  // before that, we return an empty result.
  val data = updatedState.get
  if (data.size == ElementCountWindowSize) {
    val start = data.head
    val end = data.last
    val pressureAvg = data
      .map(event => event.pressure)
      .sum / data.size
    val tempAvg = data
      .map(event => event.temp)
      .sum / data.size
    Iterator(
      WeatherEventAverage(
        key,
        start.timestamp,
        end.timestamp,
        pressureAvg,
        tempAvg
      )
    )
  } else {
    Iterator.empty
  }
}

val weatherEventsMovingAverage = weatherEvents
  .groupByKey(record => record.stationId)
  .flatMapGroupsWithState(
    OutputMode.Update,
    GroupStateTimeout.ProcessingTimeTimeout
  )(flatMappingFunction)

Using flatMapGroupsWithState, we no longer need to produce artificial zeroed records. In addition to that, our state management definition is now strict in having n elements to produce a result.

Output Modes

Although the cardinality difference in the results between the map and the flatMapGroupsWithState operations might seem like a small practical API difference, it has deeper consequences beyond the obvious variable production of results.

As we can appreciate in the example, flatMapGroupsWithState requires the additional specification of an output mode. This is needed to provide information about the record production semantics of the stateful operation to the downstream process. In turn, this helps Structured Streaming to compute the allowed output operation for the downstream sink.

The output mode specified in flatMapGroupsWithState can be either of the following:

update

This indicates that the records produced are nonfinal. They are intermediate results that might be updated with new information later on. In the previous example, the arrival of new data for a key will produce a new data point. The downstream sink must use update and no aggregations can follow the flatMapGroupsWithState operation.

append

This designates that we have collected all of the information we need to produce a result for a group, and no further incoming events will change that outcome. The downstream sink must use append mode to write. Given that the application of flatMapGroupsWithState produces a final record, it’s possible to apply further aggregations to that result.

Managing State Over Time

A critical requirement of managing state over time is to ensure that we have a stable working set.1 That is, the memory required by our process is bounded over time and remains at a safe distance under the available memory to allow for fluctuations.

In the managed stateful aggregations, such as the time-based windows that we saw in Chapter 12, Structured Streaming internally manages mechanisms to evict state and events that are deemed expired in order to limit the amount of memory used. When we use the custom state management capabilities offered by [map|flatMap]GroupsWithState, we must also assume the responsibility of removing old state.

Luckily, Structured Streaming exposes time and timeout information that we can use to decide when to expire certain state. The first step is to decide the time reference to use. Timeouts can be based on event time or processing time and the choice is global to the state handled by the particular [map|flatMap]GroupsWithState being configured.

The timeout type is specified when we call [map|flatMap]GroupsWithState. Recalling the moving average example, we configured the mapGroupsWithState function to use processing time like this:

import org.apache.spark.sql.streaming.GroupStateTimeout
val weatherEventsMovingAverage = weatherEvents
  .groupByKey(record => record.stationId)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)

To use event time, we also need to declare a watermark definition. This definition consists of the timestamp field from the event and the configured lag of the watermark. If we wanted to use event time with the previous example, we would declare it like so:

val weatherEventsMovingAverage = weatherEvents
  .withWatermark("timestamp", "2 minutes")
  .groupByKey(record => record.stationId)
  .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(mappingFunction)

The timeout type declares the global source of the time reference. There is also the option GroupStateTimeout.NoTimeout for the cases in which we don’t need timeouts. The actual value of the timeout is managed per individual group, using the methods available in GroupState to manage timeout: state.setTimeoutDuration or state.setTimeoutTimestamp.

To determine whether a state has expired, we check state.hasTimedOut. When a state has timed out, the call to the (flat)MapFunction will be issued with an empty iterator of values for the group that has timed out.

Let’s put the timeout feature to use. Continuing with our running example, the first thing we want to do is extract the transformation of state into event:

def stateToAverageEvent(
    key: String,
    data: FIFOBuffer[WeatherEvent]
  ): Iterator[WeatherEventAverage] = {
  if (data.size == ElementCountWindowSize) {
    val events = data.get
    val start = events.head
    val end = events.last
    val pressureAvg = events
      .map(event => event.pressure)
      .sum / data.size
    val tempAvg = events
      .map(event => event.temp)
      .sum / data.size
    Iterator(
      WeatherEventAverage(
        key,
        start.timestamp,
        end.timestamp,
        pressureAvg,
        tempAvg
      )
    )
  } else {
    Iterator.empty
  }
}

Now, we can use that new abstraction to transform our state in the case of a timeout as well as in the usual scenario where data is coming in. Note in Example 13-3 how we use the timeout information to evict the expiring state.

Example 13-3. Using timeouts in flatMapGroupsWithState
import org.apache.spark.sql.streaming.GroupState
def flatMappingFunction(
    key: String,
    values: Iterator[WeatherEvent],
    state: GroupState[FIFOBuffer[WeatherEvent]]
  ): Iterator[WeatherEventAverage] = {
  // first check for timeout in the state
  if (state.hasTimedOut) {
    // when the state has a timeout, the values are empty
    // this validation is only to illustrate the point
    assert(
      values.isEmpty,
      "When the state has a timeout, the values are empty"
    )
    val result = stateToAverageEvent(key, state.get)
    // evict the timed-out state
    state.remove()
    // emit the result of transforming the current state into an output record
    result
  } else {
    // get current state or create a new one if there's no previous state
    val currentState = state.getOption.getOrElse(
      new FIFOBuffer[WeatherEvent](ElementCountWindowSize)
    )
    // enrich the state with the new events
    val updatedState = values.foldLeft(currentState) {
      case (st, ev) => st.add(ev)
    }
    // update the state with the enriched state
    state.update(updatedState)
    state.setTimeoutDuration("30 seconds")
    // only when we have enough data,
    // create a WeatherEventAverage from the accumulated state
    // before that, we return an empty result.
    stateToAverageEvent(key, updatedState)
  }
}

When a timeout actually times out

The semantics of the timeouts in Structured Streaming gives the guarantee that no event will be timed out before the clock advances past the watermark. This follows our intuition of a timeout: our state does not timeout before the set expiration time.

Where the timeout semantics depart from the common intuition is when the timeout event actually happens after the expiration time has passed.

Currently, the timeout processing is bound to the receiving of new data. So, a stream that goes silent for a while and does not generate new triggers to process will not generate timeouts either. The current timeout semantics are defined in terms of an eventuality: The timeout event will be eventually triggered after the state has expired, without any guarantees about how long the timeout event will fire after the actual timeout has happened. Stated formally: there is a no strict upper bound on when the timeout would occur.

Warning

There is work in progress to make timeouts fire even when no new data is available.

Summary

In this chapter, we learned about the arbitrary stateful processing API in Structured Streaming. We explored the details of and differences between the mapGroupsWithState and flatMapGroupsWithState with relation to the events produced and the output modes supported. At the end, we also learned about the timeout settings and became aware of its semantics.

Although this API is more complex to use than the regular SQL-like constructs of the structured APIs, it provides us with a powerful toolset to implement arbitrary state management to address the development of the most demanding streaming use cases.

1 Working set is a concept that refers to the amount of memory used by a process to function over a period of time.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset