From eaf399dfd6fb08c50dfcaa9ebfb318eabb3b5d7b Mon Sep 17 00:00:00 2001 From: Sergey Soldatov Date: Sun, 5 Jun 2016 23:46:03 -0700 Subject: HBASE-15957 RpcClientImpl.close never ends in some circumstances --- .../org/apache/hadoop/hbase/ipc/RpcClientImpl.java | 5 ++-- .../hadoop/hbase/ipc/IntegrationTestRpcClient.java | 35 ++++++++++++++++++---- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index d8c87e9..dc05af1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -1202,9 +1202,8 @@ public class RpcClientImpl extends AbstractRpcClient { } if (connsToClose != null) { for (Connection conn : connsToClose) { - if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) { - conn.close(); - } + conn.markClosed(new InterruptedIOException("RpcClient is closing")); + conn.close(); } } // wait until all connections are closed diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index c28f3e6..6c0fbcc 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -41,12 +42,6 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.ipc.AbstractRpcClient; -import org.apache.hadoop.hbase.ipc.AsyncRpcClient; -import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; -import org.apache.hadoop.hbase.ipc.RpcClientImpl; -import org.apache.hadoop.hbase.ipc.RpcScheduler; -import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; @@ -290,6 +285,7 @@ public class IntegrationTestRpcClient { static class SimpleClient extends Thread { AbstractRpcClient rpcClient; AtomicBoolean running = new AtomicBoolean(true); + AtomicBoolean sending = new AtomicBoolean(false); AtomicReference exception = new AtomicReference<>(null); Cluster cluster; String id; @@ -319,6 +315,7 @@ public class IntegrationTestRpcClient { if (address == null) { throw new IOException("Listener channel is closed"); } + sending.set(true); ret = (EchoResponseProto) rpcClient.callBlockingMethod(md, null, param, ret, user, address); } catch (Exception e) { @@ -340,6 +337,9 @@ public class IntegrationTestRpcClient { void stopRunning() { running.set(false); } + boolean isSending() { + return sending.get(); + } void rethrowException() throws Throwable { if (exception.get() != null) { @@ -348,6 +348,29 @@ public class IntegrationTestRpcClient { } } + /* + Test that not started connections are successfully removed from connection pool when + rpc client is closing. + */ + @Test (timeout = 30000) + public void testRpcWithWriteThread() throws IOException, InterruptedException { + LOG.info("Starting test"); + Cluster cluster = new Cluster(1, 1); + cluster.startServer(); + conf.setBoolean(SPECIFIC_WRITE_THREAD, true); + for(int i = 0; i <1000; i++) { + AbstractRpcClient rpcClient = createRpcClient(conf, true); + SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1"); + client.start(); + while(!client.isSending()) { + Thread.sleep(1); + } + client.stopRunning(); + rpcClient.close(); + } + } + + @Test (timeout = 900000) public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable { for (int i = 0; i < numIterations; i++) { -- 2.5.4 (Apple Git-61)