From bd4b835a387740ee0079f23bcbb4893726be7b3e Mon Sep 17 00:00:00 2001 From: Mikhail Antonov Date: Mon, 22 Feb 2016 07:14:58 -0800 Subject: [PATCH] online reconfiguration of call queue size --- .../hadoop/hbase/ipc/BalancedQueueRpcExecutor.java | 20 +++++++++++++++++++- .../apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java | 5 +++++ .../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 2 ++ .../java/org/apache/hadoop/hbase/ipc/RpcServer.java | 3 +++ .../apache/hadoop/hbase/ipc/SimpleRpcScheduler.java | 14 +++++++++++++- 5 files changed, 42 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 79b4ec8..501cb04 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -37,6 +37,9 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; @InterfaceStability.Evolving public class BalancedQueueRpcExecutor extends RpcExecutor { + public static final int CALL_QUEUE_SIZE_HARD_LIMIT = 250; // nobody should need more than that + public volatile int currentQueueLimit; + protected final List> queues; private final QueueBalancer balancer; @@ -67,6 +70,11 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { protected void initializeQueues(final int numQueues, final Class queueClass, Object... initargs) { for (int i = 0; i < numQueues; ++i) { + // nasty, right? + if (initargs.length > 0) { + currentQueueLimit = (int) initargs[0]; + initargs[0] = CALL_QUEUE_SIZE_HARD_LIMIT; + } queues.add((BlockingQueue) ReflectionUtils.newInstance(queueClass, initargs)); } } @@ -74,7 +82,12 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { @Override public boolean dispatch(final CallRunner callTask) throws InterruptedException { int queueIndex = balancer.getNextQueue(); - return queues.get(queueIndex).offer(callTask); + BlockingQueue queue = queues.get(queueIndex); + // that means we can overflow by at most size (5), that's ok + if (queue.size() >= currentQueueLimit) { + return false; + } + return queue.offer(callTask); } @Override @@ -90,4 +103,9 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { public List> getQueues() { return queues; } + + @Override + public void resizeQueues(Configuration conf) { + currentQueueLimit = conf.getInt("hbase.ipc.server.max.callqueue.length", currentQueueLimit); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index de4b4de..c4eb3f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -160,6 +160,11 @@ public class RWQueueRpcExecutor extends RpcExecutor { } @Override + public void resizeQueues(Configuration conf) { + // NO OP + } + + @Override public boolean dispatch(final CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); int queueIndex; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 017bf39..f3f1c16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -169,6 +169,8 @@ public abstract class RpcExecutor { } } + public abstract void resizeQueues(Configuration conf); + public static abstract class QueueBalancer { /** * @return the index of the next queue to which a request should be inserted diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 51de6af..f6933f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -2043,6 +2043,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { @Override public void onConfigurationChange(Configuration newConf) { initReconfigurable(newConf); + if (scheduler instanceof ConfigurationObserver) { + ((ConfigurationObserver)scheduler).onConfigurationChange(newConf); + } } private void initReconfigurable(Configuration confToLoad) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 8de714d..81b86fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; /** @@ -36,7 +37,7 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue; */ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving -public class SimpleRpcScheduler extends RpcScheduler { +public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver { private static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class); public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = @@ -56,6 +57,17 @@ public class SimpleRpcScheduler extends RpcScheduler { = "hbase.ipc.server.queue.max.call.delay"; /** + * Resize call queues; + * @param conf new configuration + */ + @Override + public void onConfigurationChange(Configuration conf) { + callExecutor.resizeQueues(conf); + priorityExecutor.resizeQueues(conf); + replicationExecutor.resizeQueues(conf); + } + + /** * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. * It uses the calculated "deadline" e.g. to deprioritize long-running job * -- 1.9.5