Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1450209) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -185,8 +187,18 @@ public void postDelete(final ObserverContext e, final Delete delete, final WALEdit edit, final boolean writeToWAL) throws IOException { } + + @Override + public void preBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + } @Override + public void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + } + + @Override public boolean preCheckAndPut(final ObserverContext e, final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final WritableByteArrayComparable comparator, Index: src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1450209) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -404,6 +406,29 @@ throws IOException; /** + * This will be called for every batch mutation operation happening at the server. This will be + * called after acquiring the locks on the mutating rows and after applying the proper timestamp + * for each Mutation at the server. The batch may contain Put/Delete. By setting OperationStatus + * of Mutations ({@link MiniBatchOperationInProgress#setOperationStatus(int, OperationStatus)}), + * {@link RegionObserver} can make HRegion to skip these Mutations. + * @param c the environment provided by the region server + * @param miniBatchOp batch of Mutations getting applied to region. + * @throws IOException if an error occurred on the coprocessor + */ + void preBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException; + + /** + * This will be called after applying a batch of Mutations on a region. The Mutations are added to + * memstore and WAL. + * @param c the environment provided by the region server + * @param miniBatchOp batch of Mutations applied to region. + * @throws IOException if an error occurred on the coprocessor + */ + void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException; + + /** * Called before checkAndPut *

* Call CoprocessorEnvironment#bypass to skip default actions Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1450209) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -2263,6 +2263,14 @@ // ---------------------------------- w = mvcc.beginMemstoreInsert(); + // calling the pre CP hook for batch mutation + if (coprocessorHost != null) { + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>(batchOp.operations, + batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); + if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; + } + // ------------------------------------ // STEP 3. Write back to memstore // Write to memstore. It is ok to write to memstore @@ -2336,6 +2344,14 @@ syncOrDefer(txid); } walSyncSuccessful = true; + // calling the post CP hook for batch mutation + if (coprocessorHost != null) { + MiniBatchOperationInProgress> miniBatchOp = + new MiniBatchOperationInProgress>(batchOp.operations, + batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); + coprocessorHost.postBatchMutate(miniBatchOp); + } + // ------------------------------------------------------------------ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ Index: src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java (working copy) @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +/** + * Wraps together the mutations which are applied as a batch to the region and their operation + * status and WALEdits. + * @see RegionObserver#preBatchMutate(ObserverContext, MiniBatchOperationInProgress) + * @see RegionObserver#postBatchMutate(ObserverContext, MiniBatchOperationInProgress) + * @param Pair pair of Mutations and associated rowlock ids. + */ +public class MiniBatchOperationInProgress { + private final T[] operations; + private final OperationStatus[] retCodeDetails; + private final WALEdit[] walEditsFromCoprocessors; + private final int firstIndex; + private final int lastIndexExclusive; + + public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails, + WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive) { + this.operations = operations; + this.retCodeDetails = retCodeDetails; + this.walEditsFromCoprocessors = walEditsFromCoprocessors; + this.firstIndex = firstIndex; + this.lastIndexExclusive = lastIndexExclusive; + } + + /** + * @return The number of operations(Mutations) involved in this batch. + */ + public int size() { + return this.lastIndexExclusive - this.firstIndex; + } + + /** + * @param index + * @return The operation(Mutation) at the specified position. + */ + public T getOperation(int index) { + return operations[getAbsoluteIndex(index)]; + } + + /** + * Sets the status code for the operation(Mutation) at the specified position. + * By setting this status, {@link RegionObserver} can make HRegion to skip Mutations. + * @param index + * @param opStatus + */ + public void setOperationStatus(int index, OperationStatus opStatus) { + this.retCodeDetails[getAbsoluteIndex(index)] = opStatus; + } + + /** + * @param index + * @return Gets the status code for the operation(Mutation) at the specified position. + */ + public OperationStatus getOperationStatus(int index) { + return this.retCodeDetails[getAbsoluteIndex(index)]; + } + + /** + * Sets the walEdit for the operation(Mutation) at the specified position. + * @param index + * @param walEdit + */ + public void setWalEdit(int index, WALEdit walEdit) { + this.walEditsFromCoprocessors[getAbsoluteIndex(index)] = walEdit; + } + + /** + * @param index + * @return Gets the walEdit for the operation(Mutation) at the specified position. + */ + public WALEdit getWalEdit(int index) { + return this.walEditsFromCoprocessors[getAbsoluteIndex(index)]; + } + + private int getAbsoluteIndex(int index) { + if (index < 0 || this.firstIndex + index >= this.lastIndexExclusive) { + throw new ArrayIndexOutOfBoundsException(index); + } + return this.firstIndex + index; + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1450209) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -885,8 +886,56 @@ } } } + + /** + * @param miniBatchOp + * @return true if default processing should be bypassed + * @throws IOException + */ + public boolean preBatchMutate( + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + boolean bypass = false; + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass; + } /** + * @param miniBatchOp + * @throws IOException + */ + public void postBatchMutate( + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + + /** * @param row row to check * @param family column family * @param qualifier column qualifier Index: src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (revision 1450209) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (working copy) @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Increment; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.Leases; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -98,7 +100,9 @@ boolean hadPostScannerOpen = false; boolean hadPreBulkLoadHFile = false; boolean hadPostBulkLoadHFile = false; - + boolean hadPreBatchMutate = false; + boolean hadPostBatchMutate = false; + @Override public void start(CoprocessorEnvironment e) throws IOException { // this only makes sure that leases and locks are available to coprocessors @@ -392,6 +396,26 @@ } @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress> miniBatchOp) throws IOException { + RegionCoprocessorEnvironment e = c.getEnvironment(); + assertNotNull(e); + assertNotNull(e.getRegion()); + assertNotNull(miniBatchOp); + hadPreBatchMutate = true; + } + + @Override + public void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress> miniBatchOp) throws IOException { + RegionCoprocessorEnvironment e = c.getEnvironment(); + assertNotNull(e); + assertNotNull(e.getRegion()); + assertNotNull(miniBatchOp); + hadPostBatchMutate = true; + } + + @Override public void preGetClosestRowBefore(final ObserverContext c, final byte[] row, final byte[] family, final Result result) throws IOException { @@ -483,6 +507,15 @@ public boolean hadPostPut() { return hadPostPut; } + + public boolean hadPreBatchMutate() { + return hadPreBatchMutate; + } + + public boolean hadPostBatchMutate() { + return hadPostBatchMutate; + } + public boolean hadDelete() { return !beforeDelete; } Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (revision 1450209) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (working copy) @@ -102,9 +102,9 @@ verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDelete"}, + "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, TEST_TABLE, - new Boolean[] {false, false, true, true, false} + new Boolean[] {false, false, true, true, true, true, false} ); Get get = new Get(ROW); @@ -128,9 +128,9 @@ verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDelete"}, + "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, TEST_TABLE, - new Boolean[] {true, true, true, true, true} + new Boolean[] {true, true, true, true, true, true, true} ); util.deleteTable(tableName); table.close(); Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java (working copy) @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestMiniBatchOperationInProgress { + + @Test + public void testMiniBatchOperationInProgressMethods() { + Pair[] operations = new Pair[10]; + OperationStatus[] retCodeDetails = new OperationStatus[10]; + WALEdit[] walEditsFromCoprocessors = new WALEdit[10]; + for (int i = 0; i < 10; i++) { + operations[i] = new Pair(new Put(Bytes.toBytes(i)), null); + } + MiniBatchOperationInProgress> miniBatch = + new MiniBatchOperationInProgress>(operations, retCodeDetails, + walEditsFromCoprocessors, 0, 5); + + assertEquals(5, miniBatch.size()); + assertTrue(Bytes.equals(Bytes.toBytes(0), miniBatch.getOperation(0).getFirst().getRow())); + assertTrue(Bytes.equals(Bytes.toBytes(2), miniBatch.getOperation(2).getFirst().getRow())); + assertTrue(Bytes.equals(Bytes.toBytes(4), miniBatch.getOperation(4).getFirst().getRow())); + try { + miniBatch.getOperation(5); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + miniBatch.setOperationStatus(1, OperationStatus.FAILURE); + assertEquals(OperationStatus.FAILURE, retCodeDetails[1]); + try { + miniBatch.setOperationStatus(6, OperationStatus.FAILURE); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + try { + miniBatch.setWalEdit(5, new WALEdit()); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + + miniBatch = new MiniBatchOperationInProgress>(operations, + retCodeDetails, walEditsFromCoprocessors, 7, 10); + try { + miniBatch.setWalEdit(-1, new WALEdit()); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + try { + miniBatch.getOperation(-1); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + try { + miniBatch.getOperation(3); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + try { + miniBatch.getOperationStatus(9); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + try { + miniBatch.setOperationStatus(3, OperationStatus.FAILURE); + fail("Should throw Exception while accessing out of range"); + } catch (ArrayIndexOutOfBoundsException e) { + } + assertTrue(Bytes.equals(Bytes.toBytes(7), miniBatch.getOperation(0).getFirst().getRow())); + assertTrue(Bytes.equals(Bytes.toBytes(9), miniBatch.getOperation(2).getFirst().getRow())); + miniBatch.setOperationStatus(1, OperationStatus.SUCCESS); + assertEquals(OperationStatus.SUCCESS, retCodeDetails[8]); + WALEdit wal = new WALEdit(); + miniBatch.setWalEdit(0, wal); + assertEquals(wal, walEditsFromCoprocessors[7]); + } +}