From 7a38e4ff11178395e5de0272861de8a7fcb5900a Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Tue, 26 Jul 2016 19:37:32 +0800 Subject: [PATCH] HBASE-16285 Drop RPC requests if it must be considered as timeout at client --- .../org/apache/hadoop/hbase/ipc/CallRunner.java | 4 ++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 13 +++++++ .../org/apache/hadoop/hbase/client/TestHCM.java | 45 ++++++++++++++++++---- 3 files changed, 55 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index e91699a..2e2f3cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -93,6 +93,10 @@ public class CallRunner { } return; } + if (System.currentTimeMillis() >= call.deadline) { + RpcServer.LOG.info("Drop timeout call: " + call); + return; + } this.status.setStatus("Setting up call"); this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); if (RpcServer.LOG.isTraceEnabled()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 73226aa..e7016c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -263,6 +263,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; + // We drop requests that have been considered as timeout at client. But at client, timeout is not + // a strict limit. Because of thread scheduling the client may wait the request for longer time. + // So we add a delay (in ms) here. + private static final String DROP_TIMEOUT_REQUEST_DELAY = "hbase.ipc.drop.timeout.request.delay"; + private static final int DEFAULT_DROP_TIMEOUT_REQUEST_DELAY = 200; + private final int dropTimeoutRequestDelay; + /** * Minimum allowable timeout (in milliseconds) in rpc request's header. This * configuration exists to prevent the rpc service regarding this request as timeout immediately. @@ -312,6 +319,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected long timestamp; // the time received when response is null // the time served when response is not null protected int timeout; + protected long deadline;// the deadline to handle this call, if exceed we can drop it. + /** * Chain of buffers to send as response. */ @@ -354,6 +363,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.retryImmediatelySupported = connection == null? null: connection.retryImmediatelySupported; this.timeout = timeout; + this.deadline = this.timeout > 0 ? this.timestamp + this.timeout + dropTimeoutRequestDelay + : Long.MAX_VALUE; } /** @@ -2049,6 +2060,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT, DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT); this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE); + this.dropTimeoutRequestDelay = conf.getInt(DROP_TIMEOUT_REQUEST_DELAY, + DEFAULT_DROP_TIMEOUT_REQUEST_DELAY); // Start the listener here and let it bind to the port listener = new Listener(name); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 1b20b76..bd400d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -18,11 +18,7 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import com.google.common.collect.Lists; import java.io.IOException; import java.lang.reflect.Field; @@ -82,7 +78,11 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; -import com.google.common.collect.Lists; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; /** * This class is for testing HBaseConnectionManager features @@ -109,6 +109,7 @@ public class TestHCM { private static final byte[] ROW = Bytes.toBytes("bbb"); private static final byte[] ROW_X = Bytes.toBytes("xxx"); private static Random _randy = new Random(); + private static final int RPC_RETRY = 5; /** * This copro sleeps 20 second. The first call it fails. The second time, it works. @@ -149,12 +150,31 @@ public class TestHCM { } } + public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver { + public static final int SLEEP_TIME = 2000; + static final AtomicLong ct = new AtomicLong(0); + @Override + public void preGetOp(final ObserverContext e, + final Get get, final List results) throws IOException { + // After first sleep, all requests are timeout except the last retry. If we handle + // all the following requests, finally the last request is also timeout. If we drop all + // timeout requests, we can handle the last request immediately and it will not timeout. + if (ct.incrementAndGet() <= 1) { + Threads.sleep(SLEEP_TIME * RPC_RETRY * 2); + } else { + Threads.sleep(SLEEP_TIME); + } + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); // Up the handlers; this test needs more than usual. TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); + // simulate queue blocking in testDropTimeoutRequest + TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1); TEST_UTIL.startMiniCluster(2); } @@ -364,6 +384,17 @@ public class TestHCM { } } + @Test + public void testDropTimeoutRequest() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDropTimeputRequest"); + hdt.addCoprocessor(SleepLongerAtFirstCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setRpcTimeout(SleepLongerAtFirstCoprocessor.SLEEP_TIME * 2); + t.get(new Get(FAM_NAM)); + } + } + /** * Test starting from 0 index when RpcRetryingCaller calculate the backoff time. */ -- 2.7.4 (Apple Git-66)