Chapter 12. Event Time–Based Stream Processing

In “The Effect of Time”, we discussed the effect of time in stream processing from a general perspective.

As we recall, event-time processing refers to looking at the stream of events from the timeline at which they were produced and applying the processing logic from that perspective. When we are interested in analyzing the patterns of the event data over time, it is necessary to process the events as if we were observing them at the time they were produced. To do this, we require the device or system that produces the event to “stamp” the events with the time of creation. Hence, the usual name “timestamp” to refer to a specific event-bound time. We use that time as our frame of reference for how time evolves.

To illustrate this concept, let’s explore a familiar example. Consider a network of weather stations used to monitor local weather conditions. Some remote stations are connected through the mobile network, whereas others, hosted at volunteering homes, have access to internet connections of varying quality. The weather monitoring system cannot rely on the arrival order of the events because that order is mostly dependent on the speed and reliability of the network they are connected to. Instead, the weather application relies on each weather station to timestamp the events delivered. Our stream processing then uses these timestamps to compute the time-based aggregations that feed the weather forecasting system.

The capability of a stream-processing engine to use event time is important because we are usually interested in the relative order in which events were produced, not the sequence in which the events are processed. In this chapter, we learn how Structured Streaming provides seamless support for event-time processing.

Understanding Event Time in Structured Streaming

At the server side, the notion of time is ruled by the internal clock of the computers running any given application. In the case of distributed applications running on a cluster of machines, it is a mandatory practice to use a clock synchronization technique and protocol such as Network Time Protocol (NTP) to align all clocks to the same time. The purpose is that the different parts of a distributed application running on a cluster of computers can make consistent decisions about the timeline and relative ordering of events.

However, when data is coming from external devices, such as sensor networks, other datacenters, mobile phones, or connected cars, to name few examples, we have no guarantees that their clocks are aligned with our cluster of machines. We need to interpret the timeline of the incoming events from the perspective of the producing system and not in reference to the internal clock of the processing system. Figure 12-1 depicts this scenario.

spas 1201
Figure 12-1. Internal event timeline

In Figure 12-1, we visualize how the time is handled in Structured Streaming:

  • In the x-axis, we have the processing time, the clock time of the processing system.

  • The y-axis represents the internal representation of the event time timeline.

  • Events are represented with a circle with their corresponding event time label next to them.

  • The event arrival time corresponds to the time on the x-axis.

As events arrive in the system, our internal notion of time progresses:

  1. The first event, 00:08, arrives into the system at 00:07, “early” from the point of view of the machine clock. We can appreciate that the internal clock time does not affect our perception of the event timeline.

  2. The event timeline advances to 00:08.

  3. The next batch of events, 00:10, 00:12, 00:18 arrive for processing. The event timeline moves up to 00:18 because it’s the maximum observed time so far.

  4. 00:15 enters the system. The event timeline remains in its current value of 00:18 as 00:15 is earlier than the current internal time.

  5. Likewise, 00:11 and 00:09 are received. Should we process these events or are they too late?

  6. When the next set of events are processed, 00:14, 00:25, 00:28, the streaming clock increases up to their maximum of 00:28.

In general, Structured Streaming infers the timeline of the events processed with event time by keeping a monotonically increasing upper bound of the field declared as timestamp in the events. This nonlinear timeline is the ruling clock used for the time-based processing features in this chapter. The ability of Structured Streaming to understand the time flow of the event source decouples the event generation from the event processing time. In particular, we can replay a sequence of past events and have Structured Streaming produce the correct results for all event-time aggregations. We could, for example, replay a week’s worth of events in a few minutes and have our system produce results consistent with a week period. This would be impossible if time were governed by the computer clock.

Using Event Time

In Structured Streaming, we can take advantage of the built-in support for event time in two areas: time-based aggregation and state management.

In both cases, the first step is to have a field in our data in the right format for Structured Streaming to understand it as a timestamp.

Spark SQL supports java.sql.Timestamp as a Timestamp type. For other base types, we need to first convert the value to a Timestamp before we can use it for event-time processing. In Table 12-1, the initial ts field contains the timestamp of a given type, and we summarize how to obtain the corresponding Timestamp type.

