Data de-duplication using Hadoop Streaming

Often, the datasets contain duplicate items that need to be eliminated to ensure the accuracy of the results. In this recipe, we use Hadoop to remove the duplicate mail records in the 20news dataset. These duplicate records are due to the user's cross-posting the same message to multiple news boards.

Getting ready

Install and deploy Hadoop MapReduce and HDFS. Export the HADOOP_HOME environment variable to point to your Hadoop installation root folder.

Install Python on your Hadoop compute nodes, if Python is not already installed.

How to do it...

The following steps show how to remove duplicate mails, due to cross-posting across the lists, from the 20news dataset:

  1. Download and extract the 20news dataset from http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz.
  2. Upload the extracted data to the HDFS. In order to preserve the compute time and resources, you may use only a subset of the dataset:
    >bin/hadoop fs -mkdir 20news-all
    >bin/hadoop fs –put  <extracted_folder> 20news-all
    
  3. We are going to use the MailPreProcessor.py Python script from the previous recipe, Data extract, cleaning and format conversion using Hadoop Streaming as the mapper. Extract the resource package for this chapter and copy the MailPreProcessor.py and the MailPreProcessorReduce.py Python scripts to the $HADOOP_HOME folder.
  4. Execute the following command:
    >bin/hadoop jar 
      ../contrib/streaming/hadoop-streaming-1.0.4.jar 
      -input
    20news-all
      -output 20news-dedup
      -mapper MailPreProcessor.py 
      -reducer MailPreProcessorReduce.py 
      -file MailPreProcessor.py
      -file MailPreProcessorReduce.py
    
  5. Inspect the results using the following command:
    >bin/hadoop dfs –cat 20news-dedup/part-00000
    

How it works...

Mapper Python script outputs the message ID as the key. We use the message ID to identify the duplicated messages that are a result of cross-posting across different newsgroups.

Hadoop Streaming provides the Reducer input records of the each key group line by line to the Streaming reducer application through the standard input. However, Hadoop Streaming does not have a mechanism to distinguish when it starts to feed records of a new key to the process. The Streaming reducer applications need to keep track of the input key to identify new groups. Since we output the mapper results using the MessageID header as the key, the Reducer input gets grouped by MessageID. Any group with more than one value (that is, message) per MessageID contains duplicates.

#!/usr/bin/env python
import sys;

currentKey = ""

for line in sys.stdin:
  line = line.strip()
  key, value = line.split('	',1)
  if currentKey == key :
    continue
  print '%s	%s' % (key, value)

In the previous script, we use only the first value (message) of the record group and discard the others, which are the duplicate

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset