Data preprocessing is an important and often required component in data analytics. Data preprocessing becomes even more important when consuming unstructured text data generated from multiple sources. Data preprocessing steps include operations such as cleaning the data, extracting important features from data, removing duplicate items from the datasets, converting data formats, and many more.
Hadoop MapReduce provides an ideal environment to perform these tasks in parallel with massive datasets. Apart from the ability to implement Java MapReduce programs, Pig, and Hive scripts to preprocess these data, Hadoop also provides several useful tools and features that we can utilize to perform these data preprocessing operations. One such feature is the support of different InputFormat
classes, providing us with the ability to support proprietary data formats by implementing custom InputFormat
classes. Another feature is the Hadoop Streaming feature, which allows us to use our favorite scripting languages to perform the actual data cleansing and extraction, while Hadoop parallelizes the computation to hundreds of compute and storage resources.
In this recipe, we are going to use Hadoop Streaming with a Python script-based mapper to perform data extraction and format conversion.
Install and deploy Hadoop MapReduce and HDFS. Export the HADOOP_HOME
environment variable to point to your Hadoop installation root folder.
Install Python on your Hadoop compute nodes, if Python is not already installed.
The following steps show you how to clean and extract data from the 20news
dataset and store the data as a tab-separated value (TSV) file:
20news
dataset from http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz.>bin/hadoop fs -mkdir 20news-all >bin/Hadoop fs –put <extracted_folder> 20news-all
MailPreProcessor.py
Python script to $HADOOP_HOME
.>bin/hadoop jar ../contrib/streaming/hadoop-streaming-VERSION.jar -input 20news-all -output 20news-cleaned -mapper MailPreProcessor.py -file MailPreProcessor.py
>bin/hadoopd fs –cat 20news-cleaned/part-00000
Hadoop uses the default TextInputFormat
class as the input specification for the preceding computation. Usage of the TextInputFormat
class generates a map task for each file in the input dataset and generates a map input record for each line. Hadoop Streaming provides the input to the map application through the standard input.
line = sys.stdin.readline(); while line: …. if (doneHeaders): list.append( line ) elif line.find( "Message-ID:" ) != -1: messageID = line[ len("Message-ID:"):] …. elif line == "": doneHeaders = True line = sys.stdin.readline();
The previous Python code reads the input lines from the standard input until the end of file is reached. We parse the headers of the news group file till we encounter the empty line demarcating the headers from the message contents. The message content will be read into a list line by line.
value = ' '.join( list ) value = fromAddress + " " ……" " + value print '%s %s' % (messageID, value)
The preceding code segment merges the message content to a single string and constructs the output value of the Streaming application as a tab-delimited set of selected headers followed by the message content. The output key value is the Message-ID
header extracted from the input file. The output is written to the standard output by using a tab to delimit the key and the value.
We can generate the output of the preceding computation in the Hadoop SequenceFile
format by specifying SequenceFileOutputFormat
as the OutputFormat
class of the Streaming computations.
>bin/hadoop jar ../contrib/streaming/hadoop-streaming-1.0.4.jar -input 20news-all -output 20news-seq -mapper MailPreProcessor.py -outputformat org.apache.hadoop.mapred.SequenceFileOutputFormat -file MailPreProcessor.py
It is a good practice to store the data as SequenceFiles
after the first pass of the input data, as SequenceFiles
take less space and support compression. You can use bin/hadoopdfs -text <path to sequencefile>
to dump the contents of a SequenceFile
format to text.
>bin/hadoop dfs –text 20news-seq/part-00000
However, for the preceding command to work, any writable classes that are used in the SequenceFile
format should be available in the Hadoop classpath.