Chapter 17. The Spark Streaming Programming Model

In Chapter 16, you learned about Spark Streaming’s central abstraction, the DStream, and how it blends a microbatch execution model with a functional programming API to deliver a complete foundation for stream processing on Spark.

In this chapter, we explore the API offered by the DStream abstraction that enables the implementation of arbitrarily complex business logic in a streaming fashion. From an API perspective, DStreams delegate much of their work to the underlying data structure in Spark, the Resilient Distributed Dataset (RDD). Before we delve into the details of the DStream API, we are going to take a quick tour of the RDD abstraction. A good understanding of the RDD concept and API is essential to comprehend how DStreams work.

RDDs as the Underlying Abstraction for DStreams

Spark has one single data structure as a base element of its API and libraries: RDD. This is a polymorphic collection that represents a bag of elements, in which the data to be analyzed is represented as an arbitrary Scala type. The dataset is distributed across the executors of the cluster and processed using those machines.

Note

Since the introduction of Spark SQL, the DataFrame and Dataset abstractions are the recommended programming interfaces to Spark. In the most recent versions, only library programmers are required to know about the RDD API and its runtime. Although RDDs are not often visible at the interface level, they still drive the core engine’s distributed computing capabilities.

To understand how Spark works, some basic knowledge of RDD-level programming is highly recommended. In the following section, we cover only the most notable elements. For a more in-depth treatment of the RDD programming model, we refer you to [Karau2015].

Using those RDDs involves (mostly) calling functions on the RDD collection type. The functions in this API are higher-order functions. In that sense, programming in Spark involves functional programming at its core: indeed, a programming language is considered to be functional when, in particular, it’s able to define a function anywhere: as an argument, as a variable, or more generally as a syntax element. But, more importantly, on the programming languages theory level, a language becomes a functional programming language only when it is able to pass functions as arguments. In the example that follows, we see how Spark lets you use an implementation of map to transform all of the values of a collection by applying an arbitrary function to every single element.

In the following example, we read a text file containing frequent last names from census data as an RDD[String] (read as RDD of Strings). Then, we obtain the length of those names using a map operation that transforms the initial name, represented as a String into its length:

scala> val names = sc.textFile("/home/learning-spark-streaming/names.txt")
names: org.apache.spark.rdd.RDD[String] =
      MapPartitionsRDD[1] at textFile at <console>:24
scala> names.take(10)
res0: Array[String] =
      Array(smith, johnson, williams, jones, brown, davis, miller,
            wilson, moore, taylor)
scala> val lengths = names.map(str => str.length )
lengths: org.apache.spark.rdd.RDD[Int] =
      MapPartitionsRDD[3] at map at <console>:27
scala> lengths.take(10)
res3: Array[Int] = Array(6, 6, 6, 8, 9, 7, 7, 7, 6, 6)

Spark also provides us with a reduce function, which lets us aggregate key elements of a collection into another result, obtained through iterative composition. We are also going to use the count operation, which computes the number of elements in an RDD. Let’s have a look:

scala> val totalLength = lengths.reduce( (acc, newValue) => acc + newValue )
totalLength: Int = 606623
scala> val count = lengths.count()
count: Int = 88799
scala> val average = totalLength.toDouble / count
average: Double = 6.831417020461942

It’s worth noting that reduce requires that the RDD is not empty. Otherwise, it will throw a java.lang.UnsupportedOperationException exception with the message: empty collection. This might seem like an extreme case given that we are in the midst of a discussion about processing large datasets, but it becomes necessary when we want to process incoming data in real-time.

To overcome this limitation, we could use fold, an aggregator similar to reduce. Besides the reduction function, fold lets us define an initial “zero” element to use with the aggregation function, therefore working properly even with empty RDDs.

fold and reduce both use an aggregation function closed over the RDD type. Hence, we could sum an RDD of Ints or calculate the min or max of an RDD of cartesian coordinates according to a measure. There are cases for which we would like to return a different type than the data represented by the RDD. The more general aggregate function lets you determine how to combine disparate input and output types in an intermediate step:

scala> names.aggregate[TAB]
def aggregate[U](zeroValue: U)
 (seqOp: (U, T) => U, combOp: (U, U) => U)(implicit scala.reflect.ClassTag[U]): U

scala> names.fold[TAB]
   def fold(zeroValue: T)(op: (T, T) => T): T

It is the ease of use of this API that has won Spark RDDs the nickname of "the ultimate Scala collections.” This reference to Spark’s original implementation programming language points at the collections library of Scala, which already lets us benefit from functional programming on a single machine with a rich API. It lets us expand our data-processing vocabulary from the basic map and reduce from the original MapReduce model.

The real genius of Spark is that it reproduces the ease of use of the Scala API, and scales it up to operate over a cluster of computing resources.The RDD API defines two broad families of functions: transformations and actions. Transformations, like map or filter, let us work on the immutable data contained by the RDD by creating new RDDs as the result of applying a transformation to its parent. This chain of RDD transformations forms a directed acyclic graph or DAG that informs Spark where the raw data is and how it needs to be transformed into the desired result. All transformations are declarative and lazy. That means that they do not result in data being actually processed. To obtain results, we need to materialize the chain of transformations by issuing an action. Actions trigger the distributed execution of data operations and produce a tangible result. It can be writing to a file or printing samples on the screen.

Additional functional operations, such as flatMap, groupBy, and zip are also available. You can also find RDD combinators, like join and cogroup, that allow you to merge two or more existing RDDs.

Understanding DStream Transformations

The DStream programming API consists of transformations or higher-order functions that take another function as an argument. At the API level, a DStream[T] is a strongly typed data structure that represents a stream of data of type T.

DStreams are immutable. This means that we cannot mutate their contents in place. Instead, we implement our intended application logic by applying a series of transformations to the input DStream. Each transformation creates a new DStream that represents the transformed data from the parent DStream. DStream transformations are lazy, meaning that the underlying data is not actually processed until the system needs to materialize a result. In the case of DStreams, this materialization process is triggered when we want to produce results to a stream sink through a particular operation known as an output operation.

Note

For readers coming from a functional programming background, it is evident that DStream transformations can be considered pure functions, whereas output operations are side-effecting functions that produce a result to an external system.

Let’s review these concepts using the code we used earlier in the introduction:

val errorLabelStream = textDStream.map{ line =>
    if (line.contains("ERROR")) (1, 1) else (0, 1)
  }

Here, the textDStream contains lines of text. Using a map transformation, we apply a fairly naive error-counting function to each line in the original DStream[String], resulting in a new DStream[(Long, Long)] of long tuples. In this case, map is a DStream transformation and it takes a function applicable to its contents, Strings in this case, to produce another DStream with the transformed contents.

DStreams are a streaming abstraction in which the elements of the stream are grouped into microbatches over a time dimension, as we illustrate in Figure 17-1. In turn, each microbatch is represented by an RDD. At the execution level, the main task of Spark Streaming is to schedule and manage the timely collection and delivery of data blocks to Spark. In turn, the Spark core engine will apply the programmed sequence of operations to the data that constitute our application logic.

spas 1701
Figure 17-1. Streaming model mapped on Spark Streaming

Going back to how this is reflected in the API, we see that there are operators, like the classical map, filter, and so on that operate on a single element. These operations follow the same principles of distributed execution and abide by the same serialization constraints as their batch Spark counterparts.

There are also operators, like transform and foreachRDD, that operate on an RDD instead of an element. These operators are executed by the Spark Streaming scheduler and the functions provided to them run in the context of the driver. It is within the scope of these operators that we can implement logic that crosses the boundary of microbatches, like bookkeeping history or maintaining application-level counters. They also provide access to all the Spark execution contexts. Within these operators, we can interact with Spark SQL, Spark ML, or even manage the life cycle of the streaming application. These operations are true bridges between the recurrent streaming microbatch scheduling, the element-level transformations, and the Spark runtime context.

Using this distinction, we can observe two broad groups of transformations:

Element-centric DStream transformations

Transformations that apply to a single element of the stream.

RDD-centric DStream transformations

Transformations that apply to the underlying RDD of each microbatch.

Besides this general classification, we also find two other classes of transformations in the DStream API:

Counting transformations

Dedicated operations to count elements in a stream.

Structure-modifying transformations

Transformations that change the internal structure or organization of the DStream but do not change the contents.

In the rest of this chapter, we study these transformations in detail.

Element-Centric DStream Transformations

In general, the element-centric transformations on the DStream API mirror functions also available in the RDD API, unifying the development experience among batch and streaming modes in Spark.

