diff --git a/src/main/java/org/apache/hadoop/hbase/constraint/BaseConstraint.java b/src/main/java/org/apache/hadoop/hbase/constraint/BaseConstraint.java new file mode 100644 index 0000000..f133399 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/constraint/BaseConstraint.java @@ -0,0 +1,23 @@ +package org.apache.hadoop.hbase.constraint; + +import org.apache.hadoop.conf.Configuration; + +/** + * Base class to use when actually implementing a {@link Constraint}. It takes + * care of getting and setting of configuration for the constraint. + */ +public abstract class BaseConstraint implements + Constraint { + + private Configuration conf; + + @Override + public void setConfiguration(Configuration conf) { + this.conf = conf; + } + + protected Configuration getConfiguration() { + return this.conf; + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/constraint/Constraint.java b/src/main/java/org/apache/hadoop/hbase/constraint/Constraint.java new file mode 100644 index 0000000..482790d --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/constraint/Constraint.java @@ -0,0 +1,41 @@ +package org.apache.hadoop.hbase.constraint; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Put; + +/** + * Apply a {@link Constraint} (in traditional database terminology) to a HTable. + * Any number of {@link Constraint Constraints} can be added to the table, in + * any order. {@link Constraint Constraints} are added by specifying the classes + * on the table level as a Coprocessor. For instance, by usng + * {@link HTableDescriptor#addCoprocessor(String)}. The order {@link Constraint + * Constraints} are specified is the order in which they will be applied. + *

+ * If any {@link BaseConstraint} fails, then no further {@link BaseConstraint + * Constraints} will be checked. Therefore, there are performance implications + * in the order in which {@link BaseConstraint Constraints} are specified. + *

+ * NOTE: Implementing classes must have a nullary (no-args) constructor + */ +public interface Constraint { + + /** + * Set the configuration for the Constraint. This will be called once, on + * load, before the Constraint is used for checking. + * @param conf + */ + public void setConfiguration(Configuration conf); + + /** + * Check a {@link Put} to ensure it is valid for the table. If the {@link Put} + * is valid, then just return from the method. Otherwise, throw an + * {@link Exception} specifying what happened. This {@link Exception} is + * propagated back to the client so you can see what caused the {@link Put} to + * fail. + * @param p {@link Put} to check + * @throws Throwable + */ + public void check(Put p) throws Throwable; + +} diff --git a/src/main/java/org/apache/hadoop/hbase/constraint/ConstraintProcessor.java b/src/main/java/org/apache/hadoop/hbase/constraint/ConstraintProcessor.java new file mode 100644 index 0000000..c835e94 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/constraint/ConstraintProcessor.java @@ -0,0 +1,88 @@ +package org.apache.hadoop.hbase.constraint; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +/*** + * Processes multiple {@link Constraint Constraints} on a given table. + *

+ * This is an ease of use mechanism - all the functionality here could be + * implemented on any given system by a coprocessor. + */ +public class ConstraintProcessor extends BaseRegionObserver { + + private static final Log LOG = LogFactory.getLog(ConstraintProcessor.class); + + private final ClassLoader classloader; + + private List constraints = new ArrayList(); + private boolean hasConstraints = true; + + /** + * Create the constraint processor. + *

+ * Stores the current classloader. + */ + public ConstraintProcessor() { + classloader = this.getClass().getClassLoader(); + } + + @Override + public void start(CoprocessorEnvironment environment) { + if (!(environment instanceof RegionCoprocessorEnvironment)) + throw new IllegalArgumentException( + "Constraints only act on regions - started in an environment that was not a region"); + RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) environment; + // TODO remove + this.hasConstraints = false; + // + HTableDescriptor desc = env.getRegion().getTableDesc(); + try { + this.constraints = Constraints.getConstraints(desc, classloader); + this.hasConstraints = constraints.size() != 0; + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Added " + constraints.size() + " constraints"); + + if (LOG.isInfoEnabled()) + LOG.info("Finished loading user Constraints on table: " + + new String(desc.getName())); + + } + + @Override + public void prePut(ObserverContext e, Put put, + WALEdit edit, boolean writeToWAL) throws IOException { + // slight speedup be not getting an iterator if we have nothing to check + if (!hasConstraints) + return; + + // check the put against the stored constraints + for (Constraint c : constraints) { + // TODO process the put through the constraints + try { + c.check(put); + } catch (Throwable t) { + // if it threw an exception, then pass that back to the client and fail + // the put + throw new DoNotRetryIOException("Failed to pass constraint", t); + } + } + // if we made it here, then the Put is valid + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/constraint/Constraints.java b/src/main/java/org/apache/hadoop/hbase/constraint/Constraints.java new file mode 100644 index 0000000..8bf21c2 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/constraint/Constraints.java @@ -0,0 +1,187 @@ +package org.apache.hadoop.hbase.constraint; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +/** + * Utilities for adding/removing constraints from a table. + *

+ * Constraints can be added on table load time, via the {@link HTableDescriptor}. + */ +public final class Constraints { + private Constraints() { + } + + private static final Log LOG = LogFactory.getLog(Constraints.class); + private static final String CONSTRAINT_HTD_KEY_PREFIX = "constraint $"; + private static final Pattern CONSTRAINT_HTD_ATTR_KEY_PATTERN = Pattern + .compile(CONSTRAINT_HTD_KEY_PREFIX, Pattern.LITERAL); + + private static final byte HAS_CONFIGURATION = 1; + private static final byte DOESNT_HAVE_CONFIGURATION = 0; + + /** + * Add configuration-less constraints to the table. + *

+ * This will overwrite any configuration associated with the previous + * constraint of the same class. + * @param desc + * @param constraints + * @throws IOException If constraint could not be serialized/added to table + */ + public static void addConstraints(HTableDescriptor desc, + Class... constraints) throws IOException { + addConstraintProcessor(desc); + for (Class clazz : constraints) { + // name of the constraint + String constraintClazz = clazz.getName(); + String puttableKey = CONSTRAINT_HTD_KEY_PREFIX + constraintClazz; + byte[] data; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + dos.writeByte(DOESNT_HAVE_CONFIGURATION); + dos.flush(); + data = bos.toByteArray(); + desc.setValue(puttableKey.getBytes(), data); + } + } + + /** + * Add constraints and their associated configurations to the table. + *

+ * Adding the same constraint class twice will overwrite the first + * constraint's configuration + * @param desc + * @param constraints + * @throws IOException if any constraint could not be deserialized. Assumes if + * 1 constraint is not loaded properly, something has gone terribly + * wrong and that all constraints need to be enforced. + */ + public static void addConstraints(HTableDescriptor desc, + Pair, Configuration>... constraints) + throws IOException { + addConstraintProcessor(desc); + for (Pair, Configuration> pair : constraints) { + // name of the constraint + Class clazz = pair.getFirst(); + String constraintClazz = clazz.getName(); + String puttableKey = CONSTRAINT_HTD_KEY_PREFIX + constraintClazz; + // write the configuration + Configuration conf = pair.getSecond(); + byte[] data = new byte[0]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + // note that it has a configuration + dos.writeByte(HAS_CONFIGURATION); + conf.write(dos); + dos.flush(); + data = bos.toByteArray(); + desc.setValue(puttableKey.getBytes(), data); + } + } + + /** + * Make sure that the {@link ConstraintProcessor} has been added to the list + * of coprocessors for this table. + * @param desc table description to add the processor + * @throws IOException If the {@link ConstraintProcessor} CP couldn't be added + * to the table. + */ + private static void addConstraintProcessor(HTableDescriptor desc) + throws IOException { + // if the CP has already been loaded, do nothing + String clazz = ConstraintProcessor.class.getName(); + if (desc.hasCoprocessor(clazz)) + return; + + // add the constrain processor CP to the table + desc.addCoprocessor(clazz); + } + + /** + * Enable constraints on a table. + *

+ * Currently, if you attempt to add a constraint to the table, then + * Constraints will automatically be turned on. + * @param desc Table descriptor to enable constraints. + * @throws IOException + */ + public static void enableConstraints(HTableDescriptor desc) + throws IOException { + desc.addCoprocessor(ConstraintProcessor.class.getName()); + } + + /** + * Turn off processing constraints for a given table, even if constraints have + * been turned on or added. + * @param desc + */ + public static void disableConstraints(HTableDescriptor desc) { + // TODO implement a removal of a coprocessor. + } + + /** + * Get the constraints stored in the table desciptor + * @param desc To read from + * @param classloader To use when loading classes + * @return List of configured {@link Constraint Constraints} + * @throws IOException if any part of reading/arguments fails + */ + public static List getConstraints( + HTableDescriptor desc, ClassLoader classloader) throws IOException { + List constraints = new ArrayList(); + for (Map.Entry e : desc + .getValues().entrySet()) { + String key = Bytes.toString(e.getKey().get()).trim(); + String[] className = CONSTRAINT_HTD_ATTR_KEY_PATTERN.split(key); + if (className.length == 2) { + key = className[1]; + if (LOG.isDebugEnabled()) + LOG.debug("Loading constraint:" + key); + + // read in the configuration + byte[] values = e.getValue().get(); + DataInputStream is = new DataInputStream(new ByteArrayInputStream( + values)); + + // if it has a configuration, read it in + Configuration conf = new Configuration(); + if (is.readByte() == HAS_CONFIGURATION) { + LOG.debug("Loading configuration for constraint"); + conf.readFields(is); + } + + try { + Class clazz = classloader.loadClass(key) + .asSubclass(Constraint.class); + Constraint constraint = clazz.newInstance(); + constraint.setConfiguration(conf); + constraints.add(constraint); + } catch (ClassNotFoundException e1) { + throw new IOException(e1); + } catch (InstantiationException e1) { + throw new IOException(e1); + } catch (IllegalAccessException e1) { + throw new IOException(e1); + } + } + } + return constraints; + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/constraint/IntegerConstraint.java b/src/main/java/org/apache/hadoop/hbase/constraint/IntegerConstraint.java new file mode 100644 index 0000000..c43e21c --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/constraint/IntegerConstraint.java @@ -0,0 +1,27 @@ +package org.apache.hadoop.hbase.constraint; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; + +/** + * Check to make sure that the value in the {@link Put} is an integer, otherwise + * reject it. + */ +public class IntegerConstraint extends BaseConstraint { + + @Override + public void check(Put p) throws Throwable { + Map> familyMap = p.getFamilyMap(); + for (List kvs : familyMap.values()) { + for (KeyValue kv : kvs) { + // just make sure that we can actually pull out an int + // this will automatically throw a NumberFormatException if we try to + // store something that isn't an Integer. + Integer.parseInt(new String(kv.getBuffer())); + } + } + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/constraint/IntegrationTestConstraint.java b/src/test/java/org/apache/hadoop/hbase/constraint/IntegrationTestConstraint.java new file mode 100644 index 0000000..08ec1aa --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/constraint/IntegrationTestConstraint.java @@ -0,0 +1,133 @@ +package org.apache.hadoop.hbase.constraint; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class IntegrationTestConstraint { + private static final Log LOG = LogFactory + .getLog(IntegrationTestConstraint.class); + + private static HBaseTestingUtility util; + private static final byte[] tableName = Bytes.toBytes("test"); + private static final byte[] dummy = Bytes.toBytes("dummy"); + private static final byte[] row1 = Bytes.toBytes("r1"); + private static final byte[] row2 = Bytes.toBytes("r2"); + private static final byte[] row3 = Bytes.toBytes("r3"); + private static final byte[] test = Bytes.toBytes("test"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + + util = new HBaseTestingUtility(); + util.startMiniCluster(); + + } + + @Test + public void testConstraintPasses() throws Exception { + // create the table + // it would be nice if this was also a method on the util + HTableDescriptor desc = new HTableDescriptor(tableName); + Constraints.addConstraints(desc, AllPassConstraint.class); + for (byte[] family : new byte[][] { dummy, test }) { + desc.addFamily(new HColumnDescriptor(family)); + } + + util.getHBaseAdmin().createTable(desc); + HTable table = new HTable(util.getConfiguration(), tableName); + table.setAutoFlush(true); + + // test that we don't fail on a valid put + Put put = new Put(row1); + byte[] value = Integer.toString(10).getBytes(); + put.add(dummy, new byte[0], value); + table.put(put); + + // cleanup + util.getHBaseAdmin().disableTable(tableName); + util.getHBaseAdmin().deleteTable(tableName); + } + + @Test(timeout = 10000) + public void testConstraintFails() throws Exception { + + // create the table + // it would be nice if this was also a method on the util + HTableDescriptor desc = new HTableDescriptor(tableName); + Constraints.addConstraints(desc, AllFailConstraint.class); + for (byte[] family : new byte[][] { dummy, test }) { + desc.addFamily(new HColumnDescriptor(family)); + } + + util.getHBaseAdmin().createTable(desc); + HTable table = new HTable(util.getConfiguration(), tableName); + table.setAutoFlush(true); + + // test that we do fail on violation + Put put = new Put(row1); + put.add(dummy, new byte[0], "fail".getBytes()); + LOG.warn("Doing put in table"); + try { + table.put(put); + // table.flushCommits(); + assertTrue(false); + } catch (RetriesExhaustedWithDetailsException e) { + List causes = e.getCauses(); + assertEquals( + "More than one failure cause - should only be the failure constraint exception", + 1, causes.size()); + Throwable t = causes.get(0); + assertEquals(DoNotRetryIOException.class, t.getClass()); + + } + + // cleanup + util.getHBaseAdmin().disableTable(tableName); + util.getHBaseAdmin().deleteTable(tableName); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + /** + * Constraint that allows all keys to pass + */ + public static class AllPassConstraint extends BaseConstraint { + + @Override + public void check(Put p) throws Throwable { + // Do Nothing - what ever the put is, it passe + } + + } + + /** + * Fail on every put + */ + public static class AllFailConstraint extends BaseConstraint { + + @Override + public void check(Put p) throws Throwable { + throw new RuntimeException("AllFailConstraint fails for all puts"); + } + } + +}