Executing our first sample MapReduce job using the mongo-hadoop connector

In this recipe, we will see how to build the mongo-hadoop connector from the source and set up Hadoop just for the purpose of running the examples in the standalone mode. The connector is the backbone that runs Hadoop MapReduce jobs on Hadoop using the data in Mongo.

Getting ready

There are various distributions of Hadoop; however, we will use Apache Hadoop (http://hadoop.apache.org/). The installation will be done on Ubuntu Linux. Apache Hadoop always runs on the Linux environment for production, and Windows is not tested for production systems. For development purposes, Windows can be used. If you are a Windows user, I would recommend that you install a virtualization environment such as VirtualBox (https://www.virtualbox.org/), set up a Linux environment, and then install Hadoop on it. Setting up VirtualBox and Linux on it is not shown in this recipe, but this is not a tedious task. The prerequisite for this recipe is a machine with the Linux operating system on it and an Internet connection. The version that we will set up here is 2.4.0 of Apache Hadoop. At the time of writing of this book, the latest version of Apache Hadoop, which is supported by the mongo-hadoop connector, is 2.4.0.

A Git client is needed to clone the repository of the mongo-hadoop connector to the local filesystem. Refer to http://git-scm.com/book/en/Getting-Started-Installing-Git to install Git.

You will also need MongoDB to be installed on your operating system. Refer to http://docs.mongodb.org/manual/installation/ and install it accordingly. Start the mongod instance listening to port 27017. It is not expected for you to be an expert in Hadoop but some familiarity with it will be helpful. Knowing the concept of MapReduce is important and knowing the Hadoop MapReduce API will be an advantage. In this recipe, we will explain what is needed to get the work done. You can get more details on Hadoop and its MapReduce API from other sources. The wiki page at http://en.wikipedia.org/wiki/MapReduce gives some good information about the MapReduce programming.

How to do it…

  1. We will first install Java, Hadoop, and the required packages. We will start with installing JDK on the operating system. Type the following on the command prompt of the operating system:
    $ javac –version
    
  2. If the program doesn't execute and you are told about various packages that contain javac and program, then we need to install Java as follows:
    $ sudo apt-get install default-jdk
    

    This is all we need to do to install Java.

  3. Download the current version of Hadoop from http://www.apache.org/dyn/closer.cgi/hadoop/common/ and download version 2.4.0 (or the latest mongo-hadoop connector support).
  4. After the .tar.gz file is downloaded, execute the following on the command prompt:
    $ tar –xvzf <name of the downloaded .tar.gz file>
    $ cd <extracted directory>
    

    Open the etc/hadoop/hadoop-env.sh file and replace export JAVA_HOME = ${JAVA_HOME} with export JAVA_HOME = /usr/lib/jvm/default-java.

    We will now get the mongo-hadoop connector code from GitHub on our local filesystem. Note that you don't need a GitHub account to clone a repository. Clone the Git project from the operating system command prompt as follows:

    $git clone https://github.com/mongodb/mongo-hadoop.git
    $cd mongo-hadoop 
    
  5. Create a soft link—the Hadoop installation directory is the same as the one that we extracted in step 3:
    $ln –s <hadoop installation directory> ~/hadoop-binaries
    

    For example, if Hadoop is extracted/installed in the home directory, then this is the command to be executed:

    $ln –s ~/hadoop-2.4.0 ~/hadoop-binaries
    

    By default, the mongo-hadoop connector will look for a Hadoop distribution under the ~/hadoop-binaries folder. So, even if the Hadoop archive is extracted elsewhere, we can create a soft link to it. Once this link has been created, we should have the Hadoop binaries in the ~/hadoop-binaries/hadoop-2.4.0/bin path.

  6. We will now build the mongo-hadoop connector from the source for the Apache Hadoop version 2.4.0. The build-by-default builds for the latest version, so as of now, the -Phadoop_version parameter can be left out as 2.4 is the latest.
    $./gradlew jar –Phadoop_version='2.4'
    

    This build process will take some time to get completed.

  7. Once the build completes successfully, we will be ready to execute our first MapReduce job. We will do this using a treasuryYield sample provided with the mongo-hadoop connector project. The first activity is to import the data to a collection in Mongo.
  8. Assuming that the mongod instance is up and running and listening to port 27017 for connections and the current directory is the root of the mongo-hadoop connector code base, execute the following command:
    $ mongoimport -c yield_historical.in -d mongo_hadoop --drop examples/treasury_yield/src/main/resources/yield_historical_in.json
    
  9. Once the import action is successful, we are left with copying two jar files to the lib directory. Execute the following in the operating system shell:
    $ wget http://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/2.12.0/mongo-java-driver-2.12.0.jar
    $ cp core/build/libs/mongo-hadoop-core-1.2.1-SNAPSHOT-hadoop_2.4.jar ~/hadoop-binaries/hadoop-2.4.0/lib/
    $ mv mongo-java-driver-2.12.0.jar ~/hadoop-binaries/hadoop-2.4.0/lib
    

    Note

    The JAR built for the mongo-hadoop core to be copied was named as shown in the preceding section for the trunk version of the code and built for Hadoop-2.4.0. Change the name of the JAR accordingly when you build it yourself for a different version of the connector and Hadoop. The Mongo driver can be the latest version. Version 2.12.0 is the latest version at the time of writing of this book.

  10. Now, execute the following command on the command prompt of the operating system shell:
     ~/hadoop-binaries/hadoop-2.4.0/bin/hadoop     jar     examples/treasury_yield/build/libs/treasury_yield-1.2.1-SNAPSHOT-hadoop_2.4.jar  com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig  -Dmongo.input.split_size=8     -Dmongo.job.verbose=true  -Dmongo.input.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.in  -Dmongo.output.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.out
    
  11. The output should print out a lot of things; however, the following line in the output tells us that the map reduce job is successful:
     14/05/11 21:38:54 INFO mapreduce.Job: Job job_local1226390512_0001 completed successfully
    
  12. Connect the mongod instance running on localhost from the mongo client and execute a find on the following collection:
    $ mongo
    > use mongo_hadoop
    switched to db mongo_hadoop
    > db.yield_historical.out.find()
    

How it works…

Installing Hadoop is not a trivial task and we don't need to get into this to try our samples for the hadoop-mongo connector. To learn about Hadoop, its installation, and other things, there are dedicated books and articles available. For the purpose of this chapter, we will simply download the archive and extract and run the MapReduce jobs in the standalone mode. This is the quickest way to get going with Hadoop. All the steps up to step 6 are needed to install Hadoop. In the next couple of steps, we will clone the mongo-hadoop connector recipe. You can also download a stable version for your version of Hadoop at https://github.com/mongodb/mongo-hadoop/releases if you prefer not to build from the source. We then build the connector for our version of Hadoop (2.4.0) till step 13. Step 14 onward is what we will do to run the actual MapReduce job to work on the data in MongoDB. We imported the data to the yield_historical.in collection, which will be used as an input for the MapReduce job. Go ahead and query the collection in the mongo shell using the mongo_hadoop database to see a document. Don't worry if you don't understand the contents; we want to see what we intend to do with this data in this example.

The next step was to invoke the MapReduce operation on the data. The Hadoop command was executed giving one jar's path, (examples/treasury_yield/build/libs/treasury_yield-1.2.1-SNAPSHOT-hadoop_2.4.jar). This is the jar that contains the classes implementing the sample MapReduce operation for the treasury yield. The com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig class in this JAR file is the Bootstrap class containing the main method. We will visit this class soon. There are lots of configurations supported by the connector. The complete list of configurations can be found at https://github.com/mongodb/mongo-hadoop/. For now, we will just remember that mongo.input.uri and mongo.output.uri are the collections for input and output for the map reduce operations.

With the project cloned, you can now import it to any Java IDE of your choice. We are particularly interested in the project at /examples/treasury_yield and core present in the root of the cloned repository.

Let's look at the com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig class. This is the entry point for the MapReduce method and has a main method in it. To write MapReduce jobs for mongo using the mongo-hadoop connector, the main class always has to extend from com.mongodb.hadoop.util.MongoTool. This class implements the org.apache.hadoop.Tool interface, which has the run method and is implemented for us by the MongoTool class. All that the main method needs to do is execute this class using the org.apache.hadoop.util.ToolRunner class by invoking its static run method passing the instance of our main class (which is an instance of Tool).

There is a static block that loads some configurations from two XML files, hadoop-local.xml and mongo-defaults.xml. The format of these files (or any XML file) is as follows. The root node of the file is the configuration node with multiple property nodes under it:

<configuration>
  <property>
    <name>{property name}</name> 
    <value>{property value}</value>
  </property>
  ...
</configuration>

The property values that make sense in this context are all those that we mentioned in the URL provided earlier. We instantiate com.mongodb.hadoop.MongoConfig wrapping an instance of org.apache.hadoop.conf.Configuration in the constructor of the bootstrap class, TreasuryYieldXmlConfig. The MongoConfig class provides sensible defaults, which are enough to satisfy majority of the use cases. Some of the most important things that we need to set in the MongoConfig instance are the output and input format, mapper and reducer classes, output key and value of the mapper, and output key and value of the reducer. The input format and output format will always be the com.mongodb.hadoop.MongoInputFormat and com.mongodb.hadoop.MongoOutputFormat classes, which are provided by the mongo-hadoop connector library. For the mapper and reducer output key and value, we have any of the org.apache.hadoop.io.Writable implementations. Refer to the Hadoop documentation for different types of the Writable implementations in the org.apache.hadoop.io package. Apart from these, the mongo-hadoop connector also provides us with some implementations in the com.mongodb.hadoop.io package. For the treasury yield example, we used the BSONWritable instance. These configurable values can either be provided in the XML file that we saw earlier or be programmatically set. Finally, we have the option to provide them as vm arguments that we did for mongo.input.uri and mongo.output.uri. These parameters can be provided either in the XML or invoked directly from the code on the MongoConfig instance; the two methods are setInputURI and setOutputURI, respectively.

We will now look at the mapper and reducer class implementations. We will copy the important portion of the class here in order to analyze. Refer to the cloned project for the entire implementation:

public class TreasuryYieldMapper
    extends Mapper<Object, BSONObject, IntWritable, DoubleWritable> {

    @Override
    public void map(final Object pKey,
                    final BSONObject pValue,
                    final Context pContext)
        throws IOException, InterruptedException {
        final int year = ((Date) pValue.get("_id")).getYear() + 1900;
        double bid10Year = ((Number) pValue.get("bc10Year")).doubleValue();
        pContext.write(new IntWritable(year), new DoubleWritable(bid10Year));
    }
}

Our mapper extends the org.apache.hadoop.mapreduce.Mapper class. The four generic parameters are the key class, type of the input value, type of the output key, and output value. The body of the map method reads the _id value from the input document, which is the date, and extracts the year out of it. Then, it gets the double value from the document for the bc10Year field and simply writes to the context key-value pair where key is the year and value of the double to the context key value pair. The implementation here doesn't rely on the value of the pKey parameter passed, which can be used as the key instead of hardcoding the _id value in the implementation. This value is basically the same field that would be set using the mongo.input.key property in the XML or the MongoConfig.setInputKey method. If none is set, _id is the default value.

Let's look at the reducer implementation (with the logging statements removed):

public class TreasuryYieldReducer extends Reducer<IntWritable, DoubleWritable, IntWritable, BSONWritable> {

    @Override
    public void reduce(final IntWritable pKey, final Iterable<DoubleWritable> pValues, final Context pContext)throws IOException, InterruptedException {
      int count = 0;
      double sum = 0;
      for (final DoubleWritable value : pValues) {
        sum += value.get();
        count++;
      }
      final double avg = sum / count;
      BasicBSONObject output = new BasicBSONObject();
      output.put("count", count);
      output.put("avg", avg);
      output.put("sum", sum);
      pContext.write(pKey, new BSONWritable(output));
    }
}

This class extends from org.apache.hadoop.mapreduce.Reducer and has four generic parameters: the input key, input value, output key, and output value. The input to the reducer is the output from the mapper, and thus, if you notice carefully, the type of the first two generic parameters is the same as the last two generic parameters of the mapper that we saw earlier. The third and fourth parameters are the type of the key and value emitted from the reduce. The type of the value is BSONDocument and thus we have BSONWritable as the type.

We now have the reduce method that has two parameters: the first one is the key, which is the same as the key emitted from the map function, and the second parameter is java.lang.Iterable of the values emitted for the same key. This is how standard map reduce functions work. For instance, if the map function gave the following key value pairs, (1950, 10), (1960, 20), (1950, 20), (1950, 30), then reduce will be invoked with two unique keys, 1950 and 1960, and the values for the key 1950 will be Iterable with (10, 20, 30), whereas that of 1960 will be Iterable of a single element (20). The reducer's reduce function simply iterates though Iterable of the doubles, finds the sum and count of these numbers, and writes one key value pair where the key is the same as the incoming key and the output value is BasicBSONObject with the sum, count, and average for the computed values.

There are some good samples including the Enron dataset in the examples of the cloned mongo-hadoop connector. If you would like to play around a bit, I would recommend that you take a look at these example projects and run them.

There's more…

What we saw here is a readymade sample that we executed. There is nothing like writing one MapReduce job ourselves to clear our understanding. In the next recipe, we will write one sample MapReduce job using the Hadoop API in Java and see it in action.

See also…

If you're wondering what the Writable interface is all about and why you should not use plain old serialization instead, then refer to this URL that gives the explanation by the creator of Hadoop himself: http://www.mail-archive.com/[email protected]/msg00378.html.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset