In this recipe, we will see how to build the mongo-hadoop connector from the source and set up Hadoop just for the purpose of running the examples in the standalone mode. The connector is the backbone that runs Hadoop MapReduce jobs on Hadoop using the data in Mongo.
There are various distributions of Hadoop; however, we will use Apache Hadoop (http://hadoop.apache.org/). The installation will be done on Ubuntu Linux. Apache Hadoop always runs on the Linux environment for production, and Windows is not tested for production systems. For development purposes, Windows can be used. If you are a Windows user, I would recommend that you install a virtualization environment such as VirtualBox (https://www.virtualbox.org/), set up a Linux environment, and then install Hadoop on it. Setting up VirtualBox and Linux on it is not shown in this recipe, but this is not a tedious task. The prerequisite for this recipe is a machine with the Linux operating system on it and an Internet connection. The version that we will set up here is 2.4.0 of Apache Hadoop. At the time of writing of this book, the latest version of Apache Hadoop, which is supported by the mongo-hadoop connector, is 2.4.0.
A Git client is needed to clone the repository of the mongo-hadoop connector to the local filesystem. Refer to http://git-scm.com/book/en/Getting-Started-Installing-Git to install Git.
You will also need MongoDB to be installed on your operating system. Refer to http://docs.mongodb.org/manual/installation/ and install it accordingly. Start the mongod
instance listening to port 27017
. It is not expected for you to be an expert in Hadoop but some familiarity with it will be helpful. Knowing the concept of MapReduce is important and knowing the Hadoop MapReduce API will be an advantage. In this recipe, we will explain what is needed to get the work done. You can get more details on Hadoop and its MapReduce API from other sources. The wiki page at http://en.wikipedia.org/wiki/MapReduce gives some good information about the MapReduce programming.
$ javac –version
$ sudo apt-get install default-jdk
This is all we need to do to install Java.
.tar.gz
file is downloaded, execute the following on the command prompt:$ tar –xvzf <name of the downloaded .tar.gz file> $ cd <extracted directory>
Open the etc/hadoop/hadoop-env.sh
file and replace export JAVA_HOME = ${JAVA_HOME}
with export JAVA_HOME = /usr/lib/jvm/default-java
.
We will now get the mongo-hadoop connector code from GitHub on our local filesystem. Note that you don't need a GitHub account to clone a repository. Clone the Git project from the operating system command prompt as follows:
$git clone https://github.com/mongodb/mongo-hadoop.git $cd mongo-hadoop
$ln –s <hadoop installation directory> ~/hadoop-binaries
For example, if Hadoop is extracted/installed in the home directory, then this is the command to be executed:
$ln –s ~/hadoop-2.4.0 ~/hadoop-binaries
By default, the mongo-hadoop connector will look for a Hadoop distribution under the ~/hadoop-binaries
folder. So, even if the Hadoop archive is extracted elsewhere, we can create a soft link to it. Once this link has been created, we should have the Hadoop binaries in the ~/hadoop-binaries/hadoop-2.4.0/bin
path.
-Phadoop_version
parameter can be left out as 2.4 is the latest.$./gradlew jar –Phadoop_version='2.4'
This build process will take some time to get completed.
treasuryYield
sample provided with the mongo-hadoop connector project. The first activity is to import the data to a collection in Mongo.mongod
instance is up and running and listening to port 27017
for connections and the current directory is the root of the mongo-hadoop connector code base, execute the following command:$ mongoimport -c yield_historical.in -d mongo_hadoop --drop examples/treasury_yield/src/main/resources/yield_historical_in.json
lib
directory. Execute the following in the operating system shell:$ wget http://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/2.12.0/mongo-java-driver-2.12.0.jar $ cp core/build/libs/mongo-hadoop-core-1.2.1-SNAPSHOT-hadoop_2.4.jar ~/hadoop-binaries/hadoop-2.4.0/lib/ $ mv mongo-java-driver-2.12.0.jar ~/hadoop-binaries/hadoop-2.4.0/lib
The JAR built for the mongo-hadoop core to be copied was named as shown in the preceding section for the trunk version of the code and built for Hadoop-2.4.0. Change the name of the JAR accordingly when you build it yourself for a different version of the connector and Hadoop. The Mongo driver can be the latest version. Version 2.12.0 is the latest version at the time of writing of this book.
~/hadoop-binaries/hadoop-2.4.0/bin/hadoop jar examples/treasury_yield/build/libs/treasury_yield-1.2.1-SNAPSHOT-hadoop_2.4.jar com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig -Dmongo.input.split_size=8 -Dmongo.job.verbose=true -Dmongo.input.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.in -Dmongo.output.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.out
14/05/11 21:38:54 INFO mapreduce.Job: Job job_local1226390512_0001 completed successfully
mongod
instance running on localhost from the mongo client and execute a find on the following collection:$ mongo > use mongo_hadoop switched to db mongo_hadoop > db.yield_historical.out.find()
Installing Hadoop is not a trivial task and we don't need to get into this to try our samples for the hadoop-mongo connector. To learn about Hadoop, its installation, and other things, there are dedicated books and articles available. For the purpose of this chapter, we will simply download the archive and extract and run the MapReduce jobs in the standalone mode. This is the quickest way to get going with Hadoop. All the steps up to step 6 are needed to install Hadoop. In the next couple of steps, we will clone the mongo-hadoop connector recipe. You can also download a stable version for your version of Hadoop at https://github.com/mongodb/mongo-hadoop/releases if you prefer not to build from the source. We then build the connector for our version of Hadoop (2.4.0) till step 13. Step 14 onward is what we will do to run the actual MapReduce job to work on the data in MongoDB. We imported the data to the yield_historical.in
collection, which will be used as an input for the MapReduce job. Go ahead and query the collection in the mongo shell using the mongo_hadoop
database to see a document. Don't worry if you don't understand the contents; we want to see what we intend to do with this data in this example.
The next step was to invoke the MapReduce operation on the data. The Hadoop command was executed giving one jar's path, (examples/treasury_yield/build/libs/treasury_yield-1.2.1-SNAPSHOT-hadoop_2.4.jar
). This is the jar that contains the classes implementing the sample MapReduce operation for the treasury yield. The com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig
class in this JAR file is the Bootstrap class containing the main method. We will visit this class soon. There are lots of configurations supported by the connector. The complete list of configurations can be found at https://github.com/mongodb/mongo-hadoop/. For now, we will just remember that mongo.input.uri
and mongo.output.uri
are the collections for input and output for the map reduce operations.
With the project cloned, you can now import it to any Java IDE of your choice. We are particularly interested in the project at /examples/treasury_yield
and core present in the root of the cloned repository.
Let's look at the com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig
class. This is the entry point for the MapReduce method and has a main method in it. To write MapReduce jobs for mongo using the mongo-hadoop connector, the main class always has to extend from com.mongodb.hadoop.util.MongoTool
. This class implements the org.apache.hadoop.Tool
interface, which has the run method and is implemented for us by the MongoTool
class. All that the main method needs to do is execute this class using the org.apache.hadoop.util.ToolRunner
class by invoking its static run
method passing the instance of our main class (which is an instance of Tool
).
There is a static block that loads some configurations from two XML files, hadoop-local.xml
and mongo-defaults.xml
. The format of these files (or any XML file) is as follows. The root node of the file is the configuration node with multiple property nodes under it:
<configuration> <property> <name>{property name}</name> <value>{property value}</value> </property> ... </configuration>
The property values that make sense in this context are all those that we mentioned in the URL provided earlier. We instantiate com.mongodb.hadoop.MongoConfig
wrapping an instance of org.apache.hadoop.conf.Configuration
in the constructor of the bootstrap class, TreasuryYieldXmlConfig
. The MongoConfig
class provides sensible defaults, which are enough to satisfy majority of the use cases. Some of the most important things that we need to set in the MongoConfig
instance are the output and input format, mapper
and reducer
classes, output key and value of the mapper, and output key and value of the reducer. The input format and output format will always be the com.mongodb.hadoop.MongoInputFormat
and com.mongodb.hadoop.MongoOutputFormat
classes, which are provided by the mongo-hadoop connector library. For the mapper and reducer output key and value, we have any of the org.apache.hadoop.io.Writable
implementations. Refer to the Hadoop documentation for different types of the Writable implementations in the org.apache.hadoop.io
package. Apart from these, the mongo-hadoop connector also provides us with some implementations in the com.mongodb.hadoop.io
package. For the treasury yield example, we used the BSONWritable
instance. These configurable values can either be provided in the XML file that we saw earlier or be programmatically set. Finally, we have the option to provide them as vm
arguments that we did for mongo.input.uri
and mongo.output.uri
. These parameters can be provided either in the XML or invoked directly from the code on the MongoConfig
instance; the two methods are setInputURI
and setOutputURI
, respectively.
We will now look at the mapper
and reducer
class implementations. We will copy the important portion of the class here in order to analyze. Refer to the cloned project for the entire implementation:
public class TreasuryYieldMapper extends Mapper<Object, BSONObject, IntWritable, DoubleWritable> { @Override public void map(final Object pKey, final BSONObject pValue, final Context pContext) throws IOException, InterruptedException { final int year = ((Date) pValue.get("_id")).getYear() + 1900; double bid10Year = ((Number) pValue.get("bc10Year")).doubleValue(); pContext.write(new IntWritable(year), new DoubleWritable(bid10Year)); } }
Our mapper extends the org.apache.hadoop.mapreduce.Mapper
class. The four generic parameters are the key class, type of the input value, type of the output key, and output value. The body of the map method reads the _id
value from the input document, which is the date, and extracts the year out of it. Then, it gets the double value from the document for the bc10Year
field and simply writes to the context key-value pair where key is the year and value of the double to the context key value pair. The implementation here doesn't rely on the value of the pKey
parameter passed, which can be used as the key instead of hardcoding the _id
value in the implementation. This value is basically the same field that would be set using the mongo.input.key
property in the XML or the MongoConfig.setInputKey
method. If none is set, _id
is the default value.
Let's look at the reducer implementation (with the logging statements removed):
public class TreasuryYieldReducer extends Reducer<IntWritable, DoubleWritable, IntWritable, BSONWritable> { @Override public void reduce(final IntWritable pKey, final Iterable<DoubleWritable> pValues, final Context pContext)throws IOException, InterruptedException { int count = 0; double sum = 0; for (final DoubleWritable value : pValues) { sum += value.get(); count++; } final double avg = sum / count; BasicBSONObject output = new BasicBSONObject(); output.put("count", count); output.put("avg", avg); output.put("sum", sum); pContext.write(pKey, new BSONWritable(output)); } }
This class extends from org.apache.hadoop.mapreduce.Reducer
and has four generic parameters: the input key, input value, output key, and output value. The input to the reducer is the output from the mapper, and thus, if you notice carefully, the type of the first two generic parameters is the same as the last two generic parameters of the mapper that we saw earlier. The third and fourth parameters are the type of the key and value emitted from the reduce. The type of the value is BSONDocument
and thus we have BSONWritable
as the type.
We now have the reduce method that has two parameters: the first one is the key, which is the same as the key emitted from the map function, and the second parameter is java.lang.Iterable
of the values emitted for the same key. This is how standard map reduce functions work. For instance, if the map function gave the following key value pairs, (1950, 10), (1960, 20), (1950, 20), (1950, 30), then reduce will be invoked with two unique keys, 1950 and 1960, and the values for the key 1950 will be Iterable
with (10, 20, 30), whereas that of 1960 will be Iterable
of a single element (20). The reducer's reduce function simply iterates though Iterable
of the doubles, finds the sum and count of these numbers, and writes one key value pair where the key is the same as the incoming key and the output value is BasicBSONObject
with the sum, count, and average for the computed values.
There are some good samples including the Enron dataset in the examples of the cloned mongo-hadoop connector. If you would like to play around a bit, I would recommend that you take a look at these example projects and run them.
What we saw here is a readymade sample that we executed. There is nothing like writing one MapReduce job ourselves to clear our understanding. In the next recipe, we will write one sample MapReduce job using the Hadoop API in Java and see it in action.
If you're wondering what the Writable
interface is all about and why you should not use plain old serialization instead, then refer to this URL that gives the explanation by the creator of Hadoop himself: http://www.mail-archive.com/[email protected]/msg00378.html.