The most commonly used transformations follow:

Note

The signature of each transformation has been simplified by removing implicit parameters to make the signature more concise, where applicable.

map
  map[U](mapFunc: (T) => U): DStream[U]

The map function on a DStream takes a function T => U and applies it to every element of a DStream[T], leaving the RDD structure unchanged. As in RDDs, it is the appropriate choice to make for a massively parallel operation, for which it is of no importance whether its input is in a particular position with respect to the rest of the data.

flatMap
  flatMap[U](flatMapFunc: (T) => TraversableOnce[U]): DStream[U]

flatmap is the usual companion of map, which instead of returning elements of type U, returns the type of a TraversableOnce container of U. These containers are coalesced in a single one before returning. All Scala collections implement the TraversableOnce interface, making them all usable as target types for this function.

The common use cases for flatMap are when we want to create zero or more target elements from a single element. We can use flatMap to explode a record into many elements or, when used in combination with the Option type, it can be applied to filter out input records that do not meet certain criteria.

An important remark is that this version of flatMap does not follow the strict monadic definition, which would be: flatMap[U](flatMapFunc: (T) => DStream[U]): DStream[U]. This is often a cause of confusion to newcomers with a functional programming background.

mapPartitions
  mapPartitions[U](mapPartFunc: (Iterator[T]) => Iterator[U],
                   preservePartitioning: Boolean = false): DStream[U]

This function, like the homonymous function defined on RDD, allows us to directly apply a map operation on each of the partitions of an RDD. The result is a new DStream[U] where the elements are mapped. As with the mapPartitions call as defined on an RDD, this function is useful because it allows us to have executor-specific behavior; that is, some logic that will not be repeated for every element but will rather be executed once for each executor where the data is processed.

One classic example is initializing a random number generator that is then used in the processing of every element of the partition, accessible through the Iterator[T]. Another useful case is to amortize the creation of expensive resources, such as a server or database connection, and reuse such resources to process every input element. An additional advantage is that the initialization code is run directly on the executors, letting us use nonserializable libraries in a distributed computing process.

filter
filter(filterFunc: (T) => Boolean): DStream[T]

This function selects some elements of the DStream according to the predicate passed as an argument. As for map, the predicate is checked on every element of the DStream. Beware that this can generate empty RDDs if no element verifying the predicate is received during a particular batch interval.

glom
glom(): DStream[Array[T]]

This function, like the homonymous function defined on RDD, allows us to coalesce elements in an array. In fact, as the glom call on an RDD returns arrays of elements—as many as there are partitions—the DStream equivalent returns the result of calling the glom function on each of its constituent RDDs.

reduce
  reduce(reduceFunc: (T, T) => T): DStream[T]

This is the equivalent of the reduce function on an RDD. It allows us to aggregate the elements of an RDD using the provided aggregation function. reduce takes a function of two arguments, the accumulator and a new element of the RDD, and returns the new value for the accumulator. The result of applying reduce to a DStream[T] is hence a DStream of the same type T of which each batch interval will contain an RDD with only one element: the final value of the accumulator. Note in particular that you should use reduce with care: it cannot deal with an empty RDD on its own, and in a streaming application, an empty batch of data can always happen, like when data production or ingestion is stalled.

We can summarize these various transformations in the following way, according to what type of action they have on the source DStream. In Table 17-1 we can see whether any operation works on a whole RDD rather than element-wise, and whether it has constraints on the output RDD.

Table 17-1. The computation model and output of some essential DStream operations
Operation Effect Output RDD’s structure

map ,filter

element-wise

Unchanged (as many elements as original)

glom

partition-wise

As many arrays as there are partitions in the original

mapPartitions

partition-wise

As many partitions as original RDD

reduce ,fold

aggregating

One element

flatMap

element-wise

As many elements as the size of the output container

RDD-Centric DStream Transformations

These operations give us direct access to the underlying RDD of a DStream. What makes these operations special is that they execute in the context of the Spark driver. Therefore, we can have access to the facilities provided by the Spark Session (or Spark context) as well as to the execution context of the driver program. In this local execution context, we can have access to local variables, data stores, or API calls to external services that may influence the way you want to process the data.

The most commonly used transformations follow:

transform
transform[U](transformFunc: (RDD[T]) => RDD[U]): DStream[U]
transform[U](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]

