This recipe will use the built-in SumCombiner in
Accumulo to treat the cell value associated with the qualifier fat
as long and for each key in the acled
table, to sum the total for all versions of the key.
This recipe will be easiest to test over a pseudo-distributed Hadoop cluster with Accumulo 1.4.1 and Zookeeper 3.3.3 installed. The shell script in this recipe assumes that Zookeeper is running on the host localhost
and on the port 2181
; you can change this to suit your environment needs. The Accumulo installation's bin
folder needs to be on your environment path.
For this recipe, you'll need to create an Accumulo instance named test
with user as root
and password as password
.
You will need a table by the name acled
to exist in the configured Accumulo instance. If you have an existing table by that name from an earlier recipe, delete and recreate it.
It is also highly recommended that you complete the Using MapReduce to bulk import geographic event data into Accumulo recipe earlier in this chapter. This will give you some sample data to experiment with.
Follow these steps to issue a query using SumCombiner:
accumulo-examples.jar
.example.accumulo
and add the class TotalFatalityCombinerMain.java
with the following content:package examples.accumulo; import org.apache.accumulo.core.client.*; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.*; import org.apache.accumulo.core.iterators.Combiner; import org.apache.accumulo.core.iterators.LongCombiner; import org.apache.accumulo.core.iterators.user.SummingCombiner; import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.io.Text; import java.util.*; public class TotalFatalityCombinerMain { public static final long MAX_MEMORY= 10000L; public static final long MAX_LATENCY=1000L; public static final int MAX_WRITE_THREADS = 4; public static final String TEST_TABLE = "acled"; public static final Text COLUMN_FAMILY = new Text("cf"); public static final Text FATALITIES_QUAL = new Text("fat");
main()
method handles the argument parsing:public static void main(String[] args) throws Exception { if(args.length < 4) { System.err.println("usage: <instance name> <user> <password> <zookeepers>"); System.exit(0); } String instanceName = args[0]; String user = args[1]; String pass = args[2]; String zooQuorum = args[3]; ZooKeeperInstance ins = new ZooKeeperInstance(instanceName, zooQuorum); Connector connector = ins.getConnector(user, pass); if(!connector.tableOperations().exists(TEST_TABLE)) connector.tableOperations().create(TEST_TABLE); BatchWriter writer = connector.createBatchWriter(TEST_TABLE, MAX_MEMORY, MAX_LATENCY, MAX_WRITE_THREADS);
eventA
, column family, and qualifier:Mutation m1 = new Mutation("eventA"); m1.put(COLUMN_FAMILY, FATALITIES_QUAL, new Value("10".getBytes())); Mutation m2 = new Mutation("eventA"); m2.put(COLUMN_FAMILY, FATALITIES_QUAL, new Value("5".getBytes()));
eventB
:Mutation m3 = new Mutation("eventB"); m3.put(COLUMN_FAMILY, FATALITIES_QUAL, new Value("7".getBytes())); writer.addMutation(m1); writer.addMutation(m2); writer.addMutation(m3); writer.close();
IteratorSetting
for the scanner to use the combiner:IteratorSetting iter = new IteratorSetting(1, SummingCombiner.class); LongCombiner.setEncodingType(iter, SummingCombiner.Type.STRING); Combiner.setColumns(iter, Collections.singletonList(new IteratorSetting.Column(COLUMN_FAMILY, FATALITIES_QUAL))); Scanner scan = connector.createScanner(TEST_TABLE, new Authorizations()); scan.setRange(new Range(new Text("eventA"), new Text("eventB"))); scan.fetchColumn(COLUMN_FAMILY, FATALITIES_QUAL); scan.addScanIterator(iter); for(Map.Entry<Key, Value> item : scan) { System.out.print(item.getKey().getRow().toString() + ": fatalities: "); System.out.println(new String(item.getValue().get())); } } }
accumulo-examples.jar
.accumulo-examples.jar
file is located, create a new shell script namedrun_combiner.sh
with the following commands. Be sure to change ACCUMULO-LIB
, HADOOP_LIB
, and ZOOKEEPER_LIB
to match your local paths:ACCUMULO_LIB=/opt/cloud/accumulo-1.4.1/lib/* HADOOP_LIB=/Applications/hadoop-0.20.2-cdh3u1/*:/Applications/hadoop-0.20.2-cdh3u1/lib/* ZOOKEEPER_LIB=/opt/cloud/zookeeper-3.4.2/* java -cp $ACCUMULO_LIB:$HADOOP_LIB:$ZOOKEEPER_LIB:accumulo-examples.jar examples.accumulo.TotalFatalityCombinerMain test root password localhost:2181
eventA: fatalities: 15 eventB: fatalities: 7
eventA: fatalities: 30 eventB: fatalities: 14
The class TotalFatalityCombinerMain
reads the required arguments to connect to Accumulo and instantiates a BatchWriter
instance to
write out test data to the acled
table. We write two mutations for two different versions of the same key containing the rowID eventA
. One contains the qualifier fat
with value 10
and the other a value of 5
. We also write one mutation with the key containing the rowID eventB
with a value of 7
for the qualifier fat
.
We then use a Scanner
instance to apply the SumCombiner at scan time over the key-value pairs in the table. The combiner's job is to collect different long values associated with the exact same key and emit the sum of those long values. The values 5
and 10
are both associated with the same key for the rowID eventA
, and are combined to produce a value 15
. There is only one key version associated with the rowID eventB
, so the single value 7
remains the total sum for that key.
If we re-run this application, the previous mutations are still stored in the same Accumulo table. Re-running the application applies the same mutations once more, adding the values 10
and 5
as key-value entries for the rowID eventA
, and 7
for eventB
.
Re-running the Combiner
scanner now shows four entries for the rowID eventA
(5
, 10
, 5
, and 10
) as well as two entries for the rowID eventB
(7
, 7
). The result is double the count from our previous execution. For each time we re-run this application without clearing the table, the results are increased by +15 and +7.
This happens because at the raw key-value level, our mutations are inserting new key-value pairs to the table with different timestamps every time the application is called. Our combiner sees all timestamped versions of every distinct key.
Here are some more helpful tips regarding combiners:
This can cause confusion with new Accumulo users. Combiners use the Accumulo iterator pattern for key-value aggregation, but only a per-key basis across different versions of that key. If you have a requirement to do table-wide aggregation for the values of a common qualifier, you will likely still want to use MapReduce. See the Aggregating sources in Accumulo using MapReduce recipe in this chapter.