As we have observed already, Hadoop is very good at reading through a dataset and calculating the analytics. However, often we will have to merge two datasets for analyzing the data. This recipe will explain how to join two datasets using Hadoop.
As an example, this recipe will use the Tomcat developer archives dataset. A common belief among the open source community is that, the more a developer is involved with the community (for example, by replying to threads and helping others and so on), the more quickly he will receive a response to his queries. In this recipe we will test this hypothesis using the Tomcat developer mailing list.
To test this hypothesis, we will run the MapReduce jobs as explained in the following figure:
We would start with e-mail archives in the MBOX format, and we will read the mail using the MBOX format class explained in the earlier recipe. Then, the Hadoop job will receive the sender of the e-mail (from), e-mail subject, and the date the e-mail was sent, as inputs.
HADOOP_HOME
variable to refer to the Hadoop installation folder.The following steps show how to use MapReduce to join two datasets:
HADOOP_HOME
:> bin/hadoop jar hadoop-cookbook-chapter6.jar chapter6.MLSendReplyProcessor/data/input2 /data/output7
>bin/hadoopdfs -cat /data/output7/*
input3
and copy both results from earlier jobs to that folder in HDFS:> bin/hadoopdfs -mkdir /data/input3 > bin/hadoopdfs -cp /data/output6/part-r-00000 /data/input3/1.data > bin/hadoopdfs -cp /data/output7/part-r-00000 /data/input3/2.data
HADOOP_HOME
:> bin/hadoop jar hadoop-cookbook-chapter6.jar chapter6.MLJoinSendReceiveReplies /data/input3 /data/output8
HADOOP_HOME
:> bin/hadoopdfs -get /data/output8/part-r-00000 8.data
*.plot
files from CHAPTER_6_SRC
to HADOOP_HOME
.HADOOP_HOME
:>gnuplot sendvsreceive.plot
sendreceive.png
, which will look like following:The graph confirms our hypothesis, and like before, the data approximately follows a power law distribution.
You can find the source for the recipe from src/chapter6/MLSendReplyProcessor.java
and src/chapter6/MLJoinSendReceiveReplies.java
. We have already discussed the working of the first job in the earlier recipe.
The following code snippet shows the map()
function for the second job. It receives the sender's e-mail, subject, and date separated by #
as input, which parses the input and outputs the sender's e-mail as the key and the date the e-mail was sent, as the value:
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split("#"); String from = tokens[0]; String date = tokens[2]; context.write(new Text(from), new Text(date)); }
The following code snippet shows the reduce()
function for the second job. Each reduce()
function receives the time of all the e-mails sent by one sender. The reducer counts the number of replies sent by each sender, and outputs the sender's name as the key and the replies sent, as the value:
public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (Text val : values) { sum = sum +1; } context.write(key, new IntWritable(sum)); }
The following code snippet shows the map()
function for the third job. It reads the outputs of the first and second jobs and writes them as the key-value pairs:
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split("\s"); String from = tokens[0]; String replyData = tokens[1]; context.write(new Text(from), new Text(replyData)); }
The following code snippet shows the reduce()
function for the third job. Since, both the output of the first and the second job has the same key, the sent replies and received replies for a given user are sent to the same reducer. The reducer does some adjustments to remove self-replies, and outputs the sent replies and received replies as the key and value respectively of the reducer, thus joining the two datasets:
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer buf = new StringBuffer("["); try { int sendReplyCount = 0; int receiveReplyCount = 0; for (Text val : values) { String strVal = val.toString(); if(strVal.contains("#")) { String[] tokens = strVal.split("#"); Int repliesOnThisThread = Integer.parseInt(tokens[0]); Int selfRepliesOnThisThread = Integer.parseInt(tokens[1]); receiveReplyCount = receiveReplyCount + repliesOnThisThread; sendReplyCount = sendReplyCount - selfRepliesOnThisThread; } else { sendReplyCount = sendReplyCount + Integer.parseInt(strVal); } } context.write(new IntWritable(sendReplyCount), new IntWritable(receiveReplyCount)); buf.append("]"); } catch (NumberFormatException e) { System.out.println("ERROR "+ e.getMessage()); } }
Here the final job is an example of using the MapReduce to join two datasets. The idea is to send all the values that need to be joined under the same key to the same reducer, and join the data there.