In the previous recipe, we implemented a simple MapReduce job using the Java API of Hadoop. The use case was the same as the one in the recipes of Chapter 3, Programming Language Drivers, where we saw MapReduce implemented using Mongo client APIs in Python and Java. In this recipe, we will use Hadoop streaming to implement MapReduce jobs.
The concept of streaming works based on communication using stdin and stdout. Get more information on what Hadoop streaming is and how it works at http://hadoop.apache.org/docs/r1.2.1/streaming.html.
Refer to the Executing our first sample MapReduce job using the mongo-hadoop connector recipe to see how to set up Hadoop for development purposes and build the mongo-hadoop
project using gradle
. As far as Python libraries are concerned, we will install the required library from source. However, you can use pip
to carry out the setup if you do not wish to build from source. We will also see how to set up pymongo-hadoop
using pip
.
Refer to the Installing PyMongo recipe in Chapter 3, Programming Language Drivers, to see how to install PyMongo and pip
.
pymongo–hadoop
from source. With the project cloned to the local filesystem, execute the following commands from the root of the cloned project:$ cd streaming/language_support/python $ sudo python setup.py install
ymongo-hadoop
on your machine.pymongo-hadoop
from source. However, if you chose to not build from source, you could execute the following command from the operating system shell:$ sudo pip install pymongo_hadoop
pymongo-hadoop
in either way, we will now implement our mapper
and reducer
functions in Python.mapper
function is as follows:#!/usr/bin/env python import sys from pymongo_hadoop import BSONMapper def mapper(documents): print >> sys.stderr, 'Starting mapper' for doc in documents: yield {'_id' : doc['state'], 'count' : 1} print >> sys.stderr, 'Mapper completed' BSONMapper(mapper)
reducer
function is as follows:#!/usr/bin/env python import sys from pymongo_hadoop import BSONReducer def reducer(key, documents): print >> sys.stderr, 'Invoked reducer for key "', key, '"' count = 0 for doc in documents: count += 1 return {'_id' : key, 'count' : count} BSONReducer(reducer)
$HADOOP_HOME
and $HADOOP_CONNECTOR_HOME
environment variables should point to the base directory of Hadoop and the base directory of the mongo-hadoop
connector project, respectively. Now, we will invoke the MapReduce function using the following command from the operating system shell. The code available on the book's website has the mapper
and reducer
Python script and a shell script that will be used to invoke mapper
and reducer
:$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming* -libjars $HADOOP_CONNECTOR_HOME/streaming/build/libs/mongo-hadoop-streaming-1.2.1-SNAPSHOT-hadoop_2.4.jar -input /tmp/in -output /tmp/out -inputformat com.mongodb.hadoop.mapred.MongoInputFormat -outputformat com.mongodb.hadoop.mapred.MongoOutputFormat -io mongodb -jobconf mongo.input.uri=mongodb://127.0.0.1:27017/test.postalCodes -jobconf mongo.output.uri=mongodb://127.0.0.1:27017/test.pyMRStreamTest -jobconf stream.io.identifier.resolver.class=com.mongodb.hadoop.streaming.io.MongoIdentifierResolver -mapper mapper.py -reducer reducer.py
mapper.py
and reducer.py
files are present in the current directory when executing this command.$ mongo > db.pyMRStreamTest.find().sort({count:-1}).limit(5)
Let's look at steps 5 and 6 where we wrote the mapper
and reducer
functions. We defined a map
function that accepts a list of all the documents. We iterated through these yield documents where the _id
field is the name of the key, and the value
field count has the value 1. The number of documents yielded will be the same as the total number of input documents.
Finally, we instantiated BSONMapper
, which accepts the mapper
function as the parameter. The function returned a generator object, which is then used by this BSONMapper
class to feed the value to the MapReduce framework. All we need to remember is that that the mapper
function needs to return a generator (which is returned as we call yield
in the loop) and then instantiate the BSONMapper
class, which is provided to us by the pymongo_hadoop
module. Those intrigued enough might choose to look at the source code under the project cloned on your local filesystem in the streaming/language_support/python/pymongo_hadoop/mapper.py
file and see what it does. It is a small piece of code that is simple to understand.
For the reducer
function, we got the key and a list of documents for this key as the value. The key is the same as the value of the _id
field emitted from the document in the map
function. We simply returned a new document here with _id
as the name of the state and count
as the number of documents for this state. Remember that, here, we return a document and have not emitted one as we did in the map
function. Again, finally, we instantiated BSONReducer
and passed it to the reducer
function. The source code under the project cloned on our local filesystem is in the streaming/language_support/python/pymongo_hadoop/reducer.py
file, which has the implementation of the BSONReducer
class file.
We finally invoked the command from the shell to initiate the MapReduce job that uses streaming. A few things to note here are that we need two JAR files: one in the share/hadoop/tools/lib
directory of the Hadoop distribution and one in the mongo-hadoop
connector, which is present in the streaming/build/libs/
directory. The input and output formats are com.mongodb.hadoop.mapred.MongoInputFormat
and com.mongodb.hadoop.mapred.MongoOutputFormat
, respectively.
As we saw earlier, sysout and sysin form the backbone of streaming. So, basically, we need to encode our BSON objects to write to sysout; then, we should be able to read sysin to convert the content to BSON objects again. For this purpose, the mongo-hadoop
connector provided us with two framework classes, com.mongodb.hadoop.streaming.io.MongoInputWriter
and com.mongodb.hadoop.streaming.io.MongoOutputReader
, to encode and decode from and to BSON objects, respectively. These classes extend from org.apache.hadoop.streaming.io.InputWriter
and org.apache.hadoop.streaming.io.OutputReader
.
The value of the stream.io.identifier.resolver.class
property is given as com.mongodb.hadoop.streaming.io.MongoIdentifierResolver
. This class extends from org.apache.hadoop.streaming.io.IdentifierResolver
and gives us a chance to register our implementations of org.apache.hadoop.streaming.io.InputWriter
and org.apache.hadoop.streaming.io.OutputReader
with the framework. We also registered the output key and output value class using our custom IdentifierResolver
. Just remember to use this resolver always if you are using streaming using the mongo-hadoop
connector.
Finally, we gave the mapper
and the reducer
Python functions, which we discussed earlier. An important thing to remember is, do not print out logs to sysout from the mapper
and reducer
functions. The sysout and sysin are the means of communication, and writing logs to them can yield undesirable behavior. As we saw in the example, write to
standard error (stderr) or, alternatively, write to a logfile.
When using a multiline command in Unix, you can continue the command on the next line using . However, remember that there should be no spaces after
.