Apache Spark

Apache Spark is a fast and general-purpose cluster computing system, initially developed as AMPLab/UC Berkeley as part of the Berkeley Data Analytics Stack (BDAS) (http://en.wikipedia.org/wiki/UC_Berkeley). It provides high-level APIs for the following programming languages that make large and concurrent parallel jobs easy to write and deploy [12:11]:

Note

The link to the latest information

The URLs as any reference to Apache Spark may change in future versions.

The core element of Spark is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of a cluster and/or CPU cores of servers. An RDD can be created from a local data structure such as a list, array, or hash table, from the local filesystem or the Hadoop distributed file system (HDFS).

The operations on an RDD in Spark are very similar to the Scala higher-order methods. These operations are performed concurrently over each partition. Operations on RDDs can be classified as follows:

  • Transformation: This operation converts, manipulates, and filters the elements of an RDD on each partition
  • Action: This operation aggregates, collects, or reduces the elements of the RDD from all partitions

An RDD can be persisted, serialized, and cached for future computation.

Spark is written in Scala and built on top of Akka libraries. Spark relies on the following mechanisms to distribute and partition RDDs:

  • Hadoop/HDFS for the distributed and replicated filesystem
  • Mesos or Yarn for the management of a cluster and shared pool of data nodes

The Spark ecosystem can be represented as stacks of technology and framework, as seen in the following diagram:

Apache Spark

The Apache Spark framework ecosystem

The Spark ecosystem has grown to support some machine learning algorithms out of the box, such as MLlib, a SQL-like interface to manipulate datasets with relational operators, SparkSQL, a library for distributed graphs, GraphX, and a streaming library [12:12].

Why Spark?

The authors of Spark attempt to address the limitations of Hadoop in terms of performance and real-time processing by implementing in-memory iterative computing, which is critical to most discriminative machine learning algorithms. Numerous benchmark tests have been performed and published to evaluate the performance improvement of Spark relative to Hadoop. In the case of iterative algorithms, the time per iteration can be reduced by a ratio of 1:10 or more.

Spark provides a large array of prebuilt transforms and actions that go well beyond the basic map-reduce paradigm. These methods on RDDs are a natural extension of the Scala collections, making code migration seamless for Scala developers.

Finally, Apache Spark supports fault-tolerant operations by allowing RDDs to persist both in memory and in the filesystem. Persistency enables automatic recovery from node failures. The resiliency of Spark relies on the supervisory strategy of the underlying Akka actors, the persistency of their mailboxes, and the replication schemes of the HDFS.

Design principles

The performance of Spark relies on the following five core design principles [12:13]:

  • In-memory persistency
  • Laziness in scheduling tasks
  • Transform and actions applied to RDDs
  • Implementation of shared variables
  • Support for data frames (SQL-aware RDDS)

In-memory persistency

The developer can decide to persist and/or cache an RDD for future usage. An RDD may persist in memory only or on disk only—in memory if available, or on disk otherwise as deserialized or serialized Java objects. For instance, an RDD, rdd, can be cached through serialization through a simple statement, as shown in the following code:

rdd.persist(StorageLevel.MEMORY_ONLY_SER).cache

Note

Kryo serialization

Java serialization through the Serializable interface is notoriously slow. Fortunately, the Spark framework allows the developer to specify a more efficient serialization mechanism such as the Kryo library.

Laziness

Scala supports lazy values natively. The left-hand side of the assignment, which can either be a value, object reference, or method, is performed once, that is, the first time it is invoked, as shown in the following code:

class Pipeline {
  lazy val x = { println("x"); 1.5}   
  lazy val m = { println("m"); 3}   
  val n = { println("n"); 6}   
  def f = (m <<1)
  def g(j: Int) = Math.pow(x, j)
}
val pipeline = new Pipeline  //1
pipeline.g(pipeline.f)  //2

The order of the variables printed is n, m, and then x. The instantiation of the Pipeline class initializes n but not m or x (line 1). At a later stage, the g method is called, which in turn invokes the f method. The f method initializes the m value it needs, and then g initializes x to compute its power to m <<1 (line 2).

Spark applies the same principle to RDDs by executing the transformation only when an action is performed. In other words, Spark postpones memory allocation, parallelization, and computation until the driver code gets the result through the execution of an action. The cascading effect of invoking all these transformations backward is performed by the direct acyclic graph scheduler.

Transforms and actions

Spark is implemented in Scala, so you should not be too surprised to know that the most relevant Scala higher methods on collections are supported in Spark. The first table describes the transformation methods using Spark, as well as their counterparts in the Scala standard library. We use the (K, V) notation for (key, value) pairs:

Spark

Scala

Description

map(f)

map(f)

This transforms an RDD by executing the f function on each element of the collection

filter(f)

filter(f)

This transforms an RDD by selecting the element for which the f function returns true

flatMap(f)

flatMap(f)

This transforms an RDD by mapping each element to a sequence of output items

mapPartitions(f)

 

This executes the map method separately on each partition

sample

 

This samples a fraction of the data with or without a replacement using a random generator

groupByKey

groupBy

This is called on (K,V) to generate a new (K, Seq(V)) RDD

union

union

This creates a new RDD as an union of this RDD and the argument

distinct

distinct

This eliminates duplicate elements from this RDD

reduceByKey(f)

reduce

This aggregates or reduces the value corresponding to each key using the f function

sortByKey

sortWith

This reorganizes (K,V) in an RDD by ascending, descending, or otherwise specified order of the keys, K

join

 

This joins an RDD (K,V) with an RDD (K,W) to generate a new RDD (K, (V,W))

coGroup

 

This implements a join operation but generates an RDD (K, Seq(V), Seq(W))

Action methods trigger the collection or the reduction of the datasets from all partitions back to the driver, as listed here:

Spark

Scala

Description

reduce(f)

reduce(f)

This aggregates all the elements of the RDD across all the partitions and returns a Scala object to the driver

collect

collect

This collects and returns all the elements of the RDD across all the partitions as a list in the driver

count

count

This returns the number of elements in the RDD to the driver

first

head

This returns the first element of the RDD to the driver

take(n)

take(n)

This returns the first n elements of the RDD to the driver

takeSample

 

This returns an array of random elements from the RDD back to the driver

saveAsTextFile

 

This writes the elements of the RDD as a text file in either the local filesystem or HDFS

countByKey

 

This generates an (K, Int) RDD with the original keys, K, and the count of values for each key

foreach

foreach

This executes a T=> Unit function on each elements of the RDD

Scala methods such as fold, find, drop, flatten, min, max, and sum are not currently implemented in Spark. Other Scala methods such as zip have to be used carefully, as there is no guarantee that the order of the two collections in zip is maintained between partitions.

Shared variables

In a perfect world, variables are immutable and local to each partition to avoid race conditions. However, there are circumstances where variables have to be shared without breaking the immutability provided by Spark. To this extent, Spark duplicates shared variables and copies them to each partition of the dataset. Spark supports the following types of shared variables:

  • Broadcast values: These values encapsulate and forward data to all the partitions
  • Accumulator variables: These variables act as summations or reference counters

The four design principles can be summarized in the following diagram:

Shared variables

An interaction between the Spark driver and RDDs

The preceding diagram illustrates the most common interaction between the Spark driver and its workers, as listed in the following steps:

  1. The input data, residing in either the memory as a Scala collection or HDFS as a text file, is parallelized and partitioned into an RDD.
  2. A transformation function is applied to each element of the dataset across all the partitions.
  3. An action is performed to reduce and collect the data back to the driver.
  4. The data is processed locally within the driver.
  5. A second parallelization is performed to distribute computation through the RDDs.
  6. A variable is broadcast to all the partitions as an external parameter of the last RDD transformation.
  7. Finally, the last action aggregates and collects the final result back in the driver.

If you take a look at it closely, the management of datasets and RDDs by the Spark driver is not very different from that by the Akka master and worker actors of futures.

Experimenting with Spark

Spark's in-memory computation for iterative computing makes it an excellent candidate to distribute the training of machine learning models, implemented with dynamic programming or optimization algorithms. Spark runs on Windows, Linux, and Mac OS operating systems. It can be deployed either in local mode for a single host or master mode for a distributed environment. The version of the Spark framework used is 1.3.

Note

JVM and Scala compatible versions

At the time of writing, the version of Spark 1.3.0 required Java 1.7 or higher and Scala 2.10.2 or higher. Spark 1.5.0 supports Scala 2.11 but requires the framework to be reassembled with the flag D-scala2.11.

Deploying Spark

The easiest way to learn Spark is to deploy a localhost in standalone mode. You can either deploy a precompiled version of Spark from the website, or build the JAR files using the simple build tool (sbt) or Maven [12:14] as follows:

  1. Go to the download page at http://spark.apache.org/downloads.html.
  2. Choose a package type (Hadoop distribution). The Spark framework relies on the HDFS to run in cluster mode; therefore, you need to select a distribution of Hadoop or an open source distribution such as MapR or Cloudera.
  3. Download and decompress the package.
  4. If you are interested in the latest functionality added to the framework, check out the newest source code at http://github.com/apache/spark.git.
  5. Next, you need to build, or assemble, the Apache Spark libraries from the top-level directory using either Maven or sbt:
    • Maven: Set the following Maven options to support build, deployment, and execution:
      MAVEN_OPTS="-Xmx4g -XX:MaxPermSize=512M 
                -XX:ReservedCodeCacheSize=512m"
      mvn [args] –DskipTests clean package
      

      The following are some examples:

      • Building on Hadoop 2.4 using Yarn clusters manager and Scala 2.10 (default):
        mvn -Pyarn –Phadoop-2.4 –Dhadoop.version-2.4.0 –DskipTests 
            clean package
        
      • Building on Hadoop 2.6 using Yarn clusters manager and Scala 2.11:
        mvn -Pyarn –Phadoop-2.6 –Dhadoop.version-2.6.0 –Dscala-2.11 
            –DskipTests clean package
        
      • A simple build tool: Use the following command:
        sbt/sbt [args] assembly
        

      The following are some examples:

      • Building on Hadoop 2.4 using Yarn clusters manager and Scala 2.10 (default):
            sbt -Pyarn –pHadoop 2.4 assembly
        
      • Building on Hadoop 2.6 using Yarn clusters manager and Scala 2.11:
            sbt -Pyarn –pHadoop 2.6 –Dscala-2.11 assembly
        

Note

Installation instructions

The directory and name of artifacts used in Spark will undoubtedly change over time. You can refer to the documentation and installation guide for the latest version of Spark.

Apache supports multiple deployment modes:

  • Standalone mode: The drivers and executors run as master and slave Akka actors, bundled with the default spark distribution JAR file.
  • Local mode: This is a standalone mode running on a single host. The slave actors are deployed across multiple cores within the same host.
  • Yarn clusters manager: Spark relies on the Yarn resource manager running on Hadoop version 2 and higher. The Spark driver can run either on the same JVM as the client application (client mode) or on the same JVM as the master (cluster mode).
  • Apache Mesos resource manager: This deployment allows dynamic and scalable partitioning. Apache Mesos is an open source and general-purpose cluster manager that has to be installed separately (refer to http://mesos.apache.org/). Mesos manages abstracted the hardware artifacts such as memory or storage.

The communication between a master node (or driver), cluster manager, and set of slave (or worker) nodes is illustrated in the following diagram:

Deploying Spark

The communication between a master, slave nodes, and a cluster manager

Note

Installation under Windows

Hadoop relies on some UNIX/Linux utilities that need to be added to the development environment when running on Windows. The winutils.exe file has to be installed and added to the HADOOP_PATH environment variable.

Using Spark shell

Use any of the following methods to use the Spark shell:

  • The shell is an easy way to get your feet wet with Spark-resilient distributed datasets. To launch the shell locally, execute ./bin/spark-shell –master local[8] to execute the shell on an 8-core localhost.
  • To launch a Spark application locally, connect to the shell and execute the following command line:
    ./bin/spark-submit --class application_class --master local[4] 
       --executor-memory 12G  --jars myApplication.jar 
       –class myApp.class

    The command launches the application, myApplication, with the myApp.main main method on a 4-core CPU localhost and 12 GB of memory.

  • To launch the same Spark application remotely, connect to the shell execute the following command line:
    ./bin/spark-submit --class application_class 
       --master spark://162.198.11.201:7077 
       –-total-executor-cores 80  
       --executor-memory 12G  
       --jars myApplication.jar –class myApp.class

The output will be as follows:

Using Spark shell

A partial screenshot of the Spark shell command line output

Note

Potential pitfalls with the Spark shell

Depending on your environment, you might need to disable logging information into the console by reconfiguring conf/ log4j.properties. The Spark shell might also conflict with the declaration of classpath in the profile or the environment variables' list. In this case, it has to be replaced by ADD_JARS as an environment variable such as ADD_JARS = path1/jar1, path2/jar2.

MLlib

MLlib is a scalable machine learning library built on top of Spark. As of version 1.0, the library is a work in progress.

The main components of the library are as follows:

  • Classification algorithms, including logistic regression, Naïve Bayes, and support vector machines
  • Clustering limited to K-means in version 1.0
  • L1 and L1 regularization
  • Optimization techniques such as gradient descent, logistic gradient and stochastic gradient descent, and L-BFGS
  • Linear algebra such as the singular value decomposition
  • Data generator for K-means, logistic regression, and support vector machines

The machine learning bytecode is conveniently included in the Spark assembly JAR file built with the simple build tool.

RDD generation

The transformation and actions are performed on RDDs. Therefore, the first step is to create a mechanism to facilitate the generation of RDDs from a time series. Let's create an RDDSource singleton with a convert method that transforms a time series xt into an RDD, as shown here:

def convert(
    xt: immutable.Vector[DblArray], 
    rddConfig: RDDConfig) 
    (implicit sc: SparkContext): RDD[Vector] = {

  val rdd: RDD[Vector] = 
     sc.parallelize(xt.toVector.map(new DenseVector(_))) //3
  rdd.persist(rddConfig.persist) //4
  if( rddConfig.cache) rdd.cache  //5
  rdd
}

The last rddConfig argument of the convert method specifies the configuration for the RDD. In this example, the configuration of the RDD consists of enabling/disabling cache and selecting the persistency model, as follows:

case class RDDConfig(val cache: Boolean, 
    val persist: StorageLevel)

It is fair to assume that SparkContext has already been implicitly defined in a manner quite similar to ActorSystem in the Akka framework.

The generation of the RDD is performed in the following steps:

  1. Create an RDD using the parallelize method of the context and convert it into a vector (SparseVector or DenseVector) (line 3).
  2. Specify the persistency model or the storage level if the default level needs to be overridden for the RDD (line 3).
  3. Specify whether the RDD has to persist in memory (line 5).

Note

An alternative for the creation of an RDD

An RDD can be generated from data loaded from either the local filesystem or HDFS using the SparkContext.textFile method that returns an RDD of a string.

Once the RDD is created, it can be used as an input for any algorithm defined as a sequence of transformation and actions. Let's experiment with the implementation of the K-means algorithm in Spark/MLlib.

K-means using Spark

The first step is to create a SparkKMeansConfig class to define the configuration of the Apache Spark K-means algorithm, as follows:

class SparkKMeansConfig(K: Int, maxIters: Int, 
     numRuns: Int = 1) {   
  val kmeans: KMeans = {      
    (new KMeans).setK(K) //6
      .setMaxIterations(maxIters)  //7
      .setRuns(numRuns) //8
  }
}

The minimum set of initialization parameters for MLlib K-means algorithm is as follows:

  • The number of clusters, K (line 6)
  • The maximum number of iterations for the reconstruction of the total errors, maxIters (line 7)
  • The number of training runs, numRuns (line 8)

The SparkKMeans class wraps the Spark KMeans into a data transformation of the ITransform type, as described in the Monadic data transformation section in Chapter 2, Hello World! The class follows the design template for a classifier, as explained in the Design template for immutable classifiers section in the Appendix A, Basic Concepts:

class SparkKMeans(    //9
    kMeansConfig: SparkKMeansConfig, 
    rddConfig: RDDConfig, 
    xt: Vector[DblArray])
   (implicit sc: SparkContext) extends ITransform[DblArray](xt){

  type V = Int   //10
  val model: Option[KMeansModel] = train  //11

  override def |> : PartialFunction[DblArray, Try[V]] //12
  def train: Option[KMeansModel] 
}

The constructor takes three arguments: the Apache Spark KMeans configuration kMeansConfig, the RDD configuration rddConfig, and the xt input time series for clustering (line 9). The return type of the ITransform trait's partial function |> is defined as an Int (line 10).

The generation of model merely consists of converting the time series xt into an RDD using rddConfig and invoking MLlib KMeans.run (line 11). Once it is created, the model of clusters (KMeansModel) is available for predicting a new observation, x, (line 12), as follows:

override def |> : PartialFunction[DblArray, Try[V]] = {
  case x: DblArray if(x.length > 0 && model != None) => 
     Try[V](model.get.predict(new DenseVector(x)))
}

The |> prediction method returns the index of the cluster of observations.

Finally, let's write a simple client program to exercise the SparkKMeans model using the volatility of the price of a stock and its daily trading volume. The objective is to extract clusters with features (volatility and volume), each cluster representing a specific behavior of the stock:

val K = 8
val RUNS = 16
val MAXITERS = 200
val PATH = "resources/data/chap12/CSCO.csv"
val CACHE = true

val sparkConf = new SparkConf().setMaster("local[8]")
   .setAppName("SparkKMeans")
   .set("spark.executor.memory", "2048m") //13
implicit val sc = new SparkContext(sparkConf) //14

extract.map { case (vty,vol)  => {  //15
  val vtyVol = zipToSeries(vty, vol)  
  val conf = SparkKMeansConfig(K,MAXITERS,RUNS) //16
  val rddConf = RDDConfig(CACHE, 
                    StorageLevel.MEMORY_ONLY) //17

  val pfnSparkKMeans = SparkKMeans(conf,rddConf,vtyVol) |> //18
  val obs = Array[Double](0.23, 0.67)
  val clusterId = pfnSparkKMeans(obs)
}

The first step is to define the minimum configuration for the sc context (line 13) and initialize it (line 14). The vty and vol volatility variables are used as features for K-means and extracted from a CSV file (line 15):

def extract: Option[(DblVector, DblVector)] = {
  val extractors = List[Array[String] => Double](
    YahooFinancials.volatility, YahooFinancials.volume 
  )
  val pfnSrc = DataSource(PATH, true) |>
  pfnSrc( extractors ) match {
    case Success(x) => Some((x(0).toVector, x(1).toVector))
    case Failure(e) => { error(e.toString); None }
  }
}

The execution creates a configuration config for the K-means (line 16) and another configuration for the Spark RDD, rddConfig, (line 17). The pfnSparkKMeans partial function, which implements the K-means algorithm, is created with the K-means, RDD configurations, and the input data vtyVol (line 18).

Performance evaluation

Let's execute the normalization of the cross-validation groups on an 8-core CPU machine with 32 GB of RAM. The data is partitioned with a ratio of two partitions per CPU core.

Note

A meaningful performance test

The scalability test should be performed with a large number of data points (normalized volatility, normalized volume), in excess of 1 million, in order to estimate the asymptotic time complexity.

Tuning parameters

The performance of a Spark application depends greatly on the configuration parameters. Selecting the appropriate value for those configuration parameters in Spark can be overwhelming—there are 54 configuration parameters as of the last count. Fortunately, the majority of those parameters have relevant default values. However, there are few parameters that deserve your attention, including the following:

  • The number of cores available to execute transformation and actions on RDDs (config.cores.max).
  • Memory available for the execution of the transformation and actions (spark.executor.memory). Setting the value to 60 percent of the maximum JVM heap is a generally a good compromise.
  • The number of concurrent tasks to use across all the partitions for shuffle-related operations; they use a key such as reduceByKey (spark.default.parallelism). The recommended formula is parallelism = total number of cores x 2. The value of the parameter can be overridden with the spark.reduceby.partitions parameter for specific RDD reducers.
  • A flag to compress a serialized RDD partition for MEMORY_ONLY_SER (spark.rdd.compress). The purpose is to reduce memory footprints at the cost of extra CPU cycles.
  • The maximum size of messages containing the results of an action is sent to the spark.akka.frameSize driver. This value needs to be increased if a collection may potentially generate a large size array.
  • A flag to compress large size broadcasted spark.broadcast.compress variables. It is usually recommended.

Tests

The purpose of the test is to evaluate how the execution time is related to the size of the training set. The test executes K-means from the MLlib library on the volatility and trading session volume on the Bank of America (BAC) stock over the following periods: 3 months, 6 months, 12 months, 24 months, 48 months, 60 months, 72 months, 96 months, and 120 months.

The following configuration is used to perform the training of K-means: 10 clusters, 30 maximum iterations, and 3 runs. The test is run on a single host with 8-CPU cores and 32 GB of RAM. The test was conducted with the following values of parameters:

  • StorageLevel = MEMORY_ONLY
  • spark.executor.memory = 12G
  • spark.default.parallelism = 48
  • spark.akka.frameSize = 20
  • spark.broadcast.compress = true
  • No serialization

The first step after executing a test for a specific dataset is to log in to the Spark monitoring console at http://host_name:4040/stages:

Tests

The average duration of the K-means clustering versus size of trading data in months

Obviously, each environment produces somewhat different performance results but confirms that the time complexity of the Spark K-means is a linear function of the training set.

Note

Performance evaluation in a distributed environment

A Spark deployment on multiple hosts will add latency to the overall execution time of the TCP communication. The latency is related to the collection of the results of the clustering back to the Spark driver, which is negligible and independent of the size of the training set.

Performance considerations

This test barely scratches the surface of the capabilities of Apache Spark. The following are the lessons learned from personal experience in order to avoid the most common performance pitfalls when deploying Spark 1.3+:

  • Get acquainted with the most common Spark configuration parameters regarding partitioning, storage level, and serialization.
  • Avoid serializing complex or nested objects unless you use an effective Java serialization library such as Kryo.
  • Look into defining your own partitioning function to reduce large key-value pair datasets. The convenience of reduceByKey has its price. The ratio of number of partitions to number of cores has an impact on the performance of a reducer using keys.
  • Avoid unnecessary actions such as collect, count, or lookup. An action reduces the data residing in the RDD partitions, and then forwards it to the Spark driver. The Spark driver (or master) program runs on a single JVM with limited resources.
  • Rely on shared or broadcast variables whenever necessary. Broadcast variables, for instance, improve the performance of operations on multiple datasets with very different sizes. Let's consider the common case of joining two datasets of very different sizes. Broadcasting the smaller dataset to each partition of the RDD of the larger dataset is far more efficient than converting the smaller dataset into an RDD and executing a join operation between the two datasets.
  • Use an accumulator variable for summation as it is faster than using a reduce action on an RDD.

Pros and cons

An increasing number of organizations are adopting Spark as their distributed data processing platform for real-time or pseudo real-time operations. There are several reasons for the fast adoption of Spark:

  • It is supported by a large and dedicated community of developers [12:15]
  • In-memory persistency is ideal for iterative computation found in machine learning and statistical inference algorithms
  • Excellent performance and scalability that can be extended with the Streaming module
  • Apache Spark leverages Scala functional capabilities and a large number of open source Java libraries
  • Spark can leverage the Mesos or Yarn cluster manager, which reduces the complexity of defining fault-tolerance and load balancing between worker nodes
  • Spark needs to be integrated with commercial Hadoop vendors such as Cloudera

However, no platform is perfect and Spark is no exception. The most common complaints or concerns regarding Spark are as follows:

  • Creating a Spark application can be intimidating for a developer with no prior knowledge of functional programming.
  • The integration with the database has been somewhat lagging, relying heavily on Hive. The Spark development team has started to address these limitations with the introduction of SparkSQL and data frame RDDs.

0xdata Sparkling Water

Sparkling Water is an initiative to integrate 0xdata H2O with Spark and complement MLlib [12:16]. H2O from 0xdata is a very fast, open source, in-memory platform for machine learning for very large datasets (http://0xdata.com/product/). The framework is worth mentioning for the following reasons:

  • It has a Scala API
  • It is fully dedicated to machine learning and predictive analytics
  • It leverages both the frame data representation of H2O and in-memory clustering of Spark

H2O has an extensive implementation of the generalized linear model and gradient boosted classification, among other goodies. Its data representation consists of hierarchical data frames. A data frame is a container of vectors potentially shared with other frames. Each vector is composed of data chunks, which themselves are containers of data elements [12:17]. At the time of writing, Sparkling Water is in beta version.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset