Introducing Structured Streaming

A streaming application is not just about doing some real-time computations on a stream of data. Generally, streaming will be part of a larger application that includes real-time, batch, and serving layers with machine learning, and so on. A continuous application is an end-to-end application that combines all these features in one application.

In Spark 2.0, the Structured Streaming API is introduced for building continuous applications. The Structured Streaming API addresses the following concerns of a typical streaming application:

  • Node delays: Delay in a specific node can cause data inconsistency at the database layer. Ordering the guarantee of events is achieved using systems like Kafka in which events on the same key always go to the same Kafka partition. Streaming applications pull data from this partition and process them in the order they are received. However, while applying operations on streaming data, if one of the node delays processing, data consistency cannot be guaranteed on the database. In Structured Streaming, output from the streaming computation of a batch job will always be the same irrespective of any node delays. This provides strong consistency at the database level.
  • Node failures: If one of the node fails, most streaming engines fail to provide strong exactly once semantics. Handling duplicate counts within computation or duplicate updates to a database is left to the user. Structured Streaming is designed to handle failures at any point with re-playable sources and idempotent sinks.
  • Late arrival of data: It is pretty common to receive some events with a delay from the source. Creating a streaming application with an assumption of no delays will create issues with late events. Creating a streaming application that handles delays by maintaining a state might grow indefinitely. Structured Streaming handles this by providing a feature to drop the late events or process them and update in place using actual event-time, not the time when Spark received the event.

Structured Streaming application flow

Structured Streaming is built on top of the Spark SQL engine, so it inherits the performance benefits of datasets and DataFrames. Streaming computation can be done similar to batch computation on data at rest. The Spark SQL engine will process the incoming stream of data in batches incrementally and update the sinks. Structured Streaming is supported in the Java, Scala, and Python programming languages with dataset and DataFrame APIs. Structured Streaming is an alpha release in Spark 2.0 with limited capabilities. Upcoming releases will have fully-fledged capabilities, more sources, and sinks.

In Structured Streaming, the incoming stream of data is treated as a table that is continuously growing. All events coming in a batch window will be treated like new rows appended to the input table as shown in Figure 5.9.

Structured Streaming application flow

Figure 5.9: Spark Structured Streaming model

Queries are executed on the input table to produce the result set as shown in Figure 5.10. Queries are executed based on the trigger interval, which creates the result set, and then it is updated on sinks such as file systems or databases, and so on.

Structured Streaming application flow

Figure 5.10: Spark Structured Streaming model

Valid output modes are complete, append, or update. append is the default mode.

When to use Structured Streaming?

Structured Streaming can be used in the following scenarios:

  • To create a streaming application using Dataset and DataFrame APIs
  • When providing data consistencies and exactly-once semantics even in cases of delays and failures at multiple levels
  • Creating continuous applications that are integrated with batch queries, streaming, and machine learning

Streaming Datasets and Streaming DataFrames

Streaming Datasets or DataFrames are created using Spark session, which is introduced in Spark 2.0. For creating DataFrame from static data we use SparkSession.read whereas for creating Streaming DataFrame, we use SparkSession.readStream. Also, for writing DataFrame out, we use SparkSession.write whereas for writing Streaming DataFrames, we use SparkSession.writeStream. The following Scala example creates the SparkSession and then creates the stream by reading CSV files from a directory:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("CSVStructuredStreaming")
  .getOrCreate()

val csvSchema = new StructType().add("emp_name", "string").add("emp_age", "integer")
val csv_df = spark
  .readStream
  .option("sep", ",")
  .schema(userSchema)
  .csv("HDFS Path")

csv_df
  .writeStream
  .format("parquet")
  .option("path","HDFS Path")
  .option("checkpointLocation","HDFS Path")
  .outputMode("append")
  .start()

Input sources and output sinks

Currently only two types of built-in sources are supported: file source and socket source. File source creates a stream by reading files from a directory as shown in the previous example. Spark 2.0 supports CSV, JSON, and Parquet formats. The socket source reads the data from a socket connection, which is used for testing purposes only.

Currently four types of output sinks are supported, file sink, foreach sink, console sink, and memory sink. Only the Parquet format is supported in file sink with append mode. The foreach mode can be used to write an arbitrary operation of output data and is supported in Scala and Java APIs. console and memory sinks are used for debugging purposes only.

Operations on Streaming Datasets and Streaming DataFrames

Most common operations like filter, groupBy, and aggregation are supported by streaming DataFrames. Streaming DataFrames can be converted to Streaming Datasets as well. Window operations are supported on streaming DataFrames and also streaming DataFrames can be joined with static DataFrames.

