diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index c861192..1a1b2ac 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -311,6 +311,12 @@ public abstract class BaseRegionObserver implements RegionObserver { } @Override + public void prePrepareTimeStampForDeleteVersion( + final ObserverContext e, final Mutation delete, + final Cell cell, final byte[] byteNow, final Get get) throws IOException { + } + + @Override public void postDelete(final ObserverContext e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException { @@ -340,6 +346,15 @@ public abstract class BaseRegionObserver implements RegionObserver { } @Override + public boolean preCheckAndPutAfterRowLock( + final ObserverContext e, + final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, + final ByteArrayComparable comparator, final Put put, + final boolean result) throws IOException { + return result; + } + + @Override public boolean postCheckAndPut(final ObserverContext e, final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, @@ -356,6 +371,15 @@ public abstract class BaseRegionObserver implements RegionObserver { } @Override + public boolean preCheckAndDeleteAfterRowLock( + final ObserverContext e, + final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, + final ByteArrayComparable comparator, final Delete delete, + final boolean result) throws IOException { + return result; + } + + @Override public boolean postCheckAndDelete(final ObserverContext e, final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, @@ -370,6 +394,12 @@ public abstract class BaseRegionObserver implements RegionObserver { } @Override + public Result preAppendAfterRowLock(final ObserverContext e, + final Append append) throws IOException { + return null; + } + + @Override public Result postAppend(final ObserverContext e, final Append append, final Result result) throws IOException { return result; @@ -397,6 +427,12 @@ public abstract class BaseRegionObserver implements RegionObserver { } @Override + public Result preIncrementAfterRowLock(final ObserverContext e, + final Increment increment) throws IOException { + return null; + } + + @Override public Result postIncrement(final ObserverContext e, final Increment increment, final Result result) throws IOException { return result; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 96cc3bd..3425a12 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -587,6 +587,24 @@ public interface RegionObserver extends Coprocessor { void preDelete(final ObserverContext c, final Delete delete, final WALEdit edit, final Durability durability) throws IOException; +/** + * Called before the server updates the timestamp for version delete with latest timestamp. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors + * @param c the environment provided by the region server + * @param mutation - the parent mutation associated with this delete cell + * @param cell - The deleteColumn with latest version cell + * @param byteNow - timestamp bytes + * @param get - the get formed using the current cell's row. + * Note that the get does not specify the family and qualifier + * @throws IOException + */ + void prePrepareTimeStampForDeleteVersion(final ObserverContext c, + final Mutation mutation, final Cell cell, final byte[] byteNow, + final Get get) throws IOException; /** * Called after the client deletes a value. @@ -657,7 +675,7 @@ public interface RegionObserver extends Coprocessor { MiniBatchOperationInProgress miniBatchOp, final boolean success) throws IOException; /** - * Called before checkAndPut + * Called before checkAndPut. *

* Call CoprocessorEnvironment#bypass to skip default actions *

@@ -682,6 +700,34 @@ public interface RegionObserver extends Coprocessor { throws IOException; /** + * Called before checkAndPut but after acquiring rowlock. + *

+ * Note: Caution to be taken for not doing any long time operation in this hook. + * Row will be locked for longer time. Trying to acquire lock on another row, within this, + * can lead to potential deadlock. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors + * @param c the environment provided by the region server + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param compareOp the comparison operation + * @param comparator the comparator + * @param put data to put if check succeeds + * @param result + * @return the return value to return to client if bypassing default + * processing + * @throws IOException if an error occurred on the coprocessor + */ + boolean preCheckAndPutAfterRowLock(final ObserverContext c, + final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, + final ByteArrayComparable comparator, final Put put, + final boolean result) throws IOException; + + /** * Called after checkAndPut *

* Call CoprocessorEnvironment#complete to skip any subsequent chained @@ -704,7 +750,7 @@ public interface RegionObserver extends Coprocessor { throws IOException; /** - * Called before checkAndDelete + * Called before checkAndDelete. *

* Call CoprocessorEnvironment#bypass to skip default actions *

@@ -728,6 +774,33 @@ public interface RegionObserver extends Coprocessor { throws IOException; /** + * Called before checkAndDelete but after acquiring rowock. + *

+ * Note: Caution to be taken for not doing any long time operation in this hook. + * Row will be locked for longer time. Trying to acquire lock on another row, within this, + * can lead to potential deadlock. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors + * @param c the environment provided by the region server + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param compareOp the comparison operation + * @param comparator the comparator + * @param delete delete to commit if check succeeds + * @param result + * @return the value to return to client if bypassing default processing + * @throws IOException if an error occurred on the coprocessor + */ + boolean preCheckAndDeleteAfterRowLock(final ObserverContext c, + final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, + final ByteArrayComparable comparator, final Delete delete, + final boolean result) throws IOException; + + /** * Called after checkAndDelete *

* Call CoprocessorEnvironment#complete to skip any subsequent chained @@ -795,7 +868,7 @@ public interface RegionObserver extends Coprocessor { throws IOException; /** - * Called before Append + * Called before Append. *

* Call CoprocessorEnvironment#bypass to skip default actions *

@@ -811,6 +884,25 @@ public interface RegionObserver extends Coprocessor { throws IOException; /** + * Called before Append but after acquiring rowlock. + *

+ * Note: Caution to be taken for not doing any long time operation in this hook. + * Row will be locked for longer time. Trying to acquire lock on another row, within this, + * can lead to potential deadlock. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors + * @param c the environment provided by the region server + * @param append Append object + * @return result to return to the client if bypassing default processing + * @throws IOException if an error occurred on the coprocessor + */ + Result preAppendAfterRowLock(final ObserverContext c, + final Append append) throws IOException; + + /** * Called after Append *

* Call CoprocessorEnvironment#complete to skip any subsequent chained @@ -826,7 +918,7 @@ public interface RegionObserver extends Coprocessor { throws IOException; /** - * Called before Increment + * Called before Increment. *

* Call CoprocessorEnvironment#bypass to skip default actions *

@@ -842,6 +934,28 @@ public interface RegionObserver extends Coprocessor { throws IOException; /** + * Called before Increment but after acquiring rowlock. + *

+ * Note: Caution to be taken for not doing any long time operation in this hook. + * Row will be locked for longer time. Trying to acquire lock on another row, within this, + * can lead to potential deadlock. + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors + * + * @param c + * the environment provided by the region server + * @param increment + * increment object + * @return result to return to the client if bypassing default processing + * @throws IOException + * if an error occurred on the coprocessor + */ + Result preIncrementAfterRowLock(final ObserverContext c, + final Increment increment) throws IOException; + + /** * Called after increment *

* Call CoprocessorEnvironment#complete to skip any subsequent chained diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index dea1e81..0c4f7ee 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2030,10 +2030,13 @@ public class HRegion implements HeapSize { // , Writable{ /** * Setup correct timestamps in the KVs in Delete object. * Caller should have the row and region locks. + * @param mutation + * @param familyMap + * @param byteNow * @throws IOException */ - void prepareDeleteTimestamps(Map> familyMap, byte[] byteNow) - throws IOException { + void prepareDeleteTimestamps(Mutation mutation, Map> familyMap, + byte[] byteNow) throws IOException { for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); @@ -2059,20 +2062,14 @@ public class HRegion implements HeapSize { // , Writable{ Get get = new Get(CellUtil.cloneRow(kv)); get.setMaxVersions(count); get.addColumn(family, qual); - - List result = get(get, false); - - if (result.size() < count) { - // Nothing to delete - kv.updateLatestStamp(byteNow); - continue; - } - if (result.size() > count) { - throw new RuntimeException("Unexpected size: " + result.size()); + if (coprocessorHost != null) { + if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell, byteNow, + get)) { + updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow); + } + } else { + updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow); } - KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1)); - Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), - getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); } else { kv.updateLatestStamp(byteNow); } @@ -2080,6 +2077,23 @@ public class HRegion implements HeapSize { // , Writable{ } } + void updateDeleteLatestVersionTimeStamp(KeyValue kv, Get get, int count, byte[] byteNow) + throws IOException { + List result = get(get, false); + + if (result.size() < count) { + // Nothing to delete + kv.updateLatestStamp(byteNow); + return; + } + if (result.size() > count) { + throw new RuntimeException("Unexpected size: " + result.size()); + } + KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1)); + Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(), + getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); + } + /** * @throws IOException */ @@ -2452,7 +2466,9 @@ public class HRegion implements HeapSize { // , Writable{ updateKVTimestamps(familyMaps[i].values(), byteNow); noOfPuts++; } else { - prepareDeleteTimestamps(familyMaps[i], byteNow); + if (!isInReplay) { + prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); + } noOfDeletes++; } } @@ -2712,9 +2728,21 @@ public class HRegion implements HeapSize { // , Writable{ RowLock rowLock = getRowLock(get.getRow()); // wait for all previous transactions to complete (with lock held) mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); - List result; try { - result = get(get, false); + if (this.getCoprocessorHost() != null) { + Boolean processed = null; + if (w instanceof Put) { + processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family, + qualifier, compareOp, comparator, (Put) w); + } else if (w instanceof Delete) { + processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family, + qualifier, compareOp, comparator, (Delete) w); + } + if (processed != null) { + return processed; + } + } + List result = get(get, false); boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; @@ -5033,12 +5061,18 @@ public class HRegion implements HeapSize { // , Writable{ rowLock = getRowLock(row); try { lock(this.updatesLock.readLock()); - // wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); - // now start my own transaction - w = mvcc.beginMemstoreInsert(); try { + // wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest state) + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + if (this.coprocessorHost != null) { + Result r = this.coprocessorHost.preAppendAfterRowLock(append); + if(r!= null) { + return r; + } + } + // now start my own transaction + w = mvcc.beginMemstoreInsert(); long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family for (Map.Entry> family : append.getFamilyCellMap().entrySet()) { @@ -5221,12 +5255,18 @@ public class HRegion implements HeapSize { // , Writable{ RowLock rowLock = getRowLock(row); try { lock(this.updatesLock.readLock()); - // wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest state) - mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); - // now start my own transaction - w = mvcc.beginMemstoreInsert(); try { + // wait for all prior MVCC transactions to finish - while we hold the row lock + // (so that we are guaranteed to see the latest state) + mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert()); + if (this.coprocessorHost != null) { + Result r = this.coprocessorHost.preIncrementAfterRowLock(increment); + if (r != null) { + return r; + } + } + // now start my own transaction + w = mvcc.beginMemstoreInsert(); long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family for (Map.Entry> family: diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java index 847e6cb..9cfa326 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java @@ -27,9 +27,9 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -80,7 +80,7 @@ MultiRowMutationProcessorResponse> { } else if (m instanceof Delete) { Delete d = (Delete) m; region.prepareDelete(d); - region.prepareDeleteTimestamps(d.getFamilyCellMap(), byteNow); + region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow); } else { throw new DoNotRetryIOException( "Action must be Put or Delete. But was: " diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 177f153..e476ea2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -1135,6 +1135,44 @@ public class RegionCoprocessorHost } /** + * @param mutation - the current mutation + * @param kv - the current cell + * @param byteNow - current timestamp in bytes + * @param get - the get that could be used + * Note that the get only does not specify the family and qualifier that should be used + * @return true if default processing should be bypassed + * @exception IOException + * Exception + */ + public boolean prePrepareTimeStampForDeleteVersion(Mutation mutation, + Cell kv, byte[] byteNow, Get get) throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + Thread currentThread = Thread.currentThread(); + ClassLoader cl = currentThread.getContextClassLoader(); + try { + currentThread.setContextClassLoader(env.getClassLoader()); + ((RegionObserver) env.getInstance()) + .prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, + byteNow, get); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } finally { + currentThread.setContextClassLoader(cl); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } + + /** * @param put The Put object * @param edit The WALEdit object. * @param durability The durability used @@ -1356,6 +1394,46 @@ public class RegionCoprocessorHost * @param compareOp the comparison operation * @param comparator the comparator * @param put data to put if check succeeds + * @return true or false to return to client if default processing should + * be bypassed, or null otherwise + * @throws IOException e + */ + public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family, + final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, + final Put put) throws IOException { + boolean bypass = false; + boolean result = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + Thread currentThread = Thread.currentThread(); + ClassLoader cl = currentThread.getContextClassLoader(); + try { + currentThread.setContextClassLoader(env.getClassLoader()); + result = ((RegionObserver) env.getInstance()).preCheckAndPutAfterRowLock(ctx, row, + family, qualifier, compareOp, comparator, put, result); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } finally { + currentThread.setContextClassLoader(cl); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass ? result : null; + } + + /** + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param compareOp the comparison operation + * @param comparator the comparator + * @param put data to put if check succeeds * @throws IOException e */ public boolean postCheckAndPut(final byte [] row, final byte [] family, @@ -1434,6 +1512,46 @@ public class RegionCoprocessorHost * @param compareOp the comparison operation * @param comparator the comparator * @param delete delete to commit if check succeeds + * @return true or false to return to client if default processing should + * be bypassed, or null otherwise + * @throws IOException e + */ + public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family, + final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, + final Delete delete) throws IOException { + boolean bypass = false; + boolean result = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + Thread currentThread = Thread.currentThread(); + ClassLoader cl = currentThread.getContextClassLoader(); + try { + currentThread.setContextClassLoader(env.getClassLoader()); + result = ((RegionObserver) env.getInstance()).preCheckAndDeleteAfterRowLock(ctx, row, + family, qualifier, compareOp, comparator, delete, result); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } finally { + currentThread.setContextClassLoader(cl); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass ? result : null; + } + + /** + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param compareOp the comparison operation + * @param comparator the comparator + * @param delete delete to commit if check succeeds * @throws IOException e */ public boolean postCheckAndDelete(final byte [] row, final byte [] family, @@ -1496,6 +1614,38 @@ public class RegionCoprocessorHost } /** + * @param append append object + * @return result to return to client if default operation should be + * bypassed, null otherwise + * @throws IOException if an error occurred on the coprocessor + */ + public Result preAppendAfterRowLock(final Append append) throws IOException { + boolean bypass = false; + Result result = null; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + Thread currentThread = Thread.currentThread(); + ClassLoader cl = currentThread.getContextClassLoader(); + try { + currentThread.setContextClassLoader(env.getClassLoader()); + result = ((RegionObserver) env.getInstance()).preAppendAfterRowLock(ctx, append); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } finally { + currentThread.setContextClassLoader(cl); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass ? result : null; + } + + /** * @param increment increment object * @return result to return to client if default operation should be * bypassed, null otherwise @@ -1528,6 +1678,38 @@ public class RegionCoprocessorHost } /** + * @param increment increment object + * @return result to return to client if default operation should be + * bypassed, null otherwise + * @throws IOException if an error occurred on the coprocessor + */ + public Result preIncrementAfterRowLock(final Increment increment) throws IOException { + boolean bypass = false; + Result result = null; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + Thread currentThread = Thread.currentThread(); + ClassLoader cl = currentThread.getContextClassLoader(); + try { + currentThread.setContextClassLoader(env.getClassLoader()); + result = ((RegionObserver) env.getInstance()).preIncrementAfterRowLock(ctx, increment); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } finally { + currentThread.setContextClassLoader(cl); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass ? result : null; + } + + /** * @param append Append object * @param result the result returned by the append * @throws IOException if an error occurred on the coprocessor diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index 45ecf43..bf53518 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -48,6 +49,8 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -96,11 +99,22 @@ public class SimpleRegionObserver extends BaseRegionObserver { final AtomicInteger ctPrePut = new AtomicInteger(0); final AtomicInteger ctPostPut = new AtomicInteger(0); final AtomicInteger ctPreDeleted = new AtomicInteger(0); + final AtomicInteger ctPrePrepareDeleteTS = new AtomicInteger(0); final AtomicInteger ctPostDeleted = new AtomicInteger(0); final AtomicInteger ctPreGetClosestRowBefore = new AtomicInteger(0); final AtomicInteger ctPostGetClosestRowBefore = new AtomicInteger(0); final AtomicInteger ctPreIncrement = new AtomicInteger(0); + final AtomicInteger ctPreIncrementAfterRowLock = new AtomicInteger(0); + final AtomicInteger ctPreAppend = new AtomicInteger(0); + final AtomicInteger ctPreAppendAfterRowLock = new AtomicInteger(0); final AtomicInteger ctPostIncrement = new AtomicInteger(0); + final AtomicInteger ctPostAppend = new AtomicInteger(0); + final AtomicInteger ctPreCheckAndPut = new AtomicInteger(0); + final AtomicInteger ctPreCheckAndPutAfterRowLock = new AtomicInteger(0); + final AtomicInteger ctPostCheckAndPut = new AtomicInteger(0); + final AtomicInteger ctPreCheckAndDelete = new AtomicInteger(0); + final AtomicInteger ctPreCheckAndDeleteAfterRowLock = new AtomicInteger(0); + final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0); final AtomicInteger ctPreWALRestored = new AtomicInteger(0); final AtomicInteger ctPostWALRestored = new AtomicInteger(0); final AtomicInteger ctPreScannerNext = new AtomicInteger(0); @@ -435,6 +449,12 @@ public class SimpleRegionObserver extends BaseRegionObserver { } @Override + public void prePrepareTimeStampForDeleteVersion(ObserverContext e, + Mutation delete, Cell cell, byte[] byteNow, Get get) throws IOException { + ctPrePrepareDeleteTS.incrementAndGet(); + } + + @Override public void postDelete(final ObserverContext c, final Delete delete, final WALEdit edit, final Durability durability) throws IOException { @@ -521,6 +541,13 @@ public class SimpleRegionObserver extends BaseRegionObserver { } @Override + public Result preIncrementAfterRowLock(ObserverContext e, + Increment increment) throws IOException { + ctPreIncrementAfterRowLock.incrementAndGet(); + return null; + } + + @Override public Result postIncrement(final ObserverContext c, final Increment increment, final Result result) throws IOException { ctPostIncrement.incrementAndGet(); @@ -528,6 +555,75 @@ public class SimpleRegionObserver extends BaseRegionObserver { } @Override + public boolean preCheckAndPut(ObserverContext e, byte[] row, + byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator, + Put put, boolean result) throws IOException { + ctPreCheckAndPut.incrementAndGet(); + return true; + } + + @Override + public boolean preCheckAndPutAfterRowLock(ObserverContext e, + byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + ByteArrayComparable comparator, Put put, boolean result) throws IOException { + ctPreCheckAndPutAfterRowLock.incrementAndGet(); + return true; + } + + @Override + public boolean postCheckAndPut(ObserverContext e, byte[] row, + byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator, + Put put, boolean result) throws IOException { + ctPostCheckAndPut.incrementAndGet(); + return true; + } + + @Override + public boolean preCheckAndDelete(ObserverContext e, byte[] row, + byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator, + Delete delete, boolean result) throws IOException { + ctPreCheckAndDelete.incrementAndGet(); + return true; + } + + @Override + public boolean preCheckAndDeleteAfterRowLock(ObserverContext e, + byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + ByteArrayComparable comparator, Delete delete, boolean result) throws IOException { + ctPreCheckAndDeleteAfterRowLock.incrementAndGet(); + return true; + } + + @Override + public boolean postCheckAndDelete(ObserverContext e, byte[] row, + byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator, + Delete delete, boolean result) throws IOException { + ctPostCheckAndDelete.incrementAndGet(); + return true; + } + + @Override + public Result preAppendAfterRowLock(ObserverContext e, + Append append) throws IOException { + ctPreAppendAfterRowLock.incrementAndGet(); + return null; + } + + @Override + public Result preAppend(ObserverContext e, Append append) + throws IOException { + ctPreAppend.incrementAndGet(); + return null; + } + + @Override + public Result postAppend(ObserverContext e, Append append, + Result result) throws IOException { + ctPostAppend.incrementAndGet(); + return null; + } + + @Override public void preBulkLoadHFile(ObserverContext ctx, List> familyPaths) throws IOException { RegionCoprocessorEnvironment e = ctx.getEnvironment(); @@ -646,14 +742,58 @@ public class SimpleRegionObserver extends BaseRegionObserver { return ctPostCloseRegionOperation.get(); } + public boolean hadPreCheckAndPut() { + return ctPreCheckAndPut.get() > 0; + } + + public boolean hadPreCheckAndPutAfterRowLock() { + return ctPreCheckAndPutAfterRowLock.get() > 0; + } + + public boolean hadPostCheckAndPut() { + return ctPostCheckAndPut.get() > 0; + } + + public boolean hadPreCheckAndDelete() { + return ctPreCheckAndDelete.get() > 0; + } + + public boolean hadPreCheckAndDeleteAfterRowLock() { + return ctPreCheckAndDeleteAfterRowLock.get() > 0; + } + + public boolean hadPostCheckAndDelete() { + return ctPostCheckAndDelete.get() > 0; + } + public boolean hadPreIncrement() { return ctPreIncrement.get() > 0; } + + public boolean hadPreIncrementAfterRowLock() { + return ctPreIncrementAfterRowLock.get() > 0; + } public boolean hadPostIncrement() { return ctPostIncrement.get() > 0; } + public boolean hadPreAppend() { + return ctPreAppend.get() > 0; + } + + public boolean hadPreAppendAfterRowLock() { + return ctPreAppendAfterRowLock.get() > 0; + } + + public boolean hadPostAppend() { + return ctPostAppend.get() > 0; + } + + public boolean hadPrePreparedDeleteTS() { + return ctPrePrepareDeleteTS.get() > 0; + } + public boolean hadPreWALRestored() { return ctPreWALRestored.get() > 0; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index be56c96..7bf31cd 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -145,9 +146,9 @@ public class TestRegionObserverInterface { verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDelete"}, + "hadDelete", "hadPrePreparedDeleteTS"}, tableName, - new Boolean[] {true, true, true, true, false} + new Boolean[] {true, true, true, true, false, false} ); Delete delete = new Delete(ROW); @@ -158,9 +159,9 @@ public class TestRegionObserverInterface { verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, + "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete", "hadPrePreparedDeleteTS"}, tableName, - new Boolean[] {true, true, true, true, true, true, true} + new Boolean[] {true, true, true, true, true, true, true, true} ); } finally { util.deleteTable(tableName); @@ -218,17 +219,106 @@ public class TestRegionObserverInterface { inc.addColumn(A, A, 1); verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreIncrement", "hadPostIncrement"}, + new String[] {"hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock"}, tableName, - new Boolean[] {false, false} + new Boolean[] {false, false, false} ); table.increment(inc); verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreIncrement", "hadPostIncrement"}, + new String[] {"hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock"}, tableName, - new Boolean[] {true, true} + new Boolean[] {true, true, true} + ); + } finally { + util.deleteTable(tableName); + table.close(); + } + } + + @Test + public void testCheckAndPutHooks() throws IOException { + TableName tableName = + TableName.valueOf(TEST_TABLE.getNameAsString() + ".testCheckAndPutHooks"); + HTable table = util.createTable(tableName, new byte[][] {A, B, C}); + try { + Put p = new Put(Bytes.toBytes(0)); + p.add(A, A, A); + table.put(p); + table.flushCommits(); + p = new Put(Bytes.toBytes(0)); + p.add(A, A, A); + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreCheckAndPut", + "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut"}, + tableName, + new Boolean[] {false, false, false} + ); + table.checkAndPut(Bytes.toBytes(0), A, A, A, p); + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreCheckAndPut", + "hadPreCheckAndPutAfterRowLock", "hadPostCheckAndPut"}, + tableName, + new Boolean[] {true, true, true} + ); + } finally { + util.deleteTable(tableName); + table.close(); + } + } + + @Test + public void testCheckAndDeleteHooks() throws IOException { + TableName tableName = + TableName.valueOf(TEST_TABLE.getNameAsString() + ".testCheckAndDeleteHooks"); + HTable table = util.createTable(tableName, new byte[][] {A, B, C}); + try { + Put p = new Put(Bytes.toBytes(0)); + p.add(A, A, A); + table.put(p); + table.flushCommits(); + Delete d = new Delete(Bytes.toBytes(0)); + table.delete(d); + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreCheckAndDelete", + "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete"}, + tableName, + new Boolean[] {false, false, false} + ); + table.checkAndDelete(Bytes.toBytes(0), A, A, A, d); + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreCheckAndDelete", + "hadPreCheckAndDeleteAfterRowLock", "hadPostCheckAndDelete"}, + tableName, + new Boolean[] {true, true, true} + ); + } finally { + util.deleteTable(tableName); + table.close(); + } + } + + @Test + public void testAppendHook() throws IOException { + TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testAppendHook"); + HTable table = util.createTable(tableName, new byte[][] {A, B, C}); + try { + Append app = new Append(Bytes.toBytes(0)); + app.add(A, A, A); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock"}, + tableName, + new Boolean[] {false, false, false} + ); + + table.append(app); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreAppend", "hadPostAppend", "hadPreAppendAfterRowLock"}, + tableName, + new Boolean[] {true, true, true} ); } finally { util.deleteTable(tableName);