Running MapReduce jobs on Hadoop using streaming

In the previous recipe, we implemented a simple MapReduce job using the Java API of Hadoop. The use case was the same as the one in the recipes of Chapter 3, Programming Language Drivers, where we saw MapReduce implemented using Mongo client APIs in Python and Java. In this recipe, we will use Hadoop streaming to implement MapReduce jobs.

The concept of streaming works based on communication using stdin and stdout. Get more information on what Hadoop streaming is and how it works at http://hadoop.apache.org/docs/r1.2.1/streaming.html.

Getting ready

Refer to the Executing our first sample MapReduce job using the mongo-hadoop connector recipe to see how to set up Hadoop for development purposes and build the mongo-hadoop project using gradle. As far as Python libraries are concerned, we will install the required library from source. However, you can use pip to carry out the setup if you do not wish to build from source. We will also see how to set up pymongo-hadoop using pip.

Refer to the Installing PyMongo recipe in Chapter 3, Programming Language Drivers, to see how to install PyMongo and pip.

How it works…

  1. We will first build pymongo–hadoop from source. With the project cloned to the local filesystem, execute the following commands from the root of the cloned project:
    $ cd streaming/language_support/python
    $ sudo python setup.py install
    
  2. After you enter the password, setup will continue to install pymongo-hadoop on your machine.
  3. That is all we need to do to build pymongo-hadoop from source. However, if you chose to not build from source, you could execute the following command from the operating system shell:
    $ sudo pip install pymongo_hadoop
    
  4. After installing pymongo-hadoop in either way, we will now implement our mapper and reducer functions in Python.
  5. The mapper function is as follows:
    #!/usr/bin/env python
    
    import sys
    from pymongo_hadoop import BSONMapper
    def mapper(documents):
      print >> sys.stderr, 'Starting mapper'
      for doc in documents:
        yield {'_id' : doc['state'], 'count' : 1}
      print >> sys.stderr, 'Mapper completed'
    
    BSONMapper(mapper)
  6. The reducer function is as follows:
    #!/usr/bin/env python
    
    import sys
    from pymongo_hadoop import BSONReducer
    def reducer(key, documents):
      print >> sys.stderr, 'Invoked reducer for key "', key, '"'
      count = 0
      for doc in documents:
        count += 1
      return {'_id' : key, 'count' : count}
    
    
    BSONReducer(reducer)
  7. The $HADOOP_HOME and $HADOOP_CONNECTOR_HOME environment variables should point to the base directory of Hadoop and the base directory of the mongo-hadoop connector project, respectively. Now, we will invoke the MapReduce function using the following command from the operating system shell. The code available on the book's website has the mapper and reducer Python script and a shell script that will be used to invoke mapper and reducer:
    $HADOOP_HOME/bin/hadoop jar 
    $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming* 
    -libjars $HADOOP_CONNECTOR_HOME/streaming/build/libs/mongo-hadoop-streaming-1.2.1-SNAPSHOT-hadoop_2.4.jar 
    -input /tmp/in 
    -output /tmp/out 
    -inputformat com.mongodb.hadoop.mapred.MongoInputFormat 
    -outputformat com.mongodb.hadoop.mapred.MongoOutputFormat 
    -io mongodb 
    -jobconf mongo.input.uri=mongodb://127.0.0.1:27017/test.postalCodes 
    -jobconf mongo.output.uri=mongodb://127.0.0.1:27017/test.pyMRStreamTest 
    -jobconf stream.io.identifier.resolver.class=com.mongodb.hadoop.streaming.io.MongoIdentifierResolver 
    -mapper mapper.py 
    -reducer reducer.py
    
  8. The mapper.py and reducer.py files are present in the current directory when executing this command.
  9. On executing the command, which should take some time for successful execution of the MapReduce job, open the Mongo shell by typing the following command in the operating system command prompt and execute the following query from the shell:
    $ mongo
    > db.pyMRStreamTest.find().sort({count:-1}).limit(5)
    
  10. Compare the output with the ones we got earlier when we executed the MapReduce jobs using Mongo's MapReduce framework in Chapter 3, Programming Language Drivers.

How to do it…

Let's look at steps 5 and 6 where we wrote the mapper and reducer functions. We defined a map function that accepts a list of all the documents. We iterated through these yield documents where the _id field is the name of the key, and the value field count has the value 1. The number of documents yielded will be the same as the total number of input documents.

Finally, we instantiated BSONMapper, which accepts the mapper function as the parameter. The function returned a generator object, which is then used by this BSONMapper class to feed the value to the MapReduce framework. All we need to remember is that that the mapper function needs to return a generator (which is returned as we call yield in the loop) and then instantiate the BSONMapper class, which is provided to us by the pymongo_hadoop module. Those intrigued enough might choose to look at the source code under the project cloned on your local filesystem in the streaming/language_support/python/pymongo_hadoop/mapper.py file and see what it does. It is a small piece of code that is simple to understand.

For the reducer function, we got the key and a list of documents for this key as the value. The key is the same as the value of the _id field emitted from the document in the map function. We simply returned a new document here with _id as the name of the state and count as the number of documents for this state. Remember that, here, we return a document and have not emitted one as we did in the map function. Again, finally, we instantiated BSONReducer and passed it to the reducer function. The source code under the project cloned on our local filesystem is in the streaming/language_support/python/pymongo_hadoop/reducer.py file, which has the implementation of the BSONReducer class file.

We finally invoked the command from the shell to initiate the MapReduce job that uses streaming. A few things to note here are that we need two JAR files: one in the share/hadoop/tools/lib directory of the Hadoop distribution and one in the mongo-hadoop connector, which is present in the streaming/build/libs/ directory. The input and output formats are com.mongodb.hadoop.mapred.MongoInputFormat and com.mongodb.hadoop.mapred.MongoOutputFormat, respectively.

As we saw earlier, sysout and sysin form the backbone of streaming. So, basically, we need to encode our BSON objects to write to sysout; then, we should be able to read sysin to convert the content to BSON objects again. For this purpose, the mongo-hadoop connector provided us with two framework classes, com.mongodb.hadoop.streaming.io.MongoInputWriter and com.mongodb.hadoop.streaming.io.MongoOutputReader, to encode and decode from and to BSON objects, respectively. These classes extend from org.apache.hadoop.streaming.io.InputWriter and org.apache.hadoop.streaming.io.OutputReader.

The value of the stream.io.identifier.resolver.class property is given as com.mongodb.hadoop.streaming.io.MongoIdentifierResolver. This class extends from org.apache.hadoop.streaming.io.IdentifierResolver and gives us a chance to register our implementations of org.apache.hadoop.streaming.io.InputWriter and org.apache.hadoop.streaming.io.OutputReader with the framework. We also registered the output key and output value class using our custom IdentifierResolver. Just remember to use this resolver always if you are using streaming using the mongo-hadoop connector.

Finally, we gave the mapper and the reducer Python functions, which we discussed earlier. An important thing to remember is, do not print out logs to sysout from the mapper and reducer functions. The sysout and sysin are the means of communication, and writing logs to them can yield undesirable behavior. As we saw in the example, write to standard error (stderr) or, alternatively, write to a logfile.

When using a multiline command in Unix, you can continue the command on the next line using . However, remember that there should be no spaces after .

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset