In this chapter, we will understand the basics of Hadoop and Spark, how Spark is different from MapReduce, and get started with the installation of clusters and setting up the tools needed for analytics.
This chapter is divided into the following subtopics:
Apache Hadoop is a software framework that enables distributed processing on large clusters with thousands of nodes and petabytes of data. Apache Hadoop clusters can be built using commodity hardware where failure rates are generally high. Hadoop is designed to handle these failures gracefully without user intervention. Also, Hadoop uses the move computation to the data approach, thereby avoiding significant network I/O. Users will be able to develop parallel applications quickly, focusing on business logic rather than doing the heavy lifting of distributing data, distributing code for parallel processing, and handling failures.
Apache Hadoop has mainly four projects: Hadoop Common, Hadoop Distributed File System (HDFS), Yet Another Resource Negotiator (YARN), and MapReduce.
In simple words, HDFS is used to store data, MapReduce is used to process data, and YARN is used to manage the resources (CPU and memory) of the cluster and common utilities that support Hadoop. Apache Hadoop integrates with many other projects, such as Avro, Hive, Pig, HBase, Zookeeper, and Apache Spark.
Hadoop mainly brings the following three components to the table:
Let's take a look at Hadoop's adoption drivers with respect to the economy, business, and technical areas:
The following list provides the typical characteristics of Hadoop:
HDFS is a distributed filesystem that provides high scalability and reliability on large clusters of commodity hardware.
HDFS files are divided into large blocks that are typically 128 MB in size and distributed across the cluster. Each block is replicated (typically three times) to handle hardware failures and block placement exposed by NameNode so that computation can be moved to data with the MapReduce framework, as illustrated in Figure 2.1:
In the preceding image, when storing File1, it's divided into a single block (B1) as its size (100 MB) is less than the default block size (128 MB) and replicated on Node 1, Node 2, and Node 3. Block1 (B1) is replicated on the first node (Node 1) and then Node 1 replicates on Node 2 and Node 2 replicates on Node 3. File2 is divided into two blocks as its size (150 MB) is greater than the block size, and block2 (B2) and block3 (B3) are replicated on three nodes (B2 on Node 1, Node 3, and Node 4 and B3 on Node 1, Node 2, and Node 3). Blocks' metadata (file name, blocks, location, date created, and size) is stored in NameNode, as shown in the preceding image. HDFS has a bigger block size to reduce the number of disk seeks needed to read the complete file.
The creation of a file seems like a single file to the user. However, it is stored as blocks on DataNodes and metadata is stored in NameNode. If we lose the NameNode for any reason, blocks stored on DataNodes become useless as there is no way to identify the blocks belonging to the file names. So, creating NameNode high availability and metadata backups is very important in any Hadoop cluster.
HDFS is becoming a standard enterprise Big Data storage system because of the unlimited scalability and yet provides most features needed for enterprise-grade Big Data applications. The following table explains the important features of HDFS:
MapReduce (MR) is a framework to write analytical applications in batch mode on terabytes or petabytes of data stored on HDFS. An MR job usually processes each block (excluding replicas) of input file(s) in HDFS with the mapper tasks in a parallel manner. The MR framework sorts and shuffles the outputs of the mappers to the reduce tasks in order to produce the output. The framework takes care of computing the number of tasks needed, scheduling tasks, monitoring them, and re-executing them if they fail. The developer needs to focus only on writing the business logic, and all the heavy lifting is done by the HDFS and MR frameworks.
For example, in Figure 2.1, if an MR job is submitted for File1, one map task will be created and run on any Node 1, 2, or 3 to achieve data locality. In the case of File2, two map tasks will be created with map task 1 running on Node 1, 3, or 4, and map task 2 running on Node 1, 2, or 3, depending on resource availability. The output of the mappers will be sorted and shuffled to reducer tasks. By default, the number of reducers is one. However, the number of reducer tasks can be increased to provide parallelism at the reducer level.
MR provides you with excellent features to build Big Data applications. The following table describes MR's key features and techniques used, such as sorting and joining:
Apache Hadoop's MapReduce has been a core processing engine that supports the distributed processing of large-scale data workloads. MR has undergone a complete refurbishment in the Hadoop 0.23 version and now it's called MapReduce 2.0 (MR v2) or YARN.
MapReduce v1, which is also called Classic MapReduce, has three main components:
MapReduce v2, which is also called NextGen, moves resource management to YARN, as shown in Figure 2.2:
MapReduce v1 had three challenges:
The following table shows you the differences between v1 and v2:
MR v1 |
MR v2 | |
---|---|---|
Components used |
Job tracker as master and task tracker as slave |
Resource manager as master and node manager as slave |
Resource allocation |
DataNodes are configured to run a fixed number of map tasks and reduce tasks |
Containers are allocated as needed for any type of task |
Resource management |
One job tracker per cluster, which supports up to 4,000 nodes |
One resource manager per cluster, which supports up to tens of thousands of nodes |
Types of jobs |
MR jobs only |
Supports MR and other frameworks such as Spark, Impala, and Giraph |
YARN is the resource management framework that enables an enterprise to process data in multiple ways simultaneously for batch processing, interactive analytics, or real-time analytics on shared datasets. While HDFS provides scalable, fault-tolerant, and cost-efficient storage for Big Data, YARN provides resource management to clusters. Figure 2.3 shows you how multiple frameworks are typically run on top of HDFS and YARN frameworks in Hadoop 2.0. YARN is like an operating system for Hadoop, which manages the cluster resources (CPU and Memory) efficiently. Applications such as MapReduce, Spark, and others request YARN to allocate resources for their tasks. YARN allocates containers on nodes with the requested amount of RAM and virtual CPU from the total available on that node:
YARN's original purpose was to split up the two major responsibilities of the JobTracker/TaskTracker (which are part of MapReduce v1) into separate entities:
ResourceManager keeps track of the resource availability of the entire cluster and provides resources to applications when requested by ApplicationMaster.
ApplicationMaster negotiates the resources needed by the application to run their tasks. ApplicationMaster also tracks and monitors the progress of the application. Note that this monitoring functionality was handled by TaskTrackers and JobTrackers in MR v1, which led to overloading the JobTracker.
NodeManager is responsible for launching containers provided by ResourceManager, monitoring the resource usage on the slave nodes, and reporting to ResourceManager.
The application container is responsible for running the tasks of the application. YARN also has pluggable schedulers (Fair Scheduler and Capacity Scheduler) to control the resource assignments to different applications. Detailed steps of the YARN application life cycle are shown in Figure 2.4 with two resource requests by an application:
The following is our interpretation of the preceding figure:
Let's summarize the preceding points concerning YARN:
XML and JSON files are well-accepted industry standard formats. So, why can't we just use XML or JSON files on Hadoop? There are many disadvantages of XML and JSON, including the following:
When storing data and building applications on Hadoop, some fundamental questions arises: What storage format is useful for my application? What compression codec is optimum for my application?
Hadoop provides you with a variety of file formats built for different use cases. Choosing the right file format and compression codec provides optimum performance for the use case that you are working on. Let's go through the file formats and understand when to use them.
File formats are divided into two categories. Hadoop can store all the data regardless of what format the data is stored in. Data can be stored in its raw form using the standard file format or the special Hadoop container file format that offers benefits in specific use case scenarios, which can be split even when data is compressed. Broadly, there are two types of file formats: Standard file formats and Hadoop file formats:
Provides splittable compression
Sequence file
Thrift
Protocol buffers
Avro
RCFile
ORCFile
Parquet
Let's go through the Hadoop file format features and use cases in which they can be used.
Sequence files store data as binary key-value pairs. It supports the Java language only and does not support schema evolution. It supports the splitting of files even when the data is compressed.
Let's see a use case for the sequence file:
Key |
Value |
Key |
Value |
Key |
Value |
---|---|---|---|---|---|
|
|
|
|
|
|
Protocol buffers were developed by Google and open sourced in 2008. Thrift was developed at Facebook and offers more features and language support than protocol buffers. Both of these are serialization frameworks that offer high performance while sending over the network. Avro is a specialized serialization format that is designed for Hadoop.
A generic usage pattern for protocol buffers and thrift is as follows:
Avro is a row-based data serialization system used for storage and sends data over the network efficiently. Avro provides the following benefits:
Parquet is a columnar format that skips I/O and decompression (if applicable) on columns that are not part of the query. It is generally very efficient in terms of compression on columns because column data is similar within the same column than it is in a block of rows.
A use case for Parquet is as follows:
Record Columnar File (RCFile) was the first columnar format for Hive that provided efficient query processing. Optimized Row Columnar (ORC) format was introduced in Hive 0.11 and offered better compressions and efficiency than the RCFile format. ORCFile has lightweight indexing that enables the skipping of irrelevant columns.
A use case for ORC and Parquet files is as follows:
A variety of compression formats are available for Hadoop storage. However, if Hadoop storage is cheap, then why do I need to compress my data? The following list answers your question:
Compression and decompression increases CPU time. Understanding these trade-offs is very important in providing optimum performance of jobs running on Hadoop.
The following table shows you the standard compression formats available on the Hadoop platform:
Compression format |
Tool |
Algorithm |
File extension |
Splittable? |
---|---|---|---|---|
gzip |
Gzip |
DEFLATE |
|
No |
bzip2 |
bizp2 |
bzip2 |
|
Yes |
LZO |
Lzop |
LZO |
|
Yes, if indexed |
Snappy |
N/A |
Snappy |
|
No |
Recommended usage patterns for compression are as follows: