diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 045885f..5bb0f58 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -1066,7 +1066,6 @@ class AsyncProcess { for (Map.Entry> e : actionsByServer.entrySet()) { ServerName server = e.getKey(); MultiAction multiAction = e.getValue(); - incTaskCounters(multiAction.getRegions(), server); Collection runnables = getNewMultiActionRunnable(server, multiAction, numAttempt); // make sure we correctly count the number of runnables before we try to reuse the send @@ -1114,6 +1113,7 @@ class AsyncProcess { if (connection.getConnectionMetrics() != null) { connection.getConnectionMetrics().incrNormalRunners(); } + incTaskCounters(multiAction.getRegions(), server); SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); @@ -1136,6 +1136,7 @@ class AsyncProcess { List toReturn = new ArrayList(actions.size()); for (DelayingRunner runner : actions.values()) { + incTaskCounters(runner.getActions().getRegions(), server); String traceText = "AsyncProcess.sendMultiAction"; Runnable runnable = addSingleServerRequestHeapSize(server, new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); @@ -1757,7 +1758,8 @@ class AsyncProcess { } } - private void updateStats(ServerName server, Map results) { + @VisibleForTesting + protected void updateStats(ServerName server, Map results) { boolean metrics = AsyncProcess.this.connection.getConnectionMetrics() != null; boolean stats = AsyncProcess.this.connection.getStatisticsTracker() != null; if (!stats && !metrics) { diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 516f2cf..00f5232 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -82,24 +82,10 @@ import org.junit.rules.TestRule; import org.mockito.Mockito; import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker; import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -225,6 +211,11 @@ public class TestAsyncProcess { return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); } + + @Override + protected void updateStats(ServerName server, Map results) { + // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. + } @Override protected RpcRetryingCaller createCaller( CancellableRegionServerCallable callable) { @@ -295,7 +286,21 @@ public class TestAsyncProcess { return new CallerWithFailure(ioe); } } - + /** + * Make the backoff time always different on each call. + */ + static class MyClientBackoffPolicy implements ClientBackoffPolicy { + private final Map count = new HashMap<>(); + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + AtomicInteger inc = count.get(serverName); + if (inc == null) { + inc = new AtomicInteger(0); + count.put(serverName, inc); + } + return inc.getAndIncrement(); + } + } class MyAsyncProcessWithReplicas extends MyAsyncProcess { private Set failures = new TreeSet(new Bytes.ByteArrayComparator()); private long primarySleepMs = 0, replicaSleepMs = 0; @@ -836,6 +841,46 @@ public class TestAsyncProcess { } @Test + public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException { + ClusterConnection hc = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(hc, conf, false); + testTaskCount(ap); + } + + @Test + public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException { + Configuration copyConf = new Configuration(conf); + copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); + MyClientBackoffPolicy bp = new MyClientBackoffPolicy(); + ClusterConnection hc = createHConnection(); + Mockito.when(hc.getConfiguration()).thenReturn(copyConf); + Mockito.when(hc.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf)); + Mockito.when(hc.getBackoffPolicy()).thenReturn(bp); + MyAsyncProcess ap = new MyAsyncProcess(hc, copyConf, false); + testTaskCount(ap); + } + + private void testTaskCount(AsyncProcess ap) throws InterruptedIOException, InterruptedException { + List puts = new ArrayList<>(); + for (int i = 0; i != 3; ++i) { + puts.add(createPut(1, true)); + puts.add(createPut(2, true)); + puts.add(createPut(3, true)); + } + ap.submit(DUMMY_TABLE, puts, true, null, false); + ap.waitUntilDone(); + // More time to wait if there are incorrect task count. + TimeUnit.SECONDS.sleep(1); + assertEquals(0, ap.tasksInProgress.get()); + for (AtomicInteger count : ap.taskCounterPerRegion.values()) { + assertEquals(0, count.get()); + } + for (AtomicInteger count : ap.taskCounterPerServer.values()) { + assertEquals(0, count.get()); + } + } + + @Test public void testMaxTask() throws Exception { final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java index bd0268a..b4c808c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** - * Implementations of this interface will keep and return to clients + * Implementations of this interface will keep and return to clients * implementations of classes providing API to execute * coordinated operations. This interface is client-side, so it does NOT * include methods to retrieve the particular interface implementations.