Let's create a Structured Streaming application using Internet of Things (IOT) use case where a few sensors are sending information about an active or inactive device at different intervals. Our goal is to find how many devices are active or inactive at different time intervals; the steps to do so are as follows:

  1. Let's create three input datasets as shown here in JSON format. In the real-life project, data would automatically come to this directory. But, for our understanding, let's create these datasets manually:
    iot-file1.json
    {"device_id":1,"timestamp":1470009600,"status":"active"}
    {"device_id":2,"timestamp":1470013200,"status":"active"}
    {"device_id":3,"timestamp":1470016800,"status":"active"}
    {"device_id":4,"timestamp":1470020400,"status":"active"}
    {"device_id":5,"timestamp":1470024000,"status":"active"}
    {"device_id":1,"timestamp":1470009601,"status":"active"}
    {"device_id":2,"timestamp":1470013202,"status":"active"}
    {"device_id":3,"timestamp":1470016803,"status":"inactive"}
    {"device_id":4,"timestamp":1470020404,"status":"active"}
    {"device_id":5,"timestamp":1470024005,"status":"active"}
    
    iot-file2.json
    {"device_id":1,"timestamp":1470027600,"status":"active"}
    {"device_id":2,"timestamp":1470031200,"status":"active"}
    {"device_id":3,"timestamp":1470034800,"status":"active"}
    {"device_id":4,"timestamp":1470038400,"status":"active"}
    {"device_id":5,"timestamp":1470042000,"status":"active"}
    {"device_id":1,"timestamp":1470027601,"status":"active"}
    {"device_id":2,"timestamp":1470031202,"status":"active"}
    {"device_id":3,"timestamp":1470034803,"status":"active"}
    {"device_id":4,"timestamp":1470038404,"status":"active"}
    {"device_id":5,"timestamp":1470042005,"status":"active"}
    
    iot-file3.json
    {"device_id":1,"timestamp":1470027601,"status":"active"}
  2. Create an HDFS directory and copy the first IOT events file:
    hadoop fs -mkdir iotstream
    hadoop fs -put iot-file1.json iotstream/
    
  3. Start a Scala shell session, create the streaming DataFrame, and start the stream. bin/spark-shell starts the Scala shell, which creates a pre-configured Spark session called spark:
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    val iotSchema = new StructType().add("device_id", LongType).add("timestamp", TimestampType).add("status", StringType)
    val iotPath = "hdfs://quickstart.cloudera:8020/user/cloudera/iotstream"
    val iotStreamingDataFrame = spark.readStream.schema(iotSchema).option("maxFilesPerTrigger", 1).json(iotPath)
    val iotStreamingCounts = iotStreamingDataFrame.groupBy($"status", window($"timestamp", "1 hour")).count() 
    iotStreamingCounts.isStreaming
    
    val iotQuery = iotStreamingCounts.writeStream.format("memory").queryName("iotstream").outputMode("complete").start()
  4. Now query the in-memory iotstream table:
    scala> spark.sql("select status, date_format(window.start, 'MMM-dd HH:mm') as start_time, date_format(window.end, 'MMM-dd HH:mm') as end_time, count from iotstream order by start_time,end_time, status").show()
    
    +--------+------------+------------+-----+
    |  status|  start_time|    end_time|count|
    +--------+------------+------------+-----+
    |  active|Jul-31 17:00|Jul-31 18:00|    2|
    |  active|Jul-31 18:00|Jul-31 19:00|    2|
    |  active|Jul-31 19:00|Jul-31 20:00|    1|
    |inactive|Jul-31 19:00|Jul-31 20:00|    1|
    |  active|Jul-31 20:00|Jul-31 21:00|    2|
    |  active|Jul-31 21:00|Jul-31 22:00|    2|
    +--------+------------+------------+-----+
    
  5. Now, copy the second iot-file2.json to the same HDFS directory; you can observe that the streaming query executes automatically and computes the counts. Let's run the same query again as we have seen previously:
    hadoop fs -put iot-file2.json iotstream/
    scala> spark.sql("select status, date_format(window.start, 'MMM-dd HH:mm') as start_time, date_format(window.end, 'MMM-dd HH:mm') as end_time, count from iotstream order by start_time,end_time, status").show()
    
    +--------+------------+------------+-----+
    |  status|  start_time|    end_time|count|
    +--------+------------+------------+-----+
    |  active|Aug-01 00:00|Aug-01 01:00|    2|
    |  active|Aug-01 01:00|Aug-01 02:00|    2|
    |  active|Aug-01 02:00|Aug-01 03:00|    2|
    |  active|Jul-31 17:00|Jul-31 18:00|    2|
    |  active|Jul-31 18:00|Jul-31 19:00|    2|
    |  active|Jul-31 19:00|Jul-31 20:00|    1|
    |inactive|Jul-31 19:00|Jul-31 20:00|    1|
    |  active|Jul-31 20:00|Jul-31 21:00|    2|
    |  active|Jul-31 21:00|Jul-31 22:00|    2|
    |  active|Jul-31 22:00|Jul-31 23:00|    2|
    |  active|Jul-31 23:00|Aug-01 00:00|    2|
    +--------+------------+------------+-----+
    
  6. Now, copy the third iot-file3.json to the same HDFS directory, which has a late event. Once you execute the query you can observe that the late event is handled by updating the previous record:
    hadoop fs -put iot-file3.json iotstream/
    scala> spark.sql("select status, date_format(window.start, 'MMM-dd HH:mm') as start_time, date_format(window.end, 'MMM-dd HH:mm') as end_time, count from iotstream order by start_time,end_time, status").show()
    +--------+------------+------------+-----+
    |  status|  start_time|    end_time|count|
    +--------+------------+------------+-----+
    |  active|Aug-01 00:00|Aug-01 01:00|    2|
    |  active|Aug-01 01:00|Aug-01 02:00|    2|
    |  active|Aug-01 02:00|Aug-01 03:00|    2|
    |  active|Jul-31 17:00|Jul-31 18:00|    2|
    |  active|Jul-31 18:00|Jul-31 19:00|    2|
    |  active|Jul-31 19:00|Jul-31 20:00|    1|
    |inactive|Jul-31 19:00|Jul-31 20:00|    1|
    |  active|Jul-31 20:00|Jul-31 21:00|    2|
    |  active|Jul-31 21:00|Jul-31 22:00|    2|
    |  active|Jul-31 22:00|Jul-31 23:00|    3|
    |  active|Jul-31 23:00|Aug-01 00:00|    2|
    +--------+------------+------------+-----+
    

    You can observe that late coming event is updated in place and incremented the count as 3. You can also copy the same datasets again to the same directory to see how counts are updated.

  7. Finally, you can stop the job with the following command:
    iotQuery.stop
    
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset