Often, the datasets contain duplicate items that need to be eliminated to ensure the accuracy of the results. In this recipe, we use Hadoop to remove the duplicate mail records in the 20news
dataset. These duplicate records are due to the user's cross-posting the same message to multiple news boards.
Install and deploy Hadoop MapReduce and HDFS. Export the HADOOP_HOME
environment variable to point to your Hadoop installation root folder.
Install Python on your Hadoop compute nodes, if Python is not already installed.
The following steps show how to remove duplicate mails, due to cross-posting across the lists, from the 20news
dataset:
20news
dataset from http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz.>bin/hadoop fs -mkdir 20news-all >bin/hadoop fs –put <extracted_folder> 20news-all
MailPreProcessor.py
Python script from the previous recipe, Data extract, cleaning and format conversion using Hadoop Streaming as the mapper. Extract the resource package for this chapter and copy the MailPreProcessor.py
and the MailPreProcessorReduce.py
Python scripts to the $HADOOP_HOME
folder.>bin/hadoop jar ../contrib/streaming/hadoop-streaming-1.0.4.jar -input 20news-all -output 20news-dedup -mapper MailPreProcessor.py -reducer MailPreProcessorReduce.py -file MailPreProcessor.py -file MailPreProcessorReduce.py
>bin/hadoop dfs –cat 20news-dedup/part-00000
Mapper Python script outputs the message ID as the key. We use the message ID to identify the duplicated messages that are a result of cross-posting across different newsgroups.
Hadoop Streaming provides the Reducer
input records of the each key group line by line to the Streaming reducer application through the standard input. However, Hadoop Streaming does not have a mechanism to distinguish when it starts to feed records of a new key to the process. The Streaming reducer applications need to keep track of the input key to identify new groups. Since we output the mapper results using the MessageID
header as the key, the Reducer input gets grouped by MessageID
. Any group with more than one value (that is, message) per MessageID contains duplicates.
#!/usr/bin/env python import sys; currentKey = "" for line in sys.stdin: line = line.strip() key, value = line.split(' ',1) if currentKey == key : continue print '%s %s' % (key, value)
In the previous script, we use only the first value (message) of the record group and discard the others, which are the duplicate