Chapter 15. Experimental Areas: Continuous Processing and Machine Learning

Structured Streaming first appeared in Spark 2.0 as an experimental API, offering a new streaming model aimed at simplifying the way we think about streaming applications. In Spark 2.2, Structured Streaming “graduated” to production-ready, giving the signal that this new model was all set for industry adoption. With Spark 2.3, we saw further improvements in the area of streaming joins, and it also introduced a new experimental continuous execution model for low-latency stream processing.

As with any new successful development, we can expect Structured Streaming to keep advancing at a fast pace. Although industry adoption will contribute evolutionary feedback on important features, market trends such as the increasing popularity of machine learning will drive the roadmap of the releases to come.

In this chapter, we want to provide insights into some of the areas under development that will probably become mainstream in upcoming releases.

Continuous Processing

Continuous processing is an alternative execution mode for Structured Streaming that allows for low-latency processing of individual events. It has been included as an experimental feature in Spark v2.3 and is still under active development, in particular in the areas of delivery semantics, stateful operation support, and monitoring.

Understanding Continuous Processing

The initial streaming API for Spark, Spark Streaming, was conceived with the idea of reusing the batch capabilities of Spark. In a nutshell, the data stream is split into small chunks that are given to Spark for processing, using the core engine in its native batch execution mode. Using a scheduled repetition of this process at short time intervals, the input stream is constantly consumed and results are produced in a streaming fashion. This is called the microbatch model, which we discussed early on in Chapter 5. We study the application of this model in more detail when we talk about Spark Streaming in the next part of the book. The important part to remember for now, is that the definition of that interval of time, called batch interval, is the keystone of the original microbatch implementation.

Microbatch in Structured Streaming

When Structured Streaming was introduced, a similar evolution took place. Instead of introducing an alternative processing model, Structured Streaming was embedded into the Dataset API and reused the existing capabilities of the underlying Spark SQL engine. As a result, Structured Streaming offers a unified API with the more traditional batch mode and fully benefits from the performance optimizations introduced by Spark SQL, such as query optimization and Tungsten’s code generation.

In that effort, the underlying engine received additional capabilities to sustain a streaming workload, like incremental query execution and the support of resilient state management over time.

At the API surface level, Structured Streaming avoided making the notion of time an explicit user-facing parameter. This is what allows for the implementation of event-time aggregations, as the notion of time is inferred from the data stream, instead. Internally, the execution engine still relies on a microbatch architecture, but the abstraction of time allows for the creation of engines with different models of execution.

The first execution model that departs from the fixed-time microbatch is the best-effort execution in Structured Streaming, which is the default mode when no trigger is specified. In best-effort mode, the next microbatch starts as soon as the previous one ends. This creates the observable continuity of the resulting stream and improves the usage of the underlying computing resources.

The microbatch execution engine uses a task-dispatching model. Task dispatching and coordination over the cluster is rather expensive and the minimal latency possible is roughly 100 ms.

In Figure 15-1, you can observe how the process of filtering the incoming “circles” and transforming them to “triangles” works with the microbatch model. We collect all elements that arrive in a certain interval and apply our function f to all of them at the same time.

spas 1501
Figure 15-1. Microbatch latency

The processing latency is the time duration between the arrival of the event in the source stream and the production of a result in the sink. As we can appreciate, in the microbatch model, the latency upper limit is the batch interval plus the time it takes to process the data, which consists of the computation itself and the coordination needed to execute such computation in some executor of the cluster.

Introducing continuous processing: A low-latency streaming mode

Taking advantage of the time abstraction in Structured Streaming, it is possible to introduce new modes of execution without changing the user-facing API.

In the continuous processing execution mode, the data-processing query is implemented as a long-running task that executes continuously on the executors. The parallelism model is simple: for each input partition, we will have such a task running on a node of the cluster. This task subscribes to the input partition and continuously processes incoming individual events. The deployment of a query under continuous processing creates a topology of tasks in the cluster.

As illustrated in Figure 15-2, this new execution model eliminates the microbatch delay and produces a result for each element as soon as it’s processed. This model is similar to Apache Flink.

spas 1502
Figure 15-2. Continuous processing latency

Using Continuous Processing

All that is required to use the continuous processing execution mode is to specify Trigger.Continuous as a trigger and provide it with the time interval for the asynchronous checkpoint function, like we show in this minimalistic example:

import org.apache.spark.sql.streaming.Trigger

val stream = spark.readStream
    .format("rate")
    .option("rowsPerSecond", "5")
    .load()

val evenElements = stream.select($"timestamp", $"value").where($"value" % 2 === 0)

val query = evenElements.writeStream
    .format("console")
    .trigger(Trigger.Continuous("2 seconds"))
    .start()
Warning

Do not confuse the time interval provided in Trigger.Continuous(<time-interval>) with a microbatch interval. This is the time interval of the asynchronous checkpoint operation, done by the continuous query executor.

Limitations