transform allows us to reuse a transformation function of type RDD[T] => RDD[U] and apply it on each constituent RDD of a DStream. It is often used to take advantage of some processing written for a batch job—or more simply in another context—to yield a streaming process.

transform also has a timed version, with signature (RDD[T], Time) => RDD[U]. As we will remark soon for the foreachRDD action, this can be very useful for tagging the data in a DStream with the time of the batch of which this data was part.

transformWith
transformWith[U,V](
  other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V]
transformWith[U,V](
  other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V]

transformWith lets us combine this DStream with another DStream using an arbitrary transformation function. We can use it to implement custom join functions between the two DStreams where the join function is not based on the key. For example, we could apply a similarity function and combine those elements that are close enough. Like transform, transformWith offers an overload that provides access to the batch time to provide a differentiation of timestamp mechanism to the incoming data.

Counting

Because the contents of a stream often comprise data whose cardinality is important, such as counting errors in a log, or hashtags in a tweet, counting is a frequent operation that has been optimized enough in Spark to warrant a specific API function.

It is interesting to note that although count is a materializing action in the RDD API because it directly produces a result, in the DStream API, it is a transformation that produces a new DStream with the counts at each microbatch interval.

Spark Streaming has several counting functions for a given DStream, which are better seen with an example.

Let’s assume that we have a DStream consisting of names keyed by their first letter and that this DStream “repeats” itself: each RDD, on each batch interval, consists of 10 distinct such names per alphabet letter:

val namesRDD: RDD[(Char, String)] = ...
val keyedNames: DStream[(Char, String)] =
    ConstantInputDStream(namesRDD, 5s)

This would lead to the results shown in Table 17-2.

Table 17-2. Count operations
Operation Return type Result

keyedNames.count()

DStream[Long]

260

keyedNames.countByWindow(60s)

DStream[Long]

260 (because the same RDD repeats each time )

keyedNames.countByValue()

DStream[((Char, String), Long)]

1 for each of the 260 distinct (1st character, name) pairs

keyedNames.countByKey()

DStream[(Char, Long)]

10 for each of the 26 letters

keyednames.countByValueAndWindow(60s)

DStream[((Char, String), Long)]

12 for each of the 260 distinct (1st character, name) pairs

Structure-Changing Transformations

The previous operations are all transformations; that is, they always return a DStream after applying a function to the content of the stream. There are other transformations that do not transform the data in the stream but rather the structure of the DStream and in some cases, the flow of data through the Spark cluster:

repartition
repartition(int numPartitions): DStream[T]

repartition results in a new DStream with an increased or decreased partitioning of the underlying RDDs in this DStream. Repartitioning allows us to change the parallelism characteristics of some streaming computation. For example, we might want to increase partitioning when a few input streams deliver a high volume of data that we want to distribute to a large cluster for some CPU-intensive computation. Decreasing the partitioning might be interesting when we want to ensure that we have a few partitions with a large number of elements before writing to a database, for example. Note that the parameter provided is the absolute number of the target number of partitions. Whether this operation results in creating or reducing the partitioning depends on the original number of partitions of this DStream, which in turn might be dependent on the source parallelism.

Unlike data-oriented transformations, correct usage of repartition requires knowledge of the input parallelism, the complexity of the distributed computation, and the size of the cluster where the streaming job will be running.

union
union(other: DStream[T]): DStream[T]

union adds two DStreams of the same type into a single stream. The result is a DStream that contains the elements found on both input DStreams. An alternative usage form is calling union on the streamingContext instance with a collection of the DStreams to be joined. This second form allows us to unite many DStreams at once:

union[T](streams: Seq[DStream[T]]): DStream[T]

Summary

In this chapter, we have learned about the API offered by the DStream to implement streaming applications. We saw the following:

  • DStreams are immutable and we operate on them using transformations.

  • Transformations are lazy. They need to be materialized by a special output operation.

  • DStreams offer a rich functional API that allows the transformation of elements.

  • Some transformations expose the underlying RDD, providing access to the full extent of the rich Spark core API.

In the next chapters, you learn how to source data from external systems by creating DStreams. You also learn about a particular set of operations, called output operations in Spark Streaming jargon, that trigger the execution of transformations over our data streams and are able to produce data to other systems or secondary storage.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset