commit 97d9550196c15c85e3c1140fdf3a8cebb2234ad8 Author: Yu Li Date: Wed Sep 30 13:19:01 2015 +0800 Uniform the semantic of hbase.client.retries.number diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 96ed184e686e754910f22c508702f5a6716320a3..7c7fc3e67db2b23bdc9a1f98545d6370ec13116e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -258,8 +258,9 @@ class AsyncProcess { this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + // how many times we could try in total, one more than retry number this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); @@ -1117,7 +1118,7 @@ class AsyncProcess { private void receiveGlobalFailure( MultiAction rsActions, ServerName server, int numAttempt, Throwable t) { errorsByServer.reportServerError(server); - Retry canRetry = errorsByServer.canRetryMore(numAttempt) + Retry canRetry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; if (tableName == null) { @@ -1253,7 +1254,7 @@ class AsyncProcess { if (failureCount == 0) { errorsByServer.reportServerError(server); // We determine canRetry only once for all calls, after reporting server failure. - canRetry = errorsByServer.canRetryMore(numAttempt); + canRetry = errorsByServer.canTryMore(numAttempt); } ++failureCount; Retry retry = manageError(sentAction.getOriginalIndex(), row, @@ -1301,7 +1302,7 @@ class AsyncProcess { if (failureCount == 0) { errorsByServer.reportServerError(server); - canRetry = errorsByServer.canRetryMore(numAttempt); + canRetry = errorsByServer.canTryMore(numAttempt); } connection.updateCachedLocations( tableName, region, actions.get(0).getAction().getRow(), throwable, server); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index ade32a8135650b72f2e82df45acf23ac81df5b69..e0cd16b33a9957a656a44c7330767c2cfb9311fe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -226,7 +226,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS); - this.numTries = tableConfig.getRetriesNumber(); + // how many times to try, one more than max *retry* time + this.numTries = tableConfig.getRetriesNumber() + 1; this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); @@ -821,13 +822,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable { s.setConsistency(Consistency.TIMELINE); } - int localNumRetries = (retry ? numTries : 1); + int maxAttempts = (retry ? numTries : 1); for (int tries = 0; true; tries++) { - if (tries >= localNumRetries) { + if (tries >= maxAttempts) { throw new NoServerForRegionException("Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName + - " after " + localNumRetries + " tries."); + " after " + tries + " tries."); } if (useCache) { RegionLocations locations = getCachedLocation(tableName, row); @@ -915,12 +916,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (e instanceof RemoteException) { e = ((RemoteException)e).unwrapRemoteException(); } - if (tries < localNumRetries - 1) { + if (tries < maxAttempts - 1) { if (LOG.isDebugEnabled()) { LOG.debug("locateRegionInMeta parentTable=" + TableName.META_TABLE_NAME + ", metaLocation=" + ", attempt=" + tries + " of " + - localNumRetries + " failed; retrying after sleep of " + + maxAttempts + " failed; retrying after sleep of " + ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); } } else { @@ -1061,21 +1062,27 @@ class ConnectionImplementation implements ClusterConnection, Closeable { private final ConcurrentMap errorsByServer = new ConcurrentHashMap(); private final long canRetryUntil; - private final int maxRetries; + private final int maxTries;// max number to try private final long startTrackingTime; - public ServerErrorTracker(long timeout, int maxRetries) { - this.maxRetries = maxRetries; + /** + * Constructor + * @param timeout how long to wait before timeout, in unit of millisecond + * @param maxTries how many times to try + */ + public ServerErrorTracker(long timeout, int maxTries) { + this.maxTries = maxTries; this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout; this.startTrackingTime = new Date().getTime(); } /** - * We stop to retry when we have exhausted BOTH the number of retries and the time allocated. + * We stop to retry when we have exhausted BOTH the number of tries and the time allocated. + * @param numAttempt how many times we have tried by now */ - boolean canRetryMore(int numRetry) { + boolean canTryMore(int numAttempt) { // If there is a single try we must not take into account the time. - return numRetry < maxRetries || (maxRetries > 1 && + return numAttempt < maxTries || (maxTries > 1 && EnvironmentEdgeManager.currentTime() < this.canRetryUntil); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index b1f5b9f1c2e80c22d996d8f04fd09ab723feed13..034932154997338011171611648c7d1eb7907b89 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -84,7 +84,7 @@ public class HTableMultiplexer { private final Configuration workerConf; private final ClusterConnection conn; private final ExecutorService pool; - private final int retryNum; + private final int maxAttempts; private final int perRegionServerBufferQueueSize; private final int maxKeyValueSize; private final ScheduledExecutorService executor; @@ -99,8 +99,9 @@ public class HTableMultiplexer { throws IOException { this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf); this.pool = HTable.getDefaultExecutor(conf); - this.retryNum = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + // how many times we could try in total, one more than retry number + this.maxAttempts = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf); this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100); @@ -123,7 +124,7 @@ public class HTableMultiplexer { * @return true if the request can be accepted by its corresponding buffer queue. */ public boolean put(TableName tableName, final Put put) { - return put(tableName, put, this.retryNum); + return put(tableName, put, this.maxAttempts); } /** @@ -140,7 +141,7 @@ public class HTableMultiplexer { List failedPuts = null; boolean result; for (Put put : puts) { - result = put(tableName, put, this.retryNum); + result = put(tableName, put, this.maxAttempts); if (result == false) { // Create the failed puts list if necessary @@ -168,8 +169,8 @@ public class HTableMultiplexer { * Return false if the queue is already full. * @return true if the request can be accepted by its corresponding buffer queue. */ - public boolean put(final TableName tableName, final Put put, int retry) { - if (retry <= 0) { + public boolean put(final TableName tableName, final Put put, int maxAttempts) { + if (maxAttempts <= 0) { return false; } @@ -181,7 +182,7 @@ public class HTableMultiplexer { LinkedBlockingQueue queue = getQueue(loc); // Generate a MultiPutStatus object and offer it into the queue - PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry); + PutStatus s = new PutStatus(loc.getRegionInfo(), put, maxAttempts); return queue.offer(s); } @@ -342,12 +343,12 @@ public class HTableMultiplexer { private static class PutStatus { private final HRegionInfo regionInfo; private final Put put; - private final int retryCount; + private final int maxAttempCount; - public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) { + public PutStatus(HRegionInfo regionInfo, Put put, int maxAttempCount) { this.regionInfo = regionInfo; this.put = put; - this.retryCount = retryCount; + this.maxAttempCount = maxAttempCount; } } @@ -441,7 +442,7 @@ public class HTableMultiplexer { private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException { // Decrease the retry count - final int retryCount = ps.retryCount - 1; + final int retryCount = ps.maxAttempCount - 1; if (retryCount <= 0) { // Update the failed counter and no retry any more. @@ -460,7 +461,7 @@ public class HTableMultiplexer { final TableName tableName = ps.regionInfo.getTable(); long delayMs = ConnectionUtils.getPauseTime(multiplexer.flushPeriod, - multiplexer.retryNum - retryCount - 1); + multiplexer.maxAttempts - retryCount - 1); if (LOG.isDebugEnabled()) { LOG.debug("resubmitting after " + delayMs + "ms: " + retryCount); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java index 6f6820e9c6d17032ac9c76e6fac2998fe6bed2ef..96c47eaee530bc2a37fef41e8aaad64baf8da3c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedException.java @@ -78,13 +78,13 @@ public class RetriesExhaustedException extends IOException { /** * Create a new RetriesExhaustedException from the list of prior failures. - * @param numTries + * @param numReries How many times we have retried, one less than total attempts * @param exceptions List of exceptions that failed before giving up */ @InterfaceAudience.Private - public RetriesExhaustedException(final int numTries, + public RetriesExhaustedException(final int numReries, final List exceptions) { - super(getMessage(numTries, exceptions), + super(getMessage(numReries, exceptions), (exceptions != null && !exceptions.isEmpty() ? exceptions.get(exceptions.size() - 1).t : null)); } @@ -94,7 +94,7 @@ public class RetriesExhaustedException extends IOException { StringBuilder buffer = new StringBuilder("Failed contacting "); buffer.append(callableVitals); buffer.append(" after "); - buffer.append(numTries + 1); + buffer.append(numTries); buffer.append(" attempts.\nExceptions:\n"); for (Throwable t : exceptions) { buffer.append(t.toString()); @@ -103,10 +103,10 @@ public class RetriesExhaustedException extends IOException { return buffer.toString(); } - private static String getMessage(final int numTries, + private static String getMessage(final int numReries, final List exceptions) { StringBuilder buffer = new StringBuilder("Failed after attempts="); - buffer.append(numTries + 1); + buffer.append(numReries + 1); buffer.append(", exceptions:\n"); for (ThrowableWithExtraContext t : exceptions) { buffer.append(t.toString()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index dd56b17f9b59790686f0e634f854004e54cf389e..12abc6a8b7ae6150c209f22d083e791e697e0698 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -60,7 +60,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final int startLogErrorsCnt; private final long pause; - private final int retries; + private final int maxAttempts;// how many times to try private final AtomicBoolean cancelled = new AtomicBoolean(false); private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptorContext context; @@ -72,7 +72,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { public RpcRetryingCallerImpl(long pause, int retries, RetryingCallerInterceptor interceptor, int startLogErrorsCnt) { this.pause = pause; - this.retries = retries; + this.maxAttempts = retries + 1; this.interceptor = interceptor; context = interceptor.createEmptyContext(); this.startLogErrorsCnt = startLogErrorsCnt; @@ -121,8 +121,8 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { } catch (Throwable t) { ExceptionUtil.rethrowIfInterrupt(t); if (tries > startLogErrorsCnt) { - LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" + - (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, " + LOG.info("Call exception, tries=" + tries + ", maxAttempts=" + maxAttempts + ", started=" + + (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, " + "cancelled=" + cancelled.get() + ", msg=" + callable.getExceptionMessageAdditionalDetail()); } @@ -130,12 +130,12 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { // translateException throws exception when should not retry: i.e. when request is bad. interceptor.handleFailure(context, t); t = translateException(t); - callable.throwable(t, retries != 1); + callable.throwable(t, maxAttempts != 1); RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(t, EnvironmentEdgeManager.currentTime(), toString()); exceptions.add(qt); - if (tries >= retries - 1) { + if (tries >= maxAttempts - 1) { throw new RetriesExhaustedException(tries, exceptions); } // If the server is dead, we need to wait a little before retrying, to give @@ -162,7 +162,8 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { } if (cancelled.get()) return null; } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted after " + tries + " tries on " + retries); + throw new InterruptedIOException("Interrupted after " + tries + + " tries while maxAttempts=" + maxAttempts); } } } @@ -231,6 +232,6 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { @Override public String toString() { return "RpcRetryingCaller{" + "globalStartTime=" + globalStartTime + - ", pause=" + pause + ", retries=" + retries + '}'; + ", pause=" + pause + ", maxAttempts=" + maxAttempts + '}'; } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index fa3ed32d7a760b42172829d9d2b5056afce4ec39..67e75226c80217eb2090c2ca6c60c9e4b53001bd 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -137,7 +137,6 @@ public class TestAsyncProcess { AsyncRequestFutureImpl r = super.createAsyncRequestFuture( DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults); allReqs.add(r); - callsCt.incrementAndGet(); return r; } @@ -548,7 +547,7 @@ public class TestAsyncProcess { ars = ap.submit(DUMMY_TABLE, puts, false, null, true); Assert.assertEquals(0, puts.size()); ars.waitUntilDone(); - Assert.assertEquals(2, ap.callsCt.get()); + Assert.assertEquals(1, ap.callsCt.get()); verifyResult(ars, true); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java index 04078774637c818577396a266db624c936c1a5de..a2699f6649357358754c3b8a602eae3b7a0924cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java @@ -252,7 +252,7 @@ public class HMasterCommandLine extends ServerCommandLine { private int stopMaster() { Configuration conf = getConf(); // Don't try more than once - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); try (Connection connection = ConnectionFactory.createConnection(conf)) { try (Admin admin = connection.getAdmin()) { admin.shutdown(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java index a3cd8d0cef7961c0cd51b9867730189bb7f3ef95..d175744e2db566a03618a3e3e66c5c78b4eefcac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java @@ -103,7 +103,7 @@ public class TestBlockEvictionFromClient { conf.setStrings("hbase.bucketcache.ioengine", "heap"); conf.setFloat("hfile.block.cache.size", 0.2f); conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000); FAMILIES_1[0] = FAMILY; TEST_UTIL.startMiniCluster(SLAVES); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index bc4849552caf122e90794e733cf33ef73d522291..041b19fb18b4a842d7b8e43981ede44b899e78ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -485,7 +485,8 @@ public class TestHCM { Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); // We want to work on a separate connection. c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); - c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + // try only once w/o any retry + c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000); final Connection connection = ConnectionFactory.createConnection(c2); @@ -575,7 +576,8 @@ public class TestHCM { public void testRegionCaching() throws Exception{ TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close(); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + // test with no retry, or client cache will get updated after the first failure + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); Connection connection = ConnectionFactory.createConnection(conf); final Table table = connection.getTable(TABLE_NAME); @@ -1052,11 +1054,11 @@ public class TestHCM { // We also should not go over the boundary; last retry would be on it. long timeLeft = (long)(ANY_PAUSE * 0.5); timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft); - assertTrue(tracker.canRetryMore(1)); + assertTrue(tracker.canTryMore(1)); tracker.reportServerError(location); assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE)); timeMachine.setValue(timeBase + largeAmountOfTime); - assertFalse(tracker.canRetryMore(1)); + assertFalse(tracker.canTryMore(1)); } finally { EnvironmentEdgeManager.reset(); }