From c5e5505894006e1c747533087c15099fb62f3050 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 26 Oct 2017 14:55:53 -0700 Subject: [PATCH] HBASE-18770 Remove bypass method in ObserverContext and implement the 'bypass' logic case by case Changes Coprocessor context 'bypass' semantic. Changes default so it is NOT supported; only a couple of preXXX methods in RegionObserver allow it: e.g. preGet and prePut but not preFlush, etc. Changes Coprocessor 'complete' semantic too. It only has an effect on those methods that support 'bypass'; i.e. you can only exit a Coprocessor chain early via 'complete' on methods that are bypassable. See javadoc for whether a Coprocessor Observer method supports 'bypass'. If no mention, 'bypass' is NOT supported. M hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Added passing of 'bypassable' (and 'completable') argument to the Operation constructors. Methods that support 'bypass' must set this flag. M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Refactoring in here is minor. A few methods that used support bypass no longer do so removed the check and the need of an if/else meant a left-shift in some code. M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java Ditto M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java In here label explicitly those methods that are bypassable. Some changes to make sure we call the corresponding execOperation. TODO: What to do w/ the Scanner methods. --- .../hadoop/hbase/coprocessor/CoprocessorHost.java | 82 +++-- .../coprocessor/DoesNotSupportBypassException.java | 24 ++ .../DoesNotSupportCompleteException.java | 24 ++ .../hadoop/hbase/coprocessor/ObserverContext.java | 22 +- .../hbase/coprocessor/ObserverContextImpl.java | 30 +- .../hadoop/hbase/master/MasterCoprocessorHost.java | 7 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 70 ++-- .../hadoop/hbase/regionserver/RSRpcServices.java | 105 +++--- .../hbase/regionserver/RegionCoprocessorHost.java | 382 +++++++++++++-------- .../regionserver/RegionServerCoprocessorHost.java | 5 +- .../hbase/regionserver/SecureBulkLoadManager.java | 75 ++-- .../hbase/regionserver/wal/WALCoprocessorHost.java | 5 +- .../hadoop/hbase/regionserver/TestHRegion.java | 18 +- 13 files changed, 508 insertions(+), 341 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/DoesNotSupportBypassException.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/DoesNotSupportCompleteException.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index c785b0b594..53a2b83c44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -36,14 +35,12 @@ import java.util.function.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.security.User; @@ -548,11 +545,20 @@ public abstract class CoprocessorHost observerGetter; ObserverOperation(ObserverGetter observerGetter) { - this(observerGetter, RpcServer.getRequestUser().orElse(null)); + this(observerGetter, null, false); } ObserverOperation(ObserverGetter observerGetter, User user) { - super(user); + this(observerGetter, user, false); + } + + ObserverOperation(ObserverGetter observerGetter, boolean bypassable) { + this(observerGetter, null, bypassable); + } + + ObserverOperation(ObserverGetter observerGetter, User user, boolean bypassable) { + super(user != null? user: RpcServer.getRequestUser().orElse(null), + bypassable, bypassable/*Not completable if not bypassable*/); this.observerGetter = observerGetter; } @@ -574,6 +580,11 @@ public abstract class CoprocessorHost observerGetter, User user, + boolean bypassable) { + super(observerGetter, user, bypassable); + } + /** * In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor} * has BulkLoadObserver, RegionObserver, etc), some implementations may not need all @@ -594,15 +605,26 @@ public abstract class CoprocessorHost observerGetter) { + public ObserverOperationWithResult(ObserverGetter observerGetter, R result) { super(observerGetter); + this.result = result; + } + + public ObserverOperationWithResult(ObserverGetter observerGetter, R result, + boolean bypassable) { + super(observerGetter, bypassable); + this.result = result; } - public ObserverOperationWithResult(ObserverGetter observerGetter, User user) { + public ObserverOperationWithResult(ObserverGetter observerGetter, R result, + User user) { super(observerGetter, user); + this.result = result; } - void setResult(final R result) { + public ObserverOperationWithResult(ObserverGetter observerGetter, R result, User user, + boolean bypassable) { + super(observerGetter, user, bypassable); this.result = result; } @@ -621,38 +643,27 @@ public abstract class CoprocessorHost R execOperationWithResult(final R defaultValue, - final ObserverOperationWithResult observerOperation) throws IOException { - if (observerOperation == null) { - return defaultValue; - } - observerOperation.setResult(defaultValue); - execOperation(observerOperation); - return observerOperation.getResult(); - } - // what does bypass mean? - protected R execOperationWithResult(final boolean ifBypass, final R defaultValue, + /** + * Do not call with an observerOperation that is null! Have the caller check. + */ + protected R execOperationWithResult( final ObserverOperationWithResult observerOperation) throws IOException { - if (observerOperation == null) { - return ifBypass ? null : defaultValue; - } else { - observerOperation.setResult(defaultValue); - boolean bypass = execOperation(true, observerOperation); - R result = observerOperation.getResult(); - return bypass == ifBypass ? result : null; - } + // Only bypassable methods can do an early exit via 'complete'. + boolean bypass = execOperation(observerOperation); + R result = observerOperation.getResult(); + return bypass == observerOperation.isBypassable()? result: null; } + /** + * coprocessor calls by setting 'complete' on the context but for now let it be + * settable independent of bypassable in case we need this facility internally. + * @return True if we are to bypass (Can only be 'true' if bypassable is 'true'). + */ protected boolean execOperation(final ObserverOperation observerOperation) throws IOException { - return execOperation(true, observerOperation); - } - - protected boolean execOperation(final boolean earlyExit, - final ObserverOperation observerOperation) throws IOException { - if (observerOperation == null) return false; boolean bypass = false; + if (observerOperation == null) return bypass; List envs = coprocEnvironments.get(); for (E env : envs) { observerOperation.prepare(env); @@ -667,7 +678,7 @@ public abstract class CoprocessorHostCoprocessorEnvironment reference swapped out for each * coprocessor. @@ -42,16 +42,25 @@ public interface ObserverContext { /** * Call to indicate that the current coprocessor's return value should be - * used in place of the normal HBase obtained value. + * used in place of the normal HBase obtained value. Does not work on all invocations, + * only on a small subset of preXXX methods. Check javadoc on the pertinent + * Coprocessor Observer to see if bypass is supported. + * This behavior of honoring only a subset of methods is new * since hbase-2.0.0. + * @exception CoprocessorException Thrown if you call bypass on a method that is not + * bypassable. */ - void bypass(); + void bypass() throws CoprocessorException; /** * Call to indicate that additional coprocessors further down the execution - * chain do not need to be invoked. Implies that this coprocessor's response - * is definitive. + * chain do not need to be invoked. Implies that this coprocessor's response is definitive. + * Since hbase-2.0.0, only complete of bypassable methods has an effect. See javadoc on + * the Coprocessor Observer method as to whether bypass (and thereby 'complete') is + * supported. + * This behavior of honoring only a subset of methods is new * since hbase-2.0.0. + * @exception CoprocessorException Thrown if you call complete on a non-completable method. */ - void complete(); + void complete() throws CoprocessorException; /** * Returns the active user for the coprocessor call. If an explicit {@code User} instance was @@ -60,5 +69,4 @@ public interface ObserverContext { * context. */ Optional getCaller(); - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContextImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContextImpl.java index ff829564b6..245adf5430 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContextImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ObserverContextImpl.java @@ -35,11 +35,25 @@ import org.apache.yetus.audience.InterfaceStability; public class ObserverContextImpl implements ObserverContext { private E env; private boolean bypass; + /** + * Is this operation bypassable? + */ + private final boolean bypassable; + /** + * Is this operation completable? + */ private boolean complete; + private final boolean completable; private final User caller; public ObserverContextImpl(User caller) { + this(caller, false, false); + } + + public ObserverContextImpl(User caller, boolean bypassable, boolean completable) { this.caller = caller; + this.bypassable = bypassable; + this.completable = completable; } public E getEnvironment() { @@ -50,11 +64,21 @@ public class ObserverContextImpl implements Ob this.env = env; } - public void bypass() { + public boolean isBypassable() {return this.bypassable;}; + + public void bypass() throws DoesNotSupportBypassException { + if (!this.bypassable) { + throw new DoesNotSupportBypassException(); + } bypass = true; } - public void complete() { + public boolean isCompleable() {return this.completable;}; + + public void complete() throws DoesNotSupportCompleteException { + if (!this.completable) { + throw new DoesNotSupportCompleteException(); + } complete = true; } @@ -63,6 +87,7 @@ public class ObserverContextImpl implements Ob * coprocessors, {@code false} otherwise. */ public boolean shouldBypass() { + if (!isBypassable()) return false; if (bypass) { bypass = false; return true; @@ -75,6 +100,7 @@ public class ObserverContextImpl implements Ob * coprocessors, {@code false} otherwise. */ public boolean shouldComplete() { + if (!isCompleable()) return false; if (complete) { complete = false; return true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index fa2a0a9dc6..f18f81af58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -908,9 +908,10 @@ public class MasterCoprocessorHost }); } - public boolean preBalanceSwitch(final boolean b) throws IOException { - return execOperationWithResult(b, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(masterObserverGetter) { + public boolean preBalanceSwitch(final boolean result) throws IOException { + if (this.coprocEnvironments.isEmpty()) return result; + return execOperationWithResult( + new ObserverOperationWithResult(masterObserverGetter, result) { @Override public Boolean call(MasterObserver observer) throws IOException { return observer.preBalanceSwitch(this, getResult()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 07518e7999..3b9c165e45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3416,43 +3416,39 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - if (coprocessorHost.preBatchMutate(miniBatchOp)) { - doneByCoprocessor = true; - return; - } else { - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { - // lastIndexExclusive was incremented above. - continue; - } - // we pass (i - firstIndex) below since the call expects a relative index - Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex); - if (cpMutations == null) { - continue; - } - Mutation mutation = batchOp.getMutation(i); - boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL; - // Else Coprocessor added more Mutations corresponding to the Mutation at this index. - for (int j = 0; j < cpMutations.length; j++) { - Mutation cpMutation = cpMutations[j]; - checkAndPrepareMutation(cpMutation, replay, now); - - // Acquire row locks. If not, the whole batch will fail. - acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); - - // Returned mutations from coprocessor correspond to the Mutation at index i. We can - // directly add the cells from those mutations to the familyMaps of this mutation. - Map> cpFamilyMap = cpMutation.getFamilyCellMap(); - // will get added to the memStore later - mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap); - - // The durability of returned mutation is replaced by the corresponding mutation. - // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the - // cells of returned mutation. - if (!skipWal) { - for (List cells : cpFamilyMap.values()) { - cellCount += cells.size(); - } + coprocessorHost.preBatchMutate(miniBatchOp); + for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { + // lastIndexExclusive was incremented above. + continue; + } + // we pass (i - firstIndex) below since the call expects a relative index + Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex); + if (cpMutations == null) { + continue; + } + Mutation mutation = batchOp.getMutation(i); + boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL; + // Else Coprocessor added more Mutations corresponding to the Mutation at this index. + for (int j = 0; j < cpMutations.length; j++) { + Mutation cpMutation = cpMutations[j]; + checkAndPrepareMutation(cpMutation, replay, now); + + // Acquire row locks. If not, the whole batch will fail. + acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); + + // Returned mutations from coprocessor correspond to the Mutation at index i. We can + // directly add the cells from those mutations to the familyMaps of this mutation. + Map> cpFamilyMap = cpMutation.getFamilyCellMap(); + // will get added to the memStore later + mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap); + + // The durability of returned mutation is replaced by the corresponding mutation. + // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the + // cells of returned mutation. + if (!skipWal) { + for (List cells : cpFamilyMap.values()) { + cellCount += cells.size(); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index bff69ba0b7..109463bab9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -666,33 +666,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler, checkCellSizeLimit(region, append); spaceQuota.getPolicyEnforcement(region).check(append); quota.addMutation(append); - Result r = null; if (region.getCoprocessorHost() != null) { - r = region.getCoprocessorHost().preAppend(append); + region.getCoprocessorHost().preAppend(append); } - if (r == null) { - boolean canProceed = startNonceOperation(mutation, nonceGroup); - boolean success = false; - try { - long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; - if (canProceed) { - r = region.append(append, nonceGroup, nonce); - } else { - // convert duplicate append to get - List results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false, - nonceGroup, nonce); - r = Result.create(results); - } - success = true; - } finally { - if (canProceed) { - endNonceOperation(mutation, nonceGroup, success); - } + boolean canProceed = startNonceOperation(mutation, nonceGroup); + boolean success = false; + Result r = null; + try { + long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; + if (canProceed) { + r = region.append(append, nonceGroup, nonce); + } else { + // convert duplicate append to get + List results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false, + nonceGroup, nonce); + r = Result.create(results); } - if (region.getCoprocessorHost() != null) { - r = region.getCoprocessorHost().postAppend(append, r); + success = true; + } finally { + if (canProceed) { + endNonceOperation(mutation, nonceGroup, success); } } + if (region.getCoprocessorHost() != null) { + r = region.getCoprocessorHost().postAppend(append, r); + } if (regionServer.metricsRegionServer != null) { regionServer.metricsRegionServer.updateAppend( EnvironmentEdgeManager.currentTime() - before); @@ -717,33 +715,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler, checkCellSizeLimit(region, increment); spaceQuota.getPolicyEnforcement(region).check(increment); quota.addMutation(increment); - Result r = null; if (region.getCoprocessorHost() != null) { - r = region.getCoprocessorHost().preIncrement(increment); + region.getCoprocessorHost().preIncrement(increment); } - if (r == null) { - boolean canProceed = startNonceOperation(mutation, nonceGroup); - boolean success = false; - try { - long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; - if (canProceed) { - r = region.increment(increment, nonceGroup, nonce); - } else { - // convert duplicate increment to get - List results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup, - nonce); - r = Result.create(results); - } - success = true; - } finally { - if (canProceed) { - endNonceOperation(mutation, nonceGroup, success); - } + boolean canProceed = startNonceOperation(mutation, nonceGroup); + boolean success = false; + Result r = null; + try { + long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; + if (canProceed) { + r = region.increment(increment, nonceGroup, nonce); + } else { + // convert duplicate increment to get + List results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup, + nonce); + r = Result.create(results); } - if (region.getCoprocessorHost() != null) { - r = region.getCoprocessorHost().postIncrement(increment, r); + success = true; + } finally { + if (canProceed) { + endNonceOperation(mutation, nonceGroup, success); } } + if (region.getCoprocessorHost() != null) { + r = region.getCoprocessorHost().postIncrement(increment, r); + } if (regionServer.metricsRegionServer != null) { regionServer.metricsRegionServer.updateIncrement( EnvironmentEdgeManager.currentTime() - before); @@ -2250,7 +2246,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, checkOpen(); requestCount.increment(); HRegion region = getRegion(request.getRegion()); - boolean bypass = false; boolean loaded = false; Map> map = null; @@ -2277,15 +2272,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, familyPaths.add(new Pair<>(familyPath.getFamily().toByteArray(), familyPath.getPath())); } if (region.getCoprocessorHost() != null) { - bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); + region.getCoprocessorHost().preBulkLoadHFile(familyPaths); } try { - if (!bypass) { - map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, - request.getCopyFile()); - if (map != null) { - loaded = true; - } + map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, + request.getCopyFile()); + if (map != null) { + loaded = true; } } finally { if (region.getCoprocessorHost() != null) { @@ -2731,8 +2724,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); if (region.getCoprocessorHost() != null) { - processed = region.getCoprocessorHost().preCheckAndPut( - row, family, qualifier, compareOp, comparator, put); + region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, compareOp, + comparator, put); } if (processed == null) { boolean result = region.checkAndMutate(row, family, @@ -2762,8 +2755,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); if (region.getCoprocessorHost() != null) { - processed = region.getCoprocessorHost().preCheckAndDelete( - row, family, qualifier, op, comparator, delete); + region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op, comparator, + delete); } if (processed == null) { boolean result = region.checkAndMutate(row, family, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index c242fd1850..26341da995 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -474,14 +474,23 @@ public class RegionCoprocessorHost private ObserverGetter endpointObserverGetter = RegionCoprocessor::getEndpointObserver; - abstract class RegionObserverOperation extends ObserverOperationWithoutResult { - public RegionObserverOperation() { + abstract class RegionObserverOperationWithoutResult extends + ObserverOperationWithoutResult { + public RegionObserverOperationWithoutResult() { super(regionObserverGetter); } - public RegionObserverOperation(User user) { + public RegionObserverOperationWithoutResult(User user) { super(regionObserverGetter, user); } + + public RegionObserverOperationWithoutResult(boolean bypassable) { + super(regionObserverGetter, null, bypassable); + } + + public RegionObserverOperationWithoutResult(User user, boolean bypassable) { + super(regionObserverGetter, user, bypassable); + } } abstract class BulkLoadObserverOperation extends @@ -506,7 +515,7 @@ public class RegionCoprocessorHost * @throws IOException Signals that an I/O exception has occurred. */ public void preOpen() throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.preOpen(this); @@ -520,7 +529,7 @@ public class RegionCoprocessorHost */ public void postOpen() { try { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postOpen(this); @@ -536,7 +545,7 @@ public class RegionCoprocessorHost */ public void postLogReplay() { try { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postLogReplay(this); @@ -552,7 +561,7 @@ public class RegionCoprocessorHost * @param abortRequested true if the server is aborting */ public void preClose(final boolean abortRequested) throws IOException { - execOperation(false, new RegionObserverOperation() { + execOperation(new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.preClose(this, abortRequested); @@ -566,7 +575,7 @@ public class RegionCoprocessorHost */ public void postClose(final boolean abortRequested) { try { - execOperation(false, new RegionObserverOperation() { + execOperation(new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postClose(this, abortRequested); @@ -594,7 +603,8 @@ public class RegionCoprocessorHost */ public boolean preCompactSelection(final HStore store, final List candidates, final CompactionLifeCycleTracker tracker, final User user) throws IOException { - return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) { + return execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult(user) { @Override public void call(RegionObserver observer) throws IOException { observer.preCompactSelection(this, store, candidates, tracker); @@ -614,7 +624,7 @@ public class RegionCoprocessorHost public void postCompactSelection(final HStore store, final List selected, final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) { + execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) { @Override public void call(RegionObserver observer) throws IOException { observer.postCompactSelection(this, store, selected, tracker, request); @@ -635,9 +645,11 @@ public class RegionCoprocessorHost public InternalScanner preCompact(final HStore store, final InternalScanner scanner, final ScanType scanType, final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) throws IOException { - return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult( - regionObserverGetter, user) { + InternalScanner defaultResult = scanner; + if (coprocEnvironments.isEmpty()) return defaultResult; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, + defaultResult, user) { @Override public InternalScanner call(RegionObserver observer) throws IOException { return observer.preCompact(this, store, getResult(), scanType, tracker, request); @@ -657,7 +669,7 @@ public class RegionCoprocessorHost public void postCompact(final HStore store, final HStoreFile resultFile, final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) { + execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) { @Override public void call(RegionObserver observer) throws IOException { observer.postCompact(this, store, resultFile, tracker, request); @@ -671,8 +683,9 @@ public class RegionCoprocessorHost */ public InternalScanner preFlush(HStore store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { - return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null - : new ObserverOperationWithResult(regionObserverGetter) { + if (coprocEnvironments.isEmpty()) return scanner; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, scanner) { @Override public InternalScanner call(RegionObserver observer) throws IOException { return observer.preFlush(this, store, getResult(), tracker); @@ -685,7 +698,7 @@ public class RegionCoprocessorHost * @throws IOException */ public void preFlush(FlushLifeCycleTracker tracker) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.preFlush(this, tracker); @@ -698,7 +711,7 @@ public class RegionCoprocessorHost * @throws IOException */ public void postFlush(FlushLifeCycleTracker tracker) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postFlush(this, tracker); @@ -712,7 +725,7 @@ public class RegionCoprocessorHost */ public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postFlush(this, store, storeFile, tracker); @@ -722,13 +735,16 @@ public class RegionCoprocessorHost // RegionObserver support /** + * Supports Coprocessor 'bypass'. * @param get the Get request - * @return true if default processing should be bypassed + * @param results What to return if return is true/'bypass'. + * @return true if default processing should be bypassed. * @exception IOException Exception */ public boolean preGet(final Get get, final List results) throws IOException { - return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + return execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult(true) { @Override public void call(RegionObserver observer) throws IOException { observer.preGetOp(this, get, results); @@ -738,12 +754,13 @@ public class RegionCoprocessorHost /** * @param get the Get request - * @param results the result sett + * @param results the result set * @exception IOException Exception */ public void postGet(final Get get, final List results) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postGetOp(this, get, results); @@ -752,14 +769,18 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param get the Get request - * @return true or false to return to client if bypassing normal operation, - * or null otherwise + * @return true or false to return to client if bypassing normal operation, or null otherwise * @exception IOException Exception */ public Boolean preExists(final Get get) throws IOException { - return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + boolean bypassable = true; + boolean defaultResult = false; + if (coprocEnvironments.isEmpty()) return bypassable? null: false; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, + defaultResult, bypassable) { @Override public Boolean call(RegionObserver observer) throws IOException { return observer.preExists(this, get, getResult()); @@ -769,14 +790,15 @@ public class RegionCoprocessorHost /** * @param get the Get request - * @param exists the result returned by the region server + * @param result the result returned by the region server * @return the result to return to the client * @exception IOException Exception */ - public boolean postExists(final Get get, boolean exists) + public boolean postExists(final Get get, boolean result) throws IOException { - return execOperationWithResult(exists, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + if (this.coprocEnvironments.isEmpty()) return result; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, result) { @Override public Boolean call(RegionObserver observer) throws IOException { return observer.postExists(this, get, getResult()); @@ -785,6 +807,7 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param put The Put object * @param edit The WALEdit object. * @param durability The durability used @@ -793,7 +816,8 @@ public class RegionCoprocessorHost */ public boolean prePut(final Put put, final WALEdit edit, final Durability durability) throws IOException { - return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + return execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult(true) { @Override public void call(RegionObserver observer) throws IOException { observer.prePut(this, put, edit, durability); @@ -802,21 +826,24 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param mutation - the current mutation * @param kv - the current cell * @param byteNow - current timestamp in bytes * @param get - the get that could be used * Note that the get only does not specify the family and qualifier that should be used * @return true if default processing should be bypassed - * @exception IOException - * Exception + * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single + * Coprocessor for its needs only. Will be removed. */ + @Deprecated public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv, final byte[] byteNow, final Get get) throws IOException { - return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + return execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { - observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); + observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); } }); } @@ -829,7 +856,8 @@ public class RegionCoprocessorHost */ public void postPut(final Put put, final WALEdit edit, final Durability durability) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postPut(this, put, edit, durability); @@ -838,6 +866,7 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param delete The Delete object * @param edit The WALEdit object. * @param durability The durability used @@ -846,10 +875,11 @@ public class RegionCoprocessorHost */ public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) throws IOException { - return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + return execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { - observer.preDelete(this, delete, edit, durability); + observer.preDelete(this, delete, edit, durability); } }); } @@ -862,7 +892,8 @@ public class RegionCoprocessorHost */ public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postDelete(this, delete, edit, durability); @@ -870,14 +901,10 @@ public class RegionCoprocessorHost }); } - /** - * @param miniBatchOp - * @return true if default processing should be bypassed - * @throws IOException - */ - public boolean preBatchMutate( + public void preBatchMutate( final MiniBatchOperationInProgress miniBatchOp) throws IOException { - return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.preBatchMutate(this, miniBatchOp); @@ -885,13 +912,10 @@ public class RegionCoprocessorHost }); } - /** - * @param miniBatchOp - * @throws IOException - */ public void postBatchMutate( final MiniBatchOperationInProgress miniBatchOp) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postBatchMutate(this, miniBatchOp); @@ -902,7 +926,8 @@ public class RegionCoprocessorHost public void postBatchMutateIndispensably( final MiniBatchOperationInProgress miniBatchOp, final boolean success) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postBatchMutateIndispensably(this, miniBatchOp, success); @@ -911,22 +936,26 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param row row to check * @param family column family * @param qualifier column qualifier * @param op the comparison operation * @param comparator the comparator * @param put data to put if check succeeds - * @return true or false to return to client if default processing should - * be bypassed, or null otherwise - * @throws IOException e + * @return true or false to return to client if default processing should be bypassed, or null + * otherwise */ public Boolean preCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOperator op, final ByteArrayComparable comparator, final Put put) throws IOException { - return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + boolean bypassable = true; + boolean defaultResult = false; + if (coprocEnvironments.isEmpty()) return bypassable? null: defaultResult; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, + defaultResult, bypassable) { @Override public Boolean call(RegionObserver observer) throws IOException { return observer.preCheckAndPut(this, row, family, qualifier, @@ -936,21 +965,25 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param row row to check * @param family column family * @param qualifier column qualifier * @param op the comparison operation * @param comparator the comparator * @param put data to put if check succeeds - * @return true or false to return to client if default processing should - * be bypassed, or null otherwise - * @throws IOException e + * @return true or false to return to client if default processing should be bypassed, or null + * otherwise */ public Boolean preCheckAndPutAfterRowLock( final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator, final Put put) throws IOException { - return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + boolean bypassable = true; + boolean defaultResult = false; + if (coprocEnvironments.isEmpty()) return bypassable? null: defaultResult; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, + defaultResult, bypassable) { @Override public Boolean call(RegionObserver observer) throws IOException { return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier, @@ -972,8 +1005,9 @@ public class RegionCoprocessorHost final byte [] qualifier, final CompareOperator op, final ByteArrayComparable comparator, final Put put, boolean result) throws IOException { - return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + if (this.coprocEnvironments.isEmpty()) return result; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, result) { @Override public Boolean call(RegionObserver observer) throws IOException { return observer.postCheckAndPut(this, row, family, qualifier, @@ -983,22 +1017,26 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param row row to check * @param family column family * @param qualifier column qualifier * @param op the comparison operation * @param comparator the comparator * @param delete delete to commit if check succeeds - * @return true or false to return to client if default processing should - * be bypassed, or null otherwise - * @throws IOException e + * @return true or false to return to client if default processing should be bypassed, + * or null otherwise */ public Boolean preCheckAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOperator op, final ByteArrayComparable comparator, final Delete delete) throws IOException { - return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + boolean bypassable = true; + boolean defaultResult = false; + if (coprocEnvironments.isEmpty()) return bypassable? null: defaultResult; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, + defaultResult, bypassable) { @Override public Boolean call(RegionObserver observer) throws IOException { return observer.preCheckAndDelete(this, row, family, @@ -1008,21 +1046,25 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param row row to check * @param family column family * @param qualifier column qualifier * @param op the comparison operation * @param comparator the comparator * @param delete delete to commit if check succeeds - * @return true or false to return to client if default processing should - * be bypassed, or null otherwise - * @throws IOException e + * @return true or false to return to client if default processing should be bypassed, + * or null otherwise */ public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator, final Delete delete) throws IOException { - return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + boolean bypassable = true; + boolean defaultResult = false; + if (coprocEnvironments.isEmpty()) return bypassable? null: defaultResult; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, + defaultResult, bypassable) { @Override public Boolean call(RegionObserver observer) throws IOException { return observer.preCheckAndDeleteAfterRowLock(this, row, @@ -1044,8 +1086,9 @@ public class RegionCoprocessorHost final byte [] qualifier, final CompareOperator op, final ByteArrayComparable comparator, final Delete delete, boolean result) throws IOException { - return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + if (this.coprocEnvironments.isEmpty()) return result; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, result) { @Override public Boolean call(RegionObserver observer) throws IOException { return observer.postCheckAndDelete(this, row, family, @@ -1055,14 +1098,16 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param append append object - * @return result to return to client if default operation should be - * bypassed, null otherwise + * @return result to return to client if default operation should be bypassed, null otherwise * @throws IOException if an error occurred on the coprocessor */ public Result preAppend(final Append append) throws IOException { - return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + boolean bypassable = true; + if (this.coprocEnvironments.isEmpty()) return null; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, null, bypassable) { @Override public Result call(RegionObserver observer) throws IOException { return observer.preAppend(this, append); @@ -1071,14 +1116,16 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param append append object - * @return result to return to client if default operation should be - * bypassed, null otherwise + * @return result to return to client if default operation should be bypassed, null otherwise * @throws IOException if an error occurred on the coprocessor */ public Result preAppendAfterRowLock(final Append append) throws IOException { - return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + boolean bypassable = true; + if (this.coprocEnvironments.isEmpty()) return null; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, null, bypassable) { @Override public Result call(RegionObserver observer) throws IOException { return observer.preAppendAfterRowLock(this, append); @@ -1087,14 +1134,17 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param increment increment object - * @return result to return to client if default operation should be - * bypassed, null otherwise + * @return result to return to client if default operation should be bypassed, null otherwise * @throws IOException if an error occurred on the coprocessor */ public Result preIncrement(final Increment increment) throws IOException { - return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + boolean bypassable = true; + if (coprocEnvironments.isEmpty()) return null; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, null, + bypassable) { @Override public Result call(RegionObserver observer) throws IOException { return observer.preIncrement(this, increment); @@ -1103,14 +1153,17 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param increment increment object - * @return result to return to client if default operation should be - * bypassed, null otherwise + * @return result to return to client if default operation should be bypassed, null otherwise * @throws IOException if an error occurred on the coprocessor */ public Result preIncrementAfterRowLock(final Increment increment) throws IOException { - return execOperationWithResult(true, null, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + boolean bypassable = true; + if (coprocEnvironments.isEmpty()) return null; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, null, + bypassable) { @Override public Result call(RegionObserver observer) throws IOException { return observer.preIncrementAfterRowLock(this, increment); @@ -1124,8 +1177,9 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result postAppend(final Append append, final Result result) throws IOException { - return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + if (this.coprocEnvironments.isEmpty()) return result; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, result) { @Override public Result call(RegionObserver observer) throws IOException { return observer.postAppend(this, append, result); @@ -1139,8 +1193,9 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result postIncrement(final Increment increment, Result result) throws IOException { - return execOperationWithResult(result, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + if (this.coprocEnvironments.isEmpty()) return result; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, result) { @Override public Result call(RegionObserver observer) throws IOException { return observer.postIncrement(this, increment, getResult()); @@ -1153,7 +1208,7 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public void preScannerOpen(final Scan scan) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.preScannerOpen(this, scan); @@ -1168,8 +1223,9 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { - return execOperationWithResult(s, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + if (this.coprocEnvironments.isEmpty()) return s; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, s) { @Override public RegionScanner call(RegionObserver observer) throws IOException { return observer.postScannerOpen(this, scan, getResult()); @@ -1181,14 +1237,17 @@ public class RegionCoprocessorHost * @param s the scanner * @param results the result set returned by the region server * @param limit the maximum number of results to return - * @return 'has next' indication to client if bypassing default behavior, or - * null otherwise + * @return 'has next' indication to client if bypassing default behavior, or null otherwise * @exception IOException Exception */ public Boolean preScannerNext(final InternalScanner s, final List results, final int limit) throws IOException { - return execOperationWithResult(true, false, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + boolean bypassable = true; + boolean defaultResult = false; + if (coprocEnvironments.isEmpty()) return bypassable? null: defaultResult; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, + defaultResult, bypassable) { @Override public Boolean call(RegionObserver observer) throws IOException { return observer.preScannerNext(this, s, results, limit, getResult()); @@ -1207,8 +1266,9 @@ public class RegionCoprocessorHost public boolean postScannerNext(final InternalScanner s, final List results, final int limit, boolean hasMore) throws IOException { - return execOperationWithResult(hasMore, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + if (this.coprocEnvironments.isEmpty()) return hasMore; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, hasMore) { @Override public Boolean call(RegionObserver observer) throws IOException { return observer.postScannerNext(this, s, results, limit, getResult()); @@ -1227,9 +1287,11 @@ public class RegionCoprocessorHost public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell) throws IOException { // short circuit for performance - if (!hasCustomPostScannerFilterRow) return true; - return execOperationWithResult(true, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + boolean defaultResult = true; + if (!hasCustomPostScannerFilterRow) return defaultResult; + if (this.coprocEnvironments.isEmpty()) return defaultResult; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, defaultResult) { @Override public Boolean call(RegionObserver observer) throws IOException { return observer.postScannerFilterRow(this, s, curRowCell, getResult()); @@ -1238,12 +1300,15 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param s the scanner * @return true if default behavior should be bypassed, false otherwise * @exception IOException Exception */ + // Should this be bypassable? public boolean preScannerClose(final InternalScanner s) throws IOException { - return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + return execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult(true) { @Override public void call(RegionObserver observer) throws IOException { observer.preScannerClose(this, s); @@ -1255,7 +1320,8 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public void postScannerClose(final InternalScanner s) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postScannerClose(this, s); @@ -1264,12 +1330,14 @@ public class RegionCoprocessorHost } /** + * Supports Coprocessor 'bypass'. * @param info the RegionInfo for this region * @param edits the file of recovered edits - * @throws IOException Exception + * @return true if default behavior should be bypassed, false otherwise */ - public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + public boolean preReplayWALs(final RegionInfo info, final Path edits) throws IOException { + return execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult(true) { @Override public void call(RegionObserver observer) throws IOException { observer.preReplayWALs(this, info, edits); @@ -1283,7 +1351,8 @@ public class RegionCoprocessorHost * @throws IOException Exception */ public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postReplayWALs(this, info, edits); @@ -1292,15 +1361,16 @@ public class RegionCoprocessorHost } /** - * @param info - * @param logKey - * @param logEdit + * Supports Coprocessor 'bypass'. * @return true if default behavior should be bypassed, false otherwise - * @throws IOException + * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced + * with something that doesn't expose IntefaceAudience.Private classes. */ - public boolean preWALRestore(final RegionInfo info, final WALKey logKey, - final WALEdit logEdit) throws IOException { - return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + @Deprecated + public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) + throws IOException { + return execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult(true) { @Override public void call(RegionObserver observer) throws IOException { observer.preWALRestore(this, info, logKey, logEdit); @@ -1309,14 +1379,14 @@ public class RegionCoprocessorHost } /** - * @param info - * @param logKey - * @param logEdit - * @throws IOException + * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced + * with something that doesn't expose IntefaceAudience.Private classes. */ + @Deprecated public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postWALRestore(this, info, logKey, logEdit); @@ -1326,11 +1396,9 @@ public class RegionCoprocessorHost /** * @param familyPaths pairs of { CF, file path } submitted for bulk load - * @return true if the default operation should be bypassed - * @throws IOException */ - public boolean preBulkLoadHFile(final List> familyPaths) throws IOException { - return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + public void preBulkLoadHFile(final List> familyPaths) throws IOException { + execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.preBulkLoadHFile(this, familyPaths); @@ -1340,7 +1408,8 @@ public class RegionCoprocessorHost public boolean preCommitStoreFile(final byte[] family, final List> pairs) throws IOException { - return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + return execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.preCommitStoreFile(this, family, pairs); @@ -1348,7 +1417,8 @@ public class RegionCoprocessorHost }); } public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postCommitStoreFile(this, family, srcPath, dstPath); @@ -1359,14 +1429,15 @@ public class RegionCoprocessorHost /** * @param familyPaths pairs of { CF, file path } submitted for bulk load * @param map Map of CF to List of file paths for the final loaded files - * @param hasLoaded whether load was successful or not + * @param result whether load was successful or not * @return the possibly modified value of hasLoaded * @throws IOException */ public boolean postBulkLoadHFile(final List> familyPaths, - Map> map, boolean hasLoaded) throws IOException { - return execOperationWithResult(hasLoaded, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + Map> map, boolean result) throws IOException { + if (this.coprocEnvironments.isEmpty()) return result; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, result) { @Override public Boolean call(RegionObserver observer) throws IOException { return observer.postBulkLoadHFile(this, familyPaths, map, getResult()); @@ -1375,7 +1446,8 @@ public class RegionCoprocessorHost } public void postStartRegionOperation(final Operation op) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postStartRegionOperation(this, op); @@ -1384,7 +1456,8 @@ public class RegionCoprocessorHost } public void postCloseRegionOperation(final Operation op) throws IOException { - execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + execOperation(coprocEnvironments.isEmpty()? null: + new RegionObserverOperationWithoutResult() { @Override public void call(RegionObserver observer) throws IOException { observer.postCloseRegionOperation(this, op); @@ -1406,8 +1479,9 @@ public class RegionCoprocessorHost public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, final Reference r) throws IOException { - return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + if (coprocEnvironments.isEmpty()) return null; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, null) { @Override public StoreFileReader call(RegionObserver observer) throws IOException { return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, @@ -1430,8 +1504,9 @@ public class RegionCoprocessorHost public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, final Reference r, final StoreFileReader reader) throws IOException { - return execOperationWithResult(reader, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + if (this.coprocEnvironments.isEmpty()) return reader; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, reader) { @Override public StoreFileReader call(RegionObserver observer) throws IOException { return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, @@ -1442,8 +1517,9 @@ public class RegionCoprocessorHost public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation, final Cell oldCell, Cell newCell) throws IOException { - return execOperationWithResult(newCell, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + if (this.coprocEnvironments.isEmpty()) return newCell; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, newCell) { @Override public Cell call(RegionObserver observer) throws IOException { return observer.postMutationBeforeWAL(this, opType, mutation, oldCell, getResult()); @@ -1453,8 +1529,9 @@ public class RegionCoprocessorHost public Message preEndpointInvocation(final Service service, final String methodName, Message request) throws IOException { - return execOperationWithResult(request, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(endpointObserverGetter) { + if (coprocEnvironments.isEmpty()) return request; + return execOperationWithResult( + new ObserverOperationWithResult(endpointObserverGetter, request) { @Override public Message call(EndpointObserver observer) throws IOException { return observer.preEndpointInvocation(this, service, methodName, getResult()); @@ -1473,9 +1550,14 @@ public class RegionCoprocessorHost }); } - public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException { - return execOperationWithResult(tracker, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + /** + * @deprecated Since 2.0 with out any replacement and will be removed in 3.0 + */ + @Deprecated + public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException { + if (this.coprocEnvironments.isEmpty()) return result; + return execOperationWithResult( + new ObserverOperationWithResult(regionObserverGetter, result) { @Override public DeleteTracker call(RegionObserver observer) throws IOException { return observer.postInstantiateDeleteTracker(this, getResult()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index 27a3e201ea..f0cfbde971 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -169,9 +169,10 @@ public class RegionServerCoprocessorHost extends public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) throws IOException { - return execOperationWithResult(endpoint, coprocEnvironments.isEmpty() ? null : + if (this.coprocEnvironments.isEmpty()) return endpoint; + return execOperationWithResult( new ObserverOperationWithResult( - rsObserverGetter) { + rsObserverGetter, endpoint) { @Override public ReplicationEndpoint call(RegionServerObserver observer) throws IOException { return observer.postCreateReplicationEndPoint(this, getResult()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 653ec75394..6ce44fe859 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -192,57 +192,54 @@ public class SecureBulkLoadManager { throw new DoNotRetryIOException("User token cannot be null"); } - boolean bypass = false; if (region.getCoprocessorHost() != null) { - bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); + region.getCoprocessorHost().preBulkLoadHFile(familyPaths); } boolean loaded = false; Map> map = null; try { - if (!bypass) { - // Get the target fs (HBase region server fs) delegation token - // Since we have checked the permission via 'preBulkLoadHFile', now let's give - // the 'request user' necessary token to operate on the target fs. - // After this point the 'doAs' user will hold two tokens, one for the source fs - // ('request user'), another for the target fs (HBase region server principal). - if (userProvider.isHadoopSecurityEnabled()) { - FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider,"renewer"); - targetfsDelegationToken.acquireDelegationToken(fs); - - Token targetFsToken = targetfsDelegationToken.getUserToken(); - if (targetFsToken != null - && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))){ - ugi.addToken(targetFsToken); - } + // Get the target fs (HBase region server fs) delegation token + // Since we have checked the permission via 'preBulkLoadHFile', now let's give + // the 'request user' necessary token to operate on the target fs. + // After this point the 'doAs' user will hold two tokens, one for the source fs + // ('request user'), another for the target fs (HBase region server principal). + if (userProvider.isHadoopSecurityEnabled()) { + FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider,"renewer"); + targetfsDelegationToken.acquireDelegationToken(fs); + + Token targetFsToken = targetfsDelegationToken.getUserToken(); + if (targetFsToken != null + && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))){ + ugi.addToken(targetFsToken); } + } - map = ugi.doAs(new PrivilegedAction>>() { - @Override - public Map> run() { - FileSystem fs = null; - try { - fs = FileSystem.get(conf); - for(Pair el: familyPaths) { - Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); - if(!fs.exists(stageFamily)) { - fs.mkdirs(stageFamily); - fs.setPermission(stageFamily, PERM_ALL_ACCESS); - } + map = ugi.doAs(new PrivilegedAction>>() { + @Override + public Map> run() { + FileSystem fs = null; + try { + fs = FileSystem.get(conf); + for(Pair el: familyPaths) { + Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); + if(!fs.exists(stageFamily)) { + fs.mkdirs(stageFamily); + fs.setPermission(stageFamily, PERM_ALL_ACCESS); } - //We call bulkLoadHFiles as requesting user - //To enable access prior to staging - return region.bulkLoadHFiles(familyPaths, true, - new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile()); - } catch (Exception e) { - LOG.error("Failed to complete bulk load", e); } - return null; + //We call bulkLoadHFiles as requesting user + //To enable access prior to staging + return region.bulkLoadHFiles(familyPaths, true, + new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile()); + } catch (Exception e) { + LOG.error("Failed to complete bulk load", e); } - }); - if (map != null) { - loaded = true; + return null; } + }); + if (map != null) { + loaded = true; } } finally { if (region.getCoprocessorHost() != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java index 34f93fa9e2..ed3dde63d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java @@ -149,8 +149,9 @@ public class WALCoprocessorHost */ public boolean preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { - return execOperationWithResult(false, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(walObserverGetter) { + if (this.coprocEnvironments.isEmpty()) return false; + return execOperationWithResult( + new ObserverOperationWithResult(walObserverGetter, false) { @Override public Boolean call(WALObserver oserver) throws IOException { return oserver.preWALWrite(this, info, logKey, logEdit); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 4499cd569d..ce8a224d8c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -169,6 +169,7 @@ import org.junit.rules.TestName; import org.junit.rules.TestRule; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; +import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -359,7 +360,7 @@ public class TestHRegion { /** * Create a WAL outside of the usual helper in - * {@link HBaseTestingUtility#createWal(Configuration, Path, HRegionInfo)} because that method + * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method * doesn't play nicely with FaultyFileSystem. Call this method before overriding * {@code fs.file.impl}. * @param callingMethod a unique component for the path, probably the name of the test method. @@ -2384,6 +2385,9 @@ public class TestHRegion { FileSystem fs = FileSystem.get(CONF); Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL"); FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF); + // This chunk creation is done throughout the code base. Do we want to move it into core? + // It is missing from this test. W/o it we NPE. + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES); @@ -2431,17 +2435,17 @@ public class TestHRegion { // save normalCPHost and replaced by mockedCPHost RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); - Answer answer = new Answer() { + // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must + // do below format (from Mockito doc). + Mockito.doAnswer(new Answer() { @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable { + public Object answer(InvocationOnMock invocation) throws Throwable { MiniBatchOperationInProgress mb = invocation.getArgumentAt(0, MiniBatchOperationInProgress.class); mb.addOperationsFromCP(0, new Mutation[]{addPut}); - return false; + return null; } - }; - when(mockedCPHost.preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class))) - .then(answer); + }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class)); region.setCoprocessorHost(mockedCPHost); region.put(originalPut); region.setCoprocessorHost(normalCPHost); -- 2.11.0 (Apple Git-81)