From 8405237e44f572780bfd2f6d929f7737e1c81356 Mon Sep 17 00:00:00 2001 From: Abhishek Singh Chouhan Date: Fri, 29 Sep 2017 02:28:06 +0530 Subject: [PATCH] HBASE-18127 Enable state to be passed between the region observer coprocessor hook calls --- .../coprocessor/TestRowProcessorEndpoint.java | 12 +- .../hadoop/hbase/coprocessor/CoprocessorHost.java | 8 + .../hadoop/hbase/coprocessor/ObserverContext.java | 16 ++ .../hadoop/hbase/coprocessor/OperationContext.java | 45 +++++ .../hbase/coprocessor/SimpleOperationContext.java | 45 +++++ .../hbase/regionserver/BaseRowProcessor.java | 17 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 63 ++++--- .../regionserver/MultiRowMutationProcessor.java | 30 +-- .../hbase/regionserver/RegionCoprocessorHost.java | 56 +++--- .../hadoop/hbase/regionserver/RowProcessor.java | 22 ++- .../hbase/coprocessor/TestOperationContext.java | 206 +++++++++++++++++++++ .../hadoop/hbase/regionserver/TestHRegion.java | 7 +- 12 files changed, 443 insertions(+), 84 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/OperationContext.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/SimpleOperationContext.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOperationContext.java diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java index 8c11192..7b509fc 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -346,7 +346,8 @@ public class TestRowProcessorEndpoint { @Override public void process(long now, HRegion region, - List mutations, WALEdit walEdit) throws IOException { + List mutations, WALEdit walEdit, + OperationContext opContext) throws IOException { // Scan current counter List kvs = new ArrayList<>(); Scan scan = new Scan(row, row); @@ -430,7 +431,8 @@ public class TestRowProcessorEndpoint { @Override public void process(long now, HRegion region, - List mutations, WALEdit walEdit) throws IOException { + List mutations, WALEdit walEdit, + OperationContext opContext) throws IOException { List kvs = new ArrayList<>(); { // First scan to get friends of the person Scan scan = new Scan(row, row); @@ -514,7 +516,8 @@ public class TestRowProcessorEndpoint { @Override public void process(long now, HRegion region, - List mutations, WALEdit walEdit) throws IOException { + List mutations, WALEdit walEdit, + OperationContext opContext) throws IOException { // Override the time to avoid race-condition in the unit test caused by // inacurate timer on some machines @@ -608,7 +611,8 @@ public class TestRowProcessorEndpoint { @Override public void process(long now, HRegion region, - List mutations, WALEdit walEdit) throws IOException { + List mutations, WALEdit walEdit, + OperationContext opContext) throws IOException { try { // Sleep for a long time so it timeout Thread.sleep(100 * 1000L); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index da07c40..bde3642 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -650,6 +650,14 @@ public abstract class CoprocessorHost boolean execOperation(final ObserverOperation observerOperation, + final OperationContext opContext) throws IOException { + if (observerOperation != null) { + observerOperation.setOperationContext(opContext); + } + return execOperation(true, observerOperation); + } + protected boolean execOperation(final boolean earlyExit, final ObserverOperation observerOperation) throws IOException { if (observerOperation == null) return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContext.java index 0192ea3..9c1691f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContext.java @@ -44,6 +44,7 @@ public class ObserverContext { private boolean bypass; private boolean complete; private User caller; + private OperationContext opContext; public ObserverContext(User caller) { this.caller = caller; @@ -108,6 +109,21 @@ public class ObserverContext { } /** + * Used by CoprocessorHost to set the OperationContext + * @param c + */ + public void setOperationContext(OperationContext c) { + this.opContext = c; + } + + /** + * @return context for this operation + */ + public OperationContext getOperationContext() { + return this.opContext; + } + + /** * Instantiates a new ObserverContext instance if the passed reference is * null and sets the environment in the new or existing instance. * This allows deferring the instantiation of a ObserverContext until it is diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/OperationContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/OperationContext.java new file mode 100644 index 0000000..2f87ad1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/OperationContext.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.util.Map; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Interface that allows the coprocessors to store some state in a particular operation that they + * can use in subsequent coprocessor hooks/calls. Eg. coprocessor can set some context in + * preBatchMutate using the map obtained by {@link #getOperationContextMap()} that can be used in + * postBatchMutate. The current implementation is + * {@link org.apache.hadoop.hbase.coprocessor.SimpleOperationContext} + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface OperationContext { + + /** + * Used by coprocessors to get the operation context map associated with this Operation + * Coprocessors can then use this map to store context + * @return context map for this operation + */ + public Map getOperationContextMap(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/SimpleOperationContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/SimpleOperationContext.java new file mode 100644 index 0000000..236af23 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/SimpleOperationContext.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Simple implementation of the OperationContext Interface that uses a hash map to store the context + * which is set by coprocessor. This lazily initializes the hashmap upon first use. This + * implementation is NOT thread safe, coprocessors should use appropriate synchronization while + * using {@link #getOperationContextMap()} and the returned Map + */ +@InterfaceAudience.Private +public class SimpleOperationContext implements OperationContext { + + private Map contextMap; + + @Override + public Map getOperationContextMap() { + if (contextMap == null) { + contextMap = new HashMap(); + } + return contextMap; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java index da691b7..d9d3bd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java @@ -27,6 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.coprocessor.OperationContext; import org.apache.hadoop.hbase.wal.WALEdit; import com.google.protobuf.Message; @@ -36,23 +37,27 @@ import com.google.protobuf.Message; */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving -public abstract class BaseRowProcessor -implements RowProcessor { +public abstract class BaseRowProcessor + implements RowProcessor { @Override - public void preProcess(HRegion region, WALEdit walEdit) throws IOException { + public void preProcess(HRegion region, WALEdit walEdit, OperationContext opContext) + throws IOException { } @Override - public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException { + public void preBatchMutate(HRegion region, WALEdit walEdit, OperationContext opContext) + throws IOException { } @Override - public void postBatchMutate(HRegion region) throws IOException { + public void postBatchMutate(HRegion region, OperationContext opContext) + throws IOException { } @Override - public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException { + public void postProcess(HRegion region, WALEdit walEdit, boolean success, + OperationContext opContext) throws IOException { } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d059977..4382ed3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -119,7 +119,9 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; +import org.apache.hadoop.hbase.coprocessor.OperationContext; import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; +import org.apache.hadoop.hbase.coprocessor.SimpleOperationContext; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; @@ -3094,6 +3096,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi OperationStatus[] batchMutate(BatchOperation batchOp) throws IOException { boolean initialized = false; Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; + OperationContext operationContext = new SimpleOperationContext(); startRegionOperation(op); try { while (!batchOp.isDone()) { @@ -3105,11 +3108,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!initialized) { this.writeRequestsCount.add(batchOp.operations.length); if (!batchOp.isInReplay()) { - doPreBatchMutateHook(batchOp); + doPreBatchMutateHook(batchOp, operationContext); } initialized = true; } - doMiniBatchMutate(batchOp); + doMiniBatchMutate(batchOp, operationContext); long newSize = this.getMemstoreSize(); requestFlushIfNeeded(newSize); } @@ -3119,15 +3122,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return batchOp.retCodeDetails; } - private void doPreBatchMutateHook(BatchOperation batchOp) - throws IOException { + private void doPreBatchMutateHook(BatchOperation batchOp, + OperationContext opContext) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); if (coprocessorHost != null) { for (int i = 0 ; i < batchOp.operations.length; i++) { Mutation m = batchOp.getMutation(i); if (m instanceof Put) { - if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { + if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability(), opContext)) { // pre hook says skip this Put // mark as success and skip in doMiniBatchMutation batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; @@ -3138,7 +3141,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // handle deleting a row case prepareDelete(curDel); } - if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) { + if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability(), opContext)) { // pre hook says skip this Delete // mark as success and skip in doMiniBatchMutation batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; @@ -3164,7 +3167,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return Change in size brought about by applying batchOp */ // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120 - private void doMiniBatchMutate(BatchOperation batchOp) throws IOException { + private void doMiniBatchMutate(BatchOperation batchOp, + OperationContext operationContext) throws IOException { boolean replay = batchOp.isInReplay(); long currentNonceGroup = HConstants.NO_NONCE; long currentNonce = HConstants.NO_NONCE; @@ -3268,7 +3272,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - if (coprocessorHost.preBatchMutate(miniBatchOp)) { + if (coprocessorHost.preBatchMutate(miniBatchOp, operationContext)) { return; } else { for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -3425,7 +3429,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - coprocessorHost.postBatchMutate(miniBatchOp); + coprocessorHost.postBatchMutate(miniBatchOp, operationContext); } // STEP 6. Complete mvcc. @@ -3461,9 +3465,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } Mutation m = batchOp.getMutation(i); if (m instanceof Put) { - coprocessorHost.postPut((Put) m, walEdit, m.getDurability()); + coprocessorHost.postPut((Put) m, walEdit, m.getDurability(), operationContext); } else { - coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability()); + coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability(), operationContext); } } } @@ -3508,7 +3512,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success); + coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success, operationContext); } batchOp.nextIndexToProcess = lastIndexExclusive; @@ -6990,6 +6994,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void processRowsWithLocks(RowProcessor processor, long timeout, long nonceGroup, long nonce) throws IOException { + OperationContext operationContext = new SimpleOperationContext(); for (byte[] row : processor.getRowsToLock()) { checkRow(row, "processRowsWithLocks"); } @@ -7001,13 +7006,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALEdit walEdit = new WALEdit(); // STEP 1. Run pre-process hook - preProcess(processor, walEdit); + preProcess(processor, walEdit, operationContext); // Short circuit the read only case if (processor.readOnly()) { try { long now = EnvironmentEdgeManager.currentTime(); - doProcessRowWithTimeout(processor, now, this, null, null, timeout); - processor.postProcess(this, walEdit, true); + doProcessRowWithTimeout(processor, now, this, null, null, timeout, operationContext); + processor.postProcess(this, walEdit, true, operationContext); } finally { closeRegionOperation(); } @@ -7037,10 +7042,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi locked = true; long now = EnvironmentEdgeManager.currentTime(); // STEP 4. Let the processor scan the rows, generate mutations and add waledits - doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); + doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout, + operationContext); if (!mutations.isEmpty()) { // STEP 5. Call the preBatchMutate hook - processor.preBatchMutate(this, walEdit); + processor.preBatchMutate(this, walEdit, operationContext); // STEP 6. Append and sync if walEdit has data to write out. if (!walEdit.isEmpty()) { @@ -7070,7 +7076,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // STEP 8. call postBatchMutate hook - processor.postBatchMutate(this); + processor.postBatchMutate(this, operationContext); // STEP 9. Complete mvcc. mvcc.completeAndWait(writeEntry); @@ -7097,7 +7103,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // 12. Run post-process hook - processor.postProcess(this, walEdit, success); + processor.postProcess(this, walEdit, success, operationContext); } finally { closeRegionOperation(); if (!mutations.isEmpty()) { @@ -7107,26 +7113,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private void preProcess(final RowProcessor processor, final WALEdit walEdit) - throws IOException { + private void preProcess(final RowProcessor processor, final WALEdit walEdit, + final OperationContext opContext) throws IOException { try { - processor.preProcess(this, walEdit); + processor.preProcess(this, walEdit, opContext); } catch (IOException e) { closeRegionOperation(); throw e; } } - private void doProcessRowWithTimeout(final RowProcessor processor, - final long now, - final HRegion region, - final List mutations, - final WALEdit walEdit, - final long timeout) throws IOException { + private void doProcessRowWithTimeout(final RowProcessor processor, final long now, + final HRegion region, final List mutations, final WALEdit walEdit, + final long timeout, final OperationContext opContext) throws IOException { // Short circuit the no time bound case. if (timeout < 0) { try { - processor.process(now, region, mutations, walEdit); + processor.process(now, region, mutations, walEdit, opContext); } catch (IOException e) { LOG.warn("RowProcessor:" + processor.getClass().getName() + " throws Exception on row(s):" + @@ -7142,7 +7145,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public Void call() throws IOException { try { - processor.process(now, region, mutations, walEdit); + processor.process(now, region, mutations, walEdit, opContext); return null; } catch (IOException e) { LOG.warn("RowProcessor:" + processor.getClass().getName() + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java index 09ac73d..c408c61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.OperationContext; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorRequest; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationProcessorResponse; import org.apache.hadoop.hbase.wal.WALEdit; @@ -70,7 +71,8 @@ MultiRowMutationProcessorResponse> { public void process(long now, HRegion region, List mutationsToApply, - WALEdit walEdit) throws IOException { + WALEdit walEdit, + OperationContext opContext) throws IOException { byte[] byteNow = Bytes.toBytes(now); // Check mutations for (Mutation m : this.mutations) { @@ -101,19 +103,20 @@ MultiRowMutationProcessorResponse> { } @Override - public void preProcess(HRegion region, WALEdit walEdit) throws IOException { + public void preProcess(HRegion region, WALEdit walEdit, OperationContext opContext) + throws IOException { RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); if (coprocessorHost != null) { for (Mutation m : mutations) { if (m instanceof Put) { - if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { + if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability(), opContext)) { // by pass everything return; } } else if (m instanceof Delete) { Delete d = (Delete) m; region.prepareDelete(d); - if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) { + if (coprocessorHost.preDelete(d, walEdit, d.getDurability(), opContext)) { // by pass everything return; } @@ -123,7 +126,8 @@ MultiRowMutationProcessorResponse> { } @Override - public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException { + public void preBatchMutate(HRegion region, WALEdit walEdit, OperationContext opContext) + throws IOException { // TODO we should return back the status of this hook run to HRegion so that those Mutations // with OperationStatus as SUCCESS or FAILURE should not get applied to memstore. RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); @@ -134,7 +138,7 @@ MultiRowMutationProcessorResponse> { miniBatch = new MiniBatchOperationInProgress<>( mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0, mutations.size()); - coprocessorHost.preBatchMutate(miniBatch); + coprocessorHost.preBatchMutate(miniBatch, opContext); } // Apply edits to a single WALEdit for (int i = 0; i < mutations.size(); i++) { @@ -152,31 +156,33 @@ MultiRowMutationProcessorResponse> { } @Override - public void postBatchMutate(HRegion region) throws IOException { + public void postBatchMutate(HRegion region, OperationContext opContext) + throws IOException { RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); if (coprocessorHost != null) { assert miniBatch != null; // Use the same miniBatch state used to call the preBatchMutate() - coprocessorHost.postBatchMutate(miniBatch); + coprocessorHost.postBatchMutate(miniBatch, opContext); } } @Override - public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException { + public void postProcess(HRegion region, WALEdit walEdit, boolean success, + OperationContext opContext) throws IOException { RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); if (coprocessorHost != null) { for (Mutation m : mutations) { if (m instanceof Put) { - coprocessorHost.postPut((Put) m, walEdit, m.getDurability()); + coprocessorHost.postPut((Put) m, walEdit, m.getDurability(), opContext); } else if (m instanceof Delete) { - coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability()); + coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability(), opContext); } } // At the end call the CP hook postBatchMutateIndispensably if (miniBatch != null) { // Directly calling this hook, with out calling pre/postBatchMutate() when Processor do a // read only process. Then no need to call this batch based CP hook also. - coprocessorHost.postBatchMutateIndispensably(miniBatch, success); + coprocessorHost.postBatchMutateIndispensably(miniBatch, success, opContext); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 1d9abca..5294864 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblit import org.apache.hadoop.hbase.coprocessor.EndpointObserver; import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.OperationContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; @@ -772,17 +773,18 @@ public class RegionCoprocessorHost * @param put The Put object * @param edit The WALEdit object. * @param durability The durability used + * @param opContext * @return true if default processing should be bypassed * @exception IOException Exception */ - public boolean prePut(final Put put, final WALEdit edit, final Durability durability) - throws IOException { + public boolean prePut(final Put put, final WALEdit edit, final Durability durability, + final OperationContext opContext) throws IOException { return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override public void call(RegionObserver observer) throws IOException { observer.prePut(this, put, edit, durability); } - }); + }, opContext); } /** @@ -809,89 +811,99 @@ public class RegionCoprocessorHost * @param put The Put object * @param edit The WALEdit object. * @param durability The durability used + * @param opContext * @exception IOException Exception */ - public void postPut(final Put put, final WALEdit edit, final Durability durability) - throws IOException { + public void postPut(final Put put, final WALEdit edit, final Durability durability, + final OperationContext opContext) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override public void call(RegionObserver observer) throws IOException { observer.postPut(this, put, edit, durability); } - }); + }, opContext); } /** * @param delete The Delete object * @param edit The WALEdit object. * @param durability The durability used + * @param opContext * @return true if default processing should be bypassed * @exception IOException Exception */ - public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) - throws IOException { + public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability, + final OperationContext opContext) throws IOException { return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override public void call(RegionObserver observer) throws IOException { observer.preDelete(this, delete, edit, durability); } - }); + }, opContext); } /** * @param delete The Delete object * @param edit The WALEdit object. * @param durability The durability used + * @param opContext * @exception IOException Exception */ - public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) - throws IOException { + public void postDelete(final Delete delete, final WALEdit edit, final Durability durability, + final OperationContext opContext) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override public void call(RegionObserver observer) throws IOException { observer.postDelete(this, delete, edit, durability); } - }); + }, opContext); } /** * @param miniBatchOp + * @param opContext * @return true if default processing should be bypassed * @throws IOException */ - public boolean preBatchMutate( - final MiniBatchOperationInProgress miniBatchOp) throws IOException { + public boolean preBatchMutate(final MiniBatchOperationInProgress miniBatchOp, + final OperationContext opContext) throws IOException { return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override public void call(RegionObserver observer) throws IOException { observer.preBatchMutate(this, miniBatchOp); } - }); + }, opContext); } /** * @param miniBatchOp + * @param opContext * @throws IOException */ - public void postBatchMutate( - final MiniBatchOperationInProgress miniBatchOp) throws IOException { + public void postBatchMutate(final MiniBatchOperationInProgress miniBatchOp, + final OperationContext opContext) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override public void call(RegionObserver observer) throws IOException { observer.postBatchMutate(this, miniBatchOp); } - }); + }, opContext); } - public void postBatchMutateIndispensably( - final MiniBatchOperationInProgress miniBatchOp, final boolean success) - throws IOException { + /** + * @param miniBatchOp + * @param success + * @param opContext + * @throws IOException + */ + public void postBatchMutateIndispensably(final MiniBatchOperationInProgress miniBatchOp, + final boolean success, final OperationContext opContext) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override public void call(RegionObserver observer) throws IOException { observer.postBatchMutateIndispensably(this, miniBatchOp, success); } - }); + }, opContext); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java index 625d9a6..d49f593 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java @@ -26,6 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.OperationContext; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.wal.WALEdit; @@ -82,33 +83,38 @@ public interface RowProcessor { * @param region the HRegion * @param mutations the output mutations to apply to memstore * @param walEdit the output WAL edits to apply to write ahead log + * @param opContext Per operation context */ void process(long now, HRegion region, List mutations, - WALEdit walEdit) throws IOException; + WALEdit walEdit, OperationContext opContext) throws IOException; /** * The hook to be executed before process(). * * @param region the HRegion * @param walEdit the output WAL edits to apply to write ahead log + * @param opContext Per operation context */ - void preProcess(HRegion region, WALEdit walEdit) throws IOException; + void preProcess(HRegion region, WALEdit walEdit, OperationContext opContext) + throws IOException; /** * The hook to be executed after the process() but before applying the Mutations to region. Also * by the time this hook is called, mvcc transaction have started. * @param walEdit the output WAL edits to apply to write ahead log + * @param opContext Per operation context */ - void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException; + void preBatchMutate(HRegion region, WALEdit walEdit, OperationContext opContext) + throws IOException; /** * The hook to be executed after the process() and applying the Mutations to region. The - * difference of this one with {@link #postProcess(HRegion, WALEdit, boolean)} is this hook will - * be executed before the mvcc transaction completion. + * difference of this one with {@link #postProcess(HRegion, WALEdit, boolean, OperationContext)} + * is this hook will be executed before the mvcc transaction completion. */ - void postBatchMutate(HRegion region) throws IOException; + void postBatchMutate(HRegion region, OperationContext opContext) throws IOException; /** * The hook to be executed after process() and applying the Mutations to region. @@ -116,8 +122,10 @@ public interface RowProcessor { * @param region the HRegion * @param walEdit the output WAL edits to apply to write ahead log * @param success true if batch operation is successful otherwise false. + * @param opContext Per operation context */ - void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException; + void postProcess(HRegion region, WALEdit walEdit, boolean success, + OperationContext opContext) throws IOException; /** * @return The cluster ids that have the change. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOperationContext.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOperationContext.java new file mode 100644 index 0000000..f69730d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestOperationContext.java @@ -0,0 +1,206 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * This test checks of the Operation context is passed successfully between the coprocessor hooks + * during batchMutate and processRowsWithLocks + */ +@Category({ CoprocessorTests.class, MediumTests.class }) +public class TestOperationContext { + + private static final HBaseTestingUtility util = new HBaseTestingUtility(); + + private static final TableName TEST_TABLE = TableName.valueOf("test"); + private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); + + private static final byte[] ROW_A = Bytes.toBytes("aaa"); + private static final byte[] ROW_B = Bytes.toBytes("bbb"); + + private static final byte[] qualifierCol1 = Bytes.toBytes("col1"); + private static final byte[] qualifierCol2 = Bytes.toBytes("col2"); + + private static final byte[] bytes1 = Bytes.toBytes(1); + private static final byte[] bytes2 = Bytes.toBytes(2); + + private Table table; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MyObserver.class.getName()); + util.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + @Before + public void before() throws IOException { + table = util.createTable(TEST_TABLE, TEST_FAMILY); + } + + @After + public void after() throws IOException { + util.deleteTable(TEST_TABLE); + } + + @Test + public void testMiniBatchOperationContext() throws Exception { + HRegionServer rs = util.getRSForFirstRegionInTable(TEST_TABLE); + MyObserver ob = (MyObserver) rs.getRegions(TEST_TABLE).get(0).getCoprocessorHost() + .findCoprocessor(MyObserver.class.getName()); + ob.resetStates(); + Put p1 = new Put(ROW_A); + p1.addColumn(TEST_FAMILY, qualifierCol1, bytes1); + Put p2 = new Put(ROW_B); + p2.addColumn(TEST_FAMILY, qualifierCol1, bytes2); + try { + ArrayList l1 = Lists.newArrayList(p1, p2); + table.batch(l1, null); + assertEquals(ob.getPreBatchMutateContext(), 1); + assertEquals(ob.getPostBatchMutateContext(), 2); + assertEquals(ob.getpostBatchMutateIndispensablyContext(), 3); + } finally { + if (table != null) { + table.close(); + } + } + } + + @Test + public void testMultiRowMutateOperationContext() throws Exception { + HRegionServer rs = util.getRSForFirstRegionInTable(TEST_TABLE); + MyObserver ob = (MyObserver) rs.getRegions(TEST_TABLE).get(0).getCoprocessorHost() + .findCoprocessor(MyObserver.class.getName()); + ob.resetStates(); + Put p1 = new Put(ROW_A); + p1.addColumn(TEST_FAMILY, qualifierCol1, bytes1); + Put p2 = new Put(ROW_A); + p2.addColumn(TEST_FAMILY, qualifierCol2, bytes2); + Delete d1 = new Delete(ROW_A); + d1.addColumn(TEST_FAMILY, qualifierCol1); + RowMutations rm = new RowMutations(ROW_A); + try { + rm.add(p1); + rm.add(p2); + rm.add(d1); + table.mutateRow(rm); + assertEquals(ob.getPreBatchMutateContext(), 1); + assertEquals(ob.getPostBatchMutateContext(), 2); + assertEquals(ob.getpostBatchMutateIndispensablyContext(), 3); + } finally { + if (table != null) { + table.close(); + } + } + } + + public static class MyObserver implements RegionObserver, RegionCoprocessor { + + private int preBatchMutateContext; + private int postBatchMutateContext; + private int postBatchMutateIndispensablyContext; + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress miniBatchOp) throws IOException { + c.getOperationContext().getOperationContextMap().put("preBatchMutate", 1); + } + + @Override + public void postBatchMutate(final ObserverContext c, + final MiniBatchOperationInProgress miniBatchOp) throws IOException { + OperationContext opContext = c.getOperationContext(); + Map contextMap = opContext.getOperationContextMap(); + // Store the value set by preBatchMutate to be verified later + preBatchMutateContext = (int) contextMap.get("preBatchMutate"); + // Set our context based on the previous + contextMap.put("postBatchMutateContext", preBatchMutateContext + 1); + } + + @Override + public void postBatchMutateIndispensably( + final ObserverContext ctx, + MiniBatchOperationInProgress miniBatchOp, final boolean success) + throws IOException { + OperationContext opContext = ctx.getOperationContext(); + Map contextMap = opContext.getOperationContextMap(); + // Verify we see correct postBatchMutate context + postBatchMutateContext = (int) contextMap.get("postBatchMutateContext"); + // take the sum of earlier contexts, this verifies we see both earlier contexts + postBatchMutateIndispensablyContext = + ((int) contextMap.get("preBatchMutate")) + postBatchMutateContext; + contextMap.put("postBatchMutateIndispensably", postBatchMutateIndispensablyContext); + } + + public void resetStates() { + preBatchMutateContext = 0; + postBatchMutateContext = 0; + postBatchMutateIndispensablyContext = 0; + } + + public int getPreBatchMutateContext() { + return preBatchMutateContext; + } + + public int getPostBatchMutateContext() { + return postBatchMutateContext; + } + + public int getpostBatchMutateIndispensablyContext() { + return postBatchMutateIndispensablyContext; + } + + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 4d557b9..ea23a73 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -438,7 +438,8 @@ public class TestHRegion { RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); doThrow(new IOException()) - .when(mockedCPHost).postBatchMutate(Mockito.>any()); + .when(mockedCPHost).postBatchMutate(Mockito.>any(), + Mockito.any()); region.setCoprocessorHost(mockedCPHost); put = new Put(value); @@ -2438,8 +2439,8 @@ public class TestHRegion { return false; } }; - when(mockedCPHost.preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class))) - .then(answer); + when(mockedCPHost.preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class), + Mockito.any())).then(answer); region.setCoprocessorHost(mockedCPHost); region.put(originalPut); region.setCoprocessorHost(normalCPHost); -- 2.1.2