diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index e653c80..4bde70a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -210,6 +210,7 @@ class AsyncProcess { */ protected final int maxConcurrentTasksPerServer; protected final long pause; + protected final long specialPause; protected int numTries; protected int serverTrackerTimeout; protected int rpcTimeout; @@ -234,6 +235,8 @@ class AsyncProcess { this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + this.specialPause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_SPECIAL_CASE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE_SPECIAL_CASE); // how many times we could try in total, one more than retry number this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 6b6b99a..b865142 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -788,8 +789,15 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { // we go for one. boolean retryImmediately = throwable instanceof RetryImmediatelyException; int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; - long backOffTime = retryImmediately ? 0 : - errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause); + long backOffTime; + if (retryImmediately) { + backOffTime = 0; + } else if (throwable instanceof CallQueueTooBigException) { + // use a special pause for CQTBE, see #HBASE-17114 + backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.specialPause); + } else { + backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause); + } if (numAttempt > asyncProcess.startLogErrorsCnt) { // We use this value to have some logs when we have multiple failures, but not too many // logs, as errors are to be expected when a region moves, splits and so on diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 53eb522..1b3cb4b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -114,6 +115,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { private final boolean hostnamesCanChange; private final long pause; + private final long specialPause; private final boolean useMetaReplicas; private final int numTries; final int rpcTimeout; @@ -193,6 +195,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.closed = false; this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + this.specialPause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_SPECIAL_CASE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE_SPECIAL_CASE); this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS); // how many times to try, one more than max *retry* time @@ -751,6 +755,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } // Query the meta region + long pauseBase = this.pause; try { Result regionInfoRow = null; ReversedClientScanner rcs = null; @@ -825,13 +830,17 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (e instanceof RemoteException) { e = ((RemoteException)e).unwrapRemoteException(); } + if (e instanceof CallQueueTooBigException) { + // use a longer pause between retries when meta call queue already full, see #HBASE-17114 + pauseBase = this.specialPause; + } if (tries < maxAttempts - 1) { if (LOG.isDebugEnabled()) { LOG.debug("locateRegionInMeta parentTable=" + TableName.META_TABLE_NAME + ", metaLocation=" + ", attempt=" + tries + " of " + maxAttempts + " failed; retrying after sleep of " + - ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); + ConnectionUtils.getPauseTime(pauseBase, tries) + " because: " + e.getMessage()); } } else { throw e; @@ -843,7 +852,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } } try{ - Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); + Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries)); } catch (InterruptedException e) { throw new InterruptedIOException("Giving up trying to location region in " + "meta: thread is interrupted."); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index f92aeae..744a6c1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -32,6 +32,7 @@ public class RpcRetryingCallerFactory { public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; protected final Configuration conf; private final long pause; + private final long specialPause; private final int retries; private final int rpcTimeout; private final RetryingCallerInterceptor interceptor; @@ -48,6 +49,8 @@ public class RpcRetryingCallerFactory { this.conf = conf; pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + specialPause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_SPECIAL_CASE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE_SPECIAL_CASE); retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, @@ -71,8 +74,8 @@ public class RpcRetryingCallerFactory { public RpcRetryingCaller newCaller(int rpcTimeout) { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, retries, interceptor, - startLogErrorsCnt, rpcTimeout); + RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, specialPause, retries, + interceptor, startLogErrorsCnt, rpcTimeout); return caller; } @@ -82,8 +85,8 @@ public class RpcRetryingCallerFactory { public RpcRetryingCaller newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, retries, interceptor, - startLogErrorsCnt, rpcTimeout); + RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, specialPause, retries, + interceptor, startLogErrorsCnt, rpcTimeout); return caller; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 91a20ec..a0775f0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; @@ -57,6 +58,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final int startLogErrorsCnt; private final long pause; + private final long specialPause; private final int maxAttempts;// how many times to try private final int rpcTimeout;// timeout for each rpc request private final AtomicBoolean cancelled = new AtomicBoolean(false); @@ -64,13 +66,15 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final RetryingCallerInterceptorContext context; private final RetryingTimeTracker tracker; - public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) { - this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0); + public RpcRetryingCallerImpl(long pause, long specialPause, int retries, int startLogErrorsCnt) { + this(pause, specialPause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, + startLogErrorsCnt, 0); } - public RpcRetryingCallerImpl(long pause, int retries, + public RpcRetryingCallerImpl(long pause, long specialPause, int retries, RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { this.pause = pause; + this.specialPause = specialPause; this.maxAttempts = retries2Attempts(retries); this.interceptor = interceptor; context = interceptor.createEmptyContext(); @@ -126,9 +130,11 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { throw new RetriesExhaustedException(tries, exceptions); } // If the server is dead, we need to wait a little before retrying, to give - // a chance to the regions to be - // get right pause time, start by RETRY_BACKOFF[0] * pause - expectedSleep = callable.sleep(pause, tries); + // a chance to the regions to be moved + // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase will be + // special when encountering CallQueueTooBigException, see #HBASE-17114 + long pauseBase = (t instanceof CallQueueTooBigException) ? specialPause : pause; + expectedSleep = callable.sleep(pauseBase, tries + 1); // If, after the planned sleep, there won't be enough time left, we stop now. long duration = singleCallDuration(expectedSleep); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 5a21699..3c6717b 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -236,7 +236,7 @@ public class TestAsyncProcess { } }); - return new RpcRetryingCallerImpl(100, 10, 9) { + return new RpcRetryingCallerImpl(100, 500, 10, 9) { @Override public AbstractResponse callWithoutRetries(RetryingCallable callable, int callTimeout) @@ -280,7 +280,7 @@ public class TestAsyncProcess { private final IOException e; public CallerWithFailure(IOException e) { - super(100, 100, 9); + super(100, 500, 100, 9); this.e = e; } @@ -386,7 +386,7 @@ public class TestAsyncProcess { replicaCalls.incrementAndGet(); } - return new RpcRetryingCallerImpl(100, 10, 9) { + return new RpcRetryingCallerImpl(100, 500, 10, 9) { @Override public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) @@ -1730,4 +1730,68 @@ public class TestAsyncProcess { } t.join(); } + + /** + * Test and make sure we will use the special pause setting when retry with + * CallQueueTooBigException, see HBASE-17114 + * @throws Exception if unexpected error happened during test + */ + @Test + public void testRetryPauseWithCallQueueTooBigException() throws Exception { + Configuration myConf = new Configuration(conf); + final long specialPause = 500L; + final int tries = 2; + myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_SPECIAL_CASE, specialPause); + myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, tries); + ClusterConnection conn = new MyConnectionImpl(myConf); + BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); + AsyncProcessWithFailure ap = + new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException()); + mutator.ap = ap; + + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + + Put p = createPut(1, true); + mutator.mutate(p); + + long startTime = System.currentTimeMillis(); + try { + mutator.flush(); + Assert.fail(); + } catch (RetriesExhaustedWithDetailsException expected) { + } + long actualSleep = System.currentTimeMillis() - startTime; + long expectedSleep = 0L; + for (int i = 0; i < tries; i++) { + expectedSleep += ConnectionUtils.getPauseTime(specialPause, i); + // Prevent jitter in CollectionUtils#getPauseTime to affect result + actualSleep += (long) (specialPause * 0.01f); + } + LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); + Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms", + actualSleep >= expectedSleep); + + // check and confirm normal IOE will use the normal pause + final long normalPause = + myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + ap = new AsyncProcessWithFailure(conn, myConf, new IOException()); + mutator.ap = ap; + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + mutator.mutate(p); + startTime = System.currentTimeMillis(); + try { + mutator.flush(); + Assert.fail(); + } catch (RetriesExhaustedWithDetailsException expected) { + } + actualSleep = System.currentTimeMillis() - startTime; + expectedSleep = 0L; + for (int i = 0; i < tries; i++) { + expectedSleep += ConnectionUtils.getPauseTime(normalPause, i); + } + // plus an additional pause to balance the program execution time + expectedSleep += normalPause; + LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); + Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 3f9b430..e809e07 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -713,6 +713,16 @@ public final class HConstants { public static final long DEFAULT_HBASE_CLIENT_PAUSE = 100; /** + * Parameter name for client pause value for special case such as call queue too big, etc. + */ + public static final String HBASE_CLIENT_PAUSE_SPECIAL_CASE = "hbase.client.pause.special"; + + /** + * Default value of {@link #HBASE_CLIENT_PAUSE_SPECIAL_CASE}. + */ + public static final long DEFAULT_HBASE_CLIENT_PAUSE_SPECIAL_CASE = 500; + + /** * The maximum number of concurrent connections the client will maintain. */ public static final String HBASE_CLIENT_MAX_TOTAL_TASKS = "hbase.client.max.total.tasks";