From 671c6dd5987a6203df0e9b14448525c47165087f Mon Sep 17 00:00:00 2001 From: haxiaolin Date: Mon, 30 Jul 2018 11:23:54 +0800 Subject: [PATCH] HBASE-20972 Fix call queue buffer size leaking bug --- .../apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java | 5 +++-- .../main/java/org/apache/hadoop/hbase/ipc/CallRunner.java | 13 ++++++++++--- .../org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java | 5 +++-- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index db8e93fb3a..8bcbd6c274 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -56,9 +56,10 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { int queueIndex = balancer.getNextQueue(); BlockingQueue queue = queues.get(queueIndex); // that means we can overflow by at most size (5), that's ok - if (queue.size() >= currentQueueLimit) { + if (queue.size() >= currentQueueLimit || !queue.offer(callTask)) { + callTask.resetCallQueueSize(); return false; } - return queue.offer(callTask); + return true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index e4763a5c5d..2524afabec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -149,7 +149,7 @@ public class CallRunner { } finally { RpcServer.CurCall.set(null); if (resultPair != null) { - this.rpcServer.addCallSize(call.getSize() * -1); + resetCallQueueSize(); sucessful = true; } } @@ -183,7 +183,7 @@ public class CallRunner { + ": caught: " + StringUtils.stringifyException(e)); } finally { if (!sucessful) { - this.rpcServer.addCallSize(call.getSize() * -1); + resetCallQueueSize(); } cleanup(); } @@ -217,9 +217,16 @@ public class CallRunner { + ": caught: " + StringUtils.stringifyException(e)); } finally { if (!sucessful) { - this.rpcServer.addCallSize(call.getSize() * -1); + resetCallQueueSize(); } cleanup(); } } + + /** + * Subtract call queue size when a call is failed or rejected + */ + protected void resetCallQueueSize() { + this.rpcServer.addCallSize(call.getSize() * -1); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 28a8ecc523..b7d1f86a2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -141,10 +141,11 @@ public class RWQueueRpcExecutor extends RpcExecutor { } BlockingQueue queue = queues.get(queueIndex); - if (queue.size() >= currentQueueLimit) { + if (queue.size() >= currentQueueLimit || !queue.offer(callTask)) { + callTask.resetCallQueueSize(); return false; } - return queue.offer(callTask); + return true; } @Override -- 2.14.1