Now we will move forward with MapReduce by learning a very common and easy example of word count. The goal of this example is to calculate how many times each word occurs in the provided documents. These documents can be considered as input to MapReduce's file.
In this example, we already have a set of text files—we want to identify the frequency of all the unique words existing in the files. We will get this by designing the Hadoop MapReduce phase.
In this section, we will see more on Hadoop MapReduce programming using Hadoop MapReduce's old API. Here we assume that the reader has already set up the Hadoop environment as described in Chapter 1, Getting Ready to Use R and Hadoop. Also, keep in mind that we are not going to use R to count words; only Hadoop will be used here.
Basically, Hadoop MapReduce has three main objects: Mapper, Reducer, and Driver. They can be developed with three Java classes; they are the Map
class, Reduce
class, and Driver
class, where the Map
class denotes the Map phase, the Reducer
class denotes the Reduce phase, and the Driver
class denotes the class with the main()
method to initialize the Hadoop MapReduce program.
In the previous section of Hadoop MapReduce fundamentals, we already discussed what Mapper, Reducer, and Driver are. Now, we will learn how to define them and program for them in Java. In upcoming chapters, we will be learning to do more with a combination of R and Hadoop.
There are many languages and frameworks that are used for building MapReduce, but each of them has different strengths. There are multiple factors that by modification can provide high latency over MapReduce. Refer to the article 10 MapReduce Tips by Cloudera at http://blog.cloudera.com/blog/2009/05/10-mapreduce-tips/.
To make MapReduce development easier, use Eclipse configured with Maven, which supports the old MapReduce API.
Let's see the steps to run a MapReduce job with Hadoop:
Map.java
, Reduce.java
, and WordCount.java
, used for calculating the frequency of the word in the provided text files.Map.java
: This is the Map class for the word count Mapper.// Defining package of the class package com.PACKT.chapter1; // Importing java libraries import java.io.*; importjava.util.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; // Defining the Map class public class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{ //Defining the map method – for processing the data with // problem specific logic public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { // For breaking the string to tokens and convert them to lowercase StringTokenizer st = new StringTokenizer(value.toString().toLowerCase()); // For every string tokens while(st.hasMoreTokens()) { // Emitting the (key,value) pair with value 1. output.collect(new Text(st.nextToken()), new IntWritable(1)); } } }
Reduce.java
: This is the Reduce class for the word count Reducer.// Defining package of the class package com.PACKT.chapter1; // Importing java libraries import java.io.*; importjava.util.*; import org.apache.hadoop.io.*; importorg.apache.hadoop.mapred.*; // Defining the Reduce class public class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { // Defining the reduce method for aggregating the //generated output of Map phase public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException { // Setting initial counter value as 0 int count = 0; // For every element with similar key attribute, increment its counter value by adding 1. while(values.hasNext()) { count += values.next().get(); } // Emitting the (key,value) pair output.collect(key, new IntWritable(count)); } }
WordCount.java
: This is the task of Driver in the Hadoop MapReduce Driver main file.//Defining package of the class package com.PACKT.chapter1; // Importing java libraries import java.io.*; importorg.apache.hadoop.fs.*; import org.apache.hadoop.io.*; importorg.apache.hadoop.mapred.*; importorg.apache.hadoop.util.*; importorg.apache.hadoop.conf.*; //Defining wordcount class for job configuration // information public class WordCount extends Configured implements Tool{ publicint run(String[] args) throws IOException{ JobConfconf = new JobConf(WordCount.class); conf.setJobName("wordcount"); //For defining the output key format conf.setOutputKeyClass(Text.class); //For defining the output value format conf.setOutputValueClass(IntWritable.class); // For defining the Mapper class implementation conf.setMapperClass(Map.class); // For defining the Reducer class implementation conf.setReducerClass(Reduce.class); // For defining the type of input format conf.setInputFormat(TextInputFormat.class); // For defining the type of output format conf.setOutputFormat(TextOutputFormat.class); // For defining the command line argument sequence for // input dataset path FileInputFormat.setInputPaths(conf, new Path(args[0])); // For defining the command line argument sequence for // output dataset path FileOutputFormat.setOutputPath(conf, new Path(args[1])); // For submitting the configuration object JobClient.runJob(conf); return 0; } // Defining the main() method to start the execution of // the MapReduce program public static void main(String[] args) throws Exception { intexitCode = ToolRunner.run(new WordCount(), args); System.exit(exitCode); } }
// create a folder for storing the compiled classes hduser@ubuntu:~/Desktop/PacktPub$ mkdir classes // compile the java class files with classpath hduser@ubuntu:~/Desktop/PacktPub$ javac -classpath /usr/local/hadoop/hadoop-core-1.1.0.jar:/usr/local/hadoop/lib/commons-cli-1.2.jar -d classes *.java
.jar
file from the compiled classes.hduser@ubuntu:~/Desktop/PacktPub$ cd classes/ // create jar of developed java classes hduser@ubuntu:~/Desktop/PacktPub/classes$ jar -cvf wordcount.jar com
// Go to Hadoop home Directory hduser@ubuntu:~$ cd $HADOOP_HOME // Start Hadoop Cluster hduser@ubuntu:/usr/local/hadoop$ bin/start-all.sh
// Ensure all daemons are running properly hduser@ubuntu:/usr/local/hadoop$ jps
/wordcount/input/
.// Create Hadoop directory for storing the input dataset hduser@ubuntu:/usr/local/hadoop$ bin/Hadoop fs -mkdir /wordcount/input
CHANGES.txt
, LICENSE.txt
, NOTICE.txt
, and README.txt
) by copying them to the Hadoop directory. We can have other text datasets from the Internet input in this MapReduce algorithm instead of using readymade text files. We can also extract data from the Internet to process them, but here we are using readymade input files.// To copying the text files from machine's local // directory in to Hadoop directory hduser@ubuntu:/usr/local/hadoop$ bin/hadoopfs -copyFromLocal $HADOOP_HOME/*.txt /wordcount/input/
// Command for running the Hadoop job by specifying jar, main class, input directory and output directory. hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar wordcount.jar com.PACKT.chapter1.WordCount /wordcount/input/ /wordcount/output/
// To read the generated output from HDFS directory hduser@ubuntu:/usr/local/hadoop$ bin/hadoopfs -cat /wordcount/output/part-00000
During the MapReduce phase, you need to monitor the job as well as the nodes. Use the following to monitor MapReduce jobs in web browsers:
localhost:50030
: JobTracker Web interface (for MapReduce layer)localhost:50060
: TaskTracker Web interface (for MapReduce layer)In this section, we will learn how to monitor as well as debug a Hadoop MapReduce job without any commands.
This is one of the easiest ways to use the Hadoop MapReduce administration UI. We can access this via a browser by entering the URL http://localhost:50030
(web UI for the JobTracker daemon). This will show the logged information of the Hadoop MapReduce jobs, which looks like following screenshot:
Here we can check the information and status of running jobs, the status of the Map and Reduce tasks of a job, and the past completed jobs as well as failed jobs with failed Map and Reduce tasks. Additionally, we can debug a MapReduce job by clicking on the hyperlink of the failed Map or Reduce task of the failed job. This will produce an error message printed on standard output while the job is running.
In this section, we will see how to explore HDFS directories without running any Bash command. The web UI of the NameNode daemon provides such a facility. We just need to locate it at http://localhost:50070
.
This UI enables us to get a cluster summary (memory status), NameNode logs, as well as information on live and dead nodes in the cluster. Also, this allows us to explore the Hadoop directory that we have created for storing input and output data for Hadoop MapReduce jobs.
Until now we have learned what MapReduce is and how to code it. Now, we will see some common MapReduce problem definitions that are used for business analytics. Any reader who knows MapReduce with Hadoop will easily be able to code and solve these problem definitions by modifying the MapReduce example for word count. The major changes will be in data parsing and in the logic behind operating the data. The major effort will be required in data collection, data cleaning, and data storage.
Also, there are too many possible MapReduce applications that can be applied to improve business cost.