Although there are no changes at the API level, there are a number of restrictions on the type of queries that are supported in continuous mode. The intuition is that continuous mode works with queries that can be applied on a per-element basis. In SQL terms, we can use selections, projections, and transformations, including SQL functions except for aggregations. In functional terms, we can use filter, map, flatMap, and mapPartitions.

When using aggregations and window functions, in particular with event-based data, we have much longer deadlines to wait for late and out-of-order data. The very nature of a time period in a window and related concepts such as watermarks do not benefit from the low-latency characteristics of this execution model. In such cases, the recommended approach is to fall back to the microbatch-based engine, replacing Trigger.Continuous(<checkpoint-interval>) with a microbatch trigger definition: Trigger.ProcessingTime(<trigger-interval>).

The support of arbitrary stateful processing such as [flat]mapGroupsWithState is currently under development.

Machine Learning

As the amount of available data and its rate of arrival increases, traditional techniques of understanding signals in the data become a major block to extract actionable insights from it.

Machine learning is, in essence, the combination of algorithms and statistical analysis techniques to learn from the data and use that learning to provide an answer to certain questions. Machine learning uses data to estimate a model, a mathematical representation of some aspect of the world. Once a model has been determined, it can be queried on existing or new data to obtain an answer.

The nature of the answer we want from the data divides the aim of the machine learning algorithms into three groups:

Regression

We want to predict a value in a continuous range. Example: using data about a student’s number of absences and hours of study for a given class, predict the score in their final exam.

Classification

We want to separate data points into one of several categories. Example: given a text sample, we want to estimate the language.

Clustering

Given a set of elements, we want to divide it into subsets using some notion of similarity. Example: in an online wine store, we want to group customers with similar purchase behavior.

In the learning process, we also have the notion of supervision. We talk about supervised learning when the algorithm being trained requires data that maps a number of observations to an outcome. Regression and classification techniques fall under the category of supervised learning. Using our previous example of the exam score, to build our regression model, we require a dataset of historical student performance that contains the exam scores along with the number of absences and hours of study reported by the students. Obtaining good data is the most challenging aspect of a machine learning task.

Learning Versus Exploiting

We can identify two phases in the application of machine learning techniques:

  • A learning phase, in which data is prepared and used to estimate a model. This is also known as training or learning.

  • An exploit phase, in which the estimated model is queried on new data. This phase is known as prediction or scoring.

The training phase in machine learning is typically done using historical datasets. These datasets are usually cleaned and prepared for the target application. The machine learning methodology also calls for a validation phase in which the resulting model is evaluated against a dataset of known results, usually called the testing or validation set. The result of the testing phase are metrics that report how well the learned model performs on data that it didn’t see during the training.

Applying a Machine Learning Model to a Stream

As we mentioned earlier, creating a machine learning model is usually a batch-based process that uses historical data to train a statistical model. As soon as that model is available, it can be used to score new data to obtain an estimate of the specific aspect for which the model was trained.

The unified structured APIs of Apache Spark across batch, machine learning, and streaming make it straightforward to apply a previously trained model to a streaming DataFrame.

Assuming that the model is stored on disk, the process consists of two steps:

  1. Load the model.

  2. Use its transform method to apply the model to the streaming DataFrame.

Let’s see the API in action with an example.

Example: Estimating Room Occupancy by Using Ambient Sensors

During the development of this part of the book, we have been using sensor information as a running theme. Up to now, we have used the sensor data to explore the data processing and analytic capabilities of Structured Streaming. Now, imagine that we have such ambient sensors in a series of rooms, but instead of keeping track of temperature or humidity data over time, we want to use that information to drive a novel application. We would like to estimate whether the room is occupied at a certain moment by using the sensor data. Although temperature or humidity alone are probably not sufficient to determine whether a room is in use, maybe a combination of these factors is able to predict occupancy to a certain degree of accuracy.

Online Resources

For this example, we will use the occupancy_detection_model and occupancy_streaming_prediction notebooks in the online resources for the book, located at https://github.com/stream-processing-with-spark.

For this example, we are going to use an occupancy dataset that was collected with the intention of answering that question. The dataset consists of the following schema:

 |-- id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Light: double (nullable = true)
 |-- CO2: double (nullable = true)
 |-- HumidityRatio: double (nullable = true)
 |-- Occupancy: integer (nullable = true)

The occupancy information in the training dataset was obtained using camera images of the room to detect with certainty the presence of people in it.

Using this data, we trained a logistic regression model that estimates the occupancy, represented by the binomial outcome [0,1], where 0 = not occupied and 1 = occupied.

Note

For this example, we assume that the trained model is already available on disk. The complete training phase of this example is available in the book’s online resources.

The first step is to load the previously trained model:

$ import org.apache.spark.ml._
$ val pipelineModel = PipelineModel.read.load(modelFile)
>pipelineModel: org.apache.spark.ml.PipelineModel = pipeline_5b323b4dfffd

This call results in a model that contains information over the stages of our pipeline.

With the call to model.stages, we can visualize these stages:

$ model.stages
res16: Array[org.apache.spark.ml.Transformer] =
    Array(vecAssembler_7582c780b304, logreg_52e582f4bdb0)

Our pipeline consists of two stages: a VectorAssembler and a LogisticRegression classifier. The VectorAssembler is a transformation that selectively transforms the chosen fields in the input data into a numeric Vector that serves as input for the model. The LogisticRegression stage is the trained logistic regression classifier. It uses the learned parameters to transform the input Vector into three fields that are added to the streaming DataFrame: rawPrediction, probability, and prediction.

For our application, we are interested in the prediction value that will tell us whether the room is in use (1) or not (0).

The next step, shown in Example 15-1, is to apply the model to the streaming DataFrame.

Example 15-1. Using a trained machine learning model in Structured Streaming
// let's assume an existing sensorDataStream
$ val scoredStream = pipeline.transform(sensorDataStream)

// inspect the schema of the resulting DataFrame
$ scoredStream.printSchema
root
  |-- id: long (nullable = true)
  |-- timestamp: timestamp (nullable = true)
  |-- date: timestamp (nullable = true)
  |-- Temperature: double (nullable = true)
  |-- Humidity: double (nullable = true)
  |-- Light: double (nullable = true)
  |-- CO2: double (nullable = true)
  |-- HumidityRatio: double (nullable = true)
  |-- Occupancy: integer (nullable = true)
  |-- features: vector (nullable = true)
  |-- rawPrediction: vector (nullable = true)
  |-- probability: vector (nullable = true)
  |-- prediction: double (nullable = false)

At this point, we have a streaming DataFrame that contains the prediction of our original streaming data.

The final step in our streaming prediction is to do something with the prediction data. In this example, we are going to limit this step to querying the data using the memory sink to access the resulting data as a SQL table:

import org.apache.spark.sql.streaming.Trigger
val query = scoredStream.writeStream
        .format("memory")
        .queryName("occ_pred")
        .start()

// let the stream run for a while first so that the table gets populated
sparkSession.sql("select id, timestamp, occupancy, prediction from occ_pred")
            .show(10, false)

+---+-----------------------+---------+----------+
|id |timestamp              |occupancy|prediction|
+---+-----------------------+---------+----------+
|211|2018-08-06 00:13:15.687|1        |1.0       |
|212|2018-08-06 00:13:16.687|1        |1.0       |
|213|2018-08-06 00:13:17.687|1        |1.0       |
|214|2018-08-06 00:13:18.687|1        |1.0       |
|215|2018-08-06 00:13:19.687|1        |1.0       |
|216|2018-08-06 00:13:20.687|1        |0.0       |
|217|2018-08-06 00:13:21.687|1        |0.0       |
|218|2018-08-06 00:13:22.687|0        |0.0       |
|219|2018-08-06 00:13:23.687|0        |0.0       |
|220|2018-08-06 00:13:24.687|0        |0.0       |
+---+-----------------------+---------+----------+

Given that we are using a test dataset to drive our stream, we also have access to the original occupancy data. In this limited sample, we can observe that the actual occupancy and the prediction are accurate most but not all of the time.

For real-world applications, we will typically be interested in offering this service to other applications. Maybe in the form of an HTTP-based API or through pub/sub messaging interactions. We can use any of the available sinks to write the results to other systems for further use.

The challenge of model serving

Trained machine learning models are seldom perfect. There are always opportunities to train a model with more or better data, or tweak its parameters to improve the prediction accuracy. With ever-evolving trained models, the challenge becomes to upgrade our streaming scoring process with a new model whenever it becomes available.

This process of managing the life cycle of machine learning models from the training stage to their exploitation in an application is usually known by the broad concept of model serving.

Model serving comprises the process of transitioning trained models into a production platform and keeping those online serving processes up to date with the most recent trained models.

Model serving in Structured Streaming

In Structured Streaming, updating a running query is not possible. Like we saw in Example 15-1, we include the model scoring step as a transformation in our streaming process. After we start the corresponding streaming query, that declaration becomes part of the query plan that is deployed and will run until the query is stopped. Hence, updating machine learning models in Structured Streaming is not directly supported. It is, nevertheless, possible to create a managing system that calls the Structured Streaming APIs to stop, update, and restart a query for which a new model becomes available.

The topic of model serving is an ongoing discussion in the Spark community and will certainly see an evolution in future versions of Spark and Structured Streaming.

Online Training

In the machine learning process we described earlier, we made a distinction between the learning and scoring phases, in which the learning step was mainly an offline process. In the context of a streaming application, it is possible to train a machine learning model as data arrives. This is also called online learning. Online learning is particularly interesting when we want to adapt to evolving patterns in the data, such as the changing interests of a social network or trend analysis in financial markets.

Online learning poses a new set of challenges because its implementation mandates that each data point is observed only once and that should take into consideration that the total amount of data observed might be endless.

In its current form, Structured Streaming does not offer support for online training. There are efforts to implement some (limited) form of online learning on Structured Streaming, the most notable being Holden Karau and Seth Hendrickson and Ram Sriharsha and Vlad Feinberg.

It would seem that early initiatives to implement online learning on top of Structured Streaming have lost momentum. This might change in the future, so check new releases of Structured Streaming for potential updates in this area.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset