From fe8be945bf819de54dce93e33d16e8a939836b1d Mon Sep 17 00:00:00 2001 From: "chancelq" Date: Wed, 28 Mar 2018 10:03:22 +0800 Subject: [PATCH] HBASE-20303 RS RPC server should not allow the response queue size to be too large --- .../apache/hadoop/hbase/ipc/NettyRpcServer.java | 100 ++++++++++ .../hadoop/hbase/ipc/NettyServerRpcConnection.java | 6 + .../org/apache/hadoop/hbase/ipc/RpcServer.java | 34 ++++ .../hadoop/hbase/ipc/ServerRpcConnection.java | 18 +- .../apache/hadoop/hbase/ipc/SimpleRpcServer.java | 7 + .../hbase/ipc/SimpleServerRpcConnection.java | 5 + .../hadoop/hbase/ipc/TestNettyRpcServer.java | 2 - .../org/apache/hadoop/hbase/ipc/TestRpcServer.java | 204 +++++++++++++++++++++ 8 files changed, 362 insertions(+), 14 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index fb2a8ebd13..d79b033180 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.ipc; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; @@ -65,6 +70,19 @@ public class NettyRpcServer extends RpcServer { public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class); + private final long nettyResponseBufferHighWaterMarkGlobal; + private final long nettyResponseBufferHighWaterMarkPerChannel; + private final AtomicLong totalResponseBufferSize = new AtomicLong(0); + static final String NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_TOTAL = + "hbase.rpc.server.netty.response.buffer.high.watermark.global"; + static final String NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_PER_CHANNEL = + "hbase.rpc.server.netty.response.buffer.high.watermark.per.channel"; + private static final String RESPONSE_BUFFER_CHECK_INTERVAL = + "hbase.rpc.server.netty.response.buffer.check.interval"; + private static final long NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_GLOBAL_DEFAULT = 2147483648L; + private static final long NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_PERCHANNEL_DEFAULT = 104857600L; + private ScheduledExecutorService nettyResponseBufferHighWaterMarkRoutine; + private final InetSocketAddress bindAddress; private final CountDownLatch closed = new CountDownLatch(1); @@ -112,6 +130,48 @@ public class NettyRpcServer extends RpcServer { } initReconfigurable(conf); this.scheduler.init(new RpcSchedulerContext(this)); + + this.nettyResponseBufferHighWaterMarkGlobal = + conf.getLong(NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_TOTAL, + NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_GLOBAL_DEFAULT); + this.nettyResponseBufferHighWaterMarkPerChannel = + conf.getLong(NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_PER_CHANNEL, + NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_PERCHANNEL_DEFAULT); + int nettyResponseBufferHighWaterMarkRoutineInterval = + conf.getInt(RESPONSE_BUFFER_CHECK_INTERVAL, 5); + + //nettyResponseBufferHighWaterMarkGlobal > 0 to enable + if (nettyResponseBufferHighWaterMarkGlobal > 0) { + nettyResponseBufferHighWaterMarkRoutine = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setNameFormat("NettyResponseBufferHighWaterMarkRoutine") + .setDaemon(true).build()); + nettyResponseBufferHighWaterMarkRoutine.scheduleAtFixedRate(() -> { + try { + sumNettyResponseBuffer(); + } catch (Exception e) { + LOG.error("sumNettyResponseBuffer has Exception:" + e.getMessage(), e); + } + }, nettyResponseBufferHighWaterMarkRoutineInterval, + nettyResponseBufferHighWaterMarkRoutineInterval, TimeUnit.SECONDS); + + } + LOG.info("NettyRpcServer nettyResponseBufferHighWaterMarkGlobal =" + + nettyResponseBufferHighWaterMarkGlobal + " ; nettyResponseBufferHighWaterMarkPerChannel=" + + nettyResponseBufferHighWaterMarkPerChannel); + } + + private void sumNettyResponseBuffer() { + long total = 0; + for (Channel channel : allChannels) { + if (channel instanceof ServerChannel) { + continue; + } + total += channel.unsafe().outboundBuffer().totalPendingWriteBytes(); + } + + LOG.debug( + "current totalChannelWriteBufferSize=" + total + " ;total socket=" + allChannels.size()); + totalResponseBufferSize.set(total); } @VisibleForTesting @@ -148,6 +208,7 @@ public class NettyRpcServer extends RpcServer { allChannels.close().awaitUninterruptibly(); serverChannel.close(); scheduler.stop(); + nettyResponseBufferHighWaterMarkRoutine.shutdownNow(); closed.countDown(); running = false; } @@ -188,4 +249,43 @@ public class NettyRpcServer extends RpcServer { -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null); return call(fakeCall, status); } + + @Override + boolean checkResponseBufferBytes(ServerRpcConnection serverRpcConnection, int id, + long totalRequestSize) throws IOException { + + if (nettyResponseBufferHighWaterMarkGlobal <= 0) { + return true; + } + //RS total netty ChannelOutboundBuffer reach the global high water or channel access reach the + //high water per channel. + if (totalResponseBufferSize.get() >= nettyResponseBufferHighWaterMarkGlobal + || serverRpcConnection.getResponseBufferPendingSize() + >= nettyResponseBufferHighWaterMarkPerChannel) { + final ServerCall callTooBig = serverRpcConnection + .createCall(id, serverRpcConnection.service, null, null, null, null, totalRequestSize, + null, 0, serverRpcConnection.callCleanup); + //for client compatibility, re-use the CALL_QUEUE_TOO_BIG_EXCEPTION + this.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); + + String msg = this.server.getServerName() + + " netty response buffer size reach the global limit or channel's, client too slow?"; + + callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, msg); + callTooBig.sendResponseIfReady(); + if (RpcServer.LOG.isTraceEnabled()) { + RpcServer.LOG.trace( + msg + " global current =" + totalResponseBufferSize.get() + " ;per Channel current =" + + serverRpcConnection.getResponseBufferPendingSize() + " global HighWaterMark =" + + nettyResponseBufferHighWaterMarkGlobal + " ;per Channel HighWaterMark=" + + nettyResponseBufferHighWaterMarkPerChannel); + } + return false; + } + return true; + } + + public long getResponseQueueSize() { + return this.totalResponseBufferSize.get(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java index ffa16bfb46..aca222503d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java @@ -129,4 +129,10 @@ class NettyServerRpcConnection extends ServerRpcConnection { protected void doRespond(RpcResponse resp) { channel.writeAndFlush(resp); } + + @Override + public long getResponseBufferPendingSize() { + return this.channel.unsafe().outboundBuffer().totalPendingWriteBytes(); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 686d5785cc..df6d34ee9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -791,4 +791,38 @@ public abstract class RpcServer implements RpcServerInterface, public void setRsRpcServices(RSRpcServices rsRpcServices) { this.rsRpcServices = rsRpcServices; } + + public boolean rpcHealthCheck(ServerRpcConnection serverRpcConnection, int id, + long totalRequestSize) throws IOException { + + if (!checkCallQueueBytes(serverRpcConnection, id, totalRequestSize)) { + return false; + } + if (!checkResponseBufferBytes(serverRpcConnection, id, totalRequestSize)) { + return false; + } + return true; + } + + private boolean checkCallQueueBytes(ServerRpcConnection serverRpcConnection, int id, + long totalRequestSize) throws IOException { + // Enforcing the call queue size, this triggers a retry in the client + // This is a bit late to be doing this check - we have already read in the + // total request. + if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) { + final ServerCall callTooBig = serverRpcConnection + .createCall(id, serverRpcConnection.service, null, null, null, null, totalRequestSize, + null, 0, serverRpcConnection.callCleanup); + this.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); + callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + this.server.getServerName() + + ", is hbase.ipc.server.max.callqueue.size too small?"); + callTooBig.sendResponseIfReady(); + return false; + } + return true; + } + + abstract boolean checkResponseBufferBytes(ServerRpcConnection serverRpcConnection, int id, + long totalRequestSize) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index 17bb362b49..9de9809508 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -626,20 +626,12 @@ abstract class ServerRpcConnection implements Closeable { RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) + " totalRequestSize: " + totalRequestSize + " bytes"); } - // Enforcing the call queue size, this triggers a retry in the client - // This is a bit late to be doing this check - we have already read in the - // total request. - if ((totalRequestSize + - this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) { - final ServerCall callTooBig = createCall(id, this.service, null, null, null, null, - totalRequestSize, null, 0, this.callCleanup); - this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); - callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + this.rpcServer.server.getServerName() + - ", is hbase.ipc.server.max.callqueue.size too small?"); - callTooBig.sendResponseIfReady(); + + //check rpcServer health, to return immediately when check fail + if (!this.rpcServer.rpcHealthCheck(this, id, totalRequestSize)) { return; } + MethodDescriptor md = null; Message param = null; CellScanner cellScanner = null; @@ -829,4 +821,6 @@ abstract class ServerRpcConnection implements Closeable { return this.length; } } + + public abstract long getResponseBufferPendingSize(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java index 13a3cf7171..289ae17075 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java @@ -698,4 +698,11 @@ public class SimpleRpcServer extends RpcServer { } } + @Override + boolean checkResponseBufferBytes(ServerRpcConnection serverRpcConnection, int id, + long totalRequestSize) throws IOException { + //Not check in SimpleRpcServer + return true; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java index b4b5f3369d..54386da697 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java @@ -353,4 +353,9 @@ class SimpleServerRpcConnection extends ServerRpcConnection { protected void doRespond(RpcResponse resp) throws IOException { responder.doRespond(this, resp); } + + @Override + public long getResponseBufferPendingSize() { + throw new UnsupportedOperationException("Not implemented"); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java index b45dd5b286..2e7881b515 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -111,5 +110,4 @@ public class TestNettyRpcServer { table.close(); } } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java index 560190b0e3..cfeaf84e84 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java @@ -17,22 +17,54 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.ipc.NettyRpcServer.NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_PER_CHANNEL; +import static org.apache.hadoop.hbase.ipc.NettyRpcServer.NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_TOTAL; +import static org.apache.hadoop.hbase.ipc.RpcServer.LOG; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -141,4 +173,176 @@ public class TestRpcServer { pool.putbackBuffer(buf); } } + + @Test(timeout = 120000) + public void testRSOccupiedMoreResponseBufferSize() throws Exception { + //In this case, with 100 concurrences by async Clients, and the result of Get request is only + //about 10 bytes, only one socket, RS can occupy more than 100M heap for netty response buffer. + + Server mockServer = mock(Server.class); + RSRpcServices mockRSRpcServices = mock(RSRpcServices.class); + + //mock result + ClientProtos.GetResponse.Builder builder = ClientProtos.GetResponse.newBuilder(); + + byte[] buffer = new byte[10]; + new Random().nextBytes(buffer); + KeyValue[] kvs = genKVs("testRow".getBytes(), "testFamily".getBytes(), buffer, 1, 100); + Arrays.sort(kvs, CellComparator.getInstance()); + Result r = Result.create(kvs); + + ClientProtos.Result pbr = ProtobufUtil.toResult(r); + builder.setResult(pbr); + + when(mockRSRpcServices.get(any(RpcController.class), any(ClientProtos.GetRequest.class))) + .thenReturn(builder.build()); + Configuration configuration = new Configuration(); + FifoRpcScheduler rpcScheduler = new FifoRpcScheduler(configuration, 1); + configuration.set(NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_TOTAL, "2147483648"); + configuration.set(NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_PER_CHANNEL, "2147483648"); + List bssi = + new ArrayList(2); + bssi.add(new RpcServer.BlockingServiceAndInterface( + ClientProtos.ClientService.newReflectiveBlockingService(mockRSRpcServices), + ClientProtos.ClientService.BlockingInterface.class)); + NettyRpcServer nettyRpcServer = + new NettyRpcServer(mockServer, "unitTestNRC", bssi, new InetSocketAddress(32323), + configuration, rpcScheduler, false); + nettyRpcServer.start(); + + int threadCount = 100; + CountDownLatch countDownLatch = new CountDownLatch(threadCount * 2); + NettyRpcClient rpcClient = new NettyRpcClient(configuration); + final RpcCallback asyncCallback = + new RpcCallback() { + @Override + public void run(ClientProtos.GetResponse parameter) { + try { + //Simulate slow process in clients, sleep 10 ms + countDownLatch.countDown(); + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }; + RpcChannel channel = rpcClient + .createRpcChannel(ServerName.valueOf("localhost:32323", 123456), mock(User.class), 60000); + final ClientProtos.ClientService.Interface asyncStub = + ClientProtos.ClientService.newStub(channel); + final Get get = new Get("testRow".getBytes()); + + List threads = Lists.newArrayList(); + + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + for (int j = 0; j < 500; j++) { + asyncStub.get(new HBaseRpcControllerImpl(), + RequestConverter.buildGetRequest("testRegion".getBytes(), get), asyncCallback); + countDownLatch.countDown(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + threads.add(t); + } + + for (Thread t : threads) { + t.start(); + } + + countDownLatch.await(10000, TimeUnit.SECONDS); + //waiting nettyWriterBufferHighWaterMarkRoutine to sum all of response buffers size. + Thread.sleep(6000); + + //RS response buffer will occupied almost 100M heap + LOG.debug("nettyRpcServer.getResponseQueueSize() =" + nettyRpcServer.getResponseQueueSize()); + Assert.assertTrue(nettyRpcServer.getResponseQueueSize() > 40 * 1024 * 1024); + + //close client + rpcClient.close(); + // netty close timeout is 15s by default. + Thread.sleep(16000); + //when socket closed, the response buffer size will become to 0 by removing this socket. + Assert.assertEquals(nettyRpcServer.getResponseQueueSize(), 0); + nettyRpcServer.stop(); + } + + @Test(timeout = 120000) + public void testExceptionWhenResponseBufferReachThrottle() throws Exception { + Server mockServer = mock(Server.class); + RSRpcServices mockRSRpcServices = mock(RSRpcServices.class); + + //mock result + ClientProtos.GetResponse.Builder builder = ClientProtos.GetResponse.newBuilder(); + + byte[] buffer = new byte[10]; + new Random().nextBytes(buffer); + KeyValue[] kvs = genKVs("testRow".getBytes(), "testFamily".getBytes(), buffer, 1, 100); + Arrays.sort(kvs, CellComparator.getInstance()); + Result r = Result.create(kvs); + + ClientProtos.Result pbr = ProtobufUtil.toResult(r); + builder.setResult(pbr); + + when(mockRSRpcServices.get(any(RpcController.class), any(ClientProtos.GetRequest.class))) + .thenReturn(builder.build()); + Configuration configuration = new Configuration(); + FifoRpcScheduler rpcScheduler = new FifoRpcScheduler(configuration, 1); + configuration.set(NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_TOTAL, "2147483648"); + configuration.set(NETTY_RESPONSE_BUFFER_HIGH_WATERMARK_PER_CHANNEL, "0"); + List bssi = + new ArrayList(2); + bssi.add(new RpcServer.BlockingServiceAndInterface( + ClientProtos.ClientService.newReflectiveBlockingService(mockRSRpcServices), + ClientProtos.ClientService.BlockingInterface.class)); + NettyRpcServer nettyRpcServer = + new NettyRpcServer(mockServer, "unitTestNRC", bssi, new InetSocketAddress(32323), + configuration, rpcScheduler, false); + nettyRpcServer.start(); + + NettyRpcClient rpcClient = new NettyRpcClient(configuration); + RpcChannel channel = rpcClient + .createRpcChannel(ServerName.valueOf("localhost:32323", 123456), mock(User.class), 60000); + final ClientProtos.ClientService.Interface asyncStub = + ClientProtos.ClientService.newStub(channel); + final Get get = new Get("testRow".getBytes()); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + HBaseRpcControllerImpl hBaseRpcController = new HBaseRpcControllerImpl(); + asyncStub + .get(hBaseRpcController, RequestConverter.buildGetRequest("testRegion".getBytes(), get), + res -> countDownLatch.countDown()); + + countDownLatch.await(10000, TimeUnit.SECONDS); + Assert.assertTrue(hBaseRpcController.getFailed() != null); + Assert.assertTrue(hBaseRpcController.getFailed().toString().contains( + "netty response buffer size reach the global limit or channel's, client too slow")); + + //close client + rpcClient.close(); + // netty close timeout is 15s by default. + Thread.sleep(16000); + //when socket closed, the response buffer size will become to 0 by removing this socket. + Assert.assertEquals(nettyRpcServer.getResponseQueueSize(), 0); + nettyRpcServer.stop(); + } + + static KeyValue[] genKVs(final byte[] row, final byte[] family, final byte[] value, + final long timestamp, final int cols) { + KeyValue[] kvs = new KeyValue[cols]; + + for (int i = 0; i < cols; i++) { + kvs[i] = new KeyValue(row, family, Bytes.toBytes(i), timestamp, + Bytes.add(value, Bytes.toBytes(i))); + } + return kvs; + } + + } -- 2.14.3 (Apple Git-98)