© Raul Estrada and Isaac Ruiz 2016

Raul Estrada and Isaac Ruiz, Big Data SMACK, 10.1007/978-1-4842-2175-4_6

6. The Engine: Apache Spark

Raul Estrada and Isaac Ruiz1

(1)Mexico City, Mexico

If our stack were a vehicle, now we have reached the engine. As an engine, we will disarm it, analyze it, master it, improve it, and run it to the limit.

In this chapter, we walk hand in hand with you. First, we look at the Spark download and installation, and then we test it in Standalone mode. Next, we discuss the theory around Apache Spark to understand the fundamental concepts. Then, we go over selected topics, such as running in high availability (cluster). Finally, we discuss Spark Streaming as the entrance to the data science pipeline.

This chapter is written for people who have never touched Apache Spark before. But as you can imagine, due to space, we will not delve into many specific issues.

The following topics are covered in this chapter:

  • Introducing Spark

  • Spark concepts

  • Working with RDD

  • Running in cluster

  • Spark Streaming

Introducing Spark

Perhaps Apache Spark is the most important technology in the stack. It is divided into five modules: Core , SQL , MLIB , Streaming , and GraphX . Simply put, each module deserves a book the same size of the book that you are now reading. Spark has captured the imagination of developers and analysts, simply because it takes data manipulation from large laboratories to laptops, from large interdisciplinary teams to lone enthusiasts who want to make data analysis, and from large corporate clusters to a cheap infrastructure accessible to all.

Spark is both infrastructure software and data science laboratory. Spark as an infrastructure engine can be attached to powerful tools like Apache Kafka to produce data science pipelines. Simultaneously, it is a data science laboratory because it represents an engine for machine learning in both a laptop and a productive cluster, from a few data kilobytes up to what the hardware capacity allows. Likewise, you can build models based on sample data and then apply them in larger datasets.

In times not so distant, installing the infrastructure for data analysis was an interdisciplinary task among database specialists, operating system and network analysts, and application engineers and architects.

What makes Apache Spark so attractive is its ability to download and run it on a small and inexpensive laptop.

Apache Spark (like all the technologies covered in this book) is an open source tool. It only requires Java version 6 or higher. All the Scala and Akka dependencies are packaged within the distribution.

Apache Spark Download

Regardless of whether you use the development or production version, you must download the latest build from https://spark.apache.org/downloads.html (version 1.6.1 as of this writing).

As shown in Figure 6-1, select Pre-built for Hadoop and later.

A420086_1_En_6_Fig1_HTML.jpg
Figure 6-1. Apache Spark download page

Spark has a new release every 90 days. For hard-core coders who like to work with the latest builds, try to clone the repository at https://github.com/apache/spark . The instructions for generating the build are available at https://spark.apache.org/docs/latest/building-spark.html . Both the source code and the binary prebuilds are available at this link.

To compile the Spark sources, we need the appropriate versions of Scala and the corresponding SDK. Spark source tar includes the Scala components required.

The Spark development group has done a good job keeping the dependencies. On https://spark.apache.org/docs/latest/building-spark.html , you can see the latest information about it. According to the site, to build Spark with Maven, Java version 6 or higher and Maven 3.0.4 are required.

To uncompress the package, execute the following command:

tar xvf spark-1.6.1-bin-hadoop2.4.tgz

Let’s Kick the Tires

To test the installation, run the following command:

/opt/spark-1.6.1-bin-hadoop2.6/bin/run-example SparkPi 10

You should see an output like the one shown in Figure 6-2, with the line Pi is roughly.

A420086_1_En_6_Fig2_HTML.jpg
Figure 6-2. Testing Apache Spark

To open a Spark interactive shell, go to the bin directory and run the spark-shell:

$> /bin/spark-shell

You should see output similar to Figure 6-3 (which shows Windows 64-bit so that no one feels left out of this party):

A420086_1_En_6_Fig3_HTML.jpg
Figure 6-3. The Apache Spark shell

Like all modern shells , the Spark shell includes history. You can access it with the up and down arrows. There are also autocomplete options that you can access by pressing the Tab key.

As you can see, Spark runs in Scala; the Spark shell is a Scala terminal with more features. This chapter’s Scala examples run without problems. You can test, as follows:

scala> val num = 1 to 400000
num: scala.collection.immutable.Range.Inclusive = Range (...
To convert our Range to a RDD (now we see it is that), do the following:
scala> val myRDD = sc.parallelize(num)
myRDD: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [0] at parallelize at <console>

In this case, there is a numeric RDD. Then, as you may guess, you can do all the math operations with Scala data types. Let’s use only the odd numbers:

scala> myRDD.filter (_% 2 != 0) .collect ()
res1: Array [Int] = Array (1, 3, 5, 7, 9 ...)

Spark returns an int array with odd numbers from 1 to 400,000. With this array, you can make all the math operations used with Scala int arrays.

Now, you are inside Spark, where things can be achieved in a big corporate cluster.

Basically, Spark is a framework for processing large volumes of data— in gigabytes, terabytes, or even petabytes. When you work with small data volumes, however, there are many solutions that are more appropriate than Spark.

The two main concepts are the calculations and scale. The effectiveness of the Spark solution lies in making complex calculations over large amounts of data, in an expeditious manner.

Loading a Data File

Upload a text file in Spark within the Spark shell:

scala> val bigfile = sc.textFile ("./big/tooBigFile.txt")

This magically loads the tooBigFile.txt file to Spark, with each line a different entry of the RDD (explained shortly). The RDDs are very versatile in terms of scaling.

If you connect to the Spark master node, you may try to load the file in any of the different machines in the cluster, so you have to ensure that it can be accessed from all worker nodes in the cluster. In general, you always put your files in file systems like HDFS or S3. In local mode, you can add the file directly (e.g., sc.textFile ([path_to_file)). You can use the addFile()SparkContext function to make a file available to all machines in this way:

scala> import org.apache.spark.SparkFiles
scala> val myFile = sc.addFile( "/opt/big/data/path/bigFile.dat" )
scala> val txtFile = sc.textFile (SparkFiles.get("bigFile.txt"))

For example, if you load a (big) input file where each line has a lot of numbers, the first RDD file whose elements are strings (text lines) is not very helpful. To transform the string elements to an array of doubles, use your knowledge of the Scala language:

scala> val myArrays = textFile.map (line => line.split('').map(_. toDouble))

To verify that this is what you wanted, you can use the first() operator on both txtFile and myArrays to see that the first element in the txtFile is a string and in myArrays is an Array[Double].

Loading Data from S3

As part of Amazon support, you have access to a file system called Amazon S3 . To access it, you need the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY variables (to configure them, see the “Running Spark on EC2” section in this chapter).

For instance, you can use the Amazon examples on a data file from Wikipedia:

scala> val myWiki = sc.textFile ("S3N://bigdatademo/sample/wiki/")

We don’t need to set our AWS credentials as parameters for the Spark shell; this is the general path form for access the S3 file system:

S3N://<AWS ACCESS ID>:<AWS SECRET>@bucket/path

As another example, you can get Wikipedia’s traffic statistics from over the last 16 months at https://aws.amazon.com/datasets/wikipedia-traffic-statistics-v2/ .

Spark Architecture

Now is a good time to discuss the Spark mechanism. Let’s first talk about the architecture and then about the programming.

Parallelismis computational term used when we talk about performing operations in parallel; that is, if we have a process that works on a portion of data, we can “make copies” of that process to act simultaneously on the same portion of data. Not all processes are parallelizable. Spark’s power is in its ability to do parallel computing in a simple way; this is just one of its main advantages.

When you program on your machines or laptops, the Spark shell is run locally. The work is performed in a single node. When you are working on a cluster, all you have to do is connect the same shell to the cluster to run it in parallel. Figure 6-4 explains how Spark runs on a cluster.

A420086_1_En_6_Fig4_HTML.jpg
Figure 6-4. Spark cluster with three executor nodes

The two main concepts of Spark are the resilient distributed dataset (RDD) and the cluster manager. In a nutshell, the RDD is a parallelized computational abstraction of a collection. The cluster manager distributes the code and manages the data represented in the RDDs. The cluster manager has three responsibilities: controls the distribution and interaction with RDDs, distributes code, and manages the fault-tolerant execution.

Spark can work over several types of cluster managers; in this chapter, we talk about the standalone manager, and in a subsequent chapter, we talk about Apache Mesos. Hadoop Yarn is not covered in this book because we focus only on pipeline architectures, not Hadoop.

If you have Hadoop 2.0 installed, we recommend that you install Spark on Hadoop Yarn. If you have Hadoop 1.0 installed, we recommend that you use Spark Standalone. It is not suitable to install Apache Mesos and Hadoop Yarn at the same time.

The Spark driver program distributes the program classes in the cluster. The cluster manager starts the executors, one on each node, and assigns them a tasks set. When you run a program, all of this enginery runs transparently in your machines. For example, when you run on a cluster, the entire administration is transparent to you. That is Spark’s power.

SparkContext

Now that you have Spark running on your laptop, let’s start programming in more detail. The driver programs access the Spark core through the SparkContext object, which represents the connection between the cluster and the nodes. In the shell, Spark is always accessed through the sc variable; if you want to learn more about the sc variable, type this:

scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4152bd0f

Creating a SparkContext

In a program, you can create a SparkContext object with the following code:

val sparkContext = new SparkContext( masterPath, "applicationName", ["SparkPath (optional)"],["JarsList (optional)"])

It is always possible to hard-code the value of the parameters; however, it is best read from the environment with suitable defaults. This allows you to run the code when you change the host without recompiling it. Using local as the default for the master makes it easy to launch the application in a local testing environment. You must be careful when selecting the defaults. Here’s an example snippet:

import spark.sparkContext._
import scala.util.Properties


val masterPath = Properties.envOrElse("MASTER","local")
val sparkHome = Properties.get("SPARK_HOME")
val jarFiles = Seq(System.get("JARS"))
val sparkContext = new SparkContext(masterPath, "MyAppName", sparkHome, jarFiles)

SparkContext Metadata

The SparkContext object has useful metadata (see Table 6-1); for example, the version number, the application name, and the available memory. If you recall, information about the version is displayed when you start the Spark shell.

Table 6-1. Some Useful SparkContext Metadata

Value

Type

Use

appName

String

The application name. If you followed the convention, this value is useful at runtime.

getConf

SparkConf

Return a copy of this SparkContext’s configuration.

getExecutorMemoryStatus

Map[String, (Long, Long)]

Return a map from the slave to the maximum memory available for caching and the remaining memory available for caching. As it is distributed, it does not prevent OOM exceptions.

isLocal

Boolean

Are we running in local?

isStopped

Boolean

Are we running?

master

String

Master node name.

sparkUser

String

Spark OS username.

startTime

Long

Node start time.

version

String

Useful when testing several Spark versions.

Here are some examples of SparkContext metadata to print the Spark version, the application name, the master node’s name and the memory:

$ bin/spark-shell

scala> sc.version
res0: String = 1.6.1


scala> sc.appName
res1: String = Spark shell


scala> sc.master
res2: String = local[*]


scala> sc.getExecutorMemoryStatus
res3: scala.collection.Map[String,(Long, Long)] = Map(localhost:52962 -> (535953408,535953408))

SparkContext Methods

The SparkContext object is the main entry point for your application and your cluster. It is also used for loading and saving data. You can use it to launch additional Spark jobs and remove dependencies. Table 6-2 shows some SparkContext methods, but you can see all the SparkContext attributes and methods at https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.SparkContext $.

Table 6-2. Some Useful SparkContext Methods

Method

Parameter

Return

Use

addJar()

path:String

Unit

Adds jar files for all tasks to be executed on the SparkContext in the future.

addFile()

path:String

Unit

Distribute a file to all nodes on a cluster.

accumulator()

value: T

name: String

Accumulator

Creates an accumulator (a distributed variable among the cluster).

cancelAllJobs()

---

Unit

Cancel all jobs (scheduled and running).

clearJobGroup()

---

Unit

Clear the current thread’s job.

killExecutor()

id:String

Boolean

Request to cluster manager to kill the specified executors.

setJobDescription()

value:String

Unit

Set a human-readable description of the current job.

textFile()

path:String

minPartitions: int

String

Read a text file and return it as an RDD of strings.

stop()

---

Unit

Shut down the SparkContext.

Working with RDDs

The resilient distributed dataset is Apache Spark’s core concept. Spark has four design goals :

  • In-memory data storage. This is where Apache Hadoop is defeated, because Hadoop is primarily disk storage.

  • Fault tolerant. Achieved with two features: cluster operations and the application of linear operations on small data chucks.

  • Efficiency. Achieved with operation parallelization between cluster parts.

  • Fast. Achieved by minimizing data replication between cluster members.

The main idea is that with RDD, you only can perform two types of operations:

  • Transformations. When a transformation is applied on an RDD, a new RDD is created. For example, the set operations (union, intersection, and join) or as you learned in Chapter 3, mapping, filtering, sort, and coalesce.

  • Actions. When we apply an action over an RDD, the original RDD does not change. For example: count, collect, and first.

Computer science has a solid foundation in mathematics; all computer models have a solid mathematical model behind them. In functional programming, functions are first-class citizens; that is, functions are not modeled as objects, but are simply functions. When you apply a function to another function, the result is another function. In algebra this is known as function composition. If function f is applied to the function g, the operation is denoted as f o g, which is equivalent to f(g()).

In linear algebra, there are operations between vectors. There are vector operations whose input is various vectors and the result is a new vector (for example, vector addition). In Spark, vectors would be RDDs and operations whose return value is an RDD are equivalent to transformations.

On the other hand, there are functions whose input is several vectors and the output is a scalar value; for example, the inner product. In Spark, actions are the equivalent of these operations.

As with functional programming, there are also rules for RDDs:

  • Immutability. In both actions and transformations, the original RDD is not modified. Yes, the concept of a “variable” value in functional programming is an aberration: it does not exist; all the things (functions, values, objects) must be immutable.

  • Resilient. In Spark, the chain of transformations from the first RDD to the last RDD is always logged; that is, if a failure occurs (the power goes out or someone trips over the power cord), the process can be reproduced again from the beginning or from the point of failure.

  • Lazy evaluation. Since we live in a functional context, the transformations on RDDs are always lazy. They are not executed until (and only until) the end result is required. As you saw in Chapter 3, this exists to improve performance, because it avoids unnecessary data processing and the waste of resources (usually caused by the developer).

  • Process aware. As you saw in Chapter 4, lazy evaluation prevents deadlocks and bottlenecks, because it prevents the indefinite process of waiting for other processes’ output. Recall that the lazy evaluation emulates all the operations already made and uses a “result avatar” to estimate the final result.

  • Memory storage. By default, RDDs are born, and live and die in memory. The RDDs are stored on disk only if explicitly requested. This increases the performance terrifically, because you don’t fetch them from the file system or database.

In addition, we now have the DataFrames API (since 2015). This API offers the following:

  • Scalability. You can test kilobyte-sized data samples on your laptop, and then run the same programs on a production cluster with several terabytes of data.

  • Optimization. The powerful Spark SQL Catalyst optimizer offers two advantages: SQL beautification and SQL optimization. It also provides source code generation from actual SQL.

  • Integration. Smooth integration with the other members of the Spark family (Core, SQL, Streaming, MLlib, GraphX).

  • Multiformat. Supports several data formats and storage systems.

Before continuing, we must take the time to learn about what RDDs are and what they are not. It is crucial to understand that when an RDD is defined, it actually contains no data. You only create a container for it. RDDs follow the lazy evaluation principle; an expression is not evaluated until it is necessary (i.e., when an action is requested). This means that when you try to access the data in an RDD, you could fail. The data operation to create an RDD is only performed when the data is referenced to store or catch the RDD.

This also means that when you concatenate a large number of operations, you don’t have to worry about the excessive operations locking a thread. It is important to keep this in mind during application development—when you write and compile code, and even when you run the job.

Standalone Apps

You can run Spark applications in two ways: from the pretty Spark shell or from a program written in Java, Scala, Python, or R. The difference between the two modes is that when you run standalone applications , you must initialize the SparkContext, as you saw in previous sections.

To run a program in Scala or Java, it is best to use Maven (or Gradle or SBT, whichever you want). You must import the dependency to your project. At the time of this writing, the Spark version is 1.6.1 (version 2 exists, but it’s very new).

The following are the Maven coordinates for version 1.6.1:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.1

Initializing the SparkContext

Once you have Spark dependencies installed in your project, the first thing that you have to do is create a SparkContext.

As you saw earlier, you must first create an object of type SparkConf to configure your application, and then build a SparkContext object from it.

// All the necessary imports
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
// Create the SparkConf object
val conf = new SparkConf().setMaster("local").setAppName("mySparkApp")


// Create the SparkContext from SparkConf
val sc = new SparkContext(conf)

The SparkConf constructor receives two parameters:

  • Cluster URL. This is "local" when you want to run the program on one thread on the local machine, (i.e., without a cluster).

  • Application name. This name is used to identify your application in the cluster manager UI (running in cluster mode). Here we called it mySparkApp.

Standalone Programs

We already have the necessary imports: the SparkConf object and the SparkContext object. Now let’s give our program a body. It is important to note that all the stuff that runs on the Spark shell should run on Spark Standalone programs.

In all the “big data” books there is a word-count example; this book could not be the exception. Our program input is a file (yes, we already know how to load files) of the Franz Kafka novel The Process, in English.

The exercise objective is to see the number of times a word occurs and to see the most repeated words.

// We create a RDD with the the-process.txt file contents
val myfile = sc.textFile("the-process.txt")
// Then, we convert the each line text to lowercase
val lowerCase = myFile.map( line => line.toLowerCase)
// We split every line in words (strings separated by spaces)
// As we already know, the split command flattens arrays
val words = lowerCase.flatMap(line => line.split("\s+"))
// Create the tuple (word, frequency), initial frequency is 1
val counts = words.map(word => (word, 1))
// Let’s group the sum of frequencies by word, (easy isn’t?)
val frequency = counts.reduceByKey(_ + _)
// Reverse the tuple to (frequency, word)
val invFrequency = frequency.map(_.swap)
// Take the 20 more frequent and prints it
invFrequency.top(20).foreach(println)

It is fundamental to note that everything doesn’t run until the last println invocation. Yes, all the previous words are transformations, and the last line is the action. We will clear this up later.

Hold on, the most frequent types of words (in all human languages) are conjunctions and prepositions, so before separating each sentence into words in the third step, we filter the “stop words” in English (obviously there are better lists on Internet, this is just an example).

val tinytStopWords = Set("what", "of", "and", "the", "to", "it", "in", "or", "no", "that", "is", "with", "by", "those", "which", "its", "his", "her", "me", "him", "on", "an", "if", "more", "I", "you", "my", "your", "for" )

val words = lowerCase
.flatMap(line => line.split("\s+"))
.filter(! tinyStopWords.contains(_))

Run the Program

When your program is complete, use the script located on /bin/spark-submit to run it. Modern Java/Scala IDEs have embedded the Spark integration to run it smoothly.

But this book is mostly read by command-line fellas and old-school soldiers. Here we show how to run it from a command line with SBT and with Maven:

// To run it with sbt
sbt clean package
$SPARK_HOME/bin/spark-submit
--class com.apress.smack.WordFreq
./target/...(as above)
./README.md ./wordfreq


// To run it with Maven
mvn clean && mvn compile && mvn package
$SPARK_HOME/bin/spark-submit
--class com.apress.smack.WordFreq
./target/WordFreq-0.0.1.jar
./README.md ./wordfreq

If nothing works, you can always refer to the official Spark guide at http://spark.apache.org/docs/latest/quick-start.html .

RDD Operations

RDDs have two types of operations: transformations and actions . Transformations are operations that receive one or more RDD as input and return a new RDD. Actions return a result to the driver program and/or store it, and/or trigger a new operation.

If you still get confused and don’t know how to distinguish them, this is the rule: transformations return RDDs; actions don’t.

Transformations

Transformations are operations with these characteristics:

  • Lazy evaluation. Transformations are lazy operations; they aren’t calculated until you perform an action or explicitly invoke the collect method. This behavior is inherited from the actor model and functional programming.

  • Element-wise. Transformations work on each individual element of a collection; one at a time.

  • Immutable. RDDs are immutable, thus transformations are immutable too (i.e., they can’t modify the value of the RDD received as a parameter. There are no global variables).

  • Lineage graph. Let’s suppose you have a transformations sequence. We have RDDs as result of transformations in other RDDs. Spark keeps a track of each operation and of the dependencies among all the RDDs. This record, known as a lineage graph, is kept to recover the system from a failure. Spark always builds a lineage graph when running distributed applications on a cluster.

Table 6-3 enumerates the main transformations.

Table 6-3. Spark Main Transformations

Transformation

Purpose

Example

filter( function)

Builds a new RDD by selecting the elements on which the function returns true.

> val rdd = sc.parallelize(List(“Spark”, “Mesos”, “Akka”, “Cassandra”, “Kafka”))

> val k = rdd.filter(_.contains(“k”))

> k.collect()

Result:

Array[String] = Array(Spark, Akka, Kafka)

map( function)

Builds a new RDD by applying the function on each element.

> val rdd = sc.parallelize(List(1, 2, 3, 4))

> val t = rdd.map(_*5)

> t.collect()

Result:

Array[Int] = Array(5, 10, 15, 20)

flatMap( function )

The same as map() but it returns a sequence instead of a value.

> val rdd = sc.parallelize(List(“Big Data are Buzzwords”, “Make Fast Data”))

> val fm = rdd.flatMap( s => s.split(“ ”) )

> fm.collect()

Result:

Array[String] = Array(Big, Data, are, Buzzwords, Make, Fast, Data)

reduceByKey( function, [number] )

Aggregates the values of a key using the function.

> val words = fm.map( w => (w, 1) )

> val wordCount = words.reduceByKey( _+_ )

> wordCount.collect()

Result:

Array[(String, Int)] = Array((are,1), (Big,1), (Fast,1), (Make,1), (Buzzwords,1), (Data,2))

groupByKey([numTasks])

Converts (K, V) to (K, Iterable<V>).

> val wc = wordCount.map{case(w,c) => (c,w)}

> wc.groupByKey().collect()

Result:

Array[(Int, Iterable[String])] = Array((1,CompactBuffer(are, Big, Fast, Make, Buzzwords)), (2,CompactBuffer(Data)))

distinct([numTasks])

Eliminates duplicates.

> fm.distinct().collect()

Result:

Array[String] = Array(are, Big, Fast, Make, Buzzwords, Data)

Table 6-4 lists the main transformations on sets.

Table 6-4. Main Transformations on Sets

Transformation

Purpose

Example

union()

Builds a new RDD containing all elements from the source and the argument.

> val foo = sc.parallelize(List(“Big”, “Data”))

> val bar = sc.parallelize(List(“Fast”, “Data”))

> foo.union(bar).collect()

Result:

Array[String] = Array(Big, Data, Fast, Data)

intersection()

Builds a new RDD containing only common elements between the source and argument.

> foo.intersection(bar).collect()

Result:

Array[String] = Array(Data)

cartesian()

Builds an RDD with cross product of all elements from the source and the argument.

> foo.cartesian(bar).collect()

Result:

Array[(String, String)] = Array((Big,Fast), (Big,Data), (Data,Fast), (Data,Data))

subtract()

Builds a new RDD by removing common data elements between source and argument.

> foo.subtract(bar).collect()

Result:

Array[String] = Array(Big)

join( RDD, [number] )

When invoked on (K,V) and (K,W), creates a new RDD with (K, (V,W))

> val foo = sc.parallelize( Seq((1, “S”), (2, “M”), (3, “A”), (1, “C”), (4, “K”)))

> val bar = sc.parallelize( Seq((1, “W”), (2, “X”), (3, “Y”), (2, “Z”)))

> foo.join( bar ).collect()

Result:

Array[(Int, (String, String))] = Array((1,(S,W)), (1,(C,W)), (2,(M,X)), (2,(M,Z)), (3,(A,Y)))

cogroup( RDD, [number] )

Converts (K, V) to (K, Iterable<V>).

> foo.cogroup(bar).collect()

Result:

Array[(Int, (Iterable[String], Iterable[String]))] = Array((4,(CompactBuffer(K),CompactBuffer())), (1,(CompactBuffer(S, C),CompactBuffer(W))), (2,(CompactBuffer(M),CompactBuffer(X, Z))), (3,(CompactBuffer(A),CompactBuffer(Y))))

Actions

Although actions return scalar (simple) values, you must never underestimate them, since the internal process can become really complex. Actions return the result to the driver program and/or write in and store the result.

Pipeline of operations are advanced sequentially, operation by operation; however, remember that everything is lazy evaluation. Flow can advance, and when it finds an action, everything is evaluated to that point. Actions trigger the evaluation of all previous transformations.

Actions always trigger an evaluation because they must always return a value; if they don’t return a value or store something, they can’t continue. Table 6-5 enumerates the main Spark actions.

Table 6-5. Main Spark Actions

Action

Purpose

Example

count()

Obtains the number of RDD elements.

> val smack = sc.parallelize( List(‘s', 'M', 'A', 'C', 'K') )

> smack.count()

Result:

long = 5

collect()

Returns all the RDD elements as an array.

> val smack = sc.parallelize( List(“S”, “M”, “A”, “C”, “K”) )

> smack.collect()

Result:

Array[String] = Array(S, M, A, C, K)

reduce( function)

Aggregates the RDD elements using the function.

> val smack = sc.parallelize( List(1, 5, 2, 4, 3) )

> smack.reduce(_+_) // the sum of all

Result:

Int = 15

take( n )

Fetches the first n elements of the RDD.

> val smack = sc.parallelize( List(‘s', 'M', 'A', 'C', 'K') )

> smack.take(4)

Result:

Array[Char] = Array(S, M, A, C)

foreach( function)

Executes the function in each RDD element.

> val s = sc.parallelize(List(1, 4, 2, 3))

> s.foreach(n =>

print( “%s*7=%s ”.format(n, n*7) ))

Result:

1*7=7 4*7=28 2*7=14 3*7=21

first()

Fetches the RDD first element, the same as take(1).

> val rdd = sc.parallelize(List(4, 3, 2, 1))

> rdd.first()

Result:

Int = 4

saveAsTextFile(path)

Writes the RDD content to the text file on local file system/HDFS.

> val myLogs = sc.textFile(“/users/smack/evidence.log”)

> myLogs.filter(_.contains(“Fatal”)).

myLogs.saveAsTextFile(“/users/smack/fact.txt”)

Result:

smack@localhost∼/smack$ ls _SUCCESS part-00000 part-00001

RDD Persistence (Caching)

Now you know that RDDs support lazy evaluation. But what if you want to use the same RDD several times? If you don’t do this work conscientiously, by default, Spark will recalculate the RDD and all of its dependencies each time that you apply an action on it. If not done carefully, this can be very expensive.

You can tell Spark to persist the RDD to avoid recalculating them all the time. When you persist an RDD, the nodes working with it store the RDD partitions assigned to them. If a node fails, Spark recalculates lost partitions as needed (yes, it’s powerful).

You can also replicate your RDD among several nodes if you want to handle a node failure without performance implications. As shown in Table 6-6, Spark offers several levels of persistence to suit all of our scenarios and needs. Note that when writing data to disk the data is always serialized.

Table 6-6. RDD Persistence Levels

Persistence Level

CPU Used

Space Used

On Disk

In Memory

MEMORY_ONLY

Low

High

No

Yes

MEMORY_AND_DISK(*)

Medium

High

Some

Some

MEMORY_ONLY_SER

High

Low

No

Yes

MEMORY_AND_DISK_SER(*)

High

Low

Some

Some

DISK_ONLY

High

Low

Yes

No

OFF_HEAP (experimental)

Low

Low

Some

Some

*Write to disk if there is much data stored in memory. (Note that SER means serializable)

An important caching scheme is off-heap, a mixed scheme. It was previously called Tachyon, but now it’s called Alluxio ( http://alluxio.org/ ). Note that the off-heap catching doesn’t guarantee recovery after failure.

This is a code example:

import org.apache.spark.storage.StorageLevel
val rdd = input.map( foo )
rdd.persist( StorageLevel.DISK_ONLY )
rdd.reduce( bar )
rdd.collect()

Here are some points to consider:

  • You must call the persist() method in the code before the first action.

  • The persist() function doesn’t force the evaluation.

  • Spark automatically evicts old partitions using an LRU (least recently used) cache policy.

  • The persist() method counterpart is the unpersist() method to manually remove RDDs from the cache.

Spark in Cluster Mode

In this chapter, we have focused on running Spark in local mode. As we mentioned, horizontal scaling is what makes Spark so powerful. To run Apache Spark on a cluster, you do not need specialized software-hardware integration engineers. To escalate, you don’t need to make great efforts and stop the entire production to add more machines to your cluster.

The good news is that the same scripts that you are building on your laptop with examples that are only a few kilobytes can run on business clusters running terabytes of data. There is no need to change your code, nor invoke another API. All you have to do is test your model several times to know if it runs correctly, and then you can deploy it.

In this section, you analyze the runtime architecture of a distributed Spark application. Then you see the options to run Spark on a cluster.

Apache Spark has its own built-in standalone cluster manager. But you can run it on multiple cluster managers, including Hadoop YARN, Amazon EC2, and Apache Mesos. This topic is so large that it has its own chapter in this book.

Runtime Architecture

Before running Spark on a cluster, it’s important to understand the distributed Spark architecture.

As shown in Figure 6-5, Spark uses a master/slave architecture. The master is called the driver and the slaves are called executors. When running on a single machine, there is a distributed architecture: a driver with several executors. The driver runs in its own Java process, and each executor runs in a separate Java process. This architecture is made on the actor model.

A420086_1_En_6_Fig5_HTML.jpg
Figure 6-5. Distributed Spark application

The driver and executors set is known as a Spark application. If you have more than one machine, the Spark application must be launched using the cluster manager service. The Spark application architecture is always the same; it does not matter if it’s clustered or not.

In a typical Spark clustered application architecture, each physical machine has its own executor. You will see several strategies to know when an executor dies or goes offline.

Driver

The driver is the process where the SparkContext runs. It is in charge of creating and executing transformations and actions on RDDs. When you run the Spark shell command on your laptop, you are actually creating a driver program. Its first task is to create the SparkContext, called sc. When the driver program dies, the entire application dies.

The following sections explain the two responsibilities in the life of a driver program: dividing a program into tasks and scheduling tasks on executors.

Divide a Program into Tasks

The Spark driver is responsible for splitting the user program, which could be programmed in an inefficient way in execution units called tasks.

A user program basically applies transformations and actions into one or more RDDs to generate new RDDs and calculate and/or store data.

Another task of the Spark driver is to generate an operation’s directed acyclic graph (DAG) . With this graph, the driver knows which tasks are assigned to which node; so if you lost a node, the driver knows at which point it was at and how to assign the lost node’s tasks to the remaining nodes.

The driver also does pipeline optimizations; it splits the DAG into stages. Each stage has multiple tasks. In Spark, the task is the smallest work unit; a normal program can launch thousands of tasks.

Scheduling Tasks on Executors

Given a physical execution plan, the Spark driver coordinates which tasks are performed by each executor node . When an executor starts operating, it registers itself in the driver, so the driver always has an entire view of all the executor nodes. Each executor is a standalone Java process that can run tasks and store RDDs.

When a program runs, the driver subdivides the program into tasks, sees all the available executor nodes, and tries to balance the workload among them. The driver also knows which part of the data that each node has, in order to rebuild everything at the end.

The driver displays its information on the Web, so that the user can always see what is happening; by default, it runs on port 4040. When you run locally, it can be accessed at http://localhost:4040, as you can see in Figure 6-6 (let’s run the Spark shell and browse it).

A420086_1_En_6_Fig6_HTML.jpg
Figure 6-6. Spark shell application web UI

Executor

Executors are responsible for running the individual tasks of a given Spark job. Executors are launched when you start the Spark application; they live while the Spark application lives.

Executors have two roles:

  • Run the assigned tasks and deliver results to the driver.

  • Provide in-memory storage for RDDs. A program called Block Manager runs on each executor and manages memory and RDDs.

When running Spark in local mode, the driver and executors run in the same process. This is for development purposes; it is not recommended in a production environment.

Cluster Manager

Spark depends on a cluster manager to coordinate and launch the executors. The cluster manager that ships with the Spark distribution is called the standalone manager, but it is a pluggable component. You can change it and use a custom cluster manager like Hadoop Yarn, Amazon EC2, or Apache Mesos.

It is important to note that the terms driver and executor are used when talking about the Spark application. When we talk about the cluster manager, we use the terms master and worker. It is important not confuse the terms or to exchange them, because they are different concepts.

Regardless of the cluster manager that you use, Spark provides a single script, called spark-submit, to launch the program. The spark-submit script can connect to different managers through various options and manage the cluster resources that the application needs.

Program Execution

When you run a Spark application on a cluster, these are the steps followed by the program:

  1. The user runs the spark-submit shell.

  2. The spark-submit shell launches the driver program, and calls the user program’s main() method.

  3. The driver program establishes the connection to the cluster manager, which has the slave machines list. Then, the necessary resources are requested to launch the executors.

  4. The cluster manager launches executors in each slave node.

  5. The driver program analyzes, divides, and distributes the user application, sending each executor its tasks.

  6. The tasks run in the executors, calculating and storing the results.

  7. The user program ends when the exit() method in the main()method is invoked, or when the SparkContext stop() method is called.

  8. The driver program ends the executors and frees the cluster manager’s resources.

Application Deployment

Spark uses the spark-submit tool to send jobs to the cluster manager.

When you run a program in local mode, you only invoke spark-submit passing your script name or jar file as a parameter.

When you run a program in cluster mode, you have to pass additional parameters—for example, the size of each executor process.

The --master flag specifies the cluster URL to which you want to connect. In this case, spark:// means that we are using Spark in stand-alone mode.

For example:

bin/spark-submit --master spark://skynet:7077 --executor-memory 10g Terminator.jar

Here we indicate that we will run our Terminator program in stand-alone cluster mode in the master node called SkyNet and each executor node will have 10 gigabytes of memory.

In addition to the cluster URL, spark-submit has several options to specify how you want to run your application. These options are in of two categories:

  • Scheduling data. The amount of resources that each job will have.

  • Dependencies. The files and libraries available in the slaves.

Table 6-7 lists and describes some of the spark-submit flags.

Table 6-7. spark-submit Flags

Flag

Purpose

--master

The cluster manager to connect to (sample values explained in Table 6-8).

--deploy-mode

Indicates if the program is launched in local mode (client) or cluster mode. In local mode, the driver is launched where spark-submit is launched. The default value is client mode.

--class

The main application class is Java/Scala.

--name

Human-readable name for your application, as displayed in Spark web UI.

--jars

List of jar files to upload on application classpath.

--files

List of files uploaded on the application’s working directory on each node.

--executor-memory

Memory for executors: k for kilobytes, m for megabytes, g for gigabytes.

--driver-memory

Memory for the driver process: k for kilobytes, m for megabytes, g for gigabytes.

--conf prop=value

A single configuration property value in key-value form.

--properties-file

A configuration file with properties in key-value form.

Table 6-8. Master Flag Sample Values

Value

Meaning

spark://host:port

Connect to a cluster in stand-alone mode at specified host and port. 7077 is the default port for a stand-alone master.

mesos://host:port

Connect to a Mesos cluster at the specified host and port. 5050 is the default port for the Mesos master.

local

Local mode master with a single core.

local[N]

Local mode master with N cores.

local[*]

Local mode master using as many cores as the machine has.

Table 6-8 lists a few example values that the --master flag could have.

Now you are able to read this:

$ ./bin/spark-submit 
--master spark://skynet:7077
--deploy-mode cluster
--class com.cyberdyne.Terminator
--name "T1000 model"
--jars neuralNetwork.jar,geneticAlgorithm.jar
--total-executor-cores 300
--executor-memory 10g
terminator.jar

Here we indicate that we will run our terminator.jar program:

  • In stand-alone cluster mode in the SkyNet master node on port 7077

  • The driver program is launched in cluster mode

  • The main class is com.cyberdyne.Terminator

  • The application display name is T1000 model

  • The neuralNetwork.jar and geneticAlgorithm.jar files are used

  • Each executor node uses 300 cores and has 10 gigabytes of memory

Running in Cluster Mode

This section discusses some of the most common methods to install Spark in cluster mode . On a single laptop, Spark is excellent for developing and testing; but in practice, it is necessary to know how to install Spark with built-in scripts on a dedicated cluster via SSH (Secure Shell). This section covers how to deploy on a cluster in Spark Standalone and with Mesos. This section also covers how to deploy Spark in the cloud with Amazon EC2.

Spark Standalone Mode

When you run bin/start-master.sh, you start an individual master. When you run sbin/start-slaves.sh, you start a worker. The default port of the Spark master is always 8080. Because no one wants to go to each machine and run scripts by hand in each, there is a set of useful scripts in the /bin directory that can help you run your servers.

A prerequisite to use any of the scripts is to have passwordless SSH access from the master to all worker machines. It is always advisable to create a special user to run Spark on all machines. In this book, the examples use the name sparkuser. From the master, you can run the ssh-keygen command to generate the SSH keys. When an RSA key is generated, by default, it is stored in ∼/.ssh/id_rsa.pub. You have to add it to each host in ∼/.ssh/authorized_keys.

The Spark administration scripts require that user names match. If this is not the case, you can configure alternative user names in ∼/.ssh/config.

Now that you have SSH access to the machines, it’s time to configure Spark. There is a template in the conf/spark-env.sh.template directory, which should be copied to conf/spark-env.sh. Table 6-9 lists some of the environment variables.

Table 6-9. Spark Environment Variables

Name

Purpose

Default value

MESOS_NATIVE_LIBRARY

Points to Mesos installation directory.

---

SCALA_HOME

Points to Scala installation directory.

---

SPARK_MASTER_IP

The IP address where the Spark master listens and where the workers connect to; for example, a public one.

hostname command output

SPARK_MASTER_PORT

The port number where the Spark master listens.

7077

SPARK_MASTER_WEBUI_PORT

The port number for the master web user interface.

8080

SPARK_MASTER_OPTS

Configuration properties that apply only to the master in the form of "-Dx=y".

---

SPARK_WORKER_CORES

Number of cores to be used by the worker.

Total number of cores

SPARK_WORKER_MEMORY

Amount of memory to be used by the worker.

Total system memory minus 1GB; if you have less than 1GB, it’s 512MB.

SPARK_WORKER_PORT

The port number on which the worker runs on.

Random

SPARK_WORKER_WEBUI_PORT

The port number of the worker web user interface.

8081

SPARK_WORKER_DIR

Directory on which files from the worker are stored.

SPARK_HOME/work

SPARK_WORKER_OPTS

Configuration properties that apply only to the worker in the form of "-Dx=y".

---

SPARK_DAEMON_MEMORY

Memory to allocate to the Spark master and worker daemons.

1GB

SPARK_DAEMON_JAVA_OPTS

JVM options for the Spark master and worker daemons in the form of "-Dx=y".

---

Once the configuration is made, you must start the cluster. We highly recommend that you install the pssh tool, which is a set of tools that includes pscp SSH. The pscp command makes it easy to secure copying between hosts (although it takes a little while); for example:

pscp -v -r -h conf/slaves -l sparkuser ../spark-1.6.1 ∼/

When you have finished changing the settings, you need to distribute the configuration to workers, as shown here:

pscp -v -r -h conf/slaves -l sparkuser conf/spark-env.sh ∼/spark-1.6.1/conf/spark-env.sh

After you have copied the files, you are ready to start the cluster. Use these scripts: sbin/start-all.sh, sbin/start-master.sh, and sbin/start-slaves.sh.

It is important to note that start-all.sh and start-master.sh are assumed to be running on the master node in the cluster. All scripts start demonizing, so there is no problem running them on the screen:

ssh master bin/start-all.sh

In the event that you get a java.lang.NoClassDefFoundError: scala/ScalaObject error, check that you have Scala installed on the host and that the SCALA_HOME environment variable is set properly.

Spark scripts assume that the master has Spark installed in the same directory as your workers. If this is not the case, you must edit bin/spark-config.sh to point to the correct directories.

Table 6-10 shows the commands provided by Spark to manage the cluster. If you want more information, go to http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts .

Table 6-10. Spark Cluster Administration Commands

Command

Purpose

bin/slaves.sh <command>

Runs the provided command on all of the worker hosts. For example, to show how long each hosts worker has been up: bin/slave.sh uptime

bin/start-all.sh

Starts the master and all the worker hosts. Must be run on the master.

bin/start-master.sh

Starts the master host. Must be run on the master.

bin/start-slaves.sh

Starts all the worker hosts.

bin/start-slave.sh

Starts a specific worker host.

bin/stop-all.sh

Stops all master and workers hosts.

bin/stop-master.sh

Stops the master host.

bin/stop-slaves.sh

Stops all the worker hosts.

Now the Spark cluster is running. As shown in Figure 6-7, there is a useful web UI running in the master on port 8080, and in workers running on port 8081. The web UI contains important information about the workers’ current and past jobs.

A420086_1_En_6_Fig7_HTML.jpg
Figure 6-7. Spark Master UI

Now that you have a cluster up and running, you can do several things with it. As with single host examples, you have the same scripts to run Spark examples on a cluster.

All the example programs are in the examples/src/main/scala/spark/examples/ directory and take the master parameter, which points them to the master IP host. If you are running on the master host, you can run the example:

./run spark.examples.GroupByTest spark://'hostname':7077

If you get a java.lang.UnsupportedClassVersionError error, it is because you need to update the JDK. Always use the supported versions. To check which version compiled your Spark distribution, use the command:

java -verbose -classpath ./core/target/scala-2.11.8/classes/ spark.SparkFiles | head -n 20

Version 49 is JDK 1.5, version 50 is JDK 1.6, and version 60 is JDK 1.7

If you cannot connect to the host, make sure that you have configured your master to listen to all IP addresses.

Running Spark on EC2

The ec2 directory contains the scripts to run a Spark cluster in Amazon EC2. These scripts can be used to run Spark in single or cluster mode. Spark can also run on Elastic MapReduce, which is the Amazon solution for MapReduce cluster management.

The final configuration to run Spark in EC2 is at http://spark.apache.org/docs/latest/ec2-scripts.html .

To begin, you must have EC2 enabled in your Amazon account. It should generate a key pair for the Spark cluster. This can be done at https://portal.aws.amazon.com/gp/aws/securityCredentials . Remember that the key pairs are generated by region. You must make sure to generate them in the same region where you run your hosts. You can also choose to upload a SSH public key instead of generating a new one. They are sensible, so you have to be sure to keep them in a safe place. In our environments, we need two environment variables, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY, which are available from our EC2 scripts:

export AWS_ACCESS_KEY_ID = {our} AWS access key
export AWS_SECRET_ACCESS_KEY = {our} AWS secret key

There are some scripts provided by Amazon at http://aws.amazon.com/developertools/Amazon-EC2 .

To check that everything is running correctly, type this command:

$ Ec2-describe-regions

That should result in the following output:

REGION  ap-northeast-1  ec2.ap-northeast-1.amazonaws.com
REGION  ap-southeast-1  ec2.ap-southeast-1.amazonaws.com
REGION  ap-southeast-2  ec2.ap-southeast-2.amazonaws.com
REGION  eu-central-1    ec2.eu-central-1.amazonaws.com
REGION  eu-west-1       ec2.eu-west-1.amazonaws.com
REGION  us-east-1       ec2.us-east-1.amazonaws.com
REGION  us-west-1       ec2.us-west-1.amazonaws.com
REGION  us-west-2       ec2.us-west-2.amazonaws.com
REGION  sa-east-1       ec2.sa-east-1.amazonaws.com

The Spark EC2 script automatically generates a group of different security and firewall rules to run the Spark cluster. By default, our Spark cluster is accessible on port 8080. For security, we strongly recommend changing the 0.0.0.0/0 address in the spark_ec2.py script with our public IP address.

To start the cluster, use this command:

./ec2/spark-ec2 -k spark-keypair -i pk-{....}.pem -s 1 launch lePetitCluster

Where {....}.pem indicates the path to our private key.

If you get a “not being reliable to SSH to the master” error, it is because the key can be accessed by others users. Try changing the access permissions to the key so that only one user can read it. If more users can read it, the SSH will refuse it.

If you get the “cannot yet SSH script to the master” error, it is because we are having a race condition; the hosts are reporting them as alive. Try changing the -w in setup_cluster by using a sleep of 120 seconds.

If you get a “transient error while launching a cluster” error, try to run the script with the --resume option.

This makes the scaffolding to make a cluster with a master and a worker node with all default values. The next task is to verify that the firewall rules work. Access the master on port 8080.

The JPS command gives the following information about our cluster:

root @ ip-172-31-45-56 ∼] $ jps
1904 NameNode
2856 Jps
2426 MasterNameNode
2078 SecondaryNodeName

This information is about the name of Spark master, the Hadoop node, and slave nodes.

You can take the example of Pi that you ran on your machine at the beginning of this chapter:

cd spark
bin/run-example SparkPi 10

To terminate the instances, use this command:

ec2/spark-ec2 destroy <cluster name>

To learn all the options that the spark-ec2 command has, run this:

ec2/spark-ec2 -help

There are many types of EC2 instances available; the type has an important impact on the cluster performance. The type can be specified with --instance-type = [typeName]. If you need a lot of memory, you can specify it here.

By default, the same instance type is used for masters and workers. To specify a different type of master you use --master-instance-type = [typeName].

EC2 has also instances of the type GPU, which are very powerful if you run on Envidia graphic cards because they usually have hundreds of GPUs at a very low cost. The GPU instance type is useful when you run a worker, if the master is not used. It is important to note that EC2 performance on the GPU may be lower than when testing locally, because you have more I/O imposed by the hypervisor.

EC2 scripts use the Amazon Machine Images (AMI) provided by the Spark development team; they are enough for most applications.

Running Spark on Mesos

This book devotes an entire chapter on Mesos (Chapter 7), but we must include a special mention in this Spark chapter.

Mesos is a cluster management platform to run multiple distributed applications (or frameworks) on a cluster. Mesos intelligently manages and runs clusters of Spark, Hadoop, Cassandra, Akka, and Kafka. Mesos can run Spark as separate Mesos tasks or run all in a single Mesos task. Mesos quickly scales horizontally to manage clusters beyond the size that individual Python scripts allow.

Mesos has a large number of configuration scripts that you can use. For Ubuntu installations, use configure.ubuntu-lucid-64. In addition to the Spark requirements, you need to ensure that you have the Python C header files installed. Since Mesos must be installed on all of your machines, you can use Mesos to configure other machines:

./configure --prefix=/home/sparkuser/mesos && make && make check && make install            

As with the Spark Standalone mode configuration, you must ensure that Mesos nodes can find each other.

Let’s begin by adding the master hostname to mesossprefix/var/mesos/deploy/masters and all the worker hostnames to mesossprefix/var/mesos/deploy/slaves. Then point the workers to the master in mesossprefix/var/mesos/conf/mesos.conf.

Once Mesos is configured on your machines, you need to configure Spark to run on Mesos. This is as simple as copying the conf/spark-env.sh.template file to conf/spark-env.sh and updating the MESOS_NATIVE_LIBRARY variable to point to the path where Mesos is installed.

Then copy the build to all machines using this secure shell copy command:

pscp -v -r -h  -l sparkuser ./mesos /home/sparkuser/mesos

To start Mesos clusters, use mesosprefix/sbin/mesos-start-cluster.sh. Use mesos://[host]:5050 as the master.

Well, that’s roughly what you have to do to run an Apache Spark cluster on Apache Mesos.

Submitting Our Application

To submit our application to a standalone cluster manager, type this:

spark-submit --master spark://masterNodeHost:masterNodePort appName

Previously, you saw the spark-submit command syntax. Now just change the master node address, and that’s it.

The cluster URL is also displayed in the cluster administrator web UI at http://[masternode]:8080. The host name and port should exactly match the URL present in the web UI. As administrators, we can configure ports other than 7077.

The standalone cluster manager has a --deploy-mode option :

  • Client mode (local, default). The driver runs in the same process as spark-submit. If you shut down the spark-submit, your cluster goes down.

  • Cluster mode. The driver is launched as a different process on a worker node. You can shut down the machine on which spark-submit is running and everything will continue running.

For each executor node, the --total-executor-cores and --executor-memory parameters specify the number of cores and the available memory. A common error is to ask for more resources than can be assigned. In this case, the cluster manager can’t start the executor nodes. Always check that your cluster has the resources that you state in the parameters.

Configuring Resources

In the standalone cluster manager, the resources are controlled by two variables :

  • --executor-memory argument of the spark-submit command

  • The total memory on each executor. By default, this value is set to 1GB; unfortunately, in production environments, one gigabyte of memory assigned to each executor won’t be enough.

  • --total-executor-cores argument of the spark-submit command

  • The total number of cores used on each executor. The default value is unlimited (i.e., the application starts an executor on each available node in the cluster).

To check your current parameters, you can browse the standalone cluster manager’s web UI at http://[masterNode]:8080.

By default, the standalone cluster manager disperses the executors onto the largest number of machines available. For example, suppose you have an eight-node cluster and each machine has four cores. You launch the application with --total-executor-cores 6. Spark will raise only six executors, and tends to use the largest number of machines possible (i.e., an executor on each machine, leaves two nodes without an executor).

If you want to use the fewest number of nodes possible, change the spark.deploy.spreadOut configuration property to false in the conf/spark-defaults.conf configuration file. If we turn off this flag in our example, we will use only two machines: one with four executors and the other with two executors. The other six machines won’t run executors.

These settings affect all the applications on the cluster, so use it with caution.

High Availability

You saw how to specify the cluster mode in the Standalone cluster manager deploy mode, so if the spark-submit process dies, the manager doesn’t also die. This is because the driver runs on one member of the cluster.

You want to keep your manager running while the last node is still standing. To increase manager life, there is a top-notch tool called Apache ZooKeeper, which is discussed in Chapter 7.

Spark Streaming

Life is a continuous process, it is not discrete. Life is a continuous flow.

As we mentioned in earlier chapters, the benefits of the data are greater when the information is fresher. Many machine learning computations should be calculated in real time from streaming data.

Spark Streaming is the Apache Spark module for managing data flows. Most of Spark is built over the RDD concept. Spark Streaming introduces a new concept: discretized streams, known as DStreams. A DStream is an information sequence related to time. Note that a DStream is internally a sequence of RDDs, thus the name discretized.

Just as RDDs has two operations, DStreams also have two types of operations: transformations (results in another DStream) and output operations, which are designed to store information on external systems. DStreams have many of same the operations available to RDDs, plus time-related operations, such as sliding windows.

Unlike batch operations, Spark Streaming applications require additional configurations to provide 24/7 services. Let’s talk about resetting applications in the event of a failure and discuss how to configure automatic restarts.

Spark Streaming Architecture

Spark Streaming uses the microbatch architecture , in which streaming is considered a continuous series of small batches of data (see Figure 6-8). The magic of Spark Streaming is receiving a continuous flow and splitting it into small data chunks.

A420086_1_En_6_Fig8_HTML.jpg
Figure 6-8. A DStream is an RDD sequence

Batches are generated at regular time intervals; if two data chunks come in the same time window, they are included in the same data batch. The batch size is determined by the parameter batch interval, which usually has a value between 500 milliseconds and several seconds. The developer specifies this value.

As you can see in Figure 6-9, the primary Spark Streaming task is to receive a data stream from multiple sources, and then build a DStream, which is a sequence of RDDs. Each RDD corresponds to a time slice of the original flow.

A420086_1_En_6_Fig9_HTML.jpg
Figure 6-9. Spark Streaming operation

We can create DStreams from input sources or through applying transformations to other DStreams. The DStreams support most of the operations that RDDs support. Additionally, DStreams have “stateful” operations to aggregate data across time.

In addition to transformations, DStreams support output operations, which are similar to actions in the aspect that RDDs write to external systems; but Spark Streaming batches run periodically, producing the output batch.

For each input source, Spark launches streaming receivers, which are tasks that run on executors, gather data from information sources, and store it in RDDs. The receivers are also responsible for replicating data among other executors to support fault tolerance. This data is stored in the memory of executors in the same way as the RDDs. As shown in Figure 6-10, the StreamingContext in the driver program periodically runs Spark jobs to process this data and combine them with the new RDDs.

A420086_1_En_6_Fig10_HTML.jpg
Figure 6-10. Spark Streaming execution with Spark components

The same fault tolerance properties offered by RDDs are also offered with DStreams. In the event of a failure, Spark can recalculate the DStreams at any point in time. However, the recalculation can take time, especially if the rebuild goes back to the beginning of the execution.

Spark Streaming provides a mechanism called checkpointingthat periodically saves the state to a reliable file system. Typically, a checkpoint occurs every five to ten data batches. When a failure occurs, Spark restores from the last checkpoint.

Transformations

Transformations on DStreams can be grouped into stateless or stateful.

  • Stateless. The data of each processing batch doesn’t depend on the data from previous batches. These include the RDD transformations such as map(), reduce(), and filter().

  • Stateful. The data or intermediate results from previous batches are used to calculate the current batch’s results. These include transformations based on sliding windows and tracking status over time.

Stateless Transformations

Stateless transformations are common RDD transformations applied to each RDD on the DStream. Table 6-11 enumerates the main stateless transformations:

Table 6-11. Main Stateless Transformations

Transformation

Purpose

Example

map(function)

Applies a function to each RDD in the DStream, returning one DStream as a result.

ds.map(_*3)

flatMap(function)

Applies a function to each RDD in the DStream, returning one DStream with the content of the returned iterators.

ds.flatMap( str => str.split(“ ”) )

filter(function)

Builds a DStream with only the RDDs evaluated with true on the function.

ds.filter(_.contains(“k”))

repartition(number)

Changes the number of DStream partitions.

ds.repartition(9)

reduceByKey(function, [number] )

Combines the values with the same key in each batch.

ds.reduceByKey(_+_)

groupByKey()

Groups the values with the same key in each batch.

ds.groupByKey()

Bullet points about stateless transformations:

  • Individual. Although it seems as if the transformation is applied to the whole DStream, it is not. Actually, it is applied individually to each batch element (RDD) of the DStream. For example, with reduceByKey(), the function is applied on each individual RDD, not on the whole DStream.

  • Join. Stateless transformations can also combine data from multiple DStreams. For example, DStreams have the same join transformations than RDDs, these are cogroup(), join(), and leftOuterJoin(). You can use these operations to perform a join in each DStream batch.

  • Merge. You can merge the contents of two different DStreams by using the union() operation, or by using StreamingContext.union() to merge several DStreams.

  • Reuse. DStreams provide the powerful operator called transform() and can operate directly on the RDDs within a DStream. This operation is called on each element of the DStream, producing a new DStream. If you have written code for some RDDs and you want to reuse in Spark Streaming, the transform() method is a good option.

  • Transform. In addition to StreamingContext.transform(), you can combine several DStreams using DStream.transformWith (anotherDStream, function).

Stateful Transformations

Stateful transformations are DStream operations that track data across time; data from old batches are used to generate new batches.

There are two types of stateful transformations:

  • Windowed transformations: Operates on data over a window duration.

  • Update state by key: Tracks the status between the same key events; for example, a user session.

Stateful transformations require checkpointing to enable fault tolerance.

Windowed Operations

Windowed operations calculate results in a period longer than the StreamingContext batch interval time, which allows it to combine the results of several batches.

Windowed operations require two parameters: the window duration and the slide duration. Both must be multiples of the StreamingContext batch interval.

  • The window duration states the number of previous batches that will be considered; this is the formula:

  • Batches considered = Window duration/Batch interval

  • Using Figure 6-11 as an example, a DStream with an interval of 2 seconds and a window duration of 6 seconds considers only the last six batches.

    A420086_1_En_6_Fig11_HTML.jpg
    Figure 6-11. Windowed operations example
  • The slide duration indicates how often you want to calculate the results; the default value is the duration of batch interval.

  • For example, if you have a batch interval of 10 seconds and you calculate our window every 2 seconds, you must change your slide duration to 20 seconds.

The simplest operation that can be performed on a DStream is window(); it returns information about the current window to the DStream.

window(windowLength, slideInterval)

Purpose: Returns a new DStream computed from windowed batches of the source DStream.

Example:

val wind = lines.window(Seconds(30),Seconds(10));
wind.foreachRDD( rdd => { rdd.foreach( x => println(x+ " ")) })

Output:

10 10 20 20 10 30 20 30 40 // drops 10

Spark Streaming provides other windowed operations to improve efficiency. For example, reduceByWindow() and reduceByKeyAndWindow() allow you to make reductions to each window in a very efficient way. They have a special form that calculates the reduction incrementally and considers only the input and output data.

Finally, to count data, DStream offers the following:

  • countByWindow() returns a DStream with the number of elements in each window.

  • countByValueAndWindow() returns a DStream with the counts of each value.

countByWindow( windowLength, slideInterval )

Purpose: Returns a new sliding window count of the elements in a stream.

Example:

lines.countByWindow( Seconds(30), Seconds(10) ).print()

Output:

1 2 3 3
Update State by Key

Sometimes you need to maintain a state between batches in a DStream.

The updateStateByKey()method provides access to DStream state variables by taking a function that specifies how to update each key status given new events.

Using the updateStateByKey() method with an update function(event, oldState) takes the past event (with some key) and its previous state, and returns a new state to make the update.

The following are the parameters of this function:

  • events. Lists events in the current batch (can be empty)

  • newState. (Optional) If you want to delete the previous state.

The result of updateStateByKey() is a new DStream with RDD in pairs (key, state) at each time frame.

Output Operations

Output operations specify what to do with the data transformed into a stream. It could be printed on the screen or stored in a database, for example.

As with the RDD lazy operations, if there is no output operation applied to a DStream or any of its dependents, the DStream won’t be evaluated. In the same way, if there is no output operations on a StreamingContext, it won’t start.

Usually, an output operation used for debugging is a simple print() method. Printing the results on screen counts as an output operation. When going into production, it is vital to consider this. If you remove all the print(), you may be leaving your program without output operations, and it won’t run.

Once our program is debugged, we can use output operations to store our results. Spark Streaming has the RDD save() operation for DStreams. Like the RDD operation, it takes the directory on the file system where you want to store results. Each batch’s results are stored in subdirectories of the specified directory, with the time and the specified suffix as the file name.

Finally, foreachRDD() is a generic output operation used to make computations on each RDD of the DStream. It is similar to transform(). It gives also the time of each batch, which saves each time period in a different location.

24/7 Spark Streaming

Spark Streaming provides a mechanism to ensure fault tolerance. If the input data is stored reliably, Spark Streaming always calculates the result from it, providing the correct semantics (i.e., as if the data had been processed without failing nodes).

Spark Streaming applications that run 24/7 need special installation. The first step is to enable checkpointing on a reliable storage system: HDFS or Amazon S3. Also, note that you must deal with the driver program fault tolerance, changing some portion of the code.

Checkpointing

Checkpointing is the primary mechanism to enable fault tolerance in Spark Streaming. Spark Streaming allows you to periodically save the application data in a reliable file system, such as HDFS, or Amazon S3. Checkpointing has two purposes:

  • Limits the state to be recomputed when a fault occurs. Spark Streaming recalculates the state using a lineage graph of transformations; checkpointing tells you how far back you should go.

  • Driver program fault tolerance. If the driver program of a streaming application crashes, you can start it again and recover from the last checkpoint. Spark Streaming reads how much had processed and will resume from that point.

For these reasons checkpointing is important when you run production Spark Streaming applications.

Note that when running in local mode, Spark Streaming won’t run a stateful operation if you don’t have checkpointing enabled. In this case, you can use a local file system. In a production environment, you should use a replicated file system, such as HDFS, Amazon S3, or NFS.

Spark Streaming Performance

Spark Streaming has specific considerations in addition to Spark performance considerations.

Parallelism Techniques

A common way to reduce batch processing time is by increasing the parallelism. There are three ways to increase it.

  • Increasing parallelism. For operations such as reduceByKey(), you can specify parallelism as an operation parameter.

  • Adding receptors. Receptors can be a bottleneck if there are many records to read and distribute for a single machine. You can add more recipients by creating multiple input DStreams, which creates multiple receivers, and then apply a union() operation to merge into a single stream.

  • Repartitioning data. If you can’t increase the receivers number, you can redistribute the input data by explicitly repartitioning it using the repartition() method.

Window Size and Batch Size

A perfect batch size is a common quest. For most applications, 500 milliseconds is a good number for the minimum batch size . The best method is to start the minimum size batch at a large number, about 10 seconds, and decrease it until reaching the optimal size. If the processing times remain consistent, you can continue decreasing the batch size, but if the time increases, you have found your number.

Similarly, with windowed operations, the interval to calculate a result has great performance impact. If it’s a bottleneck, consider increasing this interval for expensive calculations.

Garbage Collector

Garbage collection is problem associated with JVM. You can minimize garbage collection pauses by enabling the concurrent Mark-and-Sweep garbage collector. Generally, this consumes more resources but has fewer pauses.

If you do not know how to enable it, use -XX: +UseConcMarkSweepGC in spark.executor.extraJavaOptions when launching spark-submit. To use the garbage collector less, you can decrease the GC pressure; for example, by caching RDDs in serialized form.

Summary

This chapter took a very quick look at the engine. You learned how to download, install, and test Apache Spark. You learned about Spark’s main concepts: RDD, run applications, and the RDD operations: transformations and actions. You also saw how to run Apache Spark in cluster mode, how to run the driver program, and how to achieve high availability. Finally, you took a look at Spark Streaming, stateless and stateful transformations, output operations, how to enable 24/7, and how to improve Spark Streaming performance.

In the following chapters, you will see how Apache Spark is the engine of the stack. All the other SMACK technologies are related to Spark.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset