From e18288380b63d6859e0df1d8aed7645fb3ab7a23 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 9 Jun 2017 21:35:25 +0800 Subject: [PATCH] HBASE-18199 Race in NettyRpcConnection may cause call stuck in BufferCallBeforeInitHandler forever --- .../hadoop/hbase/ipc/NettyRpcConnection.java | 59 ++++++++++++++-------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 47d7234..63733d6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -71,8 +71,8 @@ class NettyRpcConnection extends RpcConnection { private static final Log LOG = LogFactory.getLog(NettyRpcConnection.class); - private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors - .newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin")); + private static final ScheduledExecutorService RELOGIN_EXECUTOR = + Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin")); private final NettyRpcClient rpcClient; @@ -89,8 +89,8 @@ class NettyRpcConnection extends RpcConnection { rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); this.rpcClient = rpcClient; byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); - this.connectionHeaderPreamble = Unpooled.directBuffer(connectionHeaderPreamble.length) - .writeBytes(connectionHeaderPreamble); + this.connectionHeaderPreamble = + Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble); ConnectionHeader header = getConnectionHeader(); this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize()); this.connectionHeaderWithLength.writeInt(header.getSerializedSize()); @@ -215,8 +215,8 @@ class NettyRpcConnection extends RpcConnection { // add ReadTimeoutHandler to deal with server doesn't response connection header // because of the different configuration in client side and server side - p.addFirst(new ReadTimeoutHandler( - RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); + p.addFirst( + new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); p.addLast(chHandler); connectionHeaderPromise.addListener(new FutureListener() { @Override @@ -281,9 +281,23 @@ class NettyRpcConnection extends RpcConnection { }).channel(); } + private void write(Channel ch, Call call) { + ch.writeAndFlush(call).addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // Fail the call if we failed to write it out. This usually because the channel is + // closed. This is needed because we may shutdown the channel inside event loop and + // there may still be some pending calls in the event loop queue after us. + if (!future.isSuccess()) { + call.setException(toIOE(future.cause())); + } + } + }); + } + @Override - public synchronized void sendRequest(final Call call, HBaseRpcController hrc) - throws IOException { + public synchronized void sendRequest(final Call call, HBaseRpcController hrc) throws IOException { if (reloginInProgress) { throw new IOException("Can not send request because relogin is in progress."); } @@ -309,18 +323,23 @@ class NettyRpcConnection extends RpcConnection { connect(); } scheduleTimeoutTask(call); - channel.writeAndFlush(call).addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - // Fail the call if we failed to write it out. This usually because the channel is - // closed. This is needed because we may shutdown the channel inside event loop and - // there may still be some pending calls in the event loop queue after us. - if (!future.isSuccess()) { - call.setException(toIOE(future.cause())); - } - } - }); + Channel ch = channel; + // We must move the whole writeAndFlush call inside event loop otherwise there will be a + // race condition. + // In netty's DefaultChannelPipeline, it will find the first outbound handler in the + // current thread and then schedule a task to event loop which will start the process from + // that outbound handler. It is possible that the first handler is + // BufferCallBeforeInitHandler when we call writeAndFlush here, but the connection is set + // up at the same time so in the event loop thread we remove the + // BufferCallBeforeInitHandler, and then our writeAndFlush task comes, still calls the + // write method of BufferCallBeforeInitHandler. + // This may be considered as a bug of netty, but anyway there is a work around so let's + // fix it by ourselves first. + if (ch.eventLoop().inEventLoop()) { + write(ch, call); + } else { + ch.eventLoop().execute(() -> write(ch, call)); + } } } }); -- 2.7.4