Table 12-1. Obtaining a Timestamp field
ts base type SQL function

Long

$"ts".cast(TimestampType))

String with default format yyyy-MM-dd HH:mm:ss

$"ts".cast(TimestampType)

String with default format yyyy-MM-dd HH:mm:ss (alternative)

to_timestamp($"ts”)

String with a custom format. eg.dd-MM-yyyy HH:mm:ss

to_timestamp($"ts", "dd-MM-yyyy HH:mm:ss")

Processing Time

As we discussed in the introduction of this section, we make a distinction between event-time and processing-time processing. Event time relates to the timeline at which events were produced and is independent of the time of processing. In contrast, processing time is the timeline when events are ingested by the engine, and it is based on the clock of the computers processing the event stream. It’s is the “now” when the events enter the processing engine.

There are cases in which the event data does not contain time information, but we still want to take advantage of the native time-based functions offered by Structured Streaming. In those cases, we can add a processing-time timestamp to the event data and use that timestamp as the event time.

Continuing with the same example, we can add processing-time information using the current_timestamp SQL function:

// Lets assume an existing streaming dataframe of weather station readings
// (id: String, pressure: Double, temperature: Double)

// we add a processing-time timestamp
val timeStampEvents = raw.withColumn("timestamp", current_timestamp())

Watermarks

At the beginning of the chapter, we learned that external factors can affect the delivery of event messages and, hence, when using event time for processing, we didn’t have a guarantee of order or delivery. Events might be late or never arrive at all. How late is too late? For how long do we hold partial aggregations before considering them complete? To answer these questions, the concept of watermarks was introduced in Structured Streaming. A watermark is a time threshold that dictates how long we wait for events before declaring that they are too late. Events that are considered late beyond the watermark are discarded.

Watermarks are computed as a threshold based on the internal time representation. As we can appreciate in Figure 12-2, the watermark line is a shifted line from the event-time timeline inferred from the event’s time information. In this chart, we can observe that all events falling in the “gray area” below the watermark line are considered “too late” and will not be taken into consideration in the computations consuming this event stream.

spas 1202
Figure 12-2. Watermark with the internal event timeline

We declare a watermark by linking our timestamp field with the time threshold corresponding to the watermark. Continuing with the Table 12-1, we declare a watermark like this:

// Lets assume an existing streaming dataframe of weather station readings
// (id: String, ts:Long, pressure: Double, temperature: Double)

val timeStampEvents = raw.withColumn("timestamp", $"ts".cast(TimestampType))
                         .withWatermak("timestamp", "5 minutes")

Time-Based Window Aggregations

A natural question that we want to pose to streams of data is aggregated information at regular intervals of time. As streams are potentially never-ending, instead of asking “how many X are there?” in a stream-processing context, we are more interested in knowing “how many X were there in 15-minute intervals.”

With the use of event-time processing, Structured Streaming removes the usual complexity of dealing with intermediate state in the face of the event delivery challenges that we have discussed in this chapter. Structured Streaming takes care of keeping partial aggregates of data and of updating the downstream consumer using the semantics corresponding to the chosen output mode.

Defining Time-Based Windows

We discussed the concept of window-based aggregations in “Window Aggregations”, in which we presented the definitions of tumbling and sliding windows. In Structured Streaming, the built-in event-time support makes it easy to define and use such window-based operations.

From the API perspective, window aggregations are declared using a window function as grouping criteria. The window function must be applied to the field that we want to use as event time.

Continuing with our weather station scenario, in Example 12-1 we can compute the average pressure each 10 minutes totalized across all reporting stations.

Example 12-1. Computing totalized averages
$>val perMinuteAvg = timeStampEvents
  .withWatermak("timestamp", "5 minutes")
  .groupBy(window($"timestamp", "1 minute"))
  .agg(avg($"pressure"))

$>perMinuteAvg.printSchema // let's inspect the schema of our window aggregation

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- pressureAvg: double (nullable = true)
 |-- tempAvg: double (nullable = true)

$>perMinuteAvg.writeStream.outputMode("append").format("console").start()
// after few minutes
+---------------------------------------------+-------------+-------------+
|window                                       |pressureAvg  |tempAvg      |
+---------------------------------------------+-------------+-------------+
|[2018-06-17 23:27:00.0,2018-06-17 23:28:00.0]|101.515516867|5.19433723603|
|[2018-06-17 23:28:00.0,2018-06-17 23:29:00.0]|101.481236804|13.4036089642|
|[2018-06-17 23:29:00.0,2018-06-17 23:30:00.0]|101.534757332|7.29652790939|
|[2018-06-17 23:30:00.0,2018-06-17 23:31:00.0]|101.472349471|9.38486237260|
|[2018-06-17 23:31:00.0,2018-06-17 23:32:00.0]|101.523849943|12.3600638827|
|[2018-06-17 23:32:00.0,2018-06-17 23:33:00.0]|101.531088691|11.9662189701|
|[2018-06-17 23:33:00.0,2018-06-17 23:34:00.0]|101.491889383|9.07050033207|
+---------------------------------------------+-------------+-------------+

In this example, we observe that the resulting schema of a windowed aggregation contains the window period, indicated with start and end timestamp for each resulting window, together with the corresponding computed values.

Understanding How Intervals Are Computed

The window intervals are aligned to the start of the second/minute/hour/day that corresponds to the next upper time magnitude of the time unit used. For example, a window($"timestamp", "15 minutes") will produce 15-minute intervals aligned to the start of the hour.

The start time of the first interval is in the past to adjust the window alignment without any data loss. This implies that the first interval might contain only a fraction of the usual interval worth of data. So, if we are receiving about 100 messages per second, we expect to see about 90,000 messages in 15 minutes, whereas our first window might be just a fraction of that.

The time intervals in a window are inclusive at the start and exclusive at the end. In interval notation, this is written as [start-time, end-time). Using the 15-minute intervals as defined earlier, a data point that arrives with timestamp 11:30:00.00 will belong to the 11:30-11:45 window interval.

Using Composite Aggregation Keys

In Example 12-1, we calculated globally aggregated values for the pressure and temperature sensors. We are also interested in computing aggregated values for each weather station. We can achieve that by creating a composite aggregation key where we add the stationId to the aggregation criteria in the same way that we would do that with the static DataFrame API. Example 12-2 illustrates how we do this.

Example 12-2. Computing averages per station
$>val minuteAvgPerStation = timeStampEvents
  .withWatermak("timestamp", "5 minutes")
  .groupBy($"stationId", window($"timestamp", "1 minute"))
  .agg(avg($"pressure") as "pressureAvg", avg($"temp") as "tempAvg")

// The aggregation schema now contains the station Id
$>minuteAvgPerStation.printSchema
root
 |-- stationId: string (nullable = true)
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- pressureAvg: double (nullable = true)
 |-- tempAvg: double (nullable = true)

$>minuteAvgPerStation.writeStream.outputMode("append").format("console").start

+---------+-----------------------------------------+-----------+------------+
|stationId|window                                   |pressureAvg|tempAvg     |
+---------+-----------------------------------------+-----------+------------+
|d60779f6 |[2018-06-24 18:40:00,2018-06-24 18:41:00]|101.2941341|17.305931400|
|d1e46a42 |[2018-06-24 18:40:00,2018-06-24 18:41:00]|101.0664287|4.1361759034|
|d7e277b2 |[2018-06-24 18:40:00,2018-06-24 18:41:00]|101.8582047|26.733601007|
|d2f731cc |[2018-06-24 18:40:00,2018-06-24 18:41:00]|101.4787068|9.2916271894|
|d2e710aa |[2018-06-24 18:40:00,2018-06-24 18:41:00]|101.7895921|12.575678298|
   ...
|d2f731cc |[2018-06-24 18:41:00,2018-06-24 18:42:00]|101.3489804|11.372200251|
|d60779f6 |[2018-06-24 18:41:00,2018-06-24 18:42:00]|101.6932267|17.162540135|
|d1b06f88 |[2018-06-24 18:41:00,2018-06-24 18:42:00]|101.3705194|-3.318370333|
|d4c162ee |[2018-06-24 18:41:00,2018-06-24 18:42:00]|101.3407332|19.347538519|
+---------+-----------------------------------------+-----------+------------+
// ** output has been edited to fit into the page

Tumbling and Sliding Windows

window is a SQL function that takes a timeColumn of TimestampType type and additional parameters to specify the duration of the window:

window(timeColumn: Column,
       windowDuration: String,
       slideDuration: String,
       startTime: String)

Overloaded definitions of this method make slideDuration and startTime optional.

This API lets us specify two kinds of windows: tumbling and sliding windows. The optional startTime can delay the creation of the window, for example, when we want to allow the stream throughput to stabilize after a ramp-up period.

Tumbling windows

Tumbling windows segment the time in nonoverlapping, contiguous periods. They are the natural window operation when we refer to a “total count each 15 minutes” or “production level per generator each hour.” We specify a tumbling window by only providing the windowDuration parameter:

window($"timestamp", "5 minutes")

This window definition produces one result every five minutes.

Sliding windows

In contrast with tumbling windows, sliding windows are overlapping intervals of time. The size of the interval is determined by the windowDuration time. All values from the stream in that interval come into consideration for the aggregate operation. For the next slice, we add the elements arriving during slideDuration, remove the elements corresponding to the oldest slice, and apply the aggregation to the data within the window, producing a result at each slideDuration:

window($"timestamp", "10 minutes", "1 minute")

This window definition uses 10 minutes’ worth of data to produce a result every minute.

It’s worth noting that a tumbling window is a particular case of a sliding window in which windowDuration and slideDuration have equal values:

window($"timestamp", "5 minutes", "5 minutes")

It is illegal to use a slideInterval larger than the windowDuration. Structured Streaming will throw an org.apache.spark.sql.AnalysisException error if such a case occurs.

Interval offset

The third parameter in the window definition, called startTime, provides a way to offset the window alignment. In “Understanding How Intervals Are Computed” we saw that the window intervals are aligned to the upper-next time magnitude. startTime (a misnomer in our opinion) lets us offset the window intervals by the indicated time.

In the following window definition, we offset a 10-minute window with a slide duration of 5 minutes by 2 minutes, resulting in time intervals like 00:02-00:12, 00:07-00:17, 00:12-00:22, ...

window($"timestamp", "10 minutes", "5 minute", "2 minutes")

startTime must strictly be less than slideDuration. Structured Streaming will throw an org.apache.spark.sql.AnalysisException error if an invalid configuration is provided. Intuitively, given that slideDuration provides the periodicity at which the window is reported, we can offset that period for only a time that is less than the period itself.

Record Deduplication

Structured Streaming offers a built-in function that removes duplicate records in the stream. It is possible to specify a watermark that determines when it is safe to discard previously seen keys.

The base form is quite simple:

val deduplicatedStream = stream.dropDuplicates(<field> , <field>, ...)

Nevertheless, this base method is discouraged, as it requires you to store all received values for the set of fields defining a unique record, which can potentially be unbounded.

A more robust alternative involves specifying a watermark on the stream before the dropDuplicates function:

val deduplicatedStream = stream
  .withWatermark(<event-time-field>, <delay-threshold>)
  .dropDuplicates(<field> , <field>, ...)

When using a watermark, keys older than the watermark become eligible for deletion, allowing the state store to keep its storage needs bounded.

Summary

In this chapter we explored how Structured Streaming implements the concept of event time and the facilities the API offers to make use of time embedded in the event data:

  • We learned how to use event time and how to fall back on processing time, when needed.

  • We explored watermarks, an important concept that lets us determine which events are too late and when state-related data might be evicted from the store.

  • We saw the different configuration for window operations and their link with event time.

  • Finally, we learned about the deduplication function in how it uses watermarks to keep its state bounded.

Event-time processing is a powerful set of features built into Structured Streaming that encapsulates the complexity of dealing with time, ordering, and lateness into an easy-to-use API.

Nevertheless, there are cases when the built-in functions are not sufficient to implement certain stateful processes. For those cases, Structured Streaming offers advanced functions to implement arbitrary stateful processes, as we see in the next chapter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset