Data preprocessing (extract, clean, and format conversion) using Hadoop Streaming and Python

Data preprocessing is an important and often required component in data analytics. Data preprocessing becomes even more important when consuming unstructured text data generated from multiple sources. Data preprocessing steps include operations such as cleaning the data, extracting important features from data, removing duplicate items from the datasets, converting data formats, and many more.

Hadoop MapReduce provides an ideal environment to perform these tasks in parallel with massive datasets. Apart from the ability to implement Java MapReduce programs, Pig, and Hive scripts to preprocess these data, Hadoop also provides several useful tools and features that we can utilize to perform these data preprocessing operations. One such feature is the support of different InputFormat classes, providing us with the ability to support proprietary data formats by implementing custom InputFormat classes. Another feature is the Hadoop Streaming feature, which allows us to use our favorite scripting languages to perform the actual data cleansing and extraction, while Hadoop parallelizes the computation to hundreds of compute and storage resources.

In this recipe, we are going to use Hadoop Streaming with a Python script-based mapper to perform data extraction and format conversion.

Getting ready

Install and deploy Hadoop MapReduce and HDFS. Export the HADOOP_HOME environment variable to point to your Hadoop installation root folder.

Install Python on your Hadoop compute nodes, if Python is not already installed.

How to do it...

The following steps show you how to clean and extract data from the 20news dataset and store the data as a tab-separated value (TSV) file:

  1. Download and extract the 20news dataset from http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz.
  2. Upload the extracted data to the HDFS. In order to preserve the compute time and resources, you can use only a subset of the dataset; use the following command to upload the extracted data to the HDFS:
    >bin/hadoop fs -mkdir 20news-all
    >bin/Hadoop fs –put  <extracted_folder> 20news-all
    
  3. Extract the resource package for this chapter and copy the MailPreProcessor.py Python script to $HADOOP_HOME.
  4. Run the following Hadoop Streaming command:
    >bin/hadoop jar 
      ../contrib/streaming/hadoop-streaming-VERSION.jar 
      -input 20news-all
      -output 20news-cleaned
      -mapper MailPreProcessor.py 
      -file MailPreProcessor.py
    
  5. Inspect the results by using the following command:
    >bin/hadoopd fs –cat 20news-cleaned/part-00000
    

How it works...

Hadoop uses the default TextInputFormat class as the input specification for the preceding computation. Usage of the TextInputFormat class generates a map task for each file in the input dataset and generates a map input record for each line. Hadoop Streaming provides the input to the map application through the standard input.

line =  sys.stdin.readline();
while line:
….
  if (doneHeaders):
    list.append( line )
  elif line.find( "Message-ID:" ) != -1:
    messageID = line[ len("Message-ID:"):]
  ….
  elif line == "":
    doneHeaders = True

line = sys.stdin.readline();

The previous Python code reads the input lines from the standard input until the end of file is reached. We parse the headers of the news group file till we encounter the empty line demarcating the headers from the message contents. The message content will be read into a list line by line.

value = ' '.join( list )
value = fromAddress + "	" ……"	" + value
print '%s	%s' % (messageID, value)

The preceding code segment merges the message content to a single string and constructs the output value of the Streaming application as a tab-delimited set of selected headers followed by the message content. The output key value is the Message-ID header extracted from the input file. The output is written to the standard output by using a tab to delimit the key and the value.

There's more...

We can generate the output of the preceding computation in the Hadoop SequenceFile format by specifying SequenceFileOutputFormat as the OutputFormat class of the Streaming computations.

>bin/hadoop jar 
  ../contrib/streaming/hadoop-streaming-1.0.4.jar
  -input 20news-all 
  -output 20news-seq 
  -mapper MailPreProcessor.py 
  -outputformat 
  org.apache.hadoop.mapred.SequenceFileOutputFormat 
  -file MailPreProcessor.py

It is a good practice to store the data as SequenceFiles after the first pass of the input data, as SequenceFiles take less space and support compression. You can use bin/hadoopdfs -text <path to sequencefile> to dump the contents of a SequenceFile format to text.

>bin/hadoop dfs –text 20news-seq/part-00000

However, for the preceding command to work, any writable classes that are used in the SequenceFile format should be available in the Hadoop classpath.

See also

  • Using Hadoop with legacy applications – Hadoop Streaming in Chapter 4, Developing Complex Hadoop MapReduce Applications.
  • Adding support for new input data formats – implementing a custom InputFormat in Chapter 4, Developing Complex Hadoop MapReduce Applications.
  • More information on Hadoop Streaming can be found at http://hadoop.apache.org/docs/r1.0.4/streaming.html.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset