Datasets we parsed so far were simple, where each data item was contained in a single line. Therefore, we were able to use Hadoop default parsing support to parse those datasets. However, some datasets have much complex formats.
In this recipe, we will analyze Tomcat developer mailing list archives. In the archive, each e-mail is composed of by multiple lines in the log file. Therefore, we will write a Hadoop input formatter to process the e-mail archive.
This recipe parses the complex e-mail list archives, and finds the owner (person who started the thread) and the number of replies received by each e-mail thread.
The following figure shows a summary of the execution. Here the mapper emits the subject of the mail as key and the sender's e-mail address and date as the value. Then Hadoop groups data by the e-mail subject and sends all the data related to that thread to the same reducer.
Then, the reducer calculates the owner of the thread it received, and the number of replies received by the thread.
HADOOP_HOME
variable to refer to the Hadoop installation folder.The following steps describe how to parse the Tomcat e-mail list dataset that has complex data format using Hadoop by writing an input formatter:
http://mail-archives.apache.
org/mod_mbox/tomcat-users/
. We call the destination folder as DATA_DIR
.HADOOP_HOME
. If /data
is already there, clean it up:>bin/hadoopdfs -mkdir /data >bin/hadoopdfs -mkdir /data/input2 >bin/hadoopdfs -put <DATA_DIR>/*.mbox /data/input2
chapter6.zip
). We will call that folder CHAPTER_6_SRC
.hadoop.home
property in the CHAPTER_6_SRC/build.xml
file to point to your Hadoop installation folder.ant build
command from the CHAPTER_6_SRC
folder.build/lib/hadoop-cookbook-chapter6.jar
file to HADOOP_HOME
.HADOOP_HOME
:> bin/hadoop jar hadoop-cookbook-chapter6.jar chapter6.MLReceiveReplyProcessor/data/input2 /data/output6
>bin/hadoopdfs -cat /data/output6/*
As explained before, this dataset has data items that span multiple lines. Therefore, we have to write a custom data formatter to parse the data. You can find the source for the recipe from src/chapter6/WebLogMessageSizeAggregator.java
,src/chapter6/MboxFileFormat.java
,src/chapter6/MBoxFileReader.java
.
When the Hadoop job starts, it invokes the formatter to parse the input files. We add a new formatter via the main()
method as highlighted in the following code snippet:
Job job = new Job(conf, "LogProcessingHitsByLink");
job.setJarByClass(MLReceiveReplyProcessor.class);
job.setMapperClass(AMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(AReducer.class);
job.setInputFormatClass(MboxFileFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
As shown by the following code, the new formatter creates a record reader, which is used by Hadoop to read input keys and values:
public class MboxFileFormat extends FileInputFormat<Text, Text> { private MBoxFileReaderboxFileReader = null; public RecordReader<Text, Text>createRecordReader( InputSplitinputSplit, TaskAttemptContext attempt) throws IOException, InterruptedException { boxFileReader = new MBoxFileReader(); boxFileReader.initialize(inputSplit, attempt); return boxFileReader; } }
The following code snippet shows the record reader:
public class MBoxFileReader extends RecordReader<Text, Text> { public void initialize(InputSplitinputSplit, TaskAttemptContext attempt) throws IOException, InterruptedException { Path path = ((FileSplit) inputSplit).getPath(); FileSystem fs = FileSystem.get(attempt.getConfiguration()); FSDataInputStream fsStream = fs.open(path); reader = new BufferedReader( new InputStreamReader(fsStream)); } }
The initialize()
method reads the file from HDFS:
public Boolean nextKeyValue() throws IOException, InterruptedException { if (email == null) { return false; } count++; while ((line = reader.readLine()) != null) { Matcher matcher = pattern1.matcher(line); if (!matcher.matches()) { email.append(line).append(" "); } else { parseEmail(email.toString()); email = new StringBuffer(); email.append(line).append(" "); return true; } } parseEmail(email.toString()); email = null; return true; }
Finally, the nextKeyValue()
method parses the file, and gives users access to the key and values for this dataset. Value has the from, subject, and date of each e-mail separated by a #
.
The following code snippet shows the map task source code:
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split("#"); String from = tokens[0]; String subject = tokens[1]; String date = tokens[2].replaceAll(",", ""); subject = subject.replaceAll("Re:", ""); context.write(new Text(subject), new Text(date + "#" + from)); }
The map task receives each line in the log file as a different key-value pair. It parses the lines by breaking it by the #
, and emits the subject
as the key, and date
and from
as the value.
Then, Hadoop collects all key-value pairs, sorts them, and then invokes the reducer once for each key. Since we use the e-mail subject as the key, each reducer will receive all the information about each e-mail thread. Then, each reducer walks through all the e-mails and finds out who sent the first e-mail and how many replies have been received by each e-mail thread.
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { TreeMap<Long, String>replyData = new TreeMap<Long, String>(); for (Text val : values) { String[] tokens = val.toString().split("#"); if(tokens.length != 2) { throw new IOException("Unexpected token "+ val.toString()); } String from = tokens[1]; Date date = dateFormatter.parse(tokens[0]); replyData.put(date.getTime(), from); } String owner = replyData.get(replyData.firstKey()); intreplyCount = replyData.size(); intselfReplies = 0; for(String from: replyData.values()) { if(owner.equals(from)) { selfReplies++; } } replyCount = replyCount - selfReplies; context.write(new Text(owner),new Text(replyCount+"#" + selfReplies)); }