Transformations and actions on DStream boils down to transformations and actions on RDDs. The DStream API has many of the transformations available on normal RDD API with special functions applicable for streaming applications. Let's go through some of the important transformations.
Two DStreams can be combined to create one DStream. For example, data received from multiple receivers of Kafka or Flume can be combined to create a new DStream. This is a common approach in Spark Streaming to increase scalability:
stream1 = ... stream2 = ... MultiDStream = stream1.union(stream2)
Joins two DStreams of (K, V) and (K, W) pairs and returns a new DStream of (K, (V, W)) pairs with all pairs of elements for each key:
stream1 = ... stream2 = ... joinedDStream = stream1.join(stream2)
The transform
operation can be used to apply any RDD operation that is not available in the DStream API. For example, joining a DStream with a dataset is not directly exposed. So, use the transform
operation to join them as shown in the following:
cleanRDD = sc.textFile("hdfs://hostname:8020/input/cleandata.txt") # join existing DStream with CleanRDD and filter out. myCleanedDStream = myDStream.transform(lambda rdd: rdd.join(cleanRDD).filter(...))
This is nothing but combining both batch and streaming processing.
As we have seen in the stateful transformation example, the updateStateByKey(func)
function returns a new DStream where the state for each key and value is updated by applying a function. Checkpointing is mandatory for the updateStateByKey
operation.
mapWithState
is a stateful transformation introduced in version 1.6. This makes it easier to implement user logic and also provides better performance than updateStateByKey
.
Spark Streaming also provides powerful window computations which allow applying transformations over a sliding window of data. Let's consider the following Twitter example:
val countsDStream = hashTagsDStream.window(Minutes(10),Seconds(1)).countByValue()
As shown in Figure 5.4, it has a window length of 60 seconds, a sliding interval of 10 seconds, and a batch interval of 5 seconds. It is counting the number of hashtags from Twitter in a sliding window of 60 seconds. When the window slides every 10 seconds, a count of hashtags is computed in the 60 seconds window. This means most of the records are counted again in the next window. This is normal because the goal of this exercise is to find out the number of specific hashtags from Twitter over 60 seconds. If double counting is not needed, Spark Streaming provides smart window operations such as countByValueAndWindow
, which will subtract the counts from the previous batch window and add the count from the current window.
The following table shows the common window operations used in Spark Streaming:
Output operations write processed DStreams to any database systems or file systems. These output operations are like actions in RDDs, which cause transformations to trigger. The following table shows some of the output operations available:
A typical use of a foreachRDD
operation to create a connection and send records is as follows:
def sendPartition(iter): conn = ConnectionPool.getConnection() for record in iter: conn.send(record) ConnectionPool.returnConnection(conn) dstream.foreachRDD(lambda x: x.foreachPartition(sendPartition))