Executing our first sample MapReduce job using the mongo-hadoop connector

In this recipe, we will see how to build the Mongo Hadoop connector from source and set up Hadoop just for the purpose of running examples in the standalone mode. The connector is the backbone that runs MapReduce jobs on Hadoop using the data in Mongo.

Getting ready

There are various distributions of Hadoop; however, we will use Apache Hadoop (http://hadoop.apache.org/). The installation will be done on a Linux-flavored OS, and I am using Ubuntu Linux. For production, Apache Hadoop always runs on a Linux environment; Windows is not tested for production systems. For development purposes, however, Windows can be used. If you are a Windows user, I would recommend that you install a virtualization environment such as VirtualBox (https://www.virtualbox.org/), set up a Linux environment, and then install Hadoop on it. Setting up VirtualBox and then setting up Linux on it is not shown in this recipe, but this is not a tedious task. The prerequisite for this recipe is a machine with the Linux operating system on it and an Internet connection. The version we set up here is 2.4.0 of Apache Hadoop. At the time of writing this book, the latest version of Apache Hadoop and that supported by the mongo-hadoop connector is 2.4.0.

A Git client is needed to clone the repository of the mongo-hadoop connector to a local filesystem. Refer to http://git-scm.com/book/en/Getting-Started-Installing-Git to install Git.

You will also need MongoDB to be installed on your operating system. Refer to http://docs.mongodb.org/manual/installation/ and install it accordingly. Start the mongod instance that listens to port 27017. You are not expected to be an expert in Hadoop, but some familiarity with it will be helpful. Knowing the concept of MapReduce is important, and knowing the Hadoop MapReduce API will be an advantage. In this recipe, we will explain what is needed to get the work done. You might prefer to get more details on Hadoop and its MapReduce API from other sources. The wiki page at http://en.wikipedia.org/wiki/MapReduce gives enough information on the MapReduce programming.

How to do it…

  1. First, install Java, Hadoop, and the required packages.
  2. Start by installing JDK on the operating system. Type the following command in the command prompt of the operating system:
    $ javac –version
    
  3. If the program doesn't execute and instead, you are told about various packages that contain the javac program, we would need to install it as follows:
    $ sudo apt-get install default-jdk
    

    This is all we need to do to install Java

  4. Now, download the current version of Hadoop from http://www.apache.org/dyn/closer.cgi/hadoop/common/ and download version 2.4 (or the latest mongo-hadoop connector supports).
  5. After the .tar.gz file is downloaded, execute the following commands in the command prompt:
    $ tar –xvzf <name of the downloaded .tar.gz file>
    $ cd <extracted directory>
    
  6. Open the etc/hadoop/hadoop-env.sh file and replace export JAVA_HOME = ${JAVA_HOME} with export JAVA_HOME = /usr/lib/jvm/default-java.
  7. We will now get the mongo-hadoop connector code from GitHub on our local filesystem. Note that you don't need a GitHub account to clone a repository. Clone the Git project from the operating system's command prompt as follows:
    $ git clone https://github.com/mongodb/mongo-hadoop.git
    $ cd mongo-hadoop
    
  8. Create a soft link as follows; the Hadoop installation directory is the same as the one we extracted in step 5:
    $ ln –s <hadoop installation directory> ~/hadoop-binaries
    

    For example, if Hadoop in extracted/installed in the home directory, the following command would need to be executed:

    $ ln –s ~/hadoop-2.4.0 ~/hadoop-binaries
    

    By default, the mongo-hadoop connector will look for a Hadoop distribution under the ~/hadoop-binaries folder. So, even if the Hadoop archive is extracted elsewhere, we can create a soft link to it. Once the preceding link is created, we will have the Hadoop binaries in the ~/hadoop-binaries/hadoop-2.4.0/bin path.

  9. We will now build the mongo-hadoop connector from source for Apache Hadoop Version 2.4.0 as follows. The build, by default, builds the connector for the latest version; so, as of now, the -Phadoop_version parameter can be left out, as 2.4 is the latest anyway:
    $ ./gradlew jar –Phadoop_version='2.4'
    

    This build process will take some time to complete

  10. Once the build completes successfully, we are ready to execute our first MapReduce job. We would be doing it using a treasuryYield sample provided with the mongo-hadoop connector project. The first activity is to import the data to a collection in Mongo.
  11. Assuming that the mongod instance is up and running and listening to port 27017 for connections and the current directory is the root of the mongo-hadoop connector code base, execute the following command:
    $ mongoimport -c yield_historical.in -d mongo_hadoop --drop examples/treasury_yield/src/main/resources/yield_historical_in.json
    
  12. Once the import action is successful, we are left with copying two JAR files to the lib directory. Execute the following commands from the operating system shell:
    $ wget http://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/2.12.0/mongo-java-driver-2.12.0.jar
    $ cp core/build/libs/mongo-hadoop-core-1.2.1-SNAPSHOT-hadoop_2.4.jar ~/hadoop-binaries/hadoop-2.4.0/lib/
    $ mv mongo-java-driver-2.12.0.jar ~/hadoop-binaries/hadoop-2.4.0/lib
    
  13. The JAR file built for the mongo-hadoop core to be copied was named as mongo-hadoop-core-1.2.1-SNAPSHOT-hadoop_2.4.jar for the trunk version of the code and built for Hadoop 2.4.0. Change the name of the JAR accordingly when you build it yourself for a different version of the connector and Hadoop. The Mongo driver can be the latest version. Version 2.12.0 is the latest one at the time of writing this book.
  14. Now, execute the following command in the command prompt of the operating system shell:
    ~/hadoop-binaries/hadoop-2.4.0/bin/hadoop jar examples/treasury_yield/build/libs/treasury_yield-1.2.1-SNAPSHOT-hadoop_2.4.jar
    com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig
    -Dmongo.input.split_size=8 -Dmongo.job.verbose=true
    -Dmongo.input.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.in
    -Dmongo.output.uri=mongodb://localhost:27017/mongo_hadoop.yield_historical.out
    
  15. The output should print out a lot of things. However, the following line in the output will tell us that the MapReduce job is successful:
     14/05/11 21:38:54 INFO mapreduce.Job: Job job_local1226390512_0001 completed successfully
    
  16. Connect the mongod instance that runs on a localhost from the Mongo client and execute a find query on the following collection:
    $ mongo
    > use mongo_hadoop
    switched to db mongo_hadoop
    > db.yield_historical.out.find()
    

How it works…

Installing Hadoop is not a trivial task, and we don't need to get into this to try our samples for the mongo-hadoop connector. To learn about Hadoop, there are dedicated books and articles available. For the purpose of this chapter, we will simply download the archive and extract and run the MapReduce jobs in the standalone mode. This is the quickest way to get going with Hadoop. All the steps up to step 6 are needed to install Hadoop. In the next couple of steps, we simple cloned the mongo-hadoop connector repository. You might also download a stable build version for your version of Hadoop from https://github.com/mongodb/mongo-hadoop/releases if you prefer to not build from source and download directly. We then built the connector for our version of Hadoop (2.4.0) until step 13. From step 14 onwards, we ran the actual MapReduce job to work on the data in MongoDB. We imported the data into the yield_historical.in collection, which will be used as an input to the MapReduce job. Go ahead and query the collection from the Mongo shell using the mongo_hadoop database to see a document. Don't worry if you don't understand the contents; you want to see in this example what you intend to do with this data.

The next step was to invoke the MapReduce operation on the data. The command Hadoop was executed giving one of JAR's path (examples/treasury_yield/build/libs/treasury_yield-1.2.1-SNAPSHOT-hadoop_2.4.jar). This is the JAR file that contains the classes that implement a sample MapReduce operation for treasury yield. The com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig class in this JAR file is the bootstrap class that contains the main method. We will see this class soon. There are lots of configurations supported by the connector. A complete list of configurations can be found at https://github.com/mongodb/mongo-hadoop/blob/master/CONFIG.md. For now, we will just remember that mongo.input.uri and mongo.output.uri are the collections for input and output, respectively, of the MapReduce operations.

With the project cloned, you might import it into any Java IDE of your choice. We are particularly interested in the project at /examples/treasury_yield and the core project present in the root of the cloned repository.

Let's look at the com.mongodb.hadoop.examples.treasury.TreasuryYieldXMLConfig class. This is the entry point into the MapReduce method and has a main method in it. To write MapReduce jobs for Mongo using the mongo-hadoop connector, the main class always has to extend from com.mongodb.hadoop.util.MongoTool. This class implements the org.apache.hadoop.Tool interface, which has the run method and is implemented for us by the MongoTool class. All that the main method needs to do is execute this class using the org.apache.hadoop.util.ToolRunner class by invoking its static run method, passing the instance of our main class (an instance of Tool).

There is a static block that loads some configurations from two XML files: hadoop-local.xml and mongo-defaults.xml. The format of these files (or any XML file) is as follows. The root node of the file is the configuration node and multiple property nodes under it:

<configuration>
  <property>
    <name>{property name}</name> 
    <value>{property value}</value>
  </property>
  ...
</configuration>

The property values that make sense in this context are all those we mentioned in the URL provided earlier. We instantiate com.mongodb.hadoop.MongoConfig wrapping an instance of org.apache.hadoop.conf.Configuration in the constructor of the bootstrap class TreasuryYieldXmlConfig. The MongoConfig class provides sensible defaults, which is enough to satisfy the majority of the use cases. Some of the most important things we need to set in the MongoConfig instance are the output and the input formats, the mapper and the reducer classes, the output key and the value of mapper, and the output key and the value of reducer. The input and output formats will always be the com.mongodb.hadoop.MongoInputFormat and com.mongodb.hadoop.MongoOutputFormat classes, respectively; they are provided by the mongo-hadoop connector library. For the mapper and reducer output key and the value, we have any of the org.apache.hadoop.io.Writable implementation. Refer to the Hadoop documentation for different types of Writable implementations in the org.apache.hadoop.io package. Apart from these, the mongo-hadoop connector also provides us with some implementations in the com.mongodb.hadoop.io package. For the treasury yield example, we used the BSONWritable instance. These configurable values can either be provided in the XML file we saw earlier or can be programmatically set. Finally, we have the option to provide them as vm arguments, as we did for mongo.input.uri and mongo.output.uri. These parameters can be provided either in XML or invoked directly from the code in the MongoConfig instance; the two methods are setInputURI and setOutputURI.

We will now look at the mapper and reducer class implementations. Here, we will copy the important portion of the class to analyze it. Refer to the cloned project for the entire implementation:

public class TreasuryYieldMapper
  extends Mapper<Object, BSONObject, IntWritable, DoubleWritable> {

  @Override
  public void map(final Object pKey,
  final BSONObject pValue,
  final Context pContext)
  throws IOException, InterruptedException {
    final int year = ((Date) pValue.get("_id")).getYear() + 1900;
    double bid10Year = ((Number) pValue.get("bc10Year")).doubleValue();
    pContext.write(new IntWritable(year), new DoubleWritable(bid10Year));
  }
}

Our mapper extends the org.apache.hadoop.mapreduce.Mapper class. The four generic parameters are for the key class, type of the input value, type of the output key, and the output value, respectively. The body of the map method reads the _id value from the input document, which is date, and extracts the year out of it. Then, it gets the double value from the document for the bc10Year field and simply writes to the context key-value pair where the key is the year and the value is the double. The implementation here doesn't rely on the value of the pKey parameter passed; this can be used as the key, instead of hardcoding the _id value in the implementation. This value is basically the same field that will be set using the mongo.input.key property in the XML or using the MongoConfig.setInputKey method. If none is set, _id is anyway the default value.

Let's look at the reducer implementation (with the logging statements removed):

public class TreasuryYieldReducer
extends Reducer<IntWritable, DoubleWritable, IntWritable, BSONWritable> {

  @Override
  public void reduce(final IntWritable pKey, final Iterable<DoubleWritable> pValues, final Context pContext)
  throws IOException, InterruptedException {
    int count = 0;
    double sum = 0;
    for (final DoubleWritable value : pValues) {
      sum += value.get();
      count++;
    }
    final double avg = sum / count;
    BasicBSONObject output = new BasicBSONObject();
      output.put("count", count);
      output.put("avg", avg);
      output.put("sum", sum);
      pContext.write(pKey, new BSONWritable(output));
  }
}

This class extended from org.apache.hadoop.mapreduce.Reducer and had four generic parameters again for the input key, input value, output key, and the output value respectively. The input to reducer is the output from mapper. Thus, if you notice carefully, the type of the first two generic parameters is the same as the last two generic parameters of mapper we saw earlier. The third and fourth parameters in this case are the type of the key and the value emitted from reduce, respectively. The type of the value is BSONDocument, and thus, we have BSONWritable as the type.

We now have the reduce method that has two parameters: the first one is the key, which is the same as the key emitted from the map function, and the second parameter is java.lang.Iterable of the values emitted for the same key. This is how standard MapReduce functions work. For instance, if the map function gave the key-value pairs as (1950, 10), (1960, 20), (1950, 20), (1950, 30), then reduce would be invoked with two unique keys, 1950 and 1960. The value for the key 1950 will be an Iterable with (10, 20, 30), whereas that of 1960 will be an Iterable of a single element (20). The reduce function of the reducer class simply iterates through this Iterable of doubles, finds the sum and count of these numbers, and writes one key-value pair where the key is the same as the incoming key and the out value is BasicBSONObject, with the sum, count, and average in it for the computed values.

There are some good samples, including the enron dataset, in the examples of the cloned mongo-hadoop connector. If you would like to play around a bit, I would recommend that you take a look at these example projects too and run them.

There's more…

What we saw here was a ready-made sample that we executed. There is nothing like writing one MapReduce job ourselves to clarify our understanding. In the next recipe, we will write one sample MapReduce job using the Hadoop API in Java and see it in action.

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset