A streaming application is not just about doing some real-time computations on a stream of data. Generally, streaming will be part of a larger application that includes real-time, batch, and serving layers with machine learning, and so on. A continuous application is an end-to-end application that combines all these features in one application.
In Spark 2.0, the Structured Streaming API is introduced for building continuous applications. The Structured Streaming API addresses the following concerns of a typical streaming application:
Structured Streaming is built on top of the Spark SQL engine, so it inherits the performance benefits of datasets and DataFrames. Streaming computation can be done similar to batch computation on data at rest. The Spark SQL engine will process the incoming stream of data in batches incrementally and update the sinks. Structured Streaming is supported in the Java, Scala, and Python programming languages with dataset and DataFrame APIs. Structured Streaming is an alpha release in Spark 2.0 with limited capabilities. Upcoming releases will have fully-fledged capabilities, more sources, and sinks.
In Structured Streaming, the incoming stream of data is treated as a table that is continuously growing. All events coming in a batch window will be treated like new rows appended to the input table as shown in Figure 5.9.
Queries are executed on the input table to produce the result set as shown in Figure 5.10. Queries are executed based on the trigger interval, which creates the result set, and then it is updated on sinks such as file systems or databases, and so on.
Valid output modes are complete
, append
, or update
. append
is the default mode.
Streaming Datasets or DataFrames are created using Spark session, which is introduced in Spark 2.0. For creating DataFrame from static data we use SparkSession.read
whereas for creating Streaming DataFrame, we use SparkSession.readStream
. Also, for writing DataFrame out, we use SparkSession.write
whereas for writing Streaming DataFrames, we use SparkSession.writeStream
. The following Scala example creates the SparkSession
and then creates the stream by reading CSV files from a directory:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession .builder .appName("CSVStructuredStreaming") .getOrCreate() val csvSchema = new StructType().add("emp_name", "string").add("emp_age", "integer") val csv_df = spark .readStream .option("sep", ",") .schema(userSchema) .csv("HDFS Path") csv_df .writeStream .format("parquet") .option("path","HDFS Path") .option("checkpointLocation","HDFS Path") .outputMode("append") .start()
Currently only two types of built-in sources are supported: file source and socket source. File source creates a stream by reading files from a directory as shown in the previous example. Spark 2.0 supports CSV, JSON, and Parquet formats. The socket source reads the data from a socket connection, which is used for testing purposes only.
Currently four types of output sinks are supported, file
sink, foreach
sink, console
sink, and memory
sink. Only the Parquet format is supported in file
sink with append
mode. The foreach
mode can be used to write an arbitrary operation of output data and is supported in Scala and Java APIs. console
and memory
sinks are used for debugging purposes only.
Most common operations like filter
, groupBy
, and aggregation
are supported by streaming DataFrames. Streaming DataFrames can be converted to Streaming Datasets as well. Window operations are supported on streaming DataFrames and also streaming DataFrames can be joined with static DataFrames.
Let's create a Structured Streaming application using Internet of Things (IOT) use case where a few sensors are sending information about an active or inactive device at different intervals. Our goal is to find how many devices are active or inactive at different time intervals; the steps to do so are as follows:
iot-file1.json {"device_id":1,"timestamp":1470009600,"status":"active"} {"device_id":2,"timestamp":1470013200,"status":"active"} {"device_id":3,"timestamp":1470016800,"status":"active"} {"device_id":4,"timestamp":1470020400,"status":"active"} {"device_id":5,"timestamp":1470024000,"status":"active"} {"device_id":1,"timestamp":1470009601,"status":"active"} {"device_id":2,"timestamp":1470013202,"status":"active"} {"device_id":3,"timestamp":1470016803,"status":"inactive"} {"device_id":4,"timestamp":1470020404,"status":"active"} {"device_id":5,"timestamp":1470024005,"status":"active"} iot-file2.json {"device_id":1,"timestamp":1470027600,"status":"active"} {"device_id":2,"timestamp":1470031200,"status":"active"} {"device_id":3,"timestamp":1470034800,"status":"active"} {"device_id":4,"timestamp":1470038400,"status":"active"} {"device_id":5,"timestamp":1470042000,"status":"active"} {"device_id":1,"timestamp":1470027601,"status":"active"} {"device_id":2,"timestamp":1470031202,"status":"active"} {"device_id":3,"timestamp":1470034803,"status":"active"} {"device_id":4,"timestamp":1470038404,"status":"active"} {"device_id":5,"timestamp":1470042005,"status":"active"} iot-file3.json {"device_id":1,"timestamp":1470027601,"status":"active"}
hadoop fs -mkdir iotstream hadoop fs -put iot-file1.json iotstream/
bin/spark-shell
starts the Scala shell, which creates a pre-configured Spark session called spark
:import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ val iotSchema = new StructType().add("device_id", LongType).add("timestamp", TimestampType).add("status", StringType) val iotPath = "hdfs://quickstart.cloudera:8020/user/cloudera/iotstream" val iotStreamingDataFrame = spark.readStream.schema(iotSchema).option("maxFilesPerTrigger", 1).json(iotPath) val iotStreamingCounts = iotStreamingDataFrame.groupBy($"status", window($"timestamp", "1 hour")).count() iotStreamingCounts.isStreaming val iotQuery = iotStreamingCounts.writeStream.format("memory").queryName("iotstream").outputMode("complete").start()
iotstream
table:scala> spark.sql("select status, date_format(window.start, 'MMM-dd HH:mm') as start_time, date_format(window.end, 'MMM-dd HH:mm') as end_time, count from iotstream order by start_time,end_time, status").show() +--------+------------+------------+-----+ | status| start_time| end_time|count| +--------+------------+------------+-----+ | active|Jul-31 17:00|Jul-31 18:00| 2| | active|Jul-31 18:00|Jul-31 19:00| 2| | active|Jul-31 19:00|Jul-31 20:00| 1| |inactive|Jul-31 19:00|Jul-31 20:00| 1| | active|Jul-31 20:00|Jul-31 21:00| 2| | active|Jul-31 21:00|Jul-31 22:00| 2| +--------+------------+------------+-----+
iot-file2.json
to the same HDFS directory; you can observe that the streaming query executes automatically and computes the counts. Let's run the same query again as we have seen previously:hadoop fs -put iot-file2.json iotstream/ scala> spark.sql("select status, date_format(window.start, 'MMM-dd HH:mm') as start_time, date_format(window.end, 'MMM-dd HH:mm') as end_time, count from iotstream order by start_time,end_time, status").show() +--------+------------+------------+-----+ | status| start_time| end_time|count| +--------+------------+------------+-----+ | active|Aug-01 00:00|Aug-01 01:00| 2| | active|Aug-01 01:00|Aug-01 02:00| 2| | active|Aug-01 02:00|Aug-01 03:00| 2| | active|Jul-31 17:00|Jul-31 18:00| 2| | active|Jul-31 18:00|Jul-31 19:00| 2| | active|Jul-31 19:00|Jul-31 20:00| 1| |inactive|Jul-31 19:00|Jul-31 20:00| 1| | active|Jul-31 20:00|Jul-31 21:00| 2| | active|Jul-31 21:00|Jul-31 22:00| 2| | active|Jul-31 22:00|Jul-31 23:00| 2| | active|Jul-31 23:00|Aug-01 00:00| 2| +--------+------------+------------+-----+
iot-file3.json
to the same HDFS directory, which has a late event. Once you execute the query you can observe that the late event is handled by updating the previous record:hadoop fs -put iot-file3.json iotstream/ scala> spark.sql("select status, date_format(window.start, 'MMM-dd HH:mm') as start_time, date_format(window.end, 'MMM-dd HH:mm') as end_time, count from iotstream order by start_time,end_time, status").show() +--------+------------+------------+-----+ | status| start_time| end_time|count| +--------+------------+------------+-----+ | active|Aug-01 00:00|Aug-01 01:00| 2| | active|Aug-01 01:00|Aug-01 02:00| 2| | active|Aug-01 02:00|Aug-01 03:00| 2| | active|Jul-31 17:00|Jul-31 18:00| 2| | active|Jul-31 18:00|Jul-31 19:00| 2| | active|Jul-31 19:00|Jul-31 20:00| 1| |inactive|Jul-31 19:00|Jul-31 20:00| 1| | active|Jul-31 20:00|Jul-31 21:00| 2| | active|Jul-31 21:00|Jul-31 22:00| 2| | active|Jul-31 22:00|Jul-31 23:00| 3| | active|Jul-31 23:00|Aug-01 00:00| 2| +--------+------------+------------+-----+
You can observe that late coming event is updated in place and incremented the count as 3
. You can also copy the same datasets again to the same directory to see how counts are updated.
iotQuery.stop