To understand Hadoop MapReduce fundamentals properly, we will:
As we know, MapReduce operations in Hadoop are carried out mainly by three objects: Mapper, Reducer, and Driver.
(key, value)
pair for the group of values.main()
method that accepts arguments from the command line. The input and output directory of the Hadoop MapReduce job will be accepted by this program. Driver is the main file for defining job configuration details, such as the job name, job input format, job output format, and the Mapper, Combiner, Partitioner, and Reducer classes. MapReduce is initialized by calling this main()
function of the Driver class.Not every problem can be solved with a single Map and single Reduce program, but fewer can't be solved with a single Map and single Reduce task. Sometimes, it is also necessary to design the MapReduce job with multiple Map and Reduce tasks. We can design this type of job when we need to perform data operations, such as data extraction, data cleaning, and data merging, together in a single job. Many problems can be solved by writing multiple Mapper and Reducer tasks for a single job. The MapReduce steps that will be called sequentially in the case of multiple Map and Reduce tasks are Map1 followed by Reduce1, Map2 followed by Reduce2, and so on.
When we need to write a MapReduce job with multiple Map and Reduce tasks, we have to write multiple MapReduce application drivers to run them sequentially.
At the time of the MapReduce job submission, we can provide a number of Map tasks, and a number of Reducers will be created based on the output from the Mapper input and Hadoop cluster capacity. Also, note that setting the number of Mappers and Reducers is not mandatory.
The number of Maps is usually defined by the size of the input data and size of the data split block that is calculated by the size of the HDFS file / data split. Therefore, if we have an HDFS datafile of 5 TB and a block size of 128 MB, there will be 40,960 maps present in the file. But sometimes, the number of Mappers created will be more than this count because of speculative execution. This is true when the input is a file, though it entirely depends on the InputFormat
class.
In Hadoop MapReduce processing, there will be a delay in the result of the job when the assigned Mapper or Reducer is taking a long time to finish. If you want to avoid this, speculative execution in Hadoop can run multiple copies of the same Map or Reduce task on different nodes, and the result from the first completed nodes can be used. From the Hadoop API with the setNumMapTasks(int)
method, we can get an idea of the number of Mappers.
A numbers of Reducers are created based on the Mapper's input. However, if you hardcode the number of Reducers in MapReduce, it won't matter how many nodes are present in a cluster. It will be executed as specified in the configuration.
Additionally, we can set the number of Reducers at runtime along with the MapReduce command at the command prompt -D mapred.reduce.tasks
, with the number you want. Programmatically, it can be set via conf.setNumReduceTasks(int)
.
Now that we have seen the components that make a basic MapReduce job possible, we will distinguish how everything works together at a higher level. From the following diagram, we will understand MapReduce dataflow with multiple nodes in a Hadoop cluster:
The two APIs available for Hadoop MapReduce are: New (Hadoop 1.x and 2.x) and Old Hadoop (0.20). YARN is the next generation of Hadoop MapReduce and the new Apache Hadoop subproject that has been released for Hadoop resource management.
Hadoop data processing includes several tasks that help achieve the final output from an input dataset. These tasks are as follows:
Here, Map and Reduce tasks can be defined for several data operations as follows:
We will explore MapReduce tasks in more detail in the next part of this chapter.
In this section, we will see further details on Hadoop MapReduce dataflow with several MapReduce terminologies and their Java class details. In the MapReduce dataflow figure in the previous section, multiple nodes are connected across the network for performing distributed processing with a Hadoop setup. The ensuing attributes of the Map and Reduce phases play an important role for getting the final output.
The attributes of the Map phase are as follows:
InputFiles
term refers to input, raw datasets that have been created/extracted to be analyzed for business analytics, which have been stored in HDFS. These input files are very large, and they are available in several types.InputFormat
is a Java class to process the input files by obtaining the text of each line of offset and the contents. It defines how to split and read input data files. We can set the several input types, such as TextInputFormat
, KeyValueInputFormat
, and SequenceFileInputFormat
, of the input format that are relevant to the Map and Reduce phase.InputSplits
class is used for setting the size of the data split.RecordReader
is a Java class that comes with several methods to retrieve key and values by iterating them among the data splits. Also, it includes other methods to get the status on the current progress.Mapper
instance is created for the Map phase. The Mapper
class takes input (key, value)
pairs (generated by RecordReader) and produces an intermediate (key, value)
pair by performing user-defined code in a Map()
method. The Map()
method mainly takes two input parameters: key and value; the remaining ones are OutputCollector
and Reporter. OutputCollector
. They will provide intermediate the key-value pair to reduce the phase of the job. Reporter provides the status of the current job to JobTracker periodically. The JobTracker will aggregate them for later retrieval when the job ends.The attributes of the Reduce phase are as follows:
(key, value)
pairs are partitioned based on a key attribute similarity consideration in the hash
function. So, each Map task may emit (key, value)
pairs to partition; all values for the same key are always reduced together without it caring about which Mapper is its origin. This partitioning and shuffling will be done automatically by the MapReduce job after the completion of the Map phase. There is no need to call them separately. Also, we can explicitly override their logic code as per the requirements of the MapReduce job.(key, value)
pairs are sorted based on a key attribute value by the Hadoop MapReduce job.Reduce
instance is created for the Reduce phase. It is a section of user-provided code that performs the Reduce task. A Reduce()
method of the Reducer
class mainly takes two parameters along with OutputCollector
and Reporter
, which is the same as the Map()
function. They are the OutputCollector
and Reporter
objects. OutputCollector
in both Map and Reduce has the same functionality, but in the Reduce phase, OutputCollector
provides output to either the next Map phase (in case of multiple map and Reduce job combinations) or reports it as the final output of the jobs based on the requirement. Apart from that, Reporter
periodically reports to JobTracker about the current status of the running task.OutputFormat
the generated output (key, value) pairs are provided to the OutputCollector
parameter and then written to OutputFiles
, which is governed by OutputFormat
. It controls the setting of the OutputFiles
format as defined in the MapReduce Driver. The format will be chosen from either TextOutputFormat
, SequenceFileOutputFileFormat
, or NullOutputFormat
.RecordWriter
used by OutputFormat
to write the output data in the appropriate format.RecordWriter
after the completion of the MapReduce job.To run this MapReduce job efficiently, we need to have some knowledge of Hadoop shell commands to perform administrative tasks. Refer to the following table:
Shell commands |
Usage and code sample |
---|---|
cat
|
To copy source paths to Hadoop fs -cat URI [URI …]
|
chmod
|
To change the permissions of files: Hadoop fs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI …]
|
copyFromLocal
|
To copy a file from local storage to HDFS: Hadoop fs –copyFromLocal<localsrc> URI
|
copyToLocal
|
To copy a file from HDFS to local storage: Hadoop fs -copyToLocal [-ignorecrc] [-crc] URI <localdst>
|
cp
|
To copy a file from the source to the destination in HDFS: Hadoop fs -cp URI [URI …] <dest>
|
du
|
To display the aggregate length of a file: Hadoop fs -du URI [URI …]
|
dus
|
To display the summary of file length: Hadoop fs -dus<args>
|
get
|
To copy files to a local filesystem: Hadoop fs -get [-ignorecrc] [-crc] <src><localdst>
|
ls
|
To list all files in the current directory in HDFS: Hadoop fs –ls<args>
|
mkdir
|
To create a directory in HDFS: Hadoop fs –mkdir<paths>
|
lv
|
To move files from the source to the destination: Hadoop fs -mv URI [URI …] <dest>
|
rmr
|
To remove files from the current directory: Hadoop fs -rmr URI [URI …]
|
setrep
|
To change the replication factor of a file: Hadoop fs -setrep [-R] <path>
|
tail
|
To display the last kilobyte of a file to Hadoop fs -tail [-f] URI
|