Running MapReduce jobs on HBase (table input/output)

This recipe explains how to run a MapReduce job that reads and writes data directly to and from an HBase storage.

HBase provides abstract mapper and reducer implementations that users can extend to read and write directly from HBase. This recipe explains how to write a sample MapReduce application using these mappers and reducers.

We will use the World Bank's Human Development Report (HDR) data by country that shows Gross National Income (GNI) per capita, by countries. The dataset can be found from http://hdr.undp.org/en/statistics/data/. Using MapReduce, we will calculate average value for GNI per capita, by countries.

Getting ready

Install and start HBase as described in the Installing HBase recipe.

To compile and run the sample, you will need to have Apache Ant installed in your machine. If Apache Ant has not been installed already, install it by following the instructions given at http://ant.apache.org/manual/install.html.

How to do it...

This section demonstrates how to run a MapReduce job on data stored in HBase.

  1. Unzip the sample code for this chapter. We will call the new directory SAMPLE5_DIR.
  2. Edit the hbase.home value of SAMPLE5_DIR/build.xml to point to HBASE_HOME of your HBase installation. We will call the Hadoop installation directory as HADOOP_HOME.
  3. You can find the Java HBase MapReduce sample from SAMPLE5_DIR/src/chapter5/AverageGINByCountryCalcualtor.java. The client-side code would look like following:
    public class AverageGINByCountryCalculator
    {
      static class Mapper extends 
      TableMapper<ImmutableBytesWritable, 
        DoubleWritable>
      {
        privateintnumRecords = 0;
        public void map(ImmutableBytesWritable row,
          Result values, 
          Context context) throws IOException {
        byte[] results = values.getValue(
          "ByCountry".getBytes(), 
          "gnip".getBytes());
        ImmutableBytesWritableuserKey = new 
        ImmutableBytesWritable("ginp".getBytes());
        try
        {
          context.write(userKey, new 
            DoubleWritable(Bytes.toDouble(results)));
        }
        catch (InterruptedException e)
        {
        throw new IOException(e);
        }
        numRecords++;
        if ((numRecords % 50) == 0)
        {
          context.setStatus("mapper processed " + 
          numRecords + " records so far");
        }
      }
    }

    HBase provides two classes TableInputFormat and TableOutputFormat that take off most of the work of reading and writing from an HBase storage. To be used by these classes, the mapper and reducer must extend the TableMapper and TableReducer classes. When executed, mapper will receive each HBase row as an input.

  4. The reducer will use the Put construct of the HBase Java API to store the results back to the HBase.
    public static class Reducer extends 
      TableReducer<ImmutableBytesWritable, 
      DoubleWritable, ImmutableBytesWritable> 
    {
      public void reduce(ImmutableBytesWritable key, 
        Iterable<DoubleWritable> values, Context context)
        throwsIOException, InterruptedException
      {
        double sum = 0;
        int count = 0;
        for (DoubleWritableval : values)
        {
          sum += val.get();
          count++;
        }
       Put put = new Put(key.get());
       put.add(Bytes.toBytes("data"), 
       Bytes.toBytes("average"), 
       Bytes.toBytes(sum / count));
       System.out.println("Processed "+ count + 
         " values and avergae =" + sum / count);
       context.write(key, put);
      }
    }

    When running an HBase-based MapReduce job, users should configure from where to read data in HBase and how to write information into HBase via the TableMapReduceUtilinitTableMapperJob(...) and initTableReducerJob(..) methods.

    public static void main(String[] args) throws Exception
      {
        Configuration conf = HBaseConfiguration.create();
        Job job = new Job(conf,
          "AverageGINByCountryCalcualtor");
        job.setJarByClass(AverageGINByCountryCalcualtor.class);
        Scan scan = new Scan();
        scan.addFamily("ByCountry".getBytes());
        scan.setFilter(new FirstKeyOnlyFilter());
        TableMapReduceUtil.initTableMapperJob("HDI", scan,
          Mapper.class, ImmutableBytesWritable.class,
          DoubleWritable.class, job);
        TableMapReduceUtil.initTableReducerJob("HDIResult",
        Reducer.class, job);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    

    Here, initTableMapperJob(..) instructs Hadoop to read information from the HDI table and initTableReducerJob(..) instructs Hadoop to write the information to the HBase HDIResult table.

  5. Run the following command to compile the MapReduce job:
    >anthbase-build
    
  6. Run the following command to upload the data to HBase. (This will use the HDIDataUploader to upload the data):
    >ant hbase-sample1-upload
    
  7. Copy the JAR file to HADOOP_HOME.
  8. Run the MapReduce job by running the following command from HADOOP_HOME:
    >bin/hadoop jar hadoop-cookbook-chapter5.jarchapter5.AverageGINByCountryCalcualtor
    
  9. View the results in HBase by running the following command from the HBase shell. You can start the HBase shell by running bin/hbaseshell from HBASE_HOME.
    hbase(main):009:0> scan 'HDIResult'
    ROW    COLUMN+CELL                                                                            
    ginpcolumn=data:average, timestamp=1338785279029, value=@xC8xF7x1Ba2xA7x04            
    1 row(s) in 0.6820 seconds
    

How it works...

When we run the MapReduce job, the TableMapper and TableReducer classes receive the control. The TableMapper class connects to the HBase, reads the data as specified through initTableMapperJob(…), and passes the data directly to the HBase-based mapper that we have written. Here, the Scan object we passed into initTableMapperJob(…) specifies the search criteria to be used by the mapper when it reads the input data from the HBase.

Similarly, the TableReducer lets users emit the data directly to the HBase.

By doing that, TableMapper and TableReducer build a new programming model based on HBase APIs. With the new programming model, users do not have to worry about parsing and formatting data like with normal MapReduce jobs. The table mapper and reducer map the HBase data to Hadoop name-value pairs and vice versa.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset