In this recipe, we will use MapReduce
and the AccumuloInputFormat
class to count occurrences of each unique source stored in an Accumulo table.
This recipe will be the easiest to test over a pseudo-distributed Hadoop cluster with Accumulo 1.4.1 and Zookeeper 3.3.3 installed. The shell script in this recipe assumes that Zookeeper is running on the host localhost
and on the port 2181
; you can change this to suit your environment needs. The Accumulo installation's bin
folder needs to be on your environment path.
For this recipe you'll need to create an Accumulo instance named test
with user as root
and password as password
.
You will need a table by the name acled
to exist in the configured Accumulo instance.
To see the filtered results from this recipe, you will need to go through the Using MapReduce to bulk import geographic event data into Accumulo recipe seen earlier in this chapter. This will give you some sample data to experiment with.
The following are the steps to count occurrences of different sources using MapReduce:
accumulo-examples.jar
.example.accumulo
and add the class SourceCountJob.java
with the following content:package examples.accumulo; import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.lang.Override; import java.util.HashSet; public class SourceCountJob extends Configured implements Tool { private Configuration conf; private static final Text FAMILY = new Text("cf"); private static final Text SOURCE = new Text("src"); public SourceCountJob(Configuration conf) { this.conf = conf; }
run()
method to conform to the Tool
interface and parse the arguments from the command line.@Override public int run(String[] args) throws Exception { args = new GenericOptionsParser(conf, args).getRemainingArgs(); if(args.length < 6) { System.err.println(printUsage()); System.exit(0); } String tableName = args[0]; String outputStr = args[1]; String instanceName = args[2]; String user = args[3]; String pass = args[4]; String zooQuorum = args[5];
AccumuloInputFormat.setInputInfo(conf, user, pass.getBytes(), tableName, new Authorizations()); AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zooQuorum); HashSet<Pair<Text, Text>> columnsToFetch = new HashSet<Pair<Text,Text>>(); columnsToFetch.add(new Pair<Text, Text>(FAMILY, SOURCE)); AccumuloInputFormat.fetchColumns(conf, columnsToFetch);
Job job = new Job(conf, "Count distinct sources in ACLED"); job.setInputFormatClass(AccumuloInputFormat.class); job.setMapperClass(ACLEDSourceMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(ACLEDSourceReducer.class); job.setCombinerClass(ACLEDSourceReducer.class); job.setJarByClass(getClass()); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, clearOutputDir(outputStr)); job.setNumReduceTasks(1); return job.waitForCompletion(true) ? 0 : 1; } private String printUsage() { return "<tablename> <output> <instance_name> <username> <password> <zoohosts>"; } private Path clearOutputDir(String outputStr) throws IOException { FileSystem fs = FileSystem.get(conf); Path path = new Path(outputStr); fs.delete(path, true); return path; }
ACLEDSourceMapper
.public static class ACLEDSourceMapper extends Mapper<Key, Value, Text, IntWritable> { private Text outKey = new Text(); private IntWritable outValue = new IntWritable(1); @Override protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { outKey.set(value.get()); context.write(outKey, outValue); } }
ACLEDSourceReducer
.public static class ACLEDSourceReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable outValue = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable value : values) { count += value.get(); } outValue.set(count); context.write(key, outValue); } } @Override public void setConf(Configuration conf) { this.conf = conf; } @Override public Configuration getConf() { return conf; }
main()
method to submit the job as a Tool
instance.public static void main(String[] args) throws Exception { Configuration conf = CachedConfiguration.getInstance(); args = new GenericOptionsParser(conf, args).getRemainingArgs(); ToolRunner.run(new SourceCountJob(conf), args); } }
accumulo-examples.jar
.accumulo-examples.jar
is located, create a new shell script named source_count.sh
with the following commands. Be sure to change ACCUMULO-LIB
, HADOOP_LIB
, and ZOOKEEPER_LIB
to match your local paths:tool.sh accumulo_examples.jar examples.accumulo.SourceCountJob -Dmapred.reduce.tasks=4 acled /output/accumulo_source_count/ test root password localhost:2181 hadoop fs -cat /output/accumulo_source_count/part* > source_count.txt
source_count.txt
in your base working folder. Type in the cat
command to see the counts for each source.We define the SourceCountJob
class to implement the Tool
interface for ease of remote submission with the ToolRunner
class. The
CachedConfiguration.getInstance()
static method sends our Tool
instance the correct Accumulo configuration on the classpath.
The run()
method parses the arguments necessary to connect to the Accumulo instance using AccumuloInputFormat
. For this job we're only interested in retrieving the column qualifier src
from the column family cf
for each key. By default, the scanner will only return the most recent versions of each key containing the qualifier src
. If we wanted to count source occurrences across every key-value pair in the table for every version, we would have to configure maxVersions
in the input format.
We then set up our job instance with the AccumuloInputFormat
and map/reduce classes required to count each source. As our reducer class is simply adding integers together, we can re-use the sample implementation for a combiner. We clear any existing output folders and set the number of reduce tasks to 1
as we are running a pseudo-distributed cluster. Now we are ready to submit the job to the cluster.
The business logic operates in a very similar manner to the traditional WordCount example.
The
AccumuloInputFormat
class handles scanning and returning of only key-value pairs for the qualifier src
. Therefore, any key/value instances that enter our ACLEDSourceMapper
class's map()
function are already restricted to the data we're interested in aggregating, and we can simply output 1
to represent one occurrence of that source value in our dataset. The output key is simply the value of the incoming source.
The reduce class ACLEDSourceReducer
simply tallies
occurrences for each source, and outputs the results back to HDFS.
At the end of the shell script, we download and concatenate the different part files together into one file, source_counts.txt
. We now have one single file with newline-separated source listings and the total number of occurrences for each source.