Spark Streaming is built on an abstraction called Discretized Streams referred, to as DStreams. A DStream is represented as a sequence of RDDs, with each RDD created at each time interval. The DStream can be processed in a similar fashion to regular RDDs using similar concepts such as a directed cyclic graph-based execution plan (Directed Acyclic Graph). Just like a regular RDD processing, the transformations and actions that are part of the execution plan are handled for the DStreams.
DStream essentially divides a never ending stream of data into smaller chunks known as micro-batches based on a time interval, materializing each individual micro-batch as a RDD which can then processed as a regular RDD. Each such micro-batch is processed independently and no state is maintained between micro-batches thus making the processing stateless by nature. Let's say the batch interval is 5 seconds, then while events are being consumed, real-time and a micro-batch are created at every 5-second interval and the micro-batch is handed over for further processing as an RDD. One of the main advantages of Spark Streaming is that the API calls used to process the micro-batch of events are very tightly integrated into the spark for APIs to provide seamless integration with the rest of the architecture. When a micro-batch is created, it gets turned into an RDD, which makes it a seamless process using spark APIs.
The DStream class looks like the following in the source code showing the most important variable, a HashMap[Time, RDD] pairs:
class DStream[T: ClassTag] (var ssc: StreamingContext)
//hashmap of RDDs in the DStream
var generatedRDDs = new HashMap[Time, RDD[T]]()
Shown in the following is an illustration of a DStream comprising an RDD created every T seconds:
In the following example, a streaming context is created to create micro-batches every 5 seconds and to create an RDD, which is just like a Spark core API RDD. The RDDs in the DStream can be processed just like any other RDD.
The steps involved in building a streaming application are as follows:
- Create a StreamingContext from the SparkContext.
- Create a DStream from StreamingContext.
- Provide transformations and actions that can be applied to each RDD.
- Finally, the streaming application is started by calling start() on the StreamingContext. This starts the entire process of consuming and processing real-time events.
Shown in the following is an example of how to create a simple streaming job accessing Twitter:
- Create a StreamingContext from the SparkContext:
scala> val ssc = new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext =
org.apache.spark.streaming.StreamingContext@8ea5756
- Create a DStream from StreamingContext:
scala> val twitterStream = TwitterUtils.createStream(ssc, None)
twitterStream: org.apache.spark.streaming.dstream
.ReceiverInputDStream[twitter4j.Status] =
org.apache.spark.streaming.Twitter.TwitterInputDStream@46219d14
- Provide transformations and actions that can be applied to each RDD:
val aggStream = twitterStream
.flatMap(x => x.getText.split(" ")).filter(_.startsWith("#"))
.map(x => (x, 1))
.reduceByKey(_ + _)
- Finally, the streaming application is started by calling start() on the StreamingContext. This starts the entire process of consuming and processing real-time events:
ssc.start()
//to stop just call stop on the StreamingContext
ssc.stop(false)
- Created a DStream of type ReceiverInputDStream, which is defined as an abstract class for defining any InputDStream that has to start a receiver on worker nodes to receive external data. Here, we are receiving from Twitter stream:
class InputDStream[T: ClassTag](_ssc: StreamingContext) extends
DStream[T](_ssc)
class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
extends InputDStream[T](_ssc)
- If you run a transformation flatMap() on the twitterStream, you get a FlatMappedDStream, as shown in the following:
scala> val wordStream = twitterStream.flatMap(x => x.getText()
.split(" "))
wordStream: org.apache.spark.streaming.dstream.DStream[String] =
org.apache.spark.streaming.dstream.FlatMappedDStream@1ed2dbd5