Chapter 9. Structured Streaming in Action

Now that we have a better understanding of the Structured Streaming API and programming model, in this chapter, we create a small but complete Internet of Things (IoT)-inspired streaming program.

Online Resources

For this example, we will use the Structured-Streaming-in-action notebook in the online resources for the book, located on https://github.com/stream-processing-with-spark.

Our use case will be to consume a stream of sensor readings from Apache Kafka as the streaming source.

We are going to correlate incoming IoT sensor data with a static reference file that contains all known sensors with their configuration. That way, we enrich each incoming record with specific sensor parameters that we require to process the reported data. We then save all correctly processed records to a file in Parquet format.

Apache Kafka

Apache Kafka is one of the most popular choices for a scalable messaging broker that is used to decouple producers from consumers in an event-driven system. It is is a highly scalable distributed streaming platform based on the abstraction of a distributed commit log. It provides functionality similar to message queues or enterprise messaging systems but differentiates from its predecessors in three important areas:

  • Runs are distributed on a commodity cluster, making it highly scalable.

  • Fault-tolerant data storage guarantees consistency of data reception and delivery.

  • Pull-based consumers allow consumption of the data at a different time and pace, from real time, to microbatch, to batch, creating the possibility of feeding data to a wide range of applications.

You can find Kafka at http://kafka.apache.org.

Consuming a Streaming Source

The first part of our program deals with the creation of the streaming Dataset:

val rawData = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBootstrapServer)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

> rawData: org.apache.spark.sql.DataFrame

The entry point of Structured Streaming is an existing Spark Session (sparkSession). As you can appreciate on the first line, the creation of a streaming Dataset is almost identical to the creation of a static Dataset that would use a read operation instead. sparkSession.readStream returns a DataStreamReader, a class that implements the builder pattern to collect the information needed to construct the streaming source using a fluid API. In that API, we find the format option that lets us specify our source provider, which, in our case, is kafka. The options that follow it are specific to the source:

kafka.bootstrap.servers

Indicates the set of bootstrap servers to contact as a comma-separated list of host:port addresses

subscribe

Specifies the topic or topics to subscribe to

startingOffsets

The offset reset policy to apply when this application starts out fresh

We cover the details of the Kafka streaming provider later in Chapter 10.

The load() method evaluates the DataStreamReader builder and creates a DataFrame as a result, as we can see in the returned value:

> rawData: org.apache.spark.sql.DataFrame

A DataFrame is an alias for Dataset[Row] with a known schema. After creation, you can use streaming Datasets just like regular Datasets. This makes it possible to use the full-fledged Dataset API with Structured Streaming, albeit some exceptions apply because not all operations, such as show() or count(), make sense in a streaming context.

To programmatically differentiate a streaming Dataset from a static one, we can ask a Dataset whether it is of the streaming kind:

rawData.isStreaming
res7: Boolean = true

And we can also explore the schema attached to it, using the existing Dataset API, as demonstrated in Example 9-1.

Example 9-1. The Kafka schema
rawData.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

In general, Structured Streaming requires the explicit declaration of a schema for the consumed stream. In the specific case of kafka, the schema for the resulting Dataset is fixed and is independent of the contents of the stream. It consists of a set of fields specific to the Kakfa source: key, value, topic, partition, offset, timestamp, and timestampType, as we can see in Example 9-1. In most cases, applications will be mostly interested in the contents of the value field where the actual payload of the stream resides.

Application Logic

Recall that the intention of our job is to correlate the incoming IoT sensor data with a reference file that contains all known sensors with their configuration. That way, we would enrich each incoming record with specific sensor parameters that would allow us to interpret the reported data. We would then save all correctly processed records to a Parquet file. The data coming from unknown sensors would be saved to a separate file for later analysis.

Using Structured Streaming, our job can be implemented in terms of Dataset operations:

