Frequency distribution is the number of hits received by each URL sorted in the ascending order, by the number hits received by a URL. We have already calculated the number of hits in the earlier recipe. This recipe will sort the list.
HADOOP_HOME
to refer to Hadoop installation folder.The following steps show how to calculate frequency distribution using MapReduce:
HADOOP_HOME
:> bin/hadoop jar hadoop-cookbook-chapter6.jar chapter6.WeblogFrequencyDistributionProcessor/data/output2 /data/output3
>bin/hadoopdfs -cat /data/output3/*
You will see that it will print the results as following:
/cgi-bin/imagemap/countdown?91,175 12 /cgi-bin/imagemap/countdown?105,143 13 /cgi-bin/imagemap/countdown70?177,284 14
The second recipe of this chapter calculated the number of hits received by each link, and the frequency distribution as a sorted list of those results in that recipe. Therefore, let us sort the results of the second recipe.
MapReduce always sorts the key-value pairs emitted by the mappers by their keys before delivering them to the reducers. We will use this to sort the results.
You can find the source for the recipe from src/chapter6/WeblogFrequencyDistributionProcessor.java
.
Map task for the job will look like the following:
public static class AMapper extends Mapper<Object, Text, IntWritable, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split("\s"); context.write( new IntWritable(Integer.parseInt(tokens[1])), new Text(tokens[0])); } }
Map task receives each line in the log file as a different key-value pair. It parses the lines using regular expressions and emits the number of hits as the key and the URL name as the value. Hadoop sorts the key-value pairs emitted by the mapper before calling the reducers, and therefore the reducer will receive the pairs in sorted order. Hence, it just has to emit them as they arrive.
public static class AReducer extends Reducer<IntWritable, Text, Text, IntWritable> { public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> iterator = values.iterator(); if (iterator.hasNext()) { context.write(iterator.next(), key); } } }
The main()
method of the job will work similar to the one in the earlier recipe.