In a typical MapReduce job, key-value pairs are emitted from the mappers, shuffled, and sorted, and then finally passed to the reducers. There is no attempt by the MapReduce framework to sort the values passed to the reducers for processing. However, there are cases when we need the values passed to the reducers to be sorted, such as in the case of counting page views.
To calculate page views, we need to calculate distinct IPs by page. One way to calculate this is to have the mappers emit the key-value pairs: page and IP. Then, in the reducer, we can store all of the IPs associated with a page in a set. However, this approach is not scalable. What happens if the weblogs contain a large number of distinct IPs visiting a single page? We might not be able to fit the entire set of distinct IPs in memory.
The MapReduce framework provides a way to work around this complication. In this recipe, we will write a MapReduce application that allows us to sort the values going to a reducer using an approach known as the secondary sort. Instead of holding all of the distinct IPs in memory, we can keep track of the last IP we saw while processing the values in the reducer, and we can maintain a counter to calculate distinct IPs.
You will need to download the apache_nobots_tsv.txt
dataset from http://www.packtpub.com/support and place the file into HDFS.
The following steps show how to implement a secondary sort in MapReduce to calculate page views:
WritableComparable
interface. We will use this class to store the key and sort fields:public class CompositeKey implements WritableComparable { private Text first = null; private Text second = null; public CompositeKey() { } public CompositeKey(Text first, Text second) { this.first = first; this.second = second; } //...getters and setters public void write(DataOutput d) throws IOException { first.write(d); second.write(d); } public void readFields(DataInput di) throws IOException { if (first == null) { first = new Text(); } if (second == null) { second = new Text(); } first.readFields(di); second.readFields(di); } public int compareTo(Object obj) { CompositeKey other = (CompositeKey) obj; int cmp = first.compareTo(other.getFirst()); if (cmp != 0) { return cmp; } return second.compareTo(other.getSecond()); } @Override public boolean equals(Object obj) { CompositeKey other = (CompositeKey)obj; return first.equals(other.getFirst()); } @Override public int hashCode() { return first.hashCode(); } }
Mapper
and Reducer
classes. The Mapper
class will use the CompositeKey
class to store two fields. The first will be the page
field, which is used to group and partition the data leaving the mapper. The second is the ip
field, which is used to sort the values passed to the reducer.public class PageViewMapper extends Mapper<Object, Text, CompositeKey, Text> { private CompositeKey compositeKey = new CompositeKey(); private Text first = new Text(); private Text second = new Text(); private Text outputValue = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split(" "); if (tokens.length > 3) { String page = tokens[2]; String ip = tokens[0]; first.set(page); second.set(ip); compositeKey.setFirst(first); compositeKey.setSecond(second); outputValue.set(ip); context.write(compositeKey, outputValue); } } } public class PageViewReducer extends Reducer<CompositeKey, Text, Text, LongWritable> { private LongWritable pageViews = new LongWritable(); @Override protected void reduce(CompositeKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String lastIp = null; long pages = 0; for(Text t : values) { String ip = t.toString(); if (lastIp == null) { lastIp = ip; pages++; } else if (!lastIp.equals(ip)) { lastIp = ip; pages++; } else if (lastIp.compareTo(ip) > 0) { throw new IOException("secondary sort failed"); } } pageViews.set(pages); context.write(key.getFirst(), pageViews); } }
page
field:static class CompositeKeyParitioner extends Partitioner<CompositeKey, Writable> { @Override public int getPartition(CompositeKey key, Writable value, int numParition) { return (key.getFirst().hashCode() & 0x7FFFFFFF) % numParition; } }
static class GroupComparator extends WritableComparator { public GroupComparator() { super(CompositeKey.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { CompositeKey lhs = (CompositeKey)a; CompositeKey rhs = (CompositeKey)b; return lhs.getFirst().compareTo(rhs.getFirst()); } }
static class SortComparator extends WritableComparator { public SortComparator() { super(CompositeKey.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { CompositeKey lhs = (CompositeKey)a; CompositeKey rhs = (CompositeKey)b; int cmp = lhs.getFirst().compareTo(rhs.getFirst()); if (cmp != 0) { return cmp; } return lhs.getSecond().compareTo(rhs.getSecond()); } }
public int run(String[] args) throws Exception { Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Configuration conf = getConf(); Job weblogJob = new Job(conf); weblogJob.setJobName("PageViews"); weblogJob.setJarByClass(getClass()); weblogJob.setMapperClass(PageViewMapper.class); weblogJob.setMapOutputKeyClass(CompositeKey.class); weblogJob.setMapOutputValueClass(Text.class); weblogJob.setPartitionerClass(CompositeKeyParitioner.class); weblogJob.setGroupingComparatorClass(GroupComparator.class); weblogJob.setSortComparatorClass(SortComparator.class); weblogJob.setReducerClass(PageViewReducer.class); weblogJob.setOutputKeyClass(Text.class); weblogJob.setOutputValueClass(Text.class); weblogJob.setInputFormatClass(TextInputFormat.class); weblogJob.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(weblogJob, inputPath); FileOutputFormat.setOutputPath(weblogJob, outputPath); if(weblogJob.waitForCompletion(true)) { return 0; } return 1; }
We first created a class named CompositeKey
. This class extends the Hadoop WritableComparable
interface so that we can use the CompositeKey
class just like any normal Hadoop WritableComparable
interface (for example, Text
and IntWritable
). The CompositeKey
class holds two Text
objects. The first Text
object is used to partition and group the key-value pairs emitted from the mapper. The second Text
object is used to perform the secondary sort.
Next, we wrote a mapper class to emit the key-value pair CompositeKey
(which consists of page and IP) as the key, and IP as the value. In addition, we
wrote a reducer class that receives a CompositeKey
object and a sorted list of IPs. The distinct IP count is calculated by incrementing a counter whenever we see an IP that does not equal a previously seen IP.
After writing the mapper and reducer classes, we created three classes to partition, group, and sort the data. The CompositeKeyPartitioner
class is responsible for partitioning the data emitted from the mapper. In this recipe, we want all of the same pages to go to the same partition. Therefore, we calculate the partition location based only on the first field of the CompositeKey
class.
Next, we created a
GroupComparator
class that uses the same logic as CompositeKeyPartitioner
. We want all of the same page keys grouped together for processing by a reducer. Therefore, the group comparator only inspects the first member of the CompositeKey
class for comparison.
Finally, we created the SortComparator
class. This class is responsible for sorting all of the values that are sent to the reducer. As you can see from the method signature, compare(WritableComparable a, WritableComparable b)
, we only receive the keys that are sent to each reducer, which is why we needed to include the IP with each and every key the mapper emitted. The
SortComparator
class compares both the first and second members of the CompositeKey
class to ensure that the values a reducer receives are sorted.