Chapter 8 demonstrated how easy it is to express an aggregation in Structured Streaming using the existing aggregation functions in the structured Spark APIs. Chapter 12 showed the effectiveness of Spark’s built-in support for using the embedded time information in the event stream, the so-called event-time processing.
However, there are cases when we need to meet custom aggregation criteria that are not directly supported by the built-in models. In this chapter, we explore how to conduct advanced stateful operations to address these situations.
Structured Streaming offers an API to implement arbitrary stateful processing.
This API is represented by two operations: mapGroupsWithState
and flatMapGroupsWithState
.
Both operations allow us to create a custom definition of a state, set up the rules of how this state evolves as new data comes in over time, determine when it expires, and provide us with a method to combine this state definition with the incoming data to produce results.
The main difference between mapGroupsWithState
and flatMapGroupsWithState
is that the former must produce a single result for each processed group, whereas the latter might produce zero or more results. Semantically, this means that mapGroupsWithState
should be used when new data always results in a new state, whereas flatMapGroupsWithState
should be used in all other cases.
Internally, Structured Streaming takes care of managing state between operations and ensures its availability and fault-tolerant preservation during and across executions of the streaming process over time.
Let’s imagine a car fleet management solution in which the vehicles of the fleet are enabled with wireless network capabilities. Each vehicle regularly reports its geographical location and many operational parameters like fuel level, speed, acceleration, bearing, engine temperature, and so on. The stakeholders would like to exploit this stream of telemetry data to implement a range of applications to help them manage the operational and financial aspects of the business.
Using the Structured Streaming features we know so far, we could already implement many use cases, like monitoring kilometers driven per day using event-time windows or finding vehicles with a low-fuel warning by applying filters.
Now, we would like to have the notion of a trip: the driven road segment from a start to a stop. Individually, the notion of a trip is useful to compute fuel efficiency or monitor compliance to geo-fencing agreements. When analyzed in groups, trip information might reveal transportation patterns, traffic hotspots, and, when combined with other sensor information, they can even report road conditions. From our stream-processing perspective, we could see trips as an arbitrary window that opens when the vehicle starts moving and closes when it finally stops. The event-time window aggregations we saw in Chapter 12 use fixed time intervals as windowing criteria, so they are of no help to implement our trip analysis.
We can appreciate that we need a more powerful definition of state that is not purely based on time, but also on arbitrary conditions. In our example, this condition is that the vehicle is driving.
The arbitrary state operations, mapGroupsWithState
and flatMapGroupWithState
, work exclusively on the typed Dataset
API using either the Scala or the Java bindings.
Based on the data that we are processing and the requirements of our stateful transformation, we need to provide three type definitions, typically encoded as a case class
(Scala) or a Java Bean
(Java):
The input event (I
)
The arbitrary state to keep (S
)
The output (O
) (this type might be the same as the state representation, if suitable)
All of these types must be encodable into Spark SQL types.
This means that there should be an Encoder
available.
The usual import statement
import
spark.implicits._
is sufficient for all basic types, tuples, and case class
es.
With these types in place, we can formulate the state transformation function that implements our custom state handling logic.
mapGroupsWithState
requires that this function returns a single mandatory value:
def
mappingFunction
(
key
:
K
,
values
:
Iterator
[
I
],
state
:
GroupState
[
S
])
:
O
flatMapGroupsWithState
requires that this function returns an Iterator
, which might contain zero or more elements:
def
flatMappingFunction
(
key
:
K
,
values
:
Iterator
[
I
],
state
:
GroupState
[
S
])
:
Iterator
[
O
]
GroupState[S]
is a wrapper provided by Structured Streaming and used internally to manage the state S
across executions.
Within the function, GroupState
provides mutation access to the state and the ability to check and set timeouts.
The implementation of the mappingFunction
/flatMappingFunction
must be Serializable
.
At runtime, this function is distributed to the executors of the cluster using Java Serialization.
This requirement also has the consequence that we must not include any local state like counters or other mutable variables in the body of the function.
All managed state must be encapsulated in the State
representation class.
In Figure 13-1, we illustrate the process that combines the incoming data, in the form of events, with the state maintained internally, to produce a result.
In this chart, the mappingFunction
(denoted with a Σ
) uses the custom logic to process this group of elements, that when combined with the state managed by GroupState[S]
, leads to a result.
In this illustration, we used the stop symbol to indicate a timeout.
In the case of MapGroupsWithState
, a timeout also triggers the production of an event and should evict the state.
Given that the eviction logic is under the control of the programmed logic, the complete state management is under the responsibility of the developer.
Structured Streaming provides only the building blocks.
In “Sliding windows”, we saw how we can compute a moving average based on a time window. This time-based window would produce a result independently of the number of elements found in the window.
Now, suppose that our requirement is to compute a moving average of the last 10 elements received per key.
We cannot use a time window, because we don’t know how long it will take us to have the number of elements we need.
Instead, we can define our own count-based window using a custom state with MapGroupsWithState
.
For this example, we will use the map_groups_with_state
notebook in the online resources for the book, located on http://github.com/stream-processing-with-spark.
Let’s begin with the same streaming Dataset
that we used in “Sliding windows”.
The WeatherEvent
case class
becomes our input type (I):
// a representation of a weather station event
case
class
WeatherEvent
(
stationId
:
String
,
timestamp
:
Timestamp
,
location
:
(
Double
,
Double
),
pressure
:
Double
,
temp
:
Double
)
val
weatherEvents
:
Dataset
[
WeatherEvents
]
=
...
Next, we define the state (S).
What we want is to keep the latest n elements in our state and drop anything older.
This seems a natural application of a FIFO (First In, First Out) collection, such as a Queue
.
Newer elements are added to the front of the queue, we keep the most recent n, and drop any older element.
Our state definition becomes a FIFOBuffer
backed by a Queue
with few helper methods to facilitate its usage:
import
scala.collection.immutable.Queue
case
class
FIFOBuffer
[
T
](
capacity
:
Int
,
data
:
Queue
[
T
]
=
Queue
.
empty
)
extends
Serializable
{
def
add
(
element
:
T
)
:
FIFOBuffer
[
T
]
=
this
.
copy
(
data
=
data
.
enqueue
(
element
).
take
(
capacity
))
def
get
:
List
[
T
]
=
data
.
toList
def
size
:
Int
=
data
.
size
}
Then, we need to define the output type (O) that results from the stateful computation.
The desired result of our stateful computation is the moving average of the sensor values present in the input WeatherEvent
.
We also would like to know the time span of the values used for the computation.
With this knowledge, we design our output type, WeatherEventAverage
:
import
java.sql.Timestamp
case
class
WeatherEventAverage
(
stationId
:
String
,
startTime
:
Timestamp
,
endTime
:
Timestamp
,
pressureAvg
:
Double
,
tempAvg
:
Double
)
With these types defined, we can proceed to create the mappingFunction
that combines the existing state and the new elements into a result.
We can see the implementation of the mapping function in Example 13-1. Remember that this function is also responsible for updating the internal state through the functions provided by the GroupState
wrapper.
It’s important to note that the state cannot be updated with a null
value.
An attempt to do so will throw an IllegalArgumentException
.
To remove a state, use the method state.remove()
.
mapGroupsWithState
for a count-based moving averageimport
org.apache.spark.sql.streaming.GroupState
def
mappingFunction
(
key
:
String
,
values
:
Iterator
[
WeatherEvent
],
state
:
GroupState
[
FIFOBuffer
[
WeatherEvent
]]
)
:
WeatherEventAverage
=
{
// the size of the window
val
ElementCountWindowSize
=
10
// get current state or create a new one if there's no previous state
val
currentState
=
state
.
getOption
.
getOrElse
(
new
FIFOBuffer
[
WeatherEvent
](
ElementCountWindowSize
)
)
// enrich the state with the new events
val
updatedState
=
values
.
foldLeft
(
currentState
)
{
case
(
st
,
ev
)
=>
st
.
add
(
ev
)
}
// update the state with the enriched state
state
.
update
(
updatedState
)
// if we have enough data, create a WeatherEventAverage from the state
// otherwise, make a zeroed record
val
data
=
updatedState
.
get
if
(
data
.
size
>
2
)
{
val
start
=
data
.
head
val
end
=
data
.
last
val
pressureAvg
=
data
.
map
(
event
=>
event
.
pressure
)
.
sum
/
data
.
size
val
tempAvg
=
data
.
map
(
event
=>
event
.
temp
)
.
sum
/
data
.
size
WeatherEventAverage
(
key
,
start
.
timestamp
,
end
.
timestamp
,
pressureAvg
,
tempAvg
)
}
else
{
WeatherEventAverage
(
key
,
new
Timestamp
(
0
),
new
Timestamp
(
0
),
0.0
,
0.0
)
}
}
Now, we use the mappingFunction
to declare the stateful transformation of the streaming Dataset
:
import
org.apache.spark.sql.streaming.GroupStateTimeout
val
weatherEventsMovingAverage
=
weatherEvents
.
groupByKey
(
record
=>
record
.
stationId
)
.
mapGroupsWithState
(
GroupStateTimeout
.
ProcessingTimeTimeout
)(
mappingFunction
)
Note that we first create groups out of the key identifiers in our domain.
In this example, this is the stationId
.
The groupByKey
operation creates an intermediate structure, a KeyValueGroupedDataset
that becomes the entry point for the [map|flatMap]GroupWithState
operations.
Besides the mapping function, we also need to provide a timeout type.
A timeout type can be either a ProcessingTimeTimeout
or an EventTimeTimeout
.
Because we are not relying on the timestamp of the events for our state management, we chose the ProcessingTimeTimeout
.
We discuss timeout management in detail later in this chapter.
Finally, we can easily observe the results of the query by using the Console sink:
val
outQuery
=
weatherEventsMovingAverage
.
writeStream
.
format
(
"console"
)
.
outputMode
(
"update"
)
.
start
()
+---------+-------------------+-------------------+------------+------------+
|
stationId
|
startTime
|
endTime
|
pressureAvg
|
tempAvg
|
+---------+-------------------+-------------------+------------+------------+
|
d1e46a42
|
2018
-
07
-
08
19
:
20
:
31
|
2018
-
07
-
08
19
:
20
:
36
|
101
.
33375295
|
19
.
753225782
|
|
d1e46a42
|
2018
-
07
-
08
19
:
20
:
31
|
2018
-
07
-
08
19
:
20
:
44
|
101
.
33667584
|
14
.
287718525
|
|
d60779f6
|
2018
-
07
-
08
19
:
20
:
38
|
2018
-
07
-
08
19
:
20
:
48
|
101
.
59818386
|
11
.
990002708
|
|
d1e46a42
|
2018
-
07
-
08
19
:
20
:
31
|
2018
-
07
-
08
19
:
20
:
49
|
101
.
34226429
|
11
.
294964619
|
|
d60779f6
|
2018
-
07
-
08
19
:
20
:
38
|
2018
-
07
-
08
19
:
20
:
51
|
101
.
63191940
|
8
.
3239282534
|
|
d8e16e2a
|
2018
-
07
-
08
19
:
20
:
40
|
2018
-
07
-
08
19
:
20
:
52
|
101
.
61979385
|
5
.
0717571842
|
|
d4c162ee
|
2018
-
07
-
08
19
:
20
:
34
|
2018
-
07
-
08
19
:
20
:
53
|
101
.
55532969
|
13
.
072768358
|
+---------+-------------------+-------------------+------------+------------+
// (!) output edited to fit in the page
Our previous implementation has a flaw. Can you spot it?
When we start processing the stream, and before we have collected all the elements that we deem required to compute the moving average, the operation of mapGroupsWithState
produces zeroed-out values:
+---------+-------------------+-------------------+-----------+-------+
|
stationId
|
startTime
|
endTime
|
pressureAvg
|
tempAvg
|
+---------+-------------------+-------------------+-----------+-------+
|
d2e710aa
|
1970
-
01
-
01
01
:
00
:
00
|
1970
-
01
-
01
01
:
00
:
00
|
0
.
0
|
0
.
0
|
|
d1e46a42
|
1970
-
01
-
01
01
:
00
:
00
|
1970
-
01
-
01
01
:
00
:
00
|
0
.
0
|
0
.
0
|
|
d4a11632
|
1970
-
01
-
01
01
:
00
:
00
|
1970
-
01
-
01
01
:
00
:
00
|
0
.
0
|
0
.
0
|
+---------+-------------------+-------------------+-----------+-------+
As we mentioned earlier, mapGroupsWithState
requires the state handling function to produce a single record for each group processed at every trigger interval.
This is fine when the arrival of new data corresponding to each key naturally updates its state.
But there are cases for which our state logic requires a series of events to occur before we can produce a result. In our current example, we need n elements before we can start producing an average over them. In other scenarios, it might be that a single incoming event might complete several temporary states and therefore produce more than one result. For example, the arrival of a single mass transport to its destination might update the traveling state of all of its passengers, potentially producing a record for each of them.
flatMapGroupsWithState
is a generalization of mapGroupsWithState
in which the state handling function produces an Iterator
of results, which might contain zero or more elements.
Let’s see how we can use this function to improve our moving average computation over n-elements.
For this example, we will use the mapgroupswithstate-n-moving-average
notebook in the online resources for the book, located at https://github.com/stream-processing-with-spark.
We need to update the mapping function to return an Iterator
of results.
In our case, this Iterator
will contain zero elements when we don’t have enough values to compute the average, and a value otherwise.
Our changed function looks like Example 13-2.
import
org.apache.spark.sql.streaming._
def
flatMappingFunction
(
key
:
String
,
values
:
Iterator
[
WeatherEvent
],
state
:
GroupState
[
FIFOBuffer
[
WeatherEvent
]]
)
:
Iterator
[
WeatherEventAverage
]
=
{
val
ElementCountWindowSize
=
10
// get current state or create a new one if there's no previous state
val
currentState
=
state
.
getOption
.
getOrElse
(
new
FIFOBuffer
[
WeatherEvent
](
ElementCountWindowSize
)
)
// enrich the state with the new events
val
updatedState
=
values
.
foldLeft
(
currentState
)
{
case
(
st
,
ev
)
=>
st
.
add
(
ev
)
}
// update the state with the enriched state
state
.
update
(
updatedState
)
// only when we have enough data, create a WeatherEventAverage from the state
// before that, we return an empty result.
val
data
=
updatedState
.
get
if
(
data
.
size
==
ElementCountWindowSize
)
{
val
start
=
data
.
head
val
end
=
data
.
last
val
pressureAvg
=
data
.
map
(
event
=>
event
.
pressure
)
.
sum
/
data
.
size
val
tempAvg
=
data
.
map
(
event
=>
event
.
temp
)
.
sum
/
data
.
size
Iterator
(
WeatherEventAverage
(
key
,
start
.
timestamp
,
end
.
timestamp
,
pressureAvg
,
tempAvg
)
)
}
else
{
Iterator
.
empty
}
}
val
weatherEventsMovingAverage
=
weatherEvents
.
groupByKey
(
record
=>
record
.
stationId
)
.
flatMapGroupsWithState
(
OutputMode
.
Update
,
GroupStateTimeout
.
ProcessingTimeTimeout
)(
flatMappingFunction
)
Using flatMapGroupsWithState
, we no longer need to produce artificial zeroed records.
In addition to that, our state management definition is now strict in having n elements to produce a result.
Although the cardinality difference in the results between the map
and the flatMapGroupsWithState
operations might seem like a small practical API difference, it has deeper consequences beyond the obvious variable production of results.
As we can appreciate in the example, flatMapGroupsWithState
requires the additional specification of an output mode.
This is needed to provide information about the record production semantics of the stateful operation to the downstream process.
In turn, this helps Structured Streaming to compute the allowed output operation for the downstream sink.
The output mode specified in flatMapGroupsWithState
can be either of the following:
update
This indicates that the records produced are nonfinal. They are intermediate results that might be updated with new information later on. In the previous example, the arrival of new data for a key will produce a new data point.
The downstream sink must use update
and no aggregations can follow the flatMapGroupsWithState
operation.
append
This designates that we have collected all of the information we need to produce a result for a group, and no further incoming events will change that outcome.
The downstream sink must use append
mode to write. Given that the application of flatMapGroupsWithState
produces a final record, it’s possible to apply further aggregations to that result.
A critical requirement of managing state over time is to ensure that we have a stable working set.1 That is, the memory required by our process is bounded over time and remains at a safe distance under the available memory to allow for fluctuations.
In the managed stateful aggregations, such as the time-based windows that we saw in Chapter 12, Structured Streaming internally manages mechanisms to evict state and events that are deemed expired in order to limit the amount of memory used.
When we use the custom state management capabilities offered by [map|flatMap]GroupsWithState
, we must also assume the responsibility of removing old state.
Luckily, Structured Streaming exposes time and timeout information that we can use to decide when to expire certain state.
The first step is to decide the time reference to use.
Timeouts can be based on event time or processing time and the choice is global to the state handled by the particular [map|flatMap]GroupsWithState
being configured.
The timeout type is specified when we call [map|flatMap]GroupsWithState
.
Recalling the moving average example, we configured the mapGroupsWithState
function to use processing time like this:
import
org.apache.spark.sql.streaming.GroupStateTimeout
val
weatherEventsMovingAverage
=
weatherEvents
.
groupByKey
(
record
=>
record
.
stationId
)
.
mapGroupsWithState
(
GroupStateTimeout
.
ProcessingTimeTimeout
)(
mappingFunction
)
To use event time, we also need to declare a watermark definition. This definition consists of the timestamp field from the event and the configured lag of the watermark. If we wanted to use event time with the previous example, we would declare it like so:
val
weatherEventsMovingAverage
=
weatherEvents
.
withWatermark
(
"timestamp"
,
"2 minutes"
)
.
groupByKey
(
record
=>
record
.
stationId
)
.
mapGroupsWithState
(
GroupStateTimeout
.
EventTimeTimeout
)(
mappingFunction
)
The timeout type declares the global source of the time reference.
There is also the option GroupStateTimeout.NoTimeout
for the cases in which we don’t need timeouts.
The actual value of the timeout is managed per individual group, using the methods available in GroupState
to manage timeout: state.setTimeoutDuration
or state.setTimeoutTimestamp
.
To determine whether a state has expired, we check state.hasTimedOut
.
When a state has timed out, the call to the (flat)MapFunction
will be issued with an empty iterator of values for the group that has timed out.
Let’s put the timeout feature to use. Continuing with our running example, the first thing we want to do is extract the transformation of state into event:
def
stateToAverageEvent
(
key
:
String
,
data
:
FIFOBuffer
[
WeatherEvent
]
)
:
Iterator
[
WeatherEventAverage
]
=
{
if
(
data
.
size
==
ElementCountWindowSize
)
{
val
events
=
data
.
get
val
start
=
events
.
head
val
end
=
events
.
last
val
pressureAvg
=
events
.
map
(
event
=>
event
.
pressure
)
.
sum
/
data
.
size
val
tempAvg
=
events
.
map
(
event
=>
event
.
temp
)
.
sum
/
data
.
size
Iterator
(
WeatherEventAverage
(
key
,
start
.
timestamp
,
end
.
timestamp
,
pressureAvg
,
tempAvg
)
)
}
else
{
Iterator
.
empty
}
}
Now, we can use that new abstraction to transform our state in the case of a timeout as well as in the usual scenario where data is coming in. Note in Example 13-3 how we use the timeout information to evict the expiring state.
flatMapGroupsWithState
import
org.apache.spark.sql.streaming.GroupState
def
flatMappingFunction
(
key
:
String
,
values
:
Iterator
[
WeatherEvent
],
state
:
GroupState
[
FIFOBuffer
[
WeatherEvent
]]
)
:
Iterator
[
WeatherEventAverage
]
=
{
// first check for timeout in the state
if
(
state
.
hasTimedOut
)
{
// when the state has a timeout, the values are empty
// this validation is only to illustrate the point
assert
(
values
.
isEmpty
,
"When the state has a timeout, the values are empty"
)
val
result
=
stateToAverageEvent
(
key
,
state
.
get
)
// evict the timed-out state
state
.
remove
()
// emit the result of transforming the current state into an output record
result
}
else
{
// get current state or create a new one if there's no previous state
val
currentState
=
state
.
getOption
.
getOrElse
(
new
FIFOBuffer
[
WeatherEvent
](
ElementCountWindowSize
)
)
// enrich the state with the new events
val
updatedState
=
values
.
foldLeft
(
currentState
)
{
case
(
st
,
ev
)
=>
st
.
add
(
ev
)
}
// update the state with the enriched state
state
.
update
(
updatedState
)
state
.
setTimeoutDuration
(
"30 seconds"
)
// only when we have enough data,
// create a WeatherEventAverage from the accumulated state
// before that, we return an empty result.
stateToAverageEvent
(
key
,
updatedState
)
}
}
The semantics of the timeouts in Structured Streaming gives the guarantee that no event will be timed out before the clock advances past the watermark. This follows our intuition of a timeout: our state does not timeout before the set expiration time.
Where the timeout semantics depart from the common intuition is when the timeout event actually happens after the expiration time has passed.
Currently, the timeout processing is bound to the receiving of new data. So, a stream that goes silent for a while and does not generate new triggers to process will not generate timeouts either. The current timeout semantics are defined in terms of an eventuality: The timeout event will be eventually triggered after the state has expired, without any guarantees about how long the timeout event will fire after the actual timeout has happened. Stated formally: there is a no strict upper bound on when the timeout would occur.
There is work in progress to make timeouts fire even when no new data is available.
In this chapter, we learned about the arbitrary stateful processing API in Structured Streaming.
We explored the details of and differences between the mapGroupsWithState
and flatMapGroupsWithState
with relation to the events produced and the output modes supported.
At the end, we also learned about the timeout settings and became aware of its semantics.
Although this API is more complex to use than the regular SQL-like constructs of the structured APIs, it provides us with a powerful toolset to implement arbitrary state management to address the development of the most demanding streaming use cases.
1 Working set is a concept that refers to the amount of memory used by a process to function over a period of time.