In this recipe, we will see how to build the Mongo Hadoop connector from source and set up Hadoop just for the purpose of running examples in the standalone mode. The connector is the backbone that runs MapReduce jobs on Hadoop using the data in Mongo.
There are various distributions of Hadoop; however, we will use Apache Hadoop (http://hadoop.apache.org/). The installation will be done on a Linux-flavored OS, and I am using Ubuntu Linux. For production, Apache Hadoop always runs on a Linux environment; Windows is not tested for production systems. For development purposes, however, Windows can be used. If you are a Windows user, I would recommend that you install a virtualization environment such as VirtualBox (https://www.virtualbox.org/), set up a Linux environment, and then install Hadoop on it. Setting up VirtualBox and then setting up Linux on it is not shown in this recipe, but this is not a tedious task. The prerequisite for this recipe is a machine with the Linux operating system on it and an Internet connection. The version we set up here is 2.4.0 of Apache Hadoop. At the time of writing this book, the latest version of Apache Hadoop and that supported by the mongo-hadoop
connector is 2.4.0.
A Git client is needed to clone the repository of the mongo-hadoop
connector to a local filesystem. Refer to http://git-scm.com/book/en/Getting-Started-Installing-Git to install Git.
You will also need MongoDB to be installed on your operating system. Refer to http://docs.mongodb.org/manual/installation/ and install it accordingly. Start the mongod
instance that listens to port 27017
. You are not expected to be an expert in Hadoop, but some familiarity with it will be helpful. Knowing the concept of MapReduce is important, and knowing the Hadoop MapReduce API will be an advantage. In this recipe, we will explain what is needed to get the work done. You might prefer to get more details on Hadoop and its MapReduce API from other sources. The wiki page at http://en.wikipedia.org/wiki/MapReduce gives enough information on the MapReduce programming.
$ javac –version
javac
program, we would need to install it as follows:$ sudo apt-get install default-jdk
This is all we need to do to install Java
mongo-hadoop
connector supports)..tar.gz
file is downloaded, execute the following commands in the command prompt:$ tar –xvzf <name of the downloaded .tar.gz file> $ cd <extracted directory>
etc/hadoop/hadoop-env.sh
file and replace export JAVA_HOME = ${JAVA_HOME}
with export JAVA_HOME = /usr/lib/jvm/default-java
.mongo-hadoop
connector code from GitHub on our local filesystem. Note that you don't need a GitHub account to clone a repository. Clone the Git project from the operating system's command prompt as follows:$ git clone https://github.com/mongodb/mongo-hadoop.git $ cd mongo-hadoop
$ ln –s <hadoop installation directory> ~/hadoop-binaries
For example, if Hadoop in extracted/installed in the home
directory, the following command would need to be executed:
$ ln –s ~/hadoop-2.4.0 ~/hadoop-binaries
By default, the mongo-hadoop
connector will look for a Hadoop distribution under the ~/hadoop-binaries
folder. So, even if the Hadoop archive is extracted elsewhere, we can create a soft link to it. Once the preceding link is created, we will have the Hadoop binaries in the ~/hadoop-binaries/hadoop-2.4.0/bin
path.
mongo-hadoop
connector from source for Apache Hadoop Version 2.4.0 as follows. The build, by default, builds the connector for the latest version; so, as of now, the -Phadoop_version
parameter can be left out, as 2.4 is the latest anyway:$ ./gradlew jar –Phadoop_version='2.4'
This build process will take some time to complete
treasuryYield
sample provided with the mongo-hadoop
connector project. The first activity is to import the data to a collection in Mongo.mongod
instance is up and running and listening to port 27017
for connections and the current directory is the root of the mongo-hadoop
connector code base, execute the following command:$ mongoimport -c yield_historical.in -d mongo_hadoop --drop examples/treasury_yield/src/main/resources/yield_historical_in.json
lib
directory. Execute the following commands from the operating system shell:$ wget http://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/2.12.0/mongo-java-driver-2.12.0.jar $ cp core/build/libs/mongo-hadoop-core-1.2.1-SNAPSHOT-hadoop_2.4.jar ~/hadoop-binaries/hadoop-2.4.0/lib/ $ mv mongo-java-driver-2.12.0.jar ~/hadoop-binaries/hadoop-2.4.0/lib
mongo-hadoop
core to be copied was named as mongo-hadoop-core-1.2.1-SNAPSHOT-hadoop_2.4.jar
for the trunk version of the code and built for Hadoop 2.4.0. Change the name of the JAR accordingly when you build it yourself for a different version of the connector and Hadoop. The Mongo driver can be the latest version. Version 2.12.0 is the latest one at the time of writing this book.~/hadoop-binaries/hadoop-2.4.0/bin/hadoop jar examples/treasury_yield/build/libs/treasury_yield-1.2.1-SNAPSHOT-hadoop_2.4.jar com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig -Dmongo.input.split_size=8 -Dmongo.job.verbose=true -Dmongo.input.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.in -Dmongo.output.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.out
14/05/11 21:38:54 INFO mapreduce.Job: Job job_local1226390512_0001 completed successfully
mongod
instance that runs on a localhost from the Mongo client and execute a find
query on the following collection:$ mongo > use mongo_hadoop switched to db mongo_hadoop > db.yield_historical.out.find()
Installing Hadoop is not a trivial task, and we don't need to get into this to try our samples for the mongo-hadoop
connector. To learn about Hadoop, there are dedicated books and articles available. For the purpose of this chapter, we will simply download the archive and extract and run the MapReduce jobs in the standalone mode. This is the quickest way to get going with Hadoop. All the steps up to step 6 are needed to install Hadoop. In the next couple of steps, we simple cloned the mongo-hadoop
connector repository. You might also download a stable build version for your version of Hadoop from https://github.com/mongodb/mongo-hadoop/releases if you prefer to not build from source and download directly. We then built the connector for our version of Hadoop (2.4.0) until step 13. From step 14 onwards, we ran the actual MapReduce job to work on the data in MongoDB. We imported the data into the yield_historical.in
collection, which will be used as an input to the MapReduce job. Go ahead and query the collection from the Mongo shell using the mongo_hadoop
database to see a document. Don't worry if you don't understand the contents; you want to see in this example what you intend to do with this data.
The next step was to invoke the MapReduce operation on the data. The command Hadoop was executed giving one of JAR's path (examples/treasury_yield/build/libs/treasury_yield-1.2.1-SNAPSHOT-hadoop_2.4.jar
). This is the JAR file that contains the classes that implement a sample MapReduce operation for treasury yield
. The com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig
class in this JAR file is the bootstrap class that contains the main
method. We will see this class soon. There are lots of configurations supported by the connector. A complete list of configurations can be found at https://github.com/mongodb/mongo-hadoop/blob/master/CONFIG.md. For now, we will just remember that mongo.input.uri
and mongo.output.uri
are the collections for input and output, respectively, of the MapReduce operations.
With the project cloned, you might import it into any Java IDE of your choice. We are particularly interested in the project at /examples/treasury_yield
and the core
project present in the root of the cloned repository.
Let's look at the com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig
class. This is the entry point into the MapReduce method and has a main
method in it. To write MapReduce jobs for Mongo using the mongo-hadoop
connector, the main class always has to extend from com.mongodb.hadoop.util.MongoTool
. This class implements the org.apache.hadoop.Tool
interface, which has the run
method and is implemented for us by the MongoTool
class. All that the main
method needs to do is execute this class using the org.apache.hadoop.util.ToolRunner
class by invoking its static run
method, passing the instance of our main class (an instance of Tool
).
There is a static block that loads some configurations from two XML files: hadoop-local.xml
and mongo-defaults.xml
. The format of these files (or any XML file) is as follows. The root node of the file is the configuration node and multiple property nodes under it:
<configuration> <property> <name>{property name}</name> <value>{property value}</value> </property> ... </configuration>
The property
values that make sense in this context are all those we mentioned in the URL provided earlier. We instantiate com.mongodb.hadoop.MongoConfig
wrapping an instance of org.apache.hadoop.conf.Configuration
in the constructor of the bootstrap class TreasuryYieldXmlConfig
. The MongoConfig
class provides sensible defaults, which is enough to satisfy the majority of the use cases. Some of the most important things we need to set in the MongoConfig
instance are the output and the input formats, the mapper
and the reducer
classes, the output key and the value of mapper
, and the output key and the value of reducer
. The input and output formats will always be the com.mongodb.hadoop.MongoInputFormat
and com.mongodb.hadoop.MongoOutputFormat
classes, respectively; they are provided by the mongo-hadoop
connector library. For the mapper and reducer output key and the value, we have any of the org.apache.hadoop.io.Writable
implementation. Refer to the Hadoop documentation for different types of Writable
implementations in the org.apache.hadoop.io
package. Apart from these, the mongo-hadoop
connector also provides us with some implementations in the com.mongodb.hadoop.io
package. For the treasury yield
example, we used the BSONWritable
instance. These configurable values can either be provided in the XML file we saw earlier or can be programmatically set. Finally, we have the option to provide them as vm
arguments, as we did for mongo.input.uri
and mongo.output.uri
. These parameters can be provided either in XML or invoked directly from the code in the MongoConfig
instance; the two methods are setInputURI
and setOutputURI
.
We will now look at the mapper
and reducer
class implementations. Here, we will copy the important portion of the class to analyze it. Refer to the cloned project for the entire implementation:
public class TreasuryYieldMapper extends Mapper<Object, BSONObject, IntWritable, DoubleWritable> { @Override public void map(final Object pKey, final BSONObject pValue, final Context pContext) throws IOException, InterruptedException { final int year = ((Date) pValue.get("_id")).getYear() + 1900; double bid10Year = ((Number) pValue.get("bc10Year")).doubleValue(); pContext.write(new IntWritable(year), new DoubleWritable(bid10Year)); } }
Our mapper extends the org.apache.hadoop.mapreduce.Mapper
class. The four generic parameters are for the key class, type of the input value, type of the output key, and the output value, respectively. The body of the map
method reads the _id
value from the input document, which is date
, and extracts the year out of it. Then, it gets the double value from the document for the bc10Year
field and simply writes to the context key-value pair where the key is the year and the value is the double. The implementation here doesn't rely on the value of the pKey
parameter passed; this can be used as the key, instead of hardcoding the _id
value in the implementation. This value is basically the same field that will be set using the mongo.input.key
property in the XML or using the MongoConfig.setInputKey
method. If none is set, _id
is anyway the default value.
Let's look at the reducer implementation (with the logging statements removed):
public class TreasuryYieldReducer extends Reducer<IntWritable, DoubleWritable, IntWritable, BSONWritable> { @Override public void reduce(final IntWritable pKey, final Iterable<DoubleWritable> pValues, final Context pContext) throws IOException, InterruptedException { int count = 0; double sum = 0; for (final DoubleWritable value : pValues) { sum += value.get(); count++; } final double avg = sum / count; BasicBSONObject output = new BasicBSONObject(); output.put("count", count); output.put("avg", avg); output.put("sum", sum); pContext.write(pKey, new BSONWritable(output)); } }
This class extended from org.apache.hadoop.mapreduce.Reducer
and had four generic parameters again for the input key, input value, output key, and the output value respectively. The input to reducer
is the output from mapper
. Thus, if you notice carefully, the type of the first two generic parameters is the same as the last two generic parameters of mapper
we saw earlier. The third and fourth parameters in this case are the type of the key and the value emitted from reduce
, respectively. The type of the value is BSONDocument
, and thus, we have BSONWritable
as the type.
We now have the reduce
method that has two parameters: the first one is the key, which is the same as the key emitted from the map
function, and the second parameter is java.lang.Iterable
of the values emitted for the same key. This is how standard MapReduce functions work. For instance, if the map
function gave the key-value pairs as (1950, 10), (1960, 20), (1950, 20), (1950, 30), then reduce
would be invoked with two unique keys, 1950 and 1960. The value for the key 1950 will be an Iterable
with (10, 20, 30), whereas that of 1960 will be an Iterable
of a single element (20). The reduce
function of the reducer class simply iterates through this Iterable
of doubles, finds the sum and count of these numbers, and writes one key-value pair where the key is the same as the incoming key and the out value is BasicBSONObject
, with the sum, count, and average in it for the computed values.
There are some good samples, including the enron
dataset, in the examples of the cloned mongo-hadoop
connector. If you would like to play around a bit, I would recommend that you take a look at these example projects too and run them.
What we saw here was a ready-made sample that we executed. There is nothing like writing one MapReduce job ourselves to clarify our understanding. In the next recipe, we will write one sample MapReduce job using the Hadoop API in Java and see it in action.
Writable
interface is all about and why you should not use plain old serialization