In this recipe, we will write our first MapReduce job using the Hadoop MapReduce API and run it using the mongo-hadoop
connector that gets the data from MongoDB. Refer to the MapReduce in Mongo using a Java client recipe in Chapter 3, Programming Language Drivers, to see how MapReduce is implemented using a Java client, how to create test data and problem statements.
Refer to the previous recipe to set up the mongo-hadoop
connector. The prerequisites of the Executing our first sample MapReduce job using the mongo-hadoop connector recipe (which is present in this chapter) and the MapReduce in Mongo using a Java client recipe in Chapter 3, Programming Language Drivers, are all we need for this recipe. This is a Maven project; thus, Maven needs to be set up and installed. Refer to the Connecting to a single node from a Java client recipe in Chapter 1, Installing and Starting the MongoDB Server, where we gave the steps to set up Maven in Windows. However, this project is built on Ubuntu Linux, and the following is the command you need to execute from the operating system shell to get Maven:
$ sudo apt-get install maven
mongo-hadoop-mapreduce-test
project, which can be downloaded from the book's website. The project is targeted at achieving the same use case that we achieved in the recipes in Chapter 3, Programming Language Drivers, where we used MongoDB's MapReduce framework. We had invoked that MapReduce job using the Python and Java clients on earlier occasions.pom.xml
file is present, execute the following command:$ mvn clean package
mongo-hadoop-mapreduce-test-1.0.jar
file will be built and kept in the target directory.postalCodes
collection, execute the following command with the current directory still in the root of the mongo-hadoop-mapreduce-test
project we just built:~/hadoop-binaries/hadoop-2.4.0/bin/hadoop jar target/mongo-hadoop-mapreduce-test-1.0.jar com.packtpub.mongo.cookbook.TopStateMapReduceEntrypoint -Dmongo.input.split_size=8 -Dmongo.job.verbose=true -Dmongo.input.uri=mongodb://localhost:27017/test.postalCodes -Dmongo.output.uri=mongodb://localhost:27017/test.postalCodesHadoopmrOut
$ mongo > db.postalCodesHadoopmrOut.find().sort({count:-1}).limit(5)
We have kept the classes very simple and with the fewest possible requirements. We just have three classes in our project: TopStateMapReduceEntrypoint
, TopStateReducer
, and TopStatesMapper
. All these classes are in the same package called com.packtpub.mongo.cookbook
. The map
function of the mapper
class just writes a key-value pair to the context; here, the key is the name of the state, and the value is an integer value 1. The following line of code is from the Mapper
function:
context.write(new Text((String)value.get("state")), new IntWritable(1));
What the reducer
gets is the same key that is a list of states and an Iterable
of integer value 1. All that we do is write to the context the same name of the state and the sum of the iterables. Now, since there is no size method in the Iterable
that can give the count in constant time, we are left with adding up the ones we get in linear time. The following is the code snippet in the Reducer
method:
int sum = 0; for(IntWritable value : values) { sum += value.get(); } BSONObject object = new BasicBSONObject(); object.put("count", sum); context.write(text, new BSONWritable(object));
We write to context the text string that is the key and the value that is a JSON document that contains the count. The mongo-hadoop
connector is then responsible for writing to the output collection we have, that is, postalCodesHadoopmrOut
. The document has the _id
field whose value is same as the key emitted from the mapper. Thus, when we execute the following query, we will get the top five states with the greatest number of cities in our database:
> db. postalCodesHadoopmrOut.find().sort({count:-1}).limit(5) { "_id" : "Maharashtra", "count" : 6446 } { "_id" : "Kerala", "count" : 4684 } { "_id" : "Tamil Nadu", "count" : 3784 } { "_id" : "Andhra Pradesh", "count" : 3550 } { "_id" : "Karnataka", "count" : 3204 }
Finally, the main
method of the main entry point class is as follows:
Configuration conf = new Configuration(); MongoConfig config = new MongoConfig(conf); config.setInputFormat(MongoInputFormat.class); config.setMapperOutputKey(Text.class); config.setMapperOutputValue(IntWritable.class); config.setMapper(TopStatesMapper.class); config.setOutputFormat(MongoOutputFormat.class); config.setOutputKey(Text.class); config.setOutputValue(BSONWritable.class); config.setReducer(TopStateReducer.class); ToolRunner.run(conf, new TopStateMapReduceEntrypoint(), args);
All that we do is wrap the org.apache.hadoop.conf.Configuration
object with the com.mongodb.hadoop.MongoConfig
instance to set various properties and then submit the MapReduce job for execution using ToolRunner
.
In this recipe, we executed a simple MapReduce job on Hadoop using the Hadoop API, sourcing the data from MongoDB, and writing it to the MongoDB collection. What if we want to write the map
and reduce
functions in a different language? Fortunately, this is possible using a concept called Hadoop streaming, where stdout is used as a means to communicate between the program and the Hadoop MapReduce framework. In the next recipe, we will demonstrate how to use Python to implement the same use case as the one in this recipe using Hadoop streaming.