From 6a9b96646ff0c4349336f5f9ce64f2959b8255b7 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Tue, 11 Oct 2016 21:10:49 +0800 Subject: [PATCH] HBASE-16664 Timeout logic in AsyncProcess is broken --- .../apache/hadoop/hbase/client/AsyncProcess.java | 15 +++- .../org/apache/hadoop/hbase/client/TestHCM.java | 80 ++++++++++++++++++++-- 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 49c095d..dac343b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -212,7 +212,8 @@ class AsyncProcess { protected final long pause; protected int numTries; protected int serverTrackerTimeout; - protected int timeout; + protected int rpcTimeout; + protected int operationTimeout; protected long primaryCallTimeoutMicroseconds; // End configuration settings. @@ -270,8 +271,10 @@ class AsyncProcess { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, @@ -720,7 +723,11 @@ class AsyncProcess { try { RpcRetryingCaller caller = createCaller(callable); if (callsInProgress != null) callsInProgress.add(callable); - res = caller.callWithoutRetries(callable, timeout); + int remain = (int)(deadline - System.currentTimeMillis()); + if (remain <= 0) { + throw new DoNotRetryIOException("Operation timeout"); + } + res = caller.callWithoutRetries(callable, Math.min(remain, rpcTimeout)); if (res == null) { // Cancelled @@ -781,6 +788,7 @@ class AsyncProcess { private final int[] replicaGetIndices; private final boolean hasAnyReplicaGets; private final long nonceGroup; + private final long deadline; public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, @@ -850,6 +858,7 @@ class AsyncProcess { this.errorsByServer = createServerErrorTracker(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); + this.deadline = System.currentTimeMillis() + operationTimeout; } public Set> getCallsInProgress() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 14cc983..099efbb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -18,12 +18,7 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import com.google.common.collect.Lists; import java.io.IOException; import java.lang.reflect.Field; @@ -70,6 +65,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -84,7 +80,12 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.collect.Lists; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * This class is for testing HBaseConnectionManager features @@ -136,6 +137,15 @@ public class TestHCM { throw new IOException("first call I fail"); } } + + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } } public static class SleepCoprocessor extends BaseRegionObserver { @@ -145,6 +155,12 @@ public class TestHCM { final Get get, final List results) throws IOException { Threads.sleep(SLEEP_TIME); } + + @Override + public void prePut(final ObserverContext e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(SLEEP_TIME); + } } @BeforeClass @@ -359,6 +375,56 @@ public class TestHCM { } } + @Test(expected = RetriesExhaustedException.class) + public void testPutRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + SleepCoprocessor.SLEEP_TIME / 2); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + SleepCoprocessor.SLEEP_TIME * 100); + try (Table t = TEST_UTIL.getConnection().getTable(hdt.getTableName())) { + t.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + } finally { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + } + } + + @Test + public void testPutOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, Integer.MAX_VALUE); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120 * 1000); + Table table = TEST_UTIL.getConnection().getTable(hdt.getTableName()); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30 * 1000); + table = TEST_UTIL.getConnection().getTable(hdt.getTableName()); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (IOException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } finally { + table.close(); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + } + } + /** * Test starting from 0 index when RpcRetryingCaller calculate the backoff time. */ -- 2.8.4 (Apple Git-73)