diff --git a/src/main/java/org/apache/hadoop/hbase/constraint/BaseConstraint.java b/src/main/java/org/apache/hadoop/hbase/constraint/BaseConstraint.java
new file mode 100644
index 0000000..f133399
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/constraint/BaseConstraint.java
@@ -0,0 +1,23 @@
+package org.apache.hadoop.hbase.constraint;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Base class to use when actually implementing a {@link Constraint}. It takes
+ * care of getting and setting of configuration for the constraint.
+ */
+public abstract class BaseConstraint implements
+ Constraint {
+
+ private Configuration conf;
+
+ @Override
+ public void setConfiguration(Configuration conf) {
+ this.conf = conf;
+ }
+
+ protected Configuration getConfiguration() {
+ return this.conf;
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/constraint/Constraint.java b/src/main/java/org/apache/hadoop/hbase/constraint/Constraint.java
new file mode 100644
index 0000000..482790d
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/constraint/Constraint.java
@@ -0,0 +1,41 @@
+package org.apache.hadoop.hbase.constraint;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Put;
+
+/**
+ * Apply a {@link Constraint} (in traditional database terminology) to a HTable.
+ * Any number of {@link Constraint Constraints} can be added to the table, in
+ * any order. {@link Constraint Constraints} are added by specifying the classes
+ * on the table level as a Coprocessor. For instance, by usng
+ * {@link HTableDescriptor#addCoprocessor(String)}. The order {@link Constraint
+ * Constraints} are specified is the order in which they will be applied.
+ *
+ * If any {@link BaseConstraint} fails, then no further {@link BaseConstraint
+ * Constraints} will be checked. Therefore, there are performance implications
+ * in the order in which {@link BaseConstraint Constraints} are specified.
+ *
+ * NOTE: Implementing classes must have a nullary (no-args) constructor
+ */
+public interface Constraint {
+
+ /**
+ * Set the configuration for the Constraint. This will be called once, on
+ * load, before the Constraint is used for checking.
+ * @param conf
+ */
+ public void setConfiguration(Configuration conf);
+
+ /**
+ * Check a {@link Put} to ensure it is valid for the table. If the {@link Put}
+ * is valid, then just return from the method. Otherwise, throw an
+ * {@link Exception} specifying what happened. This {@link Exception} is
+ * propagated back to the client so you can see what caused the {@link Put} to
+ * fail.
+ * @param p {@link Put} to check
+ * @throws Throwable
+ */
+ public void check(Put p) throws Throwable;
+
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/constraint/ConstraintProcessor.java b/src/main/java/org/apache/hadoop/hbase/constraint/ConstraintProcessor.java
new file mode 100644
index 0000000..c835e94
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/constraint/ConstraintProcessor.java
@@ -0,0 +1,88 @@
+package org.apache.hadoop.hbase.constraint;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/***
+ * Processes multiple {@link Constraint Constraints} on a given table.
+ *
+ * This is an ease of use mechanism - all the functionality here could be
+ * implemented on any given system by a coprocessor.
+ */
+public class ConstraintProcessor extends BaseRegionObserver {
+
+ private static final Log LOG = LogFactory.getLog(ConstraintProcessor.class);
+
+ private final ClassLoader classloader;
+
+ private List extends Constraint> constraints = new ArrayList();
+ private boolean hasConstraints = true;
+
+ /**
+ * Create the constraint processor.
+ *
+ * Stores the current classloader.
+ */
+ public ConstraintProcessor() {
+ classloader = this.getClass().getClassLoader();
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment environment) {
+ if (!(environment instanceof RegionCoprocessorEnvironment))
+ throw new IllegalArgumentException(
+ "Constraints only act on regions - started in an environment that was not a region");
+ RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) environment;
+ // TODO remove
+ this.hasConstraints = false;
+ //
+ HTableDescriptor desc = env.getRegion().getTableDesc();
+ try {
+ this.constraints = Constraints.getConstraints(desc, classloader);
+ this.hasConstraints = constraints.size() != 0;
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Added " + constraints.size() + " constraints");
+
+ if (LOG.isInfoEnabled())
+ LOG.info("Finished loading user Constraints on table: "
+ + new String(desc.getName()));
+
+ }
+
+ @Override
+ public void prePut(ObserverContext e, Put put,
+ WALEdit edit, boolean writeToWAL) throws IOException {
+ // slight speedup be not getting an iterator if we have nothing to check
+ if (!hasConstraints)
+ return;
+
+ // check the put against the stored constraints
+ for (Constraint c : constraints) {
+ // TODO process the put through the constraints
+ try {
+ c.check(put);
+ } catch (Throwable t) {
+ // if it threw an exception, then pass that back to the client and fail
+ // the put
+ throw new DoNotRetryIOException("Failed to pass constraint", t);
+ }
+ }
+ // if we made it here, then the Put is valid
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/constraint/Constraints.java b/src/main/java/org/apache/hadoop/hbase/constraint/Constraints.java
new file mode 100644
index 0000000..8bf21c2
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/constraint/Constraints.java
@@ -0,0 +1,187 @@
+package org.apache.hadoop.hbase.constraint;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Utilities for adding/removing constraints from a table.
+ *
+ * Constraints can be added on table load time, via the {@link HTableDescriptor}.
+ */
+public final class Constraints {
+ private Constraints() {
+ }
+
+ private static final Log LOG = LogFactory.getLog(Constraints.class);
+ private static final String CONSTRAINT_HTD_KEY_PREFIX = "constraint $";
+ private static final Pattern CONSTRAINT_HTD_ATTR_KEY_PATTERN = Pattern
+ .compile(CONSTRAINT_HTD_KEY_PREFIX, Pattern.LITERAL);
+
+ private static final byte HAS_CONFIGURATION = 1;
+ private static final byte DOESNT_HAVE_CONFIGURATION = 0;
+
+ /**
+ * Add configuration-less constraints to the table.
+ *
+ * This will overwrite any configuration associated with the previous
+ * constraint of the same class.
+ * @param desc
+ * @param constraints
+ * @throws IOException If constraint could not be serialized/added to table
+ */
+ public static void addConstraints(HTableDescriptor desc,
+ Class extends Constraint>... constraints) throws IOException {
+ addConstraintProcessor(desc);
+ for (Class extends Constraint> clazz : constraints) {
+ // name of the constraint
+ String constraintClazz = clazz.getName();
+ String puttableKey = CONSTRAINT_HTD_KEY_PREFIX + constraintClazz;
+ byte[] data;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ dos.writeByte(DOESNT_HAVE_CONFIGURATION);
+ dos.flush();
+ data = bos.toByteArray();
+ desc.setValue(puttableKey.getBytes(), data);
+ }
+ }
+
+ /**
+ * Add constraints and their associated configurations to the table.
+ *
+ * Adding the same constraint class twice will overwrite the first
+ * constraint's configuration
+ * @param desc
+ * @param constraints
+ * @throws IOException if any constraint could not be deserialized. Assumes if
+ * 1 constraint is not loaded properly, something has gone terribly
+ * wrong and that all constraints need to be enforced.
+ */
+ public static void addConstraints(HTableDescriptor desc,
+ Pair, Configuration>... constraints)
+ throws IOException {
+ addConstraintProcessor(desc);
+ for (Pair, Configuration> pair : constraints) {
+ // name of the constraint
+ Class extends Constraint> clazz = pair.getFirst();
+ String constraintClazz = clazz.getName();
+ String puttableKey = CONSTRAINT_HTD_KEY_PREFIX + constraintClazz;
+ // write the configuration
+ Configuration conf = pair.getSecond();
+ byte[] data = new byte[0];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ // note that it has a configuration
+ dos.writeByte(HAS_CONFIGURATION);
+ conf.write(dos);
+ dos.flush();
+ data = bos.toByteArray();
+ desc.setValue(puttableKey.getBytes(), data);
+ }
+ }
+
+ /**
+ * Make sure that the {@link ConstraintProcessor} has been added to the list
+ * of coprocessors for this table.
+ * @param desc table description to add the processor
+ * @throws IOException If the {@link ConstraintProcessor} CP couldn't be added
+ * to the table.
+ */
+ private static void addConstraintProcessor(HTableDescriptor desc)
+ throws IOException {
+ // if the CP has already been loaded, do nothing
+ String clazz = ConstraintProcessor.class.getName();
+ if (desc.hasCoprocessor(clazz))
+ return;
+
+ // add the constrain processor CP to the table
+ desc.addCoprocessor(clazz);
+ }
+
+ /**
+ * Enable constraints on a table.
+ *
+ * Currently, if you attempt to add a constraint to the table, then
+ * Constraints will automatically be turned on.
+ * @param desc Table descriptor to enable constraints.
+ * @throws IOException
+ */
+ public static void enableConstraints(HTableDescriptor desc)
+ throws IOException {
+ desc.addCoprocessor(ConstraintProcessor.class.getName());
+ }
+
+ /**
+ * Turn off processing constraints for a given table, even if constraints have
+ * been turned on or added.
+ * @param desc
+ */
+ public static void disableConstraints(HTableDescriptor desc) {
+ // TODO implement a removal of a coprocessor.
+ }
+
+ /**
+ * Get the constraints stored in the table desciptor
+ * @param desc To read from
+ * @param classloader To use when loading classes
+ * @return List of configured {@link Constraint Constraints}
+ * @throws IOException if any part of reading/arguments fails
+ */
+ public static List extends Constraint> getConstraints(
+ HTableDescriptor desc, ClassLoader classloader) throws IOException {
+ List constraints = new ArrayList();
+ for (Map.Entry e : desc
+ .getValues().entrySet()) {
+ String key = Bytes.toString(e.getKey().get()).trim();
+ String[] className = CONSTRAINT_HTD_ATTR_KEY_PATTERN.split(key);
+ if (className.length == 2) {
+ key = className[1];
+ if (LOG.isDebugEnabled())
+ LOG.debug("Loading constraint:" + key);
+
+ // read in the configuration
+ byte[] values = e.getValue().get();
+ DataInputStream is = new DataInputStream(new ByteArrayInputStream(
+ values));
+
+ // if it has a configuration, read it in
+ Configuration conf = new Configuration();
+ if (is.readByte() == HAS_CONFIGURATION) {
+ LOG.debug("Loading configuration for constraint");
+ conf.readFields(is);
+ }
+
+ try {
+ Class extends Constraint> clazz = classloader.loadClass(key)
+ .asSubclass(Constraint.class);
+ Constraint constraint = clazz.newInstance();
+ constraint.setConfiguration(conf);
+ constraints.add(constraint);
+ } catch (ClassNotFoundException e1) {
+ throw new IOException(e1);
+ } catch (InstantiationException e1) {
+ throw new IOException(e1);
+ } catch (IllegalAccessException e1) {
+ throw new IOException(e1);
+ }
+ }
+ }
+ return constraints;
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/constraint/IntegerConstraint.java b/src/main/java/org/apache/hadoop/hbase/constraint/IntegerConstraint.java
new file mode 100644
index 0000000..c43e21c
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/constraint/IntegerConstraint.java
@@ -0,0 +1,27 @@
+package org.apache.hadoop.hbase.constraint;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+
+/**
+ * Check to make sure that the value in the {@link Put} is an integer, otherwise
+ * reject it.
+ */
+public class IntegerConstraint extends BaseConstraint {
+
+ @Override
+ public void check(Put p) throws Throwable {
+ Map> familyMap = p.getFamilyMap();
+ for (List kvs : familyMap.values()) {
+ for (KeyValue kv : kvs) {
+ // just make sure that we can actually pull out an int
+ // this will automatically throw a NumberFormatException if we try to
+ // store something that isn't an Integer.
+ Integer.parseInt(new String(kv.getBuffer()));
+ }
+ }
+ }
+}
diff --git a/src/test/java/org/apache/hadoop/hbase/constraint/IntegrationTestConstraint.java b/src/test/java/org/apache/hadoop/hbase/constraint/IntegrationTestConstraint.java
new file mode 100644
index 0000000..08ec1aa
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/hbase/constraint/IntegrationTestConstraint.java
@@ -0,0 +1,133 @@
+package org.apache.hadoop.hbase.constraint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class IntegrationTestConstraint {
+ private static final Log LOG = LogFactory
+ .getLog(IntegrationTestConstraint.class);
+
+ private static HBaseTestingUtility util;
+ private static final byte[] tableName = Bytes.toBytes("test");
+ private static final byte[] dummy = Bytes.toBytes("dummy");
+ private static final byte[] row1 = Bytes.toBytes("r1");
+ private static final byte[] row2 = Bytes.toBytes("r2");
+ private static final byte[] row3 = Bytes.toBytes("r3");
+ private static final byte[] test = Bytes.toBytes("test");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ util = new HBaseTestingUtility();
+ util.startMiniCluster();
+
+ }
+
+ @Test
+ public void testConstraintPasses() throws Exception {
+ // create the table
+ // it would be nice if this was also a method on the util
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ Constraints.addConstraints(desc, AllPassConstraint.class);
+ for (byte[] family : new byte[][] { dummy, test }) {
+ desc.addFamily(new HColumnDescriptor(family));
+ }
+
+ util.getHBaseAdmin().createTable(desc);
+ HTable table = new HTable(util.getConfiguration(), tableName);
+ table.setAutoFlush(true);
+
+ // test that we don't fail on a valid put
+ Put put = new Put(row1);
+ byte[] value = Integer.toString(10).getBytes();
+ put.add(dummy, new byte[0], value);
+ table.put(put);
+
+ // cleanup
+ util.getHBaseAdmin().disableTable(tableName);
+ util.getHBaseAdmin().deleteTable(tableName);
+ }
+
+ @Test(timeout = 10000)
+ public void testConstraintFails() throws Exception {
+
+ // create the table
+ // it would be nice if this was also a method on the util
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ Constraints.addConstraints(desc, AllFailConstraint.class);
+ for (byte[] family : new byte[][] { dummy, test }) {
+ desc.addFamily(new HColumnDescriptor(family));
+ }
+
+ util.getHBaseAdmin().createTable(desc);
+ HTable table = new HTable(util.getConfiguration(), tableName);
+ table.setAutoFlush(true);
+
+ // test that we do fail on violation
+ Put put = new Put(row1);
+ put.add(dummy, new byte[0], "fail".getBytes());
+ LOG.warn("Doing put in table");
+ try {
+ table.put(put);
+ // table.flushCommits();
+ assertTrue(false);
+ } catch (RetriesExhaustedWithDetailsException e) {
+ List causes = e.getCauses();
+ assertEquals(
+ "More than one failure cause - should only be the failure constraint exception",
+ 1, causes.size());
+ Throwable t = causes.get(0);
+ assertEquals(DoNotRetryIOException.class, t.getClass());
+
+ }
+
+ // cleanup
+ util.getHBaseAdmin().disableTable(tableName);
+ util.getHBaseAdmin().deleteTable(tableName);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ /**
+ * Constraint that allows all keys to pass
+ */
+ public static class AllPassConstraint extends BaseConstraint {
+
+ @Override
+ public void check(Put p) throws Throwable {
+ // Do Nothing - what ever the put is, it passe
+ }
+
+ }
+
+ /**
+ * Fail on every put
+ */
+ public static class AllFailConstraint extends BaseConstraint {
+
+ @Override
+ public void check(Put p) throws Throwable {
+ throw new RuntimeException("AllFailConstraint fails for all puts");
+ }
+ }
+
+}