.../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 13 ++++++++ .../hadoop/hbase/ipc/BlockingRpcConnection.java | 5 +++ .../hadoop/hbase/ipc/NettyRpcConnection.java | 10 ++++++ .../org/apache/hadoop/hbase/ipc/RpcConnection.java | 5 +++ .../hadoop/hbase/security/SaslWrapHandler.java | 38 +++++++++++++--------- 5 files changed, 56 insertions(+), 15 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 401a240..493f693 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -215,6 +215,12 @@ public abstract class AbstractRpcClient implements RpcC if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { LOG.info("Cleanup idle connection to " + conn.remoteId().address); connections.removeValue(conn.remoteId(), conn); + try { + conn.cleanupConnection(); + } catch (Exception e) { + throw new RuntimeException("Exception while cleaning up connection " + conn.toString(), + e); + } } } } @@ -472,6 +478,13 @@ public abstract class AbstractRpcClient implements RpcC conn.shutdown(); } closeInternal(); + for (T conn : connToClose) { + try { + conn.cleanupConnection(); + } catch (Exception e) { + throw new RuntimeException("Exception while cleaning up connections " + conn.toString(), e); + } + } } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index c8b366d..ccd96ff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -685,6 +685,11 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { } @Override + public void cleanupConnection() throws Exception { + // do nothing + } + + @Override public synchronized void sendRequest(final Call call, HBaseRpcController pcrc) throws IOException { pcrc.notifyOnCancel(new RpcCallback() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 5f22dfd..8b40188 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -119,6 +119,16 @@ class NettyRpcConnection extends RpcConnection { shutdown0(); } + @Override + public synchronized void cleanupConnection() throws Exception { + if (connectionHeaderPreamble != null) { + connectionHeaderPreamble.release(); + } + if (connectionHeaderWithLength != null) { + connectionHeaderWithLength.release(); + } + } + private void established(Channel ch) { ch.write(connectionHeaderWithLength.retainedDuplicate()); ChannelPipeline p = ch.pipeline(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 8118b20..9f29687 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -252,4 +252,9 @@ abstract class RpcConnection { public abstract void shutdown(); public abstract void sendRequest(Call call, HBaseRpcController hrc) throws IOException; + + /** + * Does the clean up work after the connection is removed from the connection pool + */ + public abstract void cleanupConnection() throws Exception; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java index ddb4ae9..0f88014 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java @@ -60,21 +60,29 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter { @Override public void flush(ChannelHandlerContext ctx) throws Exception { - if (!queue.isEmpty()) { - ChannelPromise promise = ctx.newPromise(); - int readableBytes = queue.readableBytes(); - ByteBuf buf = queue.remove(readableBytes, promise); - byte[] bytes = new byte[readableBytes]; - buf.readBytes(bytes); - byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length); - ChannelPromise lenPromise = ctx.newPromise(); - ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise); - ChannelPromise contentPromise = ctx.newPromise(); - ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise); - PromiseCombiner combiner = new PromiseCombiner(); - combiner.addAll(lenPromise, contentPromise); - combiner.finish(promise); + ByteBuf wrappedBuffer = null; + try { + if (!queue.isEmpty()) { + ChannelPromise promise = ctx.newPromise(); + int readableBytes = queue.readableBytes(); + ByteBuf buf = queue.remove(readableBytes, promise); + byte[] bytes = new byte[readableBytes]; + buf.readBytes(bytes); + byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length); + ChannelPromise lenPromise = ctx.newPromise(); + ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise); + ChannelPromise contentPromise = ctx.newPromise(); + wrappedBuffer = Unpooled.wrappedBuffer(wrapperBytes); + ctx.write(wrappedBuffer, contentPromise); + PromiseCombiner combiner = new PromiseCombiner(); + combiner.addAll(lenPromise, contentPromise); + combiner.finish(promise); + } + ctx.flush(); + } finally { + if (wrappedBuffer != null) { + wrappedBuffer.release(); + } } - ctx.flush(); } }