val iotData = rawData.select($"value").as[String].flatMap{record =>
  val fields = record.split(",")
  Try {
    SensorData(fields(0).toInt, fields(1).toLong, fields(2).toDouble)
  }.toOption
}

val sensorRef = sparkSession.read.parquet(s"$workDir/$referenceFile")
sensorRef.cache()

val sensorWithInfo = sensorRef.join(iotData, Seq("sensorId"), "inner")

val knownSensors = sensorWithInfo
  .withColumn("dnvalue", $"value"*($"maxRange"-$"minRange")+$"minRange")
  .drop("value", "maxRange", "minRange")

In the first step, we transform our CSV-formatted records back into SensorData entries. We apply Scala functional operations on the typed Dataset[String] that we obtained from extracting the value field as a String.

Then, we use a streaming Dataset to static Dataset inner join to correlate the sensor data with the corresponding reference using the sensorId as key.

To complete our application, we compute the real values of the sensor reading using the minimum-maximum ranges in the reference data.

Writing to a Streaming Sink

The final step of our streaming application is to write the enriched IoT data to a Parquet-formatted file. In Structured Streaming, the write operation is crucial: it marks the completion of the declared transformations on the stream, defines a write mode, and upon calling start(), the processing of the continuous query will begin.

In Structured Streaming, all operations are lazy declarations of what we want to do with the streaming data. Only when we call start() will the actual consumption of the stream begin and the query operations on the data materialize into actual results:

val knownSensorsQuery = knownSensors.writeStream
  .outputMode("append")
  .format("parquet")
  .option("path", targetPath)
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

Let’s break this operation down:

  • writeStream creates a builder object where we can configure the options for the desired write operation, using a fluent interface.

  • With format, we specify the sink that will materialize the result downstream. In our case, we use the built-in FileStreamSink with Parquet format.

  • mode is a new concept in Structured Streaming: given that we, theoretically, have access to all the data seen in the stream so far, we also have the option to produce different views of that data.

  • The append mode, used here, implies that the new records affected by our streaming computation are produced to the output.

The result of the start call is a StreamingQuery instance. This object provides methods to control the execution of the query and request information about the status of our running streaming query, as shown in Example 9-2.

Example 9-2. Query progress
knownSensorsQuery.recentProgress

res37: Array[org.apache.spark.sql.streaming.StreamingQueryProgress] =
Array({
  "id" : "6b9fe3eb-7749-4294-b3e7-2561f1e840b6",
  "runId" : "0d8d5605-bf78-4169-8cfe-98311fc8365c",
  "name" : null,
  "timestamp" : "2017-08-10T16:20:00.065Z",
  "numInputRows" : 4348,
  "inputRowsPerSecond" : 395272.7272727273,
  "processedRowsPerSecond" : 28986.666666666668,
  "durationMs" : {
    "addBatch" : 127,
    "getBatch" : 3,
    "getOffset" : 1,
    "queryPlanning" : 7,
    "triggerExecution" : 150,
    "walCommit" : 11
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[iot-data]]",
    "startOffset" : {
      "iot-data" : {
        "0" : 19048348
      }
    },
    "endOffset" : {
      "iot-data" : {
        "0" : 19052696
      }
    },
    "numInputRow...

In Example 9-2, we can see the StreamingQueryProgress as a result of calling knownSensorsQuery.recentProgress. If we see nonzero values for the numInputRows, we can be certain that our job is consuming data. We now have a Structured Streaming job running properly.

Summary

Hopefully, this hands-on chapter has shown you how to create your first nontrivial application using Structured Streaming.

After reading this chapter, you should have a better understanding of the structure of a Structured Streaming program and how to approach a streaming application, from consuming the data, to processing it using the Dataset and DataFrames APIs, to producing the data to an external output. At this point, you should be just about ready to take on the adventure of creating your own streaming processing jobs.

In the next chapters, you learn in depth the different aspects of Structured Streaming.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset