This recipe explains how to run a MapReduce job that reads and writes data directly to and from an HBase storage.
HBase provides abstract mapper and reducer implementations that users can extend to read and write directly from HBase. This recipe explains how to write a sample MapReduce application using these mappers and reducers.
We will use the World Bank's Human Development Report (HDR) data by country that shows Gross National Income (GNI) per capita, by countries. The dataset can be found from http://hdr.undp.org/en/statistics/data/. Using MapReduce, we will calculate average value for GNI per capita, by countries.
Install and start HBase as described in the Installing HBase recipe.
To compile and run the sample, you will need to have Apache Ant installed in your machine. If Apache Ant has not been installed already, install it by following the instructions given at http://ant.apache.org/manual/install.html.
This section demonstrates how to run a MapReduce job on data stored in HBase.
SAMPLE5_DIR
.hbase.home
value of SAMPLE5_DIR/build.xml
to point to HBASE_HOME
of your HBase installation. We will call the Hadoop installation directory as HADOOP_HOME
.SAMPLE5_DIR/src/chapter5/AverageGINByCountryCalcualtor.java
. The client-side code would look like following:public class AverageGINByCountryCalculator { static class Mapper extends TableMapper<ImmutableBytesWritable, DoubleWritable> { privateintnumRecords = 0; public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException { byte[] results = values.getValue( "ByCountry".getBytes(), "gnip".getBytes()); ImmutableBytesWritableuserKey = new ImmutableBytesWritable("ginp".getBytes()); try { context.write(userKey, new DoubleWritable(Bytes.toDouble(results))); } catch (InterruptedException e) { throw new IOException(e); } numRecords++; if ((numRecords % 50) == 0) { context.setStatus("mapper processed " + numRecords + " records so far"); } } }
HBase provides two classes TableInputFormat
and TableOutputFormat
that take off most of the work of reading and writing from an HBase storage. To be used by these classes, the mapper and reducer must extend the TableMapper
and TableReducer
classes. When executed, mapper will receive each HBase row as an input.
Put
construct of the HBase Java API to store the results back to the HBase.public static class Reducer extends TableReducer<ImmutableBytesWritable, DoubleWritable, ImmutableBytesWritable> { public void reduce(ImmutableBytesWritable key, Iterable<DoubleWritable> values, Context context) throwsIOException, InterruptedException { double sum = 0; int count = 0; for (DoubleWritableval : values) { sum += val.get(); count++; } Put put = new Put(key.get()); put.add(Bytes.toBytes("data"), Bytes.toBytes("average"), Bytes.toBytes(sum / count)); System.out.println("Processed "+ count + " values and avergae =" + sum / count); context.write(key, put); } }
When running an HBase-based MapReduce job, users should configure from where to read data in HBase and how to write information into HBase via the TableMapReduceUtilinitTableMapperJob(...)
and initTableReducerJob(..)
methods.
public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf, "AverageGINByCountryCalcualtor"); job.setJarByClass(AverageGINByCountryCalcualtor.class); Scan scan = new Scan(); scan.addFamily("ByCountry".getBytes()); scan.setFilter(new FirstKeyOnlyFilter()); TableMapReduceUtil.initTableMapperJob("HDI", scan, Mapper.class, ImmutableBytesWritable.class, DoubleWritable.class, job); TableMapReduceUtil.initTableReducerJob("HDIResult", Reducer.class, job); System.exit(job.waitForCompletion(true) ? 0 : 1); }
Here, initTableMapperJob(..)
instructs Hadoop to read information from the HDI
table and initTableReducerJob(..)
instructs Hadoop to write the information to the HBase HDIResult
table.
>anthbase-build
HDIDataUploader
to upload the data):>ant hbase-sample1-upload
HADOOP_HOME
.HADOOP_HOME
:>bin/hadoop jar hadoop-cookbook-chapter5.jarchapter5.AverageGINByCountryCalcualtor
bin/hbaseshell
from HBASE_HOME
.hbase(main):009:0> scan 'HDIResult' ROW COLUMN+CELL ginpcolumn=data:average, timestamp=1338785279029, value=@xC8xF7x1Ba2xA7x04 1 row(s) in 0.6820 seconds
When we run the MapReduce job, the TableMapper
and TableReducer
classes receive the control. The TableMapper
class connects to the HBase, reads the data as specified through initTableMapperJob(…)
, and passes the data directly to the HBase-based mapper that we have written. Here, the Scan
object we passed into initTableMapperJob(…)
specifies the search criteria to be used by the mapper when it reads the input data from the HBase.
Similarly, the TableReducer
lets users emit the data directly to the HBase.
By doing that, TableMapper
and TableReducer
build a new programming model based on HBase APIs. With the new programming model, users do not have to worry about parsing and formatting data like with normal MapReduce jobs. The table mapper and reducer map the HBase data to Hadoop name-value pairs and vice versa.