commit ad9f0e4d8f980ca80c71a50553447f59a141e263 Author: Yu Li Date: Thu Dec 1 14:45:12 2016 +0800 HBASE-17114 Add an option to set special retry pause when encountering CallQueueTooBigException diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index b0652a7..6cb6115 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -47,6 +47,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -257,6 +258,7 @@ class AsyncProcess { */ protected final int maxConcurrentTasksPerServer; protected final long pause; + protected final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified protected int numTries; protected int serverTrackerTimeout; protected int rpcTimeout; @@ -321,6 +323,15 @@ class AsyncProcess { this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); + if (configuredPauseForCQTBE < pause) { + LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " + + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE + + ", will use " + pause + " instead."); + this.pauseForCQTBE = pause; + } else { + this.pauseForCQTBE = configuredPauseForCQTBE; + } this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.rpcTimeout = rpcTimeout; @@ -1313,8 +1324,15 @@ class AsyncProcess { // we go for one. boolean retryImmediately = throwable instanceof RetryImmediatelyException; int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; - long backOffTime = retryImmediately ? 0 : - errorsByServer.calculateBackoffTime(oldServer, pause); + long backOffTime; + if (retryImmediately) { + backOffTime = 0; + } else if (throwable instanceof CallQueueTooBigException) { + // Give a special check on CQTBE, see #HBASE-17114 + backOffTime = errorsByServer.calculateBackoffTime(oldServer, pauseForCQTBE); + } else { + backOffTime = errorsByServer.calculateBackoffTime(oldServer, pause); + } if (numAttempt > startLogErrorsCnt) { // We use this value to have some logs when we have multiple failures, but not too many // logs, as errors are to be expected when a region moves, splits and so on diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 721ee2f..5f5badf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -557,6 +558,7 @@ class ConnectionManager { static final Log LOG = LogFactory.getLog(HConnectionImplementation.class); private final boolean hostnamesCanChange; private final long pause; + private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified private final boolean useMetaReplicas; private final int numTries; final int rpcTimeout; @@ -649,6 +651,15 @@ class ConnectionManager { this.closed = false; this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); + if (configuredPauseForCQTBE < pause) { + LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " + + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE + + ", will use " + pause + " instead."); + this.pauseForCQTBE = pause; + } else { + this.pauseForCQTBE = configuredPauseForCQTBE; + } this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS); this.numTries = connectionConfig.getRetriesNumber(); @@ -1272,6 +1283,7 @@ class ConnectionManager { } // Query the meta region + long pauseBase = this.pause; try { Result regionInfoRow = null; ReversedClientScanner rcs = null; @@ -1346,13 +1358,17 @@ class ConnectionManager { if (e instanceof RemoteException) { e = ((RemoteException)e).unwrapRemoteException(); } + if (e instanceof CallQueueTooBigException) { + // Give a special check on CallQueueTooBigException, see #HBASE-17114 + pauseBase = this.pauseForCQTBE; + } if (tries < localNumRetries - 1) { if (LOG.isDebugEnabled()) { LOG.debug("locateRegionInMeta parentTable=" + TableName.META_TABLE_NAME + ", metaLocation=" + ", attempt=" + tries + " of " + localNumRetries + " failed; retrying after sleep of " + - ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); + ConnectionUtils.getPauseTime(pauseBase, tries) + " because: " + e.getMessage()); } } else { throw e; @@ -1364,7 +1380,7 @@ class ConnectionManager { } } try{ - Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); + Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries)); } catch (InterruptedException e) { throw new InterruptedIOException("Giving up trying to location region in " + "meta: thread is interrupted."); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index 6f587d1..b03595a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -62,19 +63,22 @@ public class RpcRetryingCaller { private final int startLogErrorsCnt; private final long pause; + private final long pauseForCQTBE; private final int retries; private final int rpcTimeout;// timeout for each rpc request private final AtomicBoolean cancelled = new AtomicBoolean(false); private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptorContext context; - public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) { - this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0); + public RpcRetryingCaller(long pause, long pauseForCQTBE, int retries, int startLogErrorsCnt) { + this(pause, pauseForCQTBE, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, + startLogErrorsCnt, 0); } - public RpcRetryingCaller(long pause, int retries, + public RpcRetryingCaller(long pause, long pauseForCQTBE, int retries, RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { this.pause = pause; + this.pauseForCQTBE = pauseForCQTBE; this.retries = retries; this.interceptor = interceptor; context = interceptor.createEmptyContext(); @@ -159,9 +163,11 @@ public class RpcRetryingCaller { throw new RetriesExhaustedException(tries, exceptions); } // If the server is dead, we need to wait a little before retrying, to give - // a chance to the regions to be - // get right pause time, start by RETRY_BACKOFF[0] * pause - expectedSleep = callable.sleep(pause, tries); + // a chance to the regions to be moved + // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be + // special when encountering CallQueueTooBigException, see #HBASE-17114 + long pauseBase = (t instanceof CallQueueTooBigException) ? pauseForCQTBE : pause; + expectedSleep = callable.sleep(pauseBase, tries); // If, after the planned sleep, there won't be enough time left, we stop now. long duration = singleCallDuration(expectedSleep); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 6273624..a2551d9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.hbase.client; -import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; /** @@ -30,8 +32,10 @@ public class RpcRetryingCallerFactory { /** Configuration key for a custom {@link RpcRetryingCaller} */ public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; + private static final Log LOG = LogFactory.getLog(RpcRetryingCallerFactory.class); protected final Configuration conf; private final long pause; + private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified private final int retries; private final int rpcTimeout; private final RetryingCallerInterceptor interceptor; @@ -47,6 +51,15 @@ public class RpcRetryingCallerFactory { this.conf = conf; pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); + if (configuredPauseForCQTBE < pause) { + LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " + + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE + + ", will use " + pause + " instead."); + this.pauseForCQTBE = pause; + } else { + this.pauseForCQTBE = configuredPauseForCQTBE; + } retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, @@ -70,8 +83,8 @@ public class RpcRetryingCallerFactory { public RpcRetryingCaller newCaller(int rpcTimeout) { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - RpcRetryingCaller caller = new RpcRetryingCaller(pause, retries, interceptor, - startLogErrorsCnt, rpcTimeout); + RpcRetryingCaller caller = new RpcRetryingCaller(pause, pauseForCQTBE, retries, + interceptor, startLogErrorsCnt, rpcTimeout); return caller; } @@ -81,8 +94,8 @@ public class RpcRetryingCallerFactory { public RpcRetryingCaller newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - RpcRetryingCaller caller = new RpcRetryingCaller(pause, retries, interceptor, - startLogErrorsCnt, rpcTimeout); + RpcRetryingCaller caller = new RpcRetryingCaller(pause, pauseForCQTBE, retries, + interceptor, startLogErrorsCnt, rpcTimeout); return caller; } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 3b7f395..027d362 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -238,7 +238,7 @@ public class TestAsyncProcess { } }); - return new RpcRetryingCaller(100, 10, 9) { + return new RpcRetryingCaller(100, 500, 10, 9) { @Override public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) @@ -261,7 +261,7 @@ public class TestAsyncProcess { private final IOException e; public CallerWithFailure(IOException e) { - super(100, 100, 9); + super(100, 500, 100, 9); this.e = e; } @@ -366,7 +366,7 @@ public class TestAsyncProcess { replicaCalls.incrementAndGet(); } - return new RpcRetryingCaller(100, 10, 9) { + return new RpcRetryingCaller(100, 500, 10, 9) { @Override public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) @@ -1695,4 +1695,68 @@ public class TestAsyncProcess { } t.join(); } + + /** + * Test and make sure we could use a special pause setting when retry with + * CallQueueTooBigException, see HBASE-17114 + * @throws Exception if unexpected error happened during test + */ + @Test + public void testRetryPauseWithCallQueueTooBigException() throws Exception { + Configuration myConf = new Configuration(conf); + final long specialPause = 500L; + final int tries = 2; + myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause); + myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, tries); + ClusterConnection conn = new MyConnectionImpl(myConf); + BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); + AsyncProcessWithFailure ap = + new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException()); + mutator.ap = ap; + + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + + Put p = createPut(1, true); + mutator.mutate(p); + + long startTime = System.currentTimeMillis(); + try { + mutator.flush(); + Assert.fail(); + } catch (RetriesExhaustedWithDetailsException expected) { + } + long actualSleep = System.currentTimeMillis() - startTime; + long expectedSleep = 0L; + for (int i = 0; i < tries - 1; i++) { + expectedSleep += ConnectionUtils.getPauseTime(specialPause, i); + // Prevent jitter in CollectionUtils#getPauseTime to affect result + actualSleep += (long) (specialPause * 0.01f); + } + LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); + Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms", + actualSleep >= expectedSleep); + + // check and confirm normal IOE will use the normal pause + final long normalPause = + myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + ap = new AsyncProcessWithFailure(conn, myConf, new IOException()); + mutator.ap = ap; + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + mutator.mutate(p); + startTime = System.currentTimeMillis(); + try { + mutator.flush(); + Assert.fail(); + } catch (RetriesExhaustedWithDetailsException expected) { + } + actualSleep = System.currentTimeMillis() - startTime; + expectedSleep = 0L; + for (int i = 0; i < tries - 1; i++) { + expectedSleep += ConnectionUtils.getPauseTime(normalPause, i); + } + // plus an additional pause to balance the program execution time + expectedSleep += normalPause; + LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); + Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep); + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index a6c8685..7fb18d7 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -532,7 +532,7 @@ public class TestClientScanner { @Override public RpcRetryingCaller newCaller() { - return new RpcRetryingCaller(0, 0, 0) { + return new RpcRetryingCaller(0, 0, 0, 0) { @Override public void cancel() { } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ad68fb1..4606b03 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -713,6 +713,11 @@ public final class HConstants { */ public static final long DEFAULT_HBASE_CLIENT_PAUSE = 100; + /* + * Parameter name for client pause value for special case such as call queue too big, etc. + */ + public static final String HBASE_CLIENT_PAUSE_FOR_CQTBE = "hbase.client.pause.cqtbe"; + /** * The maximum number of concurrent connections the client will maintain. */ diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 46a4050..c96bc35 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -481,6 +481,14 @@ possible configurations would overwhelm and obscure the important. this initial pause amount and how this pause works w/ retries. + hbase.client.pause.cqtbe + + Whether or not to use a special client pause for + CallQueueTooBigException (cqtbe). Set this property to a higher value + than hbase.client.pause if you observe frequent CQTBE from the same + RegionServer and the call queue there keeps full + + hbase.client.retries.number 35 Maximum retries. Used as maximum for all retryable