Spark Streaming transformations and actions

Transformations and actions on DStream boils down to transformations and actions on RDDs. The DStream API has many of the transformations available on normal RDD API with special functions applicable for streaming applications. Let's go through some of the important transformations.

Union

Two DStreams can be combined to create one DStream. For example, data received from multiple receivers of Kafka or Flume can be combined to create a new DStream. This is a common approach in Spark Streaming to increase scalability:

stream1 = ...
stream2 = ...
MultiDStream = stream1.union(stream2)

Join

Joins two DStreams of (K, V) and (K, W) pairs and returns a new DStream of (K, (V, W)) pairs with all pairs of elements for each key:

stream1 = ...
stream2 = ...
joinedDStream = stream1.join(stream2)

Transform operation

The transform operation can be used to apply any RDD operation that is not available in the DStream API. For example, joining a DStream with a dataset is not directly exposed. So, use the transform operation to join them as shown in the following:

cleanRDD = sc.textFile("hdfs://hostname:8020/input/cleandata.txt")
# join existing DStream with CleanRDD and filter out.
myCleanedDStream = myDStream.transform(lambda rdd: rdd.join(cleanRDD).filter(...))

This is nothing but combining both batch and streaming processing.

updateStateByKey

As we have seen in the stateful transformation example, the updateStateByKey(func) function returns a new DStream where the state for each key and value is updated by applying a function. Checkpointing is mandatory for the updateStateByKey operation.

mapWithState

mapWithState is a stateful transformation introduced in version 1.6. This makes it easier to implement user logic and also provides better performance than updateStateByKey.

Window operations

Spark Streaming also provides powerful window computations which allow applying transformations over a sliding window of data. Let's consider the following Twitter example:

val countsDStream = hashTagsDStream.window(Minutes(10),Seconds(1)).countByValue()
Window operations

Figure 5.4: Spark Streaming window operation

As shown in Figure 5.4, it has a window length of 60 seconds, a sliding interval of 10 seconds, and a batch interval of 5 seconds. It is counting the number of hashtags from Twitter in a sliding window of 60 seconds. When the window slides every 10 seconds, a count of hashtags is computed in the 60 seconds window. This means most of the records are counted again in the next window. This is normal because the goal of this exercise is to find out the number of specific hashtags from Twitter over 60 seconds. If double counting is not needed, Spark Streaming provides smart window operations such as countByValueAndWindow, which will subtract the counts from the previous batch window and add the count from the current window.

The following table shows the common window operations used in Spark Streaming:

Window transformation

Description

window

Returns a new DStream with window of batches

countByWindow

Returns a new DStream with sliding window count of elements in stream

reduceByWindow

Returns a new DStream by aggregating elements using a function

reduceByKeyAndWindow

Returns a new DStream by aggregating values for each key using a function

countByValueAndWindow

Returns a new DStream with key and value pairs where the value of each key is its frequency within a sliding window only

Output operations

Output operations write processed DStreams to any database systems or file systems. These output operations are like actions in RDDs, which cause transformations to trigger. The following table shows some of the output operations available:

Output operation

Meaning

print() or pprint() in Python

Prints the first 10 elements of every batch of the DStream in the console

saveAsTextFiles

Saves the DStream to a text file

saveAsObjectFiles

Saves the DStream as a sequence file

saveAsHadoopFile or saveAsNewAPIHadoopDataset

Saves the DStream to any Hadoop file format such as Avro, Parquet, HBase, and so on

saveToCassandra

Saves the DStream to Cassandra. Requires Cassandra connector

foreachRDD(func)

Applies a function for every RDD in the stream. An example of foreachRDD is shown below. It creates a connection object and writes data to an external database

A typical use of a foreachRDD operation to create a connection and send records is as follows:

def sendPartition(iter):
    conn = ConnectionPool.getConnection()
    for record in iter:
        conn.send(record)
    ConnectionPool.returnConnection(conn)

dstream.foreachRDD(lambda x: x.foreachPartition(sendPartition))
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset