Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1518931) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -262,6 +263,12 @@ } @Override + public void postCompleteBatchMutate( + final ObserverContext ctx) + throws IOException { + } + + @Override public boolean preCheckAndPut(final ObserverContext e, final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, @@ -401,4 +408,16 @@ List> familyPaths, boolean hasLoaded) throws IOException { return hasLoaded; } + + @Override + public void postStartRegionOperation( + final ObserverContext ctx, Operation op) + throws IOException { + } + + @Override + public void postCloseRegionOperation( + final ObserverContext ctx) + throws IOException { + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1518931) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -566,7 +567,19 @@ void postBatchMutate(final ObserverContext c, final MiniBatchOperationInProgress miniBatchOp) throws IOException; + /** + * Called after the completion of batch put/delete and will be called even if the batch + * operation fails + * + * @param ctx + * @throws IOException + */ + void postCompleteBatchMutate( + final ObserverContext ctx) + throws IOException; + + /** * Called before checkAndPut *

* Call CoprocessorEnvironment#bypass to skip default actions @@ -962,4 +975,26 @@ */ boolean postBulkLoadHFile(final ObserverContext ctx, List> familyPaths, boolean hasLoaded) throws IOException; + + /** + * Used after startRegionOperation in the batchMutate() + * + * @param ctx + * @param operation + * @throws IOException + */ + void postStartRegionOperation( + final ObserverContext ctx, Operation operation) + throws IOException; + + /** + * Used after closeRegionOperation in the batchMutate() + * + * @param ctx + * @throws IOException + */ + void postCloseRegionOperation( + final ObserverContext ctx) + throws IOException; + } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1518931) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -211,7 +211,7 @@ * operations have to be defined here. It's only needed when a special check is need in * startRegionOperation */ - protected enum Operation { + public enum Operation { ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, REPLAY_BATCH_MUTATE, COMPACT_REGION } @@ -1925,10 +1925,13 @@ checkResources(); long newSize; + Operation operation = Operation.BATCH_MUTATE; if (isReplay) { - startRegionOperation(Operation.REPLAY_BATCH_MUTATE); - } else { - startRegionOperation(Operation.BATCH_MUTATE); + operation = Operation.REPLAY_BATCH_MUTATE; + } + startRegionOperation(operation); + if (coprocessorHost != null) { + coprocessorHost.postStartRegionOperation(operation); } try { @@ -1943,6 +1946,9 @@ newSize = this.addAndGetGlobalMemstoreSize(addedSize); } finally { closeRegionOperation(); + if (coprocessorHost != null) { + coprocessorHost.postCloseRegionOperation(); + } } if (isFlushSize(newSize)) { requestFlush(); @@ -2274,6 +2280,12 @@ } releaseRowLocks(acquiredRowLocks); + // call the coprocessor hook to do to any finalization steps + // after the put is done + if (coprocessorHost != null) { + coprocessorHost.postCompleteBatchMutate(); + } + // See if the column families were consistent through the whole thing. // if they were then keep them. If they were not then pass a null. // null will be treated as unknown. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1518931) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -1033,6 +1034,23 @@ } } } + + public void postCompleteBatchMutate() throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).postCompleteBatchMutate(ctx); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } /** * @param row row to check @@ -1598,4 +1616,38 @@ return hasLoaded; } + public void postStartRegionOperation(Operation op) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).postStartRegionOperation(ctx, op); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + + public void postCloseRegionOperation() throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).postCloseRegionOperation(ctx); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (revision 1518931) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (working copy) @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.Leases; @@ -102,6 +103,9 @@ final AtomicInteger ctPostBulkLoadHFile = new AtomicInteger(0); final AtomicInteger ctPreBatchMutate = new AtomicInteger(0); final AtomicInteger ctPostBatchMutate = new AtomicInteger(0); + final AtomicInteger ctPostCompleteBatchMutate = new AtomicInteger(0); + final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0); + final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0); @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -425,6 +429,28 @@ } @Override + public void postCompleteBatchMutate( + ObserverContext ctx) throws IOException { + ctPostCompleteBatchMutate.incrementAndGet(); + } + + @Override + public void postStartRegionOperation( + final ObserverContext ctx, Operation op) + throws IOException { + ctPostStartRegionOperation.incrementAndGet(); + } + + @Override + public void postCloseRegionOperation( + final ObserverContext ctx) + throws IOException { + if (ctPostStartRegionOperation.get() > 0) { + ctPostCloseRegionOperation.incrementAndGet(); + } + } + + @Override public void preGetClosestRowBefore(final ObserverContext c, final byte[] row, final byte[] family, final Result result) throws IOException { @@ -524,7 +550,19 @@ public boolean hadPostBatchMutate() { return ctPostBatchMutate.get() > 0; } + + public boolean hadPostCompleteBatchMutate() { + return ctPostCompleteBatchMutate.get() > 0; + } + public boolean hadPostStartRegionOperation() { + return ctPostStartRegionOperation.get() > 0; + } + + public boolean hadPostCloseRegionOperation() { + return ctPostCloseRegionOperation.get() > 0; + } + public boolean hadDelete() { return !(ctBeforeDelete.get() > 0); } @@ -665,4 +703,13 @@ public int getCtPostIncrement() { return ctPostIncrement.get(); } + + public int getCtPostStartRegionOperation() { + return ctPostStartRegionOperation.get(); + } + + public int getCtPostCloseRegionOperation() { + return ctPostCloseRegionOperation.get(); + } + } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (revision 1518931) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (working copy) @@ -110,9 +110,10 @@ HTable table = util.createTable(tableName, new byte[][] {A, B, C}); verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDelete"}, + "hadDelete", "hadPostStartRegionOperation", + "hadPostCloseRegionOperation", "hadPostCompleteBatchMutate"}, TEST_TABLE, - new Boolean[] {false, false, false, false, false}); + new Boolean[] {false, false, false, false, false, false, false, false}); Put put = new Put(ROW); put.add(A, A, A); @@ -120,12 +121,12 @@ put.add(C, C, C); table.put(put); - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, - TEST_TABLE, - new Boolean[] {false, false, true, true, true, true, false} - ); + verifyMethodResult(SimpleRegionObserver.class, new String[] { "hadPreGet", + "hadPostGet", "hadPrePut", "hadPostPut", "hadPreBatchMutate", + "hadPostBatchMutate", "hadDelete", "hadPostStartRegionOperation", + "hadPostCloseRegionOperation","hadPostCompleteBatchMutate" }, + TEST_TABLE, + new Boolean[] { false, false, true, true, true, true, false, true, true, true }); verifyMethodResult(SimpleRegionObserver.class, new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"},