Now that we have a better understanding of the Structured Streaming API and programming model, in this chapter, we create a small but complete Internet of Things (IoT)-inspired streaming program.
For this example, we will use the Structured-Streaming-in-action
notebook in the online resources for the book, located on https://github.com/stream-processing-with-spark.
Our use case will be to consume a stream of sensor readings from Apache Kafka as the streaming source.
We are going to correlate incoming IoT sensor data with a static reference file that contains all known sensors with their configuration. That way, we enrich each incoming record with specific sensor parameters that we require to process the reported data. We then save all correctly processed records to a file in Parquet format.
Apache Kafka is one of the most popular choices for a scalable messaging broker that is used to decouple producers from consumers in an event-driven system. It is is a highly scalable distributed streaming platform based on the abstraction of a distributed commit log. It provides functionality similar to message queues or enterprise messaging systems but differentiates from its predecessors in three important areas:
Runs are distributed on a commodity cluster, making it highly scalable.
Fault-tolerant data storage guarantees consistency of data reception and delivery.
Pull-based consumers allow consumption of the data at a different time and pace, from real time, to microbatch, to batch, creating the possibility of feeding data to a wide range of applications.
You can find Kafka at http://kafka.apache.org.
The first part of our program deals with the creation of the streaming Dataset
:
val
rawData
=
sparkSession
.
readStream
.
format
(
"kafka"
)
.
option
(
"kafka.bootstrap.servers"
,
kafkaBootstrapServer
)
.
option
(
"subscribe"
,
topic
)
.
option
(
"startingOffsets"
,
"earliest"
)
.
load
()
>
rawData
:
org.apache.spark.sql.DataFrame
The entry point of Structured Streaming is an existing Spark Session (sparkSession
).
As you can appreciate on the first line, the creation of a streaming Dataset
is almost identical to the creation of a static Dataset
that would use a read
operation instead.
sparkSession.readStream
returns a DataStreamReader
, a class that implements the builder pattern to collect the information needed to construct the streaming source using a fluid API. In that API, we find the format
option that lets us specify our source provider, which, in our case, is kafka
.
The options that follow it are specific to the source:
kafka.bootstrap.servers
Indicates the set of bootstrap servers to contact as a comma-separated list of host:port
addresses
subscribe
Specifies the topic or topics to subscribe to
startingOffsets
The offset reset policy to apply when this application starts out fresh
We cover the details of the Kafka streaming provider later in Chapter 10.
The load()
method evaluates the DataStreamReader
builder and creates a DataFrame
as a result, as we can see in the returned value:
>
rawData
:
org.apache.spark.sql.DataFrame
A DataFrame
is an alias for Dataset[Row]
with a known schema.
After creation, you can use streaming Dataset
s just like regular Dataset
s.
This makes it possible to use the full-fledged Dataset
API with Structured Streaming, albeit some exceptions apply because not all operations, such as show()
or count()
, make sense in a streaming context.
To programmatically differentiate a streaming Dataset
from a static one, we can ask a Dataset
whether it is of the streaming kind:
rawData
.
isStreaming
res7
:
Boolean
=
true
And we can also explore the schema attached to it, using the existing Dataset
API, as demonstrated in Example 9-1.
rawData
.
printSchema
()
root
|--
key
:
binary
(
nullable
=
true
)
|--
value
:
binary
(
nullable
=
true
)
|--
topic
:
string
(
nullable
=
true
)
|--
partition
:
integer
(
nullable
=
true
)
|--
offset
:
long
(
nullable
=
true
)
|--
timestamp
:
timestamp
(
nullable
=
true
)
|--
timestampType
:
integer
(
nullable
=
true
)
In general, Structured Streaming requires the explicit declaration of a schema for the consumed stream. In the specific case of kafka
, the schema for the resulting Dataset
is fixed and is independent of the contents of the stream. It consists of a set of fields specific to the Kakfa source
: key
, value
, topic
, partition
, offset
, timestamp
, and timestampType
, as we can see in Example 9-1. In most cases, applications will be mostly interested in the contents of the value
field where the actual payload of the stream resides.
Recall that the intention of our job is to correlate the incoming IoT sensor data with a reference file that contains all known sensors with their configuration. That way, we would enrich each incoming record with specific sensor parameters that would allow us to interpret the reported data. We would then save all correctly processed records to a Parquet file. The data coming from unknown sensors would be saved to a separate file for later analysis.
Using Structured Streaming, our job can be implemented in terms of Dataset
operations:
val
iotData
=
rawData
.
select
(
$
"value"
).
as
[
String
].
flatMap
{
record
=>
val
fields
=
record
.
split
(
","
)
Try
{
SensorData
(
fields
(
0
).
toInt
,
fields
(
1
).
toLong
,
fields
(
2
).
toDouble
)
}.
toOption
}
val
sensorRef
=
sparkSession
.
read
.
parquet
(
s"
$workDir
/
$referenceFile
"
)
sensorRef
.
cache
()
val
sensorWithInfo
=
sensorRef
.
join
(
iotData
,
Seq
(
"sensorId"
),
"inner"
)
val
knownSensors
=
sensorWithInfo
.
withColumn
(
"dnvalue"
,
$
"value"
*(
$
"maxRange"
-
$
"minRange"
)+
$
"minRange"
)
.
drop
(
"value"
,
"maxRange"
,
"minRange"
)
In the first step, we transform our CSV-formatted records back into SensorData
entries.
We apply Scala functional operations on the typed Dataset[String]
that we obtained from extracting the value
field as a String
.
Then, we use a streaming Dataset
to static Dataset
inner join
to correlate the sensor data with the corresponding reference using the sensorId
as key.
To complete our application, we compute the real values of the sensor reading using the minimum-maximum ranges in the reference data.
The final step of our streaming application is to write the enriched IoT data to a Parquet-formatted file.
In Structured Streaming, the write
operation is crucial: it marks the completion of the declared transformations on the stream, defines a write mode, and upon calling start(), the processing of the continuous query will begin.
In Structured Streaming, all operations are lazy declarations of what we want to do with the streaming data.
Only when we call start()
will the actual consumption of the stream begin and the query operations on the data materialize into actual results:
val
knownSensorsQuery
=
knownSensors
.
writeStream
.
outputMode
(
"append"
)
.
format
(
"parquet"
)
.
option
(
"path"
,
targetPath
)
.
option
(
"checkpointLocation"
,
"/tmp/checkpoint"
)
.
start
()
Let’s break this operation down:
writeStream
creates a builder object where we can configure the options for the desired write operation, using a fluent interface.
With format
, we specify the sink that will materialize the result downstream. In our case, we use the built-in FileStreamSink
with Parquet format.
mode
is a new concept in Structured Streaming: given that we, theoretically, have access to all the data seen in the stream so far, we also have the option to produce different views of that data.
The append
mode, used here, implies that the new records affected by our streaming computation are produced to the output.
The result of the start
call is a StreamingQuery
instance.
This object provides methods to control the execution of the query and request information about the status of our running streaming query, as shown in Example 9-2.
knownSensorsQuery
.
recentProgress
res37
:
Array
[
org.apache.spark.sql.streaming.StreamingQueryProgress
]
=
Array
({
"id"
:
"6
b9fe3eb-
7749
-
4294
-b3e7-
2561
f1e840b6
"
,
"runId"
:
"0
d8d5605-bf78-
4169
-
8
cfe-
98311
fc8365c
"
,
"name"
:
null
,
"timestamp"
:
"2017
-
08
-
10
T16:
20
:
00
.
065
Z
"
,
"numInputRows"
:
4348
,
"inputRowsPerSecond"
:
395272
.
7272727273
,
"processedRowsPerSecond"
:
28986
.
666666666668
,
"durationMs"
:
{
"
addBatch
"
:
127
,
"
getBatch
"
:
3
,
"getOffset"
:
1
,
"queryPlanning"
:
7
,
"triggerExecution"
:
150
,
"walCommit"
:
11
},
"stateOperators"
:
[
]
,
"sources"
:
[
{
"
description
"
:
"
KafkaSource
[
Subscribe
[
iot-data
]]
"
,
"startOffset"
:
{
"
iot-data
"
:
{
"0"
:
19048348
}
},
"endOffset"
:
{
"
iot-data
"
:
{
"0"
:
19052696
}
},
"
numInputRow
...
In Example 9-2, we can see the StreamingQueryProgress
as a result of calling knownSensorsQuery.recentProgress
.
If we see nonzero values for the numInputRows
, we can be certain that our job is consuming data.
We now have a Structured Streaming job running properly.
Hopefully, this hands-on chapter has shown you how to create your first nontrivial application using Structured Streaming.
After reading this chapter, you should have a better understanding of the structure of a Structured Streaming program and how to approach a streaming application, from consuming the data, to processing it using the Dataset
and DataFrames
APIs, to producing the data to an external output.
At this point, you should be just about ready to take on the adventure of creating your own streaming processing jobs.
In the next chapters, you learn in depth the different aspects of Structured Streaming.