This recipe explains how to write a simple MapReduce program and how to execute it.
To run a MapReduce job, users should furnish a map
function, a reduce
function, input data, and an output data location. When executed, Hadoop carries out the following steps:
map
function once for each data item, giving the item as the input for the function. When executed, the map
function outputs one or more key-value pairs.map
function, sorts them by the key, and groups together the values with the same key.reduce
function may output one or more key-value pairs, and Hadoop writes them to a file as the final result.From the source code available with this book, select the source code for the first chapter, chapter1_src.zip
. Then, set it up with your favorite Java Integrated Development Environment (IDE); for example, Eclipse. You need to add the hadoop-core
JAR file in HADOOP_HOME
and all other JAR files in the HADOOP_HOME/lib
directory to the classpath of the IDE.
Download and install Apache Ant from http://ant.apache.org/.
Now let us write our first Hadoop MapReduce program.
src/chapter1/Wordcount.java
. The code has three parts—mapper, reducer, and the main program.org.apache.hadoop.mapreduce.Mapper
interface. When Hadoop runs, it receives each new line in the input files as an input to the mapper. The map
function breaks each line into substrings using whitespace characters such as the separator, and for each token (word) emits (word,1)
as the output.public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, new IntWritable(1)); } }
reduce
function receives all the values that have the same key as the input, and it outputs the key and the number of occurrences of the key as the output.public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }
main
program puts the configuration together and submits the job to Hadoop.Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in><out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); //Uncomment this to //job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1);
>ant build
If you have not done this already, you should install Apache Ant by following the instructions given at http://ant.apache.org/manual/install.html. Alternatively, you can use the compiled JAR file included with the source code.
HADOOP_HOME
, and copy the hadoop-cookbook-chapter1.jar
file to the HADOOP_HOME
directory. To be used as the input, create a directory called input
under HADOOP_HOME
and copy the README.txt
file to the directory. Alternatively, you can copy any text file to the input
directory.chapter1.WordCount
is the name of the main
class we need to run. When you have run the command, you will see the following terminal output:>bin/hadoop jar hadoop-cookbook-chapter1.jar chapter1.WordCount input output 12/04/11 08:12:44 INFO input.FileInputFormat: Total input paths to process : 16 12/04/11 08:12:45 INFO mapred.JobClient: Running job: job_local_0001 12/04/11 08:12:45 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting ........... ..... 12/04/11 08:13:37 INFO mapred.JobClient: Job complete: job_local_0001 .....
part-r-XXXXX
, which will have the count of each word in the document. Congratulations! You have successfully run your first MapReduce program.In the preceding sample, MapReduce worked in the local mode without starting any servers and using the local filesystem as the storage system for inputs, outputs, and working data. The following diagram shows what happened in the WordCount program under the covers:
The workflow is as follows:
(word, 1)
.(word,1)
pairs, sorts them by the word, groups all the values emitted against each unique key, and invokes the reduce once for each unique key passing the key and values for that key as an argument.As an optional step, copy the input
directory to the top level of the IDE-based project (Eclipse project) that you created for samples. Now you can run the WordCount
class directly from your IDE passing input output
as arguments. This will run the sample the same as before. Running MapReduce jobs from IDE in this manner is very useful for debugging your MapReduce jobs.
Although you ran the sample with Hadoop installed in your local machine, you can run it using distributed Hadoop cluster setup with a HDFS-distributed filesystem. The recipes of this chapter, Setting up HDFS and Setting Hadoop in a distributed cluster environment will discuss how to run this sample in a distributed setup.