Using Hadoop with legacy applications – Hadoop Streaming

Hadoop Streaming feature allows us to use any executable or a script as the mapper or the reducer of a Hadoop MapReduce job. Hadoop Streaming enables us to perform rapid prototyping of the MapReduce computations using Linux shell utility programs or using scripting languages. Hadoop Streaming also allows the users with some or no Java knowledge to utilize Hadoop to process data stored in HDFS.

In this recipe, we implement a mapper for our HTTP log processing application using Python and use a Hadoop aggregate package based reducer.

How to do it...

The following are the steps to use a Python program as the mapper to process the HTTP server log files.

  1. Write the logProcessor.py python script.
    #!/usr/bin/python
    import sys;
    import re;
    def main(argv):
      regex =re.compile('<regex to parse log entries>'),
      line = sys.stdin.readline();
      try:
        while line:
          fields = regex.match(line);
          if(fields!=None):
            print"LongValueSum:"+fields.group(1)+
                                    "	"+fields.group(7);
          line = sys.stdin.readline();
      except "end of file":
        return None
    
    if __name__ =="__main__":
      main(sys.argv)
  2. Use the following command from the Hadoop installation directory to execute the Streaming MapReduce computation.
    > bin/hadoop jar contrib/streaming/hadoop-streaming-1.0.2.jar -input indir -output outdir -mapper logProcessor.py -reducer aggregate -file logProcessor.py

How it works...

Each Map task launches the Hadoop Streaming executable as a separate process in the worker nodes. The input records (the entries or lines of the log file, not broken in to key value pairs) to the Mapper are provided as lines to the standard input of that process. The executable should read and process the records from the standard input until the end of the file is reached.

line = sys.stdin.readline();
  try:
    while line:
      ………
      line =sys.stdin.readline();
  except "end of file":
    return None

Hadoop Streaming collects the outputs of the executable from the standard output of the process. Hadoop Streaming converts each line of the standard output to a key-value pair, where the text up to the first tab character is considered the key and the rest of the line as the value. The logProcessor.py python script outputs the key-value pairs, according to this convention, as follows:

If (fields!=None):
      print "LongValueSum:"+fields.group(1)+ "	"+fields.group(7);

In our example, we use the Hadoop Aggregate package for the reduction part of our computation. Hadoop aggregate package provides reducer and combiner implementations for simple aggregate operations such as sum, max, unique value count, and histogram. When used with the Hadoop Streaming, the mapper outputs must specify the type of aggregation operation of the current computation as a prefix to the output key, which is the LongValueSum in our example.

Hadoop Streaming also supports the distribution of files to the worker nodes using the –file option. We can use this option to distribute executable files, scripts or any other auxiliary file needed for the Streaming computation. We can specify multiple –file options for a computation.

> bin/hadoop jar …… -mapper logProcessor.py -reducer aggregate -file logProcessor.py

There's more...

We can specify Java classes as the mapper and/or reducer and/or combiner programs of Hadoop Streaming computations. We can also specify InputFormat and other options to a Hadoop Streaming computation.

Hadoop Streaming also allows us to use Linux shell utility programs as mapper and reducer as well. The following example shows the usage of grep as the mapper of a Hadoop Streaming computation.

> bin/hadoop jar contrib/streaming/hadoop-streaming-1.0.2.jar –input indir -output outdir  -mapper 'grep "wiki"'

Hadoop streaming provides the reducer input records of the each key group line by line to the standard input of the process that is executing the executable. However, Hadoop Streaming does not have a mechanism to distinguish when it starts to feed records of a new key to the process. Hence, the scripts or the executables for reducer programs should keep track of the last seen key of the input records to demarcate between key groups.

Extensive documentation on Hadoop Streaming is available at http://hadoop.apache.org/mapreduce/docs/current/streaming.html.

See also

  • The Data extract, cleaning, and format conversion using Hadoop streaming and python and Data de-duplication using Hadoop streaming recipes in Chapter 7, Mass Data Processing.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset