From a825679fe9cc57eaf054e9f568059bcd0b7ace4b Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Tue, 11 Oct 2016 20:55:27 +0800 Subject: [PATCH] HBASE-16664 Timeout logic in AsyncProcess is broken --- .../apache/hadoop/hbase/client/AsyncProcess.java | 15 +++- .../org/apache/hadoop/hbase/client/TestHCM.java | 80 ++++++++++++++++++++-- 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 0f1fd58..5a2ea6b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -231,7 +231,8 @@ class AsyncProcess { protected final long pause; protected int numTries; protected int serverTrackerTimeout; - protected int timeout; + protected int rpcTimeout; + protected int operationTimeout; protected long primaryCallTimeoutMicroseconds; // End configuration settings. @@ -289,8 +290,10 @@ class AsyncProcess { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, @@ -745,7 +748,11 @@ class AsyncProcess { try { RpcRetryingCaller caller = createCaller(callable); if (callsInProgress != null) callsInProgress.add(callable); - res = caller.callWithoutRetries(callable, timeout); + int remain = (int)(deadline - System.currentTimeMillis()); + if (remain <= 0) { + throw new DoNotRetryIOException("Operation timeout"); + } + res = caller.callWithoutRetries(callable, Math.min(remain, rpcTimeout)); if (res == null) { // Cancelled @@ -812,6 +819,7 @@ class AsyncProcess { private final int[] replicaGetIndices; private final boolean hasAnyReplicaGets; private final long nonceGroup; + private final long deadline; public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, @@ -883,6 +891,7 @@ class AsyncProcess { this.errorsByServer = createServerErrorTracker(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); + this.deadline = System.currentTimeMillis() + operationTimeout; } public Set> getCallsInProgress() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index e592fa8..7d339fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -18,12 +18,7 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import com.google.common.collect.Lists; import java.io.IOException; import java.lang.reflect.Field; @@ -72,6 +67,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -88,7 +84,12 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; -import com.google.common.collect.Lists; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * This class is for testing HBaseConnectionManager features @@ -144,6 +145,15 @@ public class TestHCM { throw new IOException("first call I fail"); } } + + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } } public static class SleepCoprocessor extends BaseRegionObserver { @@ -153,6 +163,12 @@ public class TestHCM { final Get get, final List results) throws IOException { Threads.sleep(SLEEP_TIME); } + + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(SLEEP_TIME); + } } @BeforeClass @@ -369,6 +385,56 @@ public class TestHCM { } } + @Test(expected = RetriesExhaustedException.class) + public void testPutRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + SleepCoprocessor.SLEEP_TIME / 2); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + SleepCoprocessor.SLEEP_TIME * 100); + try (Table t = TEST_UTIL.getConnection().getTable(hdt.getTableName())) { + t.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + } finally { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + } + } + + @Test + public void testPutOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, Integer.MAX_VALUE); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120 * 1000); + Table table = TEST_UTIL.getConnection().getTable(hdt.getTableName()); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30 * 1000); + table = TEST_UTIL.getConnection().getTable(hdt.getTableName()); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (IOException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } finally { + table.close(); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + } + } + /** * Test starting from 0 index when RpcRetryingCaller calculate the backoff time. */ -- 2.8.4 (Apple Git-73)