We have already executed spark applications in the Spark standalone resource manager in other sections of this chapter (within the PySpark shell and applications). Let's try to understand how these cluster resource managers are different from each other and when they should be used.
Before moving on to cluster resource managers, let's understand how cluster mode is different from local mode.
It is important to understand the scope and life cycle of variables and methods when executing code across a cluster. Let's look at an example with the foreach
action:
counter = 0 rdd = sc.parallelize(data) rdd.foreach(lambda x: counter += x) print("Counter value: " + counter)
In local mode, the preceding code works fine because the counter variable and RDD are in the same memory space (single JVM).
In cluster mode, the counter value will never change and always remains at 0. In cluster mode, spark computes the closure with variables and methods and ships them to executors. When executors perform the foreach
function it refers to the new copy of the counter on the executor. The counter on the driver is not accessible to the executor. Hence, every time you execute this, a local counter is incremented and never returned to the driver. To address this challenge in cluster mode, create a separate copy of the variable for every closure or use an accumulator.
You can run a Spark application in four different modes:
If the spark.master
(or --master
) configuration property is specified, the application will run on one of the cluster resource managers in client or cluster mode depending on the --deploy-mode
specified.
By default, an application submitted in standalone mode will acquire all cores (in the spark.deploy.defaultCores
property) of the cluster with 1G memory for every executor. In multiple application environments, it is important to cap the resource usage for every application. Cap the usage with the --total-executor-cores
argument to spark-submit or use spark.cores.max
in the spark configuration file. Use the --executor-memory
argument to spark-submit or use spark.executor.memory
in the spark configuration file to cap the amount of memory needed.
In this example, we use a 20-node cluster with four CPU cores in every node:
--executor-memory
1G
and --total-executor-cores 8
: Spark will launch eight executors, each with 1 GB of RAMspark.deploy.spreadOut
to false
in spark conf
: Spark will launch two executors, each with 1 GB RAM and four coresApplications are submitted with --master yarn-client
for client mode and --master yarn-cluster
for cluster mode. In Yarn mode, you can specify the number of executors needed with the number of cores, as opposed to -total-executor-cores
in spark standalone master.
--num-executors
controls how many executors will be allocated (the default is 2). This property is set within the configuration using spark.executor.instances
.--executor-memory
is the RAM for each executor.--executor-cores
is the number of CPU cores for each executor.In this example, we use a 20-node cluster with four CPU cores in every node:
--num-executors 8 --executor-memory 2G --executor-cores 4
: Spark will launch eight executors, each with 2 GB of memory and four coresDownloading the example code
Detailed steps to download the code bundle are mentioned in the Preface of this book. Please have a look.
The code bundle for the book is also hosted on GitHub at https://github.com/PacktPublishing/big-data-analytics. We also have other code bundles from our rich catalog of books and videos available at https://github.com/PacktPublishing/. Check them out!
The dynamic resource allocation feature was introduced in Spark 1.2. An application may give resources back to the cluster if they are no longer used and request them again later when there is demand. Dynamic resource allocation of resources will efficiently control the resource usage on the cluster. As shown in Figure 3.12, there is a big variation in allocated and used resources in all spark applications due to stragglers, scheduling, waiting, idling, and so on.
To enable this feature, the following configuration properties are to be set in an application:
spark.dynamicAllocation.enabled
spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.maxExecutors
spark.dynamicAllocation.initialExecutors
spark.shuffle.service.enabled
When running Spark in YARN client mode, the driver runs on the client machine and application master and executors run on the cluster. Each Spark executor runs as a YARN container in client or cluster mode.
In yarn-cluster mode, the driver runs within the application master. So, the application master is responsible for both running drivers and requesting resources from the YARN resource manager. The client that starts the application doesn't need to stick around for its entire lifetime.
Yarn-cluster is to be used for production jobs, while yarn-client mode is to be used in interactive mode where you want to see your application's output immediately.
Yarn client mode and cluster modes are illustrated in Figure 3.13:
More YARN settings are available at: http://spark.apache.org/docs/latest/running-on-yarn.html
Apache Mesos is a general-purpose cluster manager that can run both analytic workloads and long-running services (for example, Web applications or key-value stores) on a cluster. See the following example usage:
spark-submit --master mesos://masternode:5050
Two types of scheduling modes are available in Mesos:
--conf spark.mesos.coarse=true
When using Spark along with other applications on Hadoop clusters, it is better to use YARN for better sharing of resources. Where better performance and sharing resources are not a constraint, the standalone manager can be used. Mesos and Yarn offer similar resource sharing capabilities. On Hadoop clusters, it makes sense to use YARN because all other frameworks of Hadoop are integrated with Yarn. For clusters other than Hadoop, it makes sense to use Mesos.