There are multiple components involved in running Spark in distributed mode. In the self-contained application mode (the main program that we have run throughout this book so far), all of these components run on a single JVM. The following diagram elaborates the various components and their functions in running the Scala program in distributed mode:
As a first step, the RDD graph that we construct using the various operations on our RDD (map, filter, join, and so on) is passed to the Directed Acyclic Graph (DAG) scheduler. The DAG scheduler optimizes the flow and converts all RDD operations into groups of tasks called stages. Generally, all tasks before a shuffle are wrapped into a stage. Consider operations in which there is a one-to-one mapping between tasks; for example, a map or filter operator yields one output for every input. If there is a map on an element on RDD followed by a filter, they are generally pipelined (the map and the filter) to form a single task that can be executed by a single worker, not to mention the benefits of data locality. Relating this to our traditional Hadoop MapReduce, where data is written to the disk at every stage, would help us really appreciate the Spark lineage graph.
These shuffle-separated stages are then passed to the task scheduler, which splits them into tasks and submits them to the cluster manager. Spark comes bundled with a simple cluster manager that can receive the tasks and run it against a set of worker nodes. However, Spark applications can also be run on popular cluster managers, such as Mesos and YARN.
With YARN/Mesos, we can run multiple executors on the same worker node. Besides, YARN and Mesos can host non-Spark jobs in their cluster along with Spark jobs.
In the Spark standalone cluster, prior to Spark 1.4, the number of executors per worker node per application was limited to 1. However, we could increase the number of worker instances per worker node using the SPARK_WORKER_INSTANCES
parameter. With Spark 1.4 (https://issues.apache.org/jira/browse/SPARK-1706), we are able to run multiple executors on the same node, just as in Mesos/YARN.
In this recipe, we will be deploying the Spark application on a standalone cluster running on a single machine. For all the recipes in this chapter, we'll be using the binary classification app that we built in the previous chapter as a deployment candidate. This recipe assumes that you have some knowledge of the concepts of HDFS and basic operations on them.
Submitting a Spark job to the local cluster involves the following steps:
Throughout this book, we have been using Spark version 1.4.1, as we can see in our build.sbt
. Now, let's head over to the download page (https://spark.apache.org/downloads.html) and download the spark-1.4.1-bin-hadoop2.6.tgz
bundle, as shown here:
Instead of loading the file from the local filesystem for our Spark application, let's have the file stored away in HDFS. In order to do this, let's have a locally running Pseudo-distributed cluster (https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html#Pseudo-Distributed_Operation) of Hadoop 2.6.0.
After formatting our name node using bin/hdfs namenode -format
and bringing up our data node and name node using sbin/start-dfs.sh
, let's confirm that all the processes that we need are running properly. We do this using Jps
. The following screenshot shows what you are expected to see once you start the dfs
daemon:
In order to submit our
assembly
JAR to a Spark cluster, we have to first bring up the Spark master and worker nodes.
All that we need to do to run Spark on the local machine is go to the downloaded (and extracted) spark
folder and run sbin/start-all.sh
from the spark
home directory. This will bring up the Master and a Worker node of Spark. The Master's web UI is accessible from port 8080. We use this port to check the status of the job. The default service port of the Master is 7077. We'll be using this port to submit our assembly
JAR as a job to the Spark cluster.
Let's confirm the running of the Master
and the Worker
nodes using Jps
:
This just involves running the mkdir
and put
commands on HDFS:
bash-3.2$ hadoop fs -mkdir /scalada bash-3.2$ hadoop fs -put /Users/Gabriel/Apps/SMSSpamCollection /scalada/ bash-3.2$ hadoop fs -ls /scalada Found 1 items -rw-r--r-- 1 Gabriel supergroup 477907 2015-07-18 16:59 /scalada/SMSSpamCollection
We can also confirm this via the HDFS web interface at 50070 and by going to Utilities | Browse the file system, as shown here:
Before we submit the Spark application to be run against the local cluster, let's change the classification program (BinaryClassificationSpam
) to point to the HDFS location:
val docs = sc.textFile("hdfs://localhost:9000/scalada/SMSSpamCollection").map(line => { val words = line.split(" ") Document(words.head.trim(), words.tail.mkString(" ")) })
By default, Spark 1.4.1 uses Hadoop 2.2.0. Now that we are trying to run the job on Hadoop 2.6.0, and are using the Spark binary prebuilt for Hadoop 2.6 and later, let's change build.sbt
to reflect that:
libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided", "com.databricks" %% "spark-csv" % "1.0.3", "org.apache.hadoop" % "hadoop-client" % "2.6.0", ("org.scalanlp" % "epic-parser-en-span_2.10" % "2015.2.19"). exclude("xml-apis", "xml-apis") )
Run sbt clean assembly
to build the Uber JAR, like this:
./bin/spark-submit --class com.packt.scalada.learning.BinaryClassificationSpam --master spark://localhost:7077 --executor-memory 2G --total-executor-cores 2 <project root>/target/scala-2.10/scalada-learning-assembly.jar
Here is the output:
The following screenshot shows that we have successfully run our classification job on a Spark cluster as against the standalone app that we used in the previous chapter: