Counting fatalities for different versions of the same key using SumCombiner

This recipe will use the built-in SumCombiner in Accumulo to treat the cell value associated with the qualifier fat as long and for each key in the acled table, to sum the total for all versions of the key.

Getting ready

This recipe will be easiest to test over a pseudo-distributed Hadoop cluster with Accumulo 1.4.1 and Zookeeper 3.3.3 installed. The shell script in this recipe assumes that Zookeeper is running on the host localhost and on the port 2181; you can change this to suit your environment needs. The Accumulo installation's bin folder needs to be on your environment path.

For this recipe, you'll need to create an Accumulo instance named test with user as root and password as password.

You will need a table by the name acled to exist in the configured Accumulo instance. If you have an existing table by that name from an earlier recipe, delete and recreate it.

It is also highly recommended that you complete the Using MapReduce to bulk import geographic event data into Accumulo recipe earlier in this chapter. This will give you some sample data to experiment with.

How to do it...

Follow these steps to issue a query using SumCombiner:

  1. Open your Java IDE of choice. You will need to configure the Accumulo core and Hadoop classpath dependencies.
  2. Create a build template that produces a JAR file named accumulo-examples.jar.
  3. Create the package example.accumulo and add the class TotalFatalityCombinerMain.java with the following content:
    package examples.accumulo;
    
    import org.apache.accumulo.core.client.*;
    import org.apache.accumulo.core.client.Scanner;
    import org.apache.accumulo.core.data.*;
    import org.apache.accumulo.core.iterators.Combiner;
    import org.apache.accumulo.core.iterators.LongCombiner;
    import org.apache.accumulo.core.iterators.user.SummingCombiner;
    import org.apache.accumulo.core.security.Authorizations;
    import org.apache.hadoop.io.Text;
    
    import java.util.*;
    
    public class TotalFatalityCombinerMain {
    
        public static final long MAX_MEMORY= 10000L;
        public static final long MAX_LATENCY=1000L;
        public static final int MAX_WRITE_THREADS = 4;
        public static final String TEST_TABLE = "acled";
        public static final Text COLUMN_FAMILY = new Text("cf");
        public static final Text FATALITIES_QUAL = new Text("fat");
  4. The main() method handles the argument parsing:
        public static void main(String[] args) throws Exception {
            if(args.length < 4) {
                System.err.println("usage: <instance name>  
                           <user> <password> <zookeepers>");
                System.exit(0);
            }
            String instanceName = args[0];
            String user = args[1];
            String pass = args[2];
            String zooQuorum = args[3];
            ZooKeeperInstance ins = new 
                    ZooKeeperInstance(instanceName, zooQuorum);
            Connector connector = ins.getConnector(user, pass);
            if(!connector.tableOperations().exists(TEST_TABLE))
               connector.tableOperations().create(TEST_TABLE);
            BatchWriter writer = connector.createBatchWriter(TEST_TABLE, MAX_MEMORY, MAX_LATENCY, MAX_WRITE_THREADS);
  5. Write some sample data with the exact same rowID eventA, column family, and qualifier:
            Mutation m1 = new Mutation("eventA");
            m1.put(COLUMN_FAMILY, FATALITIES_QUAL, new 
                   Value("10".getBytes()));
    
            Mutation m2 = new Mutation("eventA");
            m2.put(COLUMN_FAMILY, FATALITIES_QUAL, new 
                   Value("5".getBytes()));
  6. Write an additional key with rowID eventB:
            Mutation m3 = new Mutation("eventB");
            m3.put(COLUMN_FAMILY, FATALITIES_QUAL, new 
                   Value("7".getBytes()));
    
            writer.addMutation(m1);
            writer.addMutation(m2);
            writer.addMutation(m3);
            writer.close();
  7. Configure an IteratorSetting for the scanner to use the combiner:
            IteratorSetting iter = new IteratorSetting(1, 
                                   SummingCombiner.class);
            LongCombiner.setEncodingType(iter, 
                                  SummingCombiner.Type.STRING);
            Combiner.setColumns(iter, 
                          Collections.singletonList(new 
                          IteratorSetting.Column(COLUMN_FAMILY,
                                            FATALITIES_QUAL)));
            Scanner scan = connector.createScanner(TEST_TABLE, 
                           new Authorizations());
            scan.setRange(new Range(new Text("eventA"), new 
                                    Text("eventB")));
            scan.fetchColumn(COLUMN_FAMILY, FATALITIES_QUAL);
            scan.addScanIterator(iter);
            for(Map.Entry<Key, Value> item : scan) {
                System.out.print(item.getKey().getRow().toString() + ": 
                     fatalities: ");
            System.out.println(new 
                               String(item.getValue().get()));
            }
        }
    }
  8. Save and build the JAR file accumulo-examples.jar.
  9. In the base working folder where the accumulo-examples.jar file is located, create a new shell script namedrun_combiner.sh with the following commands. Be sure to change ACCUMULO-LIB, HADOOP_LIB, and ZOOKEEPER_LIB to match your local paths:
    ACCUMULO_LIB=/opt/cloud/accumulo-1.4.1/lib/*
    HADOOP_LIB=/Applications/hadoop-0.20.2-cdh3u1/*:/Applications/hadoop-0.20.2-cdh3u1/lib/*
    ZOOKEEPER_LIB=/opt/cloud/zookeeper-3.4.2/*
    java -cp $ACCUMULO_LIB:$HADOOP_LIB:$ZOOKEEPER_LIB:accumulo-examples.jar examples.accumulo.TotalFatalityCombinerMain
    test
    root
     password
     localhost:2181
  10. Save and run the script.
  11. You should see the following console printout when the application finishes:
    eventA: fatalities: 15
    eventB: fatalities: 7
  12. Re-run the script.
  13. You should now see twice the count for each event:
    eventA: fatalities: 30
    eventB: fatalities: 14

How it works...

The class TotalFatalityCombinerMain reads the required arguments to connect to Accumulo and instantiates a BatchWriter instance to write out test data to the acled table. We write two mutations for two different versions of the same key containing the rowID eventA. One contains the qualifier fat with value 10 and the other a value of 5. We also write one mutation with the key containing the rowID eventB with a value of 7 for the qualifier fat.

We then use a Scanner instance to apply the SumCombiner at scan time over the key-value pairs in the table. The combiner's job is to collect different long values associated with the exact same key and emit the sum of those long values. The values 5 and 10 are both associated with the same key for the rowID eventA, and are combined to produce a value 15. There is only one key version associated with the rowID eventB, so the single value 7 remains the total sum for that key.

If we re-run this application, the previous mutations are still stored in the same Accumulo table. Re-running the application applies the same mutations once more, adding the values 10 and 5 as key-value entries for the rowID eventA, and 7 for eventB.

Re-running the Combiner scanner now shows four entries for the rowID eventA (5, 10, 5, and 10) as well as two entries for the rowID eventB (7, 7). The result is double the count from our previous execution. For each time we re-run this application without clearing the table, the results are increased by +15 and +7.

This happens because at the raw key-value level, our mutations are inserting new key-value pairs to the table with different timestamps every time the application is called. Our combiner sees all timestamped versions of every distinct key.

There's more...

Here are some more helpful tips regarding combiners:

Combiners are on a per-key basis, not across all keys

This can cause confusion with new Accumulo users. Combiners use the Accumulo iterator pattern for key-value aggregation, but only a per-key basis across different versions of that key. If you have a requirement to do table-wide aggregation for the values of a common qualifier, you will likely still want to use MapReduce. See the Aggregating sources in Accumulo using MapReduce recipe in this chapter.

Combiners can be applied at scan time or applied to the table configuration for incoming mutations

This recipe uses the combiner to aggregate the qualifier values at scan time. Accumulo also supports persistent combiners stored in the table configuration that combine values during mutation writes.

See also

  • Using MapReduce to bulk import geographic event data into Accumulo
  • Limiting query results using the regex filtering iterator
  • Aggregating sources in Accumulo using MapReduce
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset