In this recipe, we will build a custom Constraint
class to limit the types of mutations we can apply to event date values in an Accumulo table. Specifically, we want newly entered values to conform to a particular
SimpleDateFormat
pattern, but these values should not be in the future according to the system time on the TabletServer.
This recipe will be the easiest to test over a pseudo-distributed Hadoop cluster with Accumulo 1.4.1 and Zookeeper 3.3.3 installed. The shell script in this recipe assumes that Zookeeper is running on the host localhost
and on the port 2181
; you can change this to suit your environment needs. The Accumulo installation's bin
folder needs to be on your environment path.
For this recipe you'll need to create an Accumulo instance named test
with user as root
and password as password
.
You will need a table by the name acled
to exist in the configured Accumulo instance.
It is also highly recommended that you go through the Using MapReduce to bulk import geographic event data into Accumulo recipe of this chapter. This will give you some sample data with which you can experiment.
Follow these steps to implement and install a constraint in Accumulo:
accumulo-examples.jar
.example.accumulo
and create the class DtgConstraint.java
with the following content:package examples.accumulo; import org.apache.accumulo.core.constraints.Constraint; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.Mutation; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; public class DtgConstraint implements Constraint { private static final short DATE_IN_FUTURE = 1; private static final short MALFORMED_DATE = 2; private static final byte[] dtgBytes = "dtg".getBytes(); private static final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd"); public String getViolationDescription(short violationCode) { if(violationCode == DATE_IN_FUTURE) { return "Date cannot be in future"; } else if(violationCode == MALFORMED_DATE) { return "Date does not match simple date format yyyy-MM-dd"; } return null; }
check()
method.@Override public List<Short> check(Environment env, Mutation mutation) { List<Short> violations = null; try { for(ColumnUpdate update : mutation.getUpdates()) { if(isDtg(update)) { long dtgTime = dateFormatter.parse(new String(update.getValue())).getTime(); long currentMillis = System.currentTimeMillis(); if(currentMillis < dtgTime) { violations = checkAndAdd( violations, DATE_IN_FUTURE); } } } } catch (ParseException e) { violations = checkAndAdd(violations, MALFORMED_DATE); } return violations; }
dtg
.private boolean isDtg(ColumnUpdate update) { byte[] qual = update.getColumnQualifier(); if(qual.length != dtgBytes.length) return false; for (int i = 0; i < qual.length; i++) { if(!(qual[i] == dtgBytes[i])) { return false; } } return true; } private List<Short> checkAndAdd(List<Short> violations, short violationCode) { if(violations == null) violations = new ArrayList<Short>(); violations.add(violationCode); return violations; } }
examples.accumulo
, create the class DtgConstraintMain.java
with the following content:package examples.accumulo; import org.apache.accumulo.core.client.*; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ConstraintViolationSummary; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.hadoop.io.Text; import java.util.List; public class DtgConstraintMain { public static final long MAX_MEMORY= 10000L; public static final long MAX_LATENCY=1000L; public static final int MAX_WRITE_THREADS = 4; public static final String TEST_TABLE = "acled"; public static final Text COLUMN_FAMILY = new Text("cf"); public static final Text DTG_QUAL = new Text("dtg");
main()
method tries to insert both valid and invalid dtg
values to test our constraint.public static void main(String[] args) throws Exception { if(args.length < 6) { System.err.println("examples.accumulo.DtgConstraintMain <row_id> <dtg> <instance_name> <user> <password> <zookeepers>"); System.exit(0); } String rowID = args[0]; byte[] dtg = args[1].getBytes(); String instanceName = args[2]; String user = args[3]; String pass = args[4]; String zooQuorum = args[5]; ZooKeeperInstance ins; Connector connector = null; BatchWriter writer = null; try { ins = new ZooKeeperInstance(instanceName, zooQuorum); connector = ins.getConnector(user, pass); writer = connector.createBatchWriter(TEST_TABLE, MAX_MEMORY, MAX_LATENCY, MAX_WRITE_THREADS); connector.tableOperations().setProperty(TEST_TABLE, Property.TABLE_CONSTRAINT_PREFIX.getKey() + 1, DtgConstraint.class.getName()); Mutation validMutation = new Mutation(new Text(rowID)); validMutation.put(COLUMN_FAMILY, DTG_QUAL, new Value(dtg)); writer.addMutation(validMutation); writer.close(); } catch (MutationsRejectedException e) { List<ConstraintViolationSummary> summaries = e.getConstraintViolationSummaries(); for (ConstraintViolationSummary sum : summaries) { System.err.println(sum.toString()); } } } }
accumulo-examples.jar
.$ACCUMULO_HOME/conf
, and edit the file accumulo-site.xml
.general.classpaths
property in the accumulo-site.xml
file to include the path to accumulo-examples.jar
.$ACCUMULO_HOME/bin/tdown.sh
and tup.sh
.$ accumulo classpath
You should see a file //printout
with accumulo-examples.jar
.
accumulo-examples.jar
is located, create a new shell script named run_constraint_test.sh
with the following commands. Be sure to change ACCUMULO-LIB
, HADOOP_LIB
, and ZOOKEEPER_LIB
to match your local paths.ACCUMULO_LIB=/opt/cloud/accumulo-1.4.1/lib/* HADOOP_LIB=/Applications/hadoop-0.20.2-cdh3u1/*:/Applications/hadoop-0.20.2-cdh3u1/lib/* ZOOKEEPER_LIB=/opt/cloud/zookeeper-3.4.2/* java -cp $ACCUMULO_LIB:$HADOOP_LIB:$ZOOKEEPER_LIB:accumulo-examples.jar examples.accumulo.DtgConstraintMain 00993877573819_9223370801921575807 2012-08-07 test root password localhost:2181
run_constraint_test.sh
and change the value of the dtg
parameter from 2012-08-07
to 2030-08-07
.Date cannot be in future
.Our Constraint
class looks through every mutation and determines if the column qualifier matching dtg
is involved. If the
ColumnUpdate
object mutates a key-value pair containing the qualifier dtg
, examine the value for errors. This constraint has the following two violation conditions:
SimpleDateFormat
pattern, yyyy-MM-dd. So 1970-12-23 and 2012-02-11 will pass, but 70-12-23 or 12-20-22 will generate an error and add a constraint violation.2030-08-07
was 18 years in the future. If the column update contains a future date, add a constraint violation.The main class takes all of the required
parameters to connect to the Accumulo instance and adds the Constraint
class to the table. It then attempts to perform a mutation on the supplied rowID using the argument value for dtg
. If the mutation is rejected for any reason, print out the constraint violations to see if the DtgConstraint
was violated.
We can modify the dtg
argument in the shell script to see the different constraint violation errors our class generated.
Constraints are a powerful feature for data policy enforcement in Accumulo. The following headings discuss a few additional things you should know.
The Accumulo core offers numerous constraint implementations out of the box. They cover a variety of common checked conditions and are already on the TabletServer classpath. Check out the example implementations in the simple
example's module located at the package org.apache.accumulo.examples.simple.constraints
. Cell visibility and other core system checks in Accumulo use constraint implementations behind the scenes.
If after installing a custom constraint to your Accumulo instance, you'll notice every mutation being rejected; it's likely that, for whatever reason, the
TabletServer server did not find your Constraint
class on the classpath. Check the TabletServer logs for ClassNotFoundExceptions
. This can happen if the table configuration has the Constraint
class listed but cannot find a class matching the fully qualified name. In a fully-distributed setup, make sure to restart every TabletServer after modifying each general classpath.