Joining two datasets using MapReduce

As we have observed already, Hadoop is very good at reading through a dataset and calculating the analytics. However, often we will have to merge two datasets for analyzing the data. This recipe will explain how to join two datasets using Hadoop.

As an example, this recipe will use the Tomcat developer archives dataset. A common belief among the open source community is that, the more a developer is involved with the community (for example, by replying to threads and helping others and so on), the more quickly he will receive a response to his queries. In this recipe we will test this hypothesis using the Tomcat developer mailing list.

To test this hypothesis, we will run the MapReduce jobs as explained in the following figure:

Joining two datasets using MapReduce

We would start with e-mail archives in the MBOX format, and we will read the mail using the MBOX format class explained in the earlier recipe. Then, the Hadoop job will receive the sender of the e-mail (from), e-mail subject, and the date the e-mail was sent, as inputs.

  1. In the first job, mapper will emit the subject as key, and the sender's e-mail address and date as the value. Then, the reducer step will receive all values with the same subject and it will output the subject as the key, and the owner and reply count as the value. We have executed this job in the earlier recipe.
  2. In the second job, the mapper step emits the sender's e-mail address as the key and one as the value. Then, the reducer step will receive all the e-mails sent from the same address to the same reducer. Using this data, each reducer will emit the e-mail address as the key and the number of e-mails sent from that e-mail address as the value.
  3. Finally, the third job reads both the output from earlier jobs, joins the results, and emits the number of replies sent by each e-mail address and the number of replies received by each e-mail address as the output.

Getting ready

  • This recipe assumes that you have followed the first chapter and have installed Hadoop. We will use the HADOOP_HOME variable to refer to the Hadoop installation folder.
  • Start Hadoop by following the instructions in the first chapter.
  • This recipe assumes you are aware of how Hadoop processing works. If you have not already done so, you should follow the recipe Writing a WordCount MapReduce sample, bundling it and running it using standalone Hadoop from Chapter 1, Getting Hadoop Up and Running in a Cluster.

How to do it...

The following steps show how to use MapReduce to join two datasets:

  1. If you have not already done so, run the previous recipe, which will set up the environment and run the first job as explained in the figure.
  2. Run the second MapReduce job through the following command from HADOOP_HOME:
    > bin/hadoop jar hadoop-cookbook-chapter6.jar chapter6.MLSendReplyProcessor/data/input2 /data/output7
    
  3. Read the results by running the following command:
    >bin/hadoopdfs -cat /data/output7/*
    
  4. Create a new folder input3 and copy both results from earlier jobs to that folder in HDFS:
    > bin/hadoopdfs -mkdir /data/input3
    > bin/hadoopdfs  -cp /data/output6/part-r-00000 /data/input3/1.data
    > bin/hadoopdfs -cp /data/output7/part-r-00000 /data/input3/2.data
    
  5. Run the third MapReduce job through the following command from HADOOP_HOME:
    > bin/hadoop jar hadoop-cookbook-chapter6.jar chapter6.MLJoinSendReceiveReplies /data/input3 /data/output8
    
  6. Download the results of the last recipe to the local computer by running the following command from HADOOP_HOME:
    > bin/hadoopdfs -get /data/output8/part-r-00000 8.data
    
  7. Copy all the *.plot files from CHAPTER_6_SRC to HADOOP_HOME.
  8. Generate the plot by running the following command from HADOOP_HOME:
    >gnuplot sendvsreceive.plot
    
  9. It will generate a file called sendreceive.png, which will look like following:
    How to do it...

The graph confirms our hypothesis, and like before, the data approximately follows a power law distribution.

How it works...

You can find the source for the recipe from src/chapter6/MLSendReplyProcessor.java and src/chapter6/MLJoinSendReceiveReplies.java. We have already discussed the working of the first job in the earlier recipe.

The following code snippet shows the map() function for the second job. It receives the sender's e-mail, subject, and date separated by # as input, which parses the input and outputs the sender's e-mail as the key and the date the e-mail was sent, as the value:

public void map(Object key, Text value, Context context) 
  throws IOException, InterruptedException
{
String[] tokens = value.toString().split("#");
String from = tokens[0]; String date = tokens[2];
context.write(new Text(from), new Text(date));
}

The following code snippet shows the reduce() function for the second job. Each reduce() function receives the time of all the e-mails sent by one sender. The reducer counts the number of replies sent by each sender, and outputs the sender's name as the key and the replies sent, as the value:

public void reduce(Text key, Iterable<Text> values,
  Context context ) throws IOException, InterruptedException
{
  int sum = 0;
  for (Text val : values)
  {
    sum = sum +1;
  }
  context.write(key, new IntWritable(sum));
}

The following code snippet shows the map() function for the third job. It reads the outputs of the first and second jobs and writes them as the key-value pairs:

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split("\s");
    String from = tokens[0];
    String replyData = tokens[1];
context.write(new Text(from), new Text(replyData));
}

The following code snippet shows the reduce() function for the third job. Since, both the output of the first and the second job has the same key, the sent replies and received replies for a given user are sent to the same reducer. The reducer does some adjustments to remove self-replies, and outputs the sent replies and received replies as the key and value respectively of the reducer, thus joining the two datasets:

public void reduce(Text key, Iterable<Text> values, Context context)
  throws IOException, InterruptedException
{
  StringBuffer buf = new StringBuffer("[");
  try
  {
    int sendReplyCount = 0;
    int receiveReplyCount = 0;
    for (Text val : values)
    {
      String strVal = val.toString();
      if(strVal.contains("#"))
      {
        String[] tokens = strVal.split("#");
        Int repliesOnThisThread = Integer.parseInt(tokens[0]);
        Int selfRepliesOnThisThread = Integer.parseInt(tokens[1]);
        receiveReplyCount = receiveReplyCount + repliesOnThisThread;
        sendReplyCount = sendReplyCount - selfRepliesOnThisThread;
       }
       else
       {
        sendReplyCount = sendReplyCount + Integer.parseInt(strVal);
       }
     }
     context.write(new IntWritable(sendReplyCount), 
       new IntWritable(receiveReplyCount)); buf.append("]");
   }
  catch (NumberFormatException e) 
  {
    System.out.println("ERROR "+ e.getMessage()); 
  }
}

Here the final job is an example of using the MapReduce to join two datasets. The idea is to send all the values that need to be joined under the same key to the same reducer, and join the data there.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset