Aggregating sources in Accumulo using MapReduce

In this recipe, we will use MapReduce and the AccumuloInputFormat class to count occurrences of each unique source stored in an Accumulo table.

Getting ready

This recipe will be the easiest to test over a pseudo-distributed Hadoop cluster with Accumulo 1.4.1 and Zookeeper 3.3.3 installed. The shell script in this recipe assumes that Zookeeper is running on the host localhost and on the port 2181; you can change this to suit your environment needs. The Accumulo installation's bin folder needs to be on your environment path.

For this recipe you'll need to create an Accumulo instance named test with user as root and password as password.

You will need a table by the name acled to exist in the configured Accumulo instance.

To see the filtered results from this recipe, you will need to go through the Using MapReduce to bulk import geographic event data into Accumulo recipe seen earlier in this chapter. This will give you some sample data to experiment with.

How to do it...

The following are the steps to count occurrences of different sources using MapReduce:

  1. Open the Java IDE of your choice. You will need to configure the Accumulo core and Hadoop classpath dependencies.
  2. Create a build template that produces a JAR file named accumulo-examples.jar.
  3. Create the package example.accumulo and add the class SourceCountJob.java with the following content:
    package examples.accumulo;
    
    
    import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
    import org.apache.accumulo.core.data.Key;
    import org.apache.accumulo.core.data.Value;
    import org.apache.accumulo.core.security.Authorizations;
    import org.apache.accumulo.core.util.CachedConfiguration;
    import org.apache.accumulo.core.util.Pair;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    import java.lang.Override;
    import java.util.HashSet;
    public class SourceCountJob extends Configured implements Tool {
    
    
        private Configuration conf;
        private static final Text FAMILY = new Text("cf");
        private static final Text SOURCE = new Text("src");
    
        public SourceCountJob(Configuration conf) {
            this.conf = conf;
        }
  4. Add the run() method to conform to the Tool interface and parse the arguments from the command line.
        @Override
        public int run(String[] args) throws Exception {
    
            args = new GenericOptionsParser(conf, 
                   args).getRemainingArgs();
            if(args.length < 6) {
                System.err.println(printUsage());
                System.exit(0);
            }
    
            String tableName = args[0];
            String outputStr = args[1];
            String instanceName = args[2];
            String user = args[3];
            String pass = args[4];
            String zooQuorum = args[5];
  5. Configure the Accumulo input settings.
            AccumuloInputFormat.setInputInfo(conf, user, pass.getBytes(), tableName, new Authorizations());
            AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zooQuorum);
            HashSet<Pair<Text, Text>> columnsToFetch = new HashSet<Pair<Text,Text>>();
            columnsToFetch.add(new Pair<Text, Text>(FAMILY, SOURCE));
            AccumuloInputFormat.fetchColumns(conf, columnsToFetch);
  6. Set up the job, map/reduce classes, and the output location.
            Job job = new Job(conf, "Count distinct sources in 
                                     ACLED");
            job.setInputFormatClass(AccumuloInputFormat.class);
            job.setMapperClass(ACLEDSourceMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setReducerClass(ACLEDSourceReducer.class);
            job.setCombinerClass(ACLEDSourceReducer.class);
            job.setJarByClass(getClass());
            job.setOutputFormatClass(TextOutputFormat.class);
            FileOutputFormat.setOutputPath(job, 
                                   clearOutputDir(outputStr));
            job.setNumReduceTasks(1);
            return job.waitForCompletion(true) ? 0 : 1;
    
    
        }
    
        private String printUsage() {
            return "<tablename> <output> <instance_name> 
                    <username> <password> <zoohosts>";
        }
    
        private Path clearOutputDir(String outputStr)
                throws IOException {
            FileSystem fs = FileSystem.get(conf);
            Path path = new Path(outputStr);
            fs.delete(path, true);
            return path;
        }
  7. Add the static inner class ACLEDSourceMapper.
        public static class ACLEDSourceMapper
                extends Mapper<Key, Value, Text, IntWritable> {
    
            private Text outKey = new Text();
            private IntWritable outValue = new IntWritable(1);
    
            @Override
            protected void map(Key key, Value value,
                               Context context) throws IOException, InterruptedException {
    
                outKey.set(value.get());
                context.write(outKey, outValue);
            }
        }
  8. Add the static inner class ACLEDSourceReducer.
        public static class ACLEDSourceReducer
                extends Reducer<Text, IntWritable, Text, 
                                IntWritable> {
    
            private IntWritable outValue = new IntWritable();
    
            @Override
            protected void reduce(Text key, 
                                  Iterable<IntWritable> values,
                                  Context context) throws 
                            IOException, InterruptedException {
    
              int count = 0;
              for(IntWritable value : values) {
                  count += value.get();
              }
              outValue.set(count);
              context.write(key, outValue);
            }
        }
    
        @Override
        public void setConf(Configuration conf) {
            this.conf = conf;
        }
    
        @Override
        public Configuration getConf() {
            return conf;
        }
  9. Define a main() method to submit the job as a Tool instance.
        public static void main(String[] args) throws Exception {
            Configuration conf = 
                            CachedConfiguration.getInstance();
            args = new GenericOptionsParser(conf, 
                                     args).getRemainingArgs();
            ToolRunner.run(new SourceCountJob(conf), args);
        }
    }
  10. Save and build the JAR file accumulo-examples.jar.
  11. In the base working folder where accumulo-examples.jar is located, create a new shell script named source_count.sh with the following commands. Be sure to change ACCUMULO-LIB, HADOOP_LIB, and ZOOKEEPER_LIB to match your local paths:
    tool.sh accumulo_examples.jar examples.accumulo.SourceCountJob
     -Dmapred.reduce.tasks=4
     acled
     /output/accumulo_source_count/
     test
     root
     password
     localhost:2181
    hadoop fs -cat /output/accumulo_source_count/part* > source_count.txt
  12. Save and run the script. You should see the MapReduce job start executing over your pseudo-distributed cluster.
  13. Upon successful completion of the job, you should see the file source_count.txt in your base working folder. Type in the cat command to see the counts for each source.

How it works...

We define the SourceCountJob class to implement the Tool interface for ease of remote submission with the ToolRunner class. The CachedConfiguration.getInstance() static method sends our Tool instance the correct Accumulo configuration on the classpath.

The run() method parses the arguments necessary to connect to the Accumulo instance using AccumuloInputFormat. For this job we're only interested in retrieving the column qualifier src from the column family cf for each key. By default, the scanner will only return the most recent versions of each key containing the qualifier src. If we wanted to count source occurrences across every key-value pair in the table for every version, we would have to configure maxVersions in the input format.

We then set up our job instance with the AccumuloInputFormat and map/reduce classes required to count each source. As our reducer class is simply adding integers together, we can re-use the sample implementation for a combiner. We clear any existing output folders and set the number of reduce tasks to 1 as we are running a pseudo-distributed cluster. Now we are ready to submit the job to the cluster.

The business logic operates in a very similar manner to the traditional WordCount example.

The AccumuloInputFormat class handles scanning and returning of only key-value pairs for the qualifier src. Therefore, any key/value instances that enter our ACLEDSourceMapper class's map() function are already restricted to the data we're interested in aggregating, and we can simply output 1 to represent one occurrence of that source value in our dataset. The output key is simply the value of the incoming source.

The reduce class ACLEDSourceReducer simply tallies occurrences for each source, and outputs the results back to HDFS.

At the end of the shell script, we download and concatenate the different part files together into one file, source_counts.txt. We now have one single file with newline-separated source listings and the total number of occurrences for each source.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset