This recipe will walk you through creating a MapReduce program to count distinct IPs in weblog data. We will demonstrate the application of a combiner to optimize data transfer overhead between the map and reduce stages. The code is implemented in a generic fashion and can be used to count distinct values in any tab-delimited dataset.
This recipe assumes that you have a basic familiarity with the Hadoop 0.20 MapReduce API. You will need access to the
weblog_entries
dataset supplied with this book and stored in an HDFS folder at the path /input/weblog
.
You will need access to a pseudo-distributed or fully-distributed cluster capable of running MapReduce jobs using the newer MapReduce API introduced in Hadoop 0.20.
You will also need to package this code inside a JAR file to be executed by the Hadoop JAR launcher from the shell. Only the core Hadoop libraries are required to compile and run this example.
Perform the following steps to count distinct IPs using MapReduce:
DistinctCounterJob.java
in your JAR file at whatever source package is appropriate.import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.regex.Pattern; public class DistinctCounterJob implements Tool { private Configuration conf; public static final String NAME = "distinct_counter"; public static final String COL_POS = "col_pos"; public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new DistinctCounterJob(), args); }
run()
method is where we set the input/output formats, mapper class configuration, combiner class, and key/value class configuration:public int run(String[] args) throws Exception { if(args.length != 3) { System.err.println("Usage: distinct_counter <input> <output> <element_position>"); System.exit(1); } conf.setInt(COL_POS, Integer.parseInt(args[2])); Job job = new Job(conf, "Count distinct elements at position"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(DistinctMapper.class); job.setReducerClass(DistinctReducer.class); job.setCombinerClass(DistinctReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setJarByClass(DistinctCounterJob.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 1 : 0; } public void setConf(Configuration conf) { this.conf = conf; } public Configuration getConf() { return conf; } }
map()
function is implemented in the following code by extending mapreduce.Mapper
:public static class DistinctMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static int col_pos; private static final Pattern pattern = Pattern.compile("\t"); private Text outKey = new Text(); private static final IntWritable outValue = new IntWritable(1); @Override protected void setup(Context context ) throws IOException, InterruptedException { col_pos = context.getConfiguration().getInt(DistinctCounterJob.COL_POS, 0); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String field = pattern.split(value.toString())[col_pos]; outKey.set(field); context.write(outKey, outValue); } }
reduce()
function is implemented in the following code by extending mapreduce.Reducer
:public static class DistinctReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable count = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int total = 0; for(IntWritable value: values) { total += value.get(); } count.set(total); context.write(key, count); } }
4
, which is the IP column:hadoop jar myJobs.jar distinct_counter /input/weblog/ /output/weblog_distinct_counter 4
First we set up
DistinctCounterJob
to implement a Tool interface for remote submission. The static constant NAME
is of potential use in the Hadoop Driver
class, which supports the launching of different jobs from the same JAR file. The static constant COL_POS
is initialized to the third required argument from the command line <element_position>
. This value is set within the job configuration, and should match the position of the column you wish to count for each distinct entry.
Supplying 4
will match the IP column for the weblog data.
Since we are reading and writing text, we can use the supplied TextInputFormat
and TextOutputFormat
classes. We will set the Mapper
and Reduce
classes to match our DistinctMapper
and DistinctReducer
implemented classes respectively. We also supply DistinctReducer
as a combiner class. This decision is explained in more detail as follows:
It's also very important to call
setJarByClass()
so that the TaskTrackers
can properly unpack and find the Mapper
and
Reducer
classes. The job uses the static helper methods on FileInputFormat
and FileOutputFormat
to set the input and output directories respectively. Now we're set up and ready to submit the job.
The Mapper
class sets up a few member variables as follows:
col_pos
: This is initialized to a value supplied in the configuration. It allows users to change which column to parse and apply the count distinct operation on.pattern
: This defines the column's split point for each row based on tabs.outKey
: This is a class member that holds output values. This avoids having to create a new instance for each output that is written.outValue
: This is an integer representing one occurrence of the given key. It is similar to the WordCount example.The map()
function splits each incoming line's value and extracts the string located at col_pos
. We reset the internal value for outKey
to the string found on that line's position. For our example, this will be the IP value for the row. We emit the value of the newly reset outKey
variable along with the value of outValue
to mark one occurrence of that given IP address.
Without the assistance of the combiner, this would present the reducer with an iterable collection of 1s to be counted.
The following is an example of a reducer {key, value:[]} without a combiner:
{10.10.1.1, [1,1,1,1,1,1]} = six occurrences of the IP "10.10.1.1".
The implementation of the
reduce()
method will sum the integers and arrive at the correct total, but there's nothing that requires the integer values to be limited to the number 1. We can use a combiner to process the intermediate key-value pairs as they are output from each mapper and help improve the data throughput in the shuffle phase. Since the combiner is applied against the local map output, we may see a performance improvement as the amount of data we need to transfer for an intermediate key/value can be reduced considerably.
Instead of seeing {10.10.1.1, [1,1,1,1,1,1]}, the combiner can add the 1s and replace the value of the intermediate value for that key to {10.10.1.1, [6]}. The reducer can then sum the various combined values for the intermediate key and arrive at the same correct total. This is possible because addition is both a commutative and associative operation. In other words:
For counting the occurrences of distinct IPs, we can use the same code in our reducer as a combiner for output in the map phase.
When applied to our problem, the normal output with no combiner from two separate independently running map tasks might look like the following where {key: value[]} is equal to the intermediate key-value collection:
Without the aid of a combiner, this will be merged in the shuffle phase and presented to a single reducer as the following key-value collection:
Now let's revisit what would happen when using a Combiner against the exact same sample output:
Map Task A = {10.10.1.1, [1,1,1]} = three occurrences
Map Task B = {10.10.1.1, [1,1,1,1,1,1] = six occurrences
Now the reducer will see the following for that key-value collection:
We arrived at the same total count for that IP address, but we used a combiner to limit the amount of network I/O during the MapReduce shuffle phase by pre-reducing the intermediate key-value output from each mapper.
The combiner can be confusing to newcomers. Here are some useful tips:
The previous recipe and the default WordCount example show the Combiner
class being initialized to the same implementation as the Reducer
class. This is not enforced by the API, but ends up being common for many types of distributed aggregate operations such as sum()
, min()
, and max()
. One basic example might be the min()
operation of the Reducer
class that specifically formats output in a certain way for readability. This will take a slightly different form from that of the
min()
operator of the Combiner
class, which does not care about the specific output formatting.
Whether or not the framework invokes your combiner during execution depends on the intermediate spill file size from each map output, and is not guaranteed to run for every intermediate key. Your job should not depend on the combiner for correct results, it should be used only for optimization.
You can control the spill file threshold when MapReduce tries to combine intermediate values with the configuration property min.num.spills.for.combine
.