Parsing a complex dataset with Hadoop

Datasets we parsed so far were simple, where each data item was contained in a single line. Therefore, we were able to use Hadoop default parsing support to parse those datasets. However, some datasets have much complex formats.

In this recipe, we will analyze Tomcat developer mailing list archives. In the archive, each e-mail is composed of by multiple lines in the log file. Therefore, we will write a Hadoop input formatter to process the e-mail archive.

This recipe parses the complex e-mail list archives, and finds the owner (person who started the thread) and the number of replies received by each e-mail thread.

The following figure shows a summary of the execution. Here the mapper emits the subject of the mail as key and the sender's e-mail address and date as the value. Then Hadoop groups data by the e-mail subject and sends all the data related to that thread to the same reducer.

Parsing a complex dataset with Hadoop

Then, the reducer calculates the owner of the thread it received, and the number of replies received by the thread.

Getting ready

  • This recipe assumes that you have followed the first chapter and have installed Hadoop. We will use the HADOOP_HOME variable to refer to the Hadoop installation folder.
  • Start Hadoop by following the instructions in the first chapter.
  • This recipe assumes you are aware of how Hadoop processing works. If you have not already done so, you should follow the recipe Writing a WordCount MapReduce sample, bundling it and running it using standalone Hadoop from Chapter 1, Getting Hadoop Up and Running in a Cluster.

How to do it...

The following steps describe how to parse the Tomcat e-mail list dataset that has complex data format using Hadoop by writing an input formatter:

  1. Download the Apache Tomcat developer list e-mail archives for the year 2012 available from http://mail-archives.apache.org/mod_mbox/tomcat-users/. We call the destination folder as DATA_DIR.
  2. Upload the data to HDFS by running the following commands from HADOOP_HOME. If /data is already there, clean it up:
    >bin/hadoopdfs -mkdir /data
    >bin/hadoopdfs -mkdir /data/input2
    >bin/hadoopdfs -put <DATA_DIR>/*.mbox /data/input2
    
  3. Unzip the source code for this chapter (chapter6.zip). We will call that folder CHAPTER_6_SRC.
  4. Change the hadoop.home property in the CHAPTER_6_SRC/build.xml file to point to your Hadoop installation folder.
  5. Compile the source by running the ant build command from the CHAPTER_6_SRC folder.
  6. Copy the build/lib/hadoop-cookbook-chapter6.jar file to HADOOP_HOME.
  7. Run the MapReduce job through the following command from HADOOP_HOME:
    > bin/hadoop jar hadoop-cookbook-chapter6.jar chapter6.MLReceiveReplyProcessor/data/input2 /data/output6
    
  8. Read the results by running the following command:
    >bin/hadoopdfs -cat /data/output6/*
    

How it works...

As explained before, this dataset has data items that span multiple lines. Therefore, we have to write a custom data formatter to parse the data. You can find the source for the recipe from src/chapter6/WebLogMessageSizeAggregator.java,src/chapter6/MboxFileFormat.java,src/chapter6/MBoxFileReader.java.

When the Hadoop job starts, it invokes the formatter to parse the input files. We add a new formatter via the main() method as highlighted in the following code snippet:

Job job = new Job(conf, "LogProcessingHitsByLink");
job.setJarByClass(MLReceiveReplyProcessor.class);
job.setMapperClass(AMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(AReducer.class);
job.setInputFormatClass(MboxFileFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

As shown by the following code, the new formatter creates a record reader, which is used by Hadoop to read input keys and values:

public class MboxFileFormat extends 
  FileInputFormat<Text, Text>
{
  private MBoxFileReaderboxFileReader = null;
  public RecordReader<Text, Text>createRecordReader(
    InputSplitinputSplit, TaskAttemptContext attempt)
    throws IOException, InterruptedException
    {
      boxFileReader = new MBoxFileReader();
      boxFileReader.initialize(inputSplit, attempt);
      return boxFileReader;
    }
}

The following code snippet shows the record reader:

public class MBoxFileReader extends 
  RecordReader<Text, Text>
{
  public void initialize(InputSplitinputSplit,
    TaskAttemptContext attempt) throws IOException, InterruptedException
  {
    Path path = ((FileSplit) inputSplit).getPath();
    FileSystem fs = FileSystem.get(attempt.getConfiguration());
    FSDataInputStream fsStream = fs.open(path);
    reader = new BufferedReader(
    new InputStreamReader(fsStream));
  }
}

The initialize() method reads the file from HDFS:

public Boolean nextKeyValue() throws IOException, 
InterruptedException
{
  if (email == null) 
  {
    return false;
  }
  count++;
  while ((line = reader.readLine()) != null)
  {
    Matcher matcher = pattern1.matcher(line);
    if (!matcher.matches())
    {
      email.append(line).append("
");
    }
    else
    {
      parseEmail(email.toString());
      email = new StringBuffer();
      email.append(line).append("
");
      return true;
    }
  }
  parseEmail(email.toString());
  email = null; return true;
}

Finally, the nextKeyValue() method parses the file, and gives users access to the key and values for this dataset. Value has the from, subject, and date of each e-mail separated by a #.

The following code snippet shows the map task source code:

public void map(Object key, Text value,
  Context context) throws IOException, InterruptedException
{
  String[] tokens = value.toString().split("#");
  String from = tokens[0];
  String subject = tokens[1];
  String date = tokens[2].replaceAll(",", "");
  subject = subject.replaceAll("Re:", "");
  context.write(new Text(subject), new Text(date + "#" + from));
}

The map task receives each line in the log file as a different key-value pair. It parses the lines by breaking it by the #, and emits the subject as the key, and date and from as the value.

Then, Hadoop collects all key-value pairs, sorts them, and then invokes the reducer once for each key. Since we use the e-mail subject as the key, each reducer will receive all the information about each e-mail thread. Then, each reducer walks through all the e-mails and finds out who sent the first e-mail and how many replies have been received by each e-mail thread.

public void reduce(Text key, Iterable<Text> values, Context context)
  throws IOException, InterruptedException
{
  TreeMap<Long, String>replyData = new TreeMap<Long, String>();
  for (Text val : values)
  {
    String[] tokens = val.toString().split("#");
    if(tokens.length != 2)
    {
      throw new IOException("Unexpected token "+ val.toString());
    }
   String from = tokens[1];
   Date date = dateFormatter.parse(tokens[0]);
   replyData.put(date.getTime(), from);
   }
  String owner = replyData.get(replyData.firstKey());
  intreplyCount = replyData.size();
  intselfReplies = 0;
  for(String from: replyData.values())
  {
    if(owner.equals(from))
    {
      selfReplies++;
    }
  }
replyCount = replyCount - selfReplies;
context.write(new Text(owner),new Text(replyCount+"#" + selfReplies));
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset