From f550b3566e63bfd25bae5f725c7e1ee6749cd047 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Wed, 16 Mar 2016 10:13:46 -0700 Subject: [PATCH] HBASE-15470 Add a setting for Priority queue length Summary: Move the config keys to one place Make Two different config keys. One for default, one for priority Test Plan: unit tests Differential Revision: https://reviews.facebook.net/D55575 --- .../main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java | 3 +-- .../src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java | 7 ++++++- .../src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java | 5 +++++ .../java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java | 9 +++++++-- 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index b069a5a..ee36f3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger; * This can be used for HMaster, where no prioritization is needed. */ public class FifoRpcScheduler extends RpcScheduler { - private final int handlerCount; private final int maxQueueLength; private final AtomicInteger queueSize = new AtomicInteger(0); @@ -40,7 +39,7 @@ public class FifoRpcScheduler extends RpcScheduler { public FifoRpcScheduler(Configuration conf, int handlerCount) { this.handlerCount = handlerCount; - this.maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length", + this.maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 22cb195..40c11aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -43,6 +43,7 @@ public abstract class RpcExecutor { private static final Log LOG = LogFactory.getLog(RpcExecutor.class); protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; + protected volatile int currentQueueLimit; private final AtomicInteger activeHandlerCount = new AtomicInteger(0); @@ -219,6 +220,10 @@ public abstract class RpcExecutor { * @param conf updated configuration */ public void resizeQueues(Configuration conf) { - currentQueueLimit = conf.getInt("hbase.ipc.server.max.callqueue.length", currentQueueLimit); + String configKey = RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; + if (name != null && name.toLowerCase().contains("priority")) { + configKey = RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH; + } + currentQueueLimit = conf.getInt(configKey, currentQueueLimit); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index 50886cb..2414e3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -31,6 +31,11 @@ import java.net.InetSocketAddress; @InterfaceStability.Evolving public abstract class RpcScheduler { + public static final String IPC_SERVER_MAX_CALLQUEUE_LENGTH = + "hbase.ipc.server.max.callqueue.length"; + public static final String IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH = + "hbase.ipc.server.priority.max.callqueue.length"; + /** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */ static abstract class Context { public abstract InetSocketAddress getListenerAddress(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 12ee540..0cd34bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -166,8 +166,12 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs PriorityFunction priority, Abortable server, int highPriorityLevel) { - int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length", + + int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + int maxPriorityQueueLength = + conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, maxQueueLength); + this.priority = priority; this.highPriorityLevel = highPriorityLevel; this.abortable = server; @@ -226,7 +230,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs // Create 2 queues to help priorityExecutor be more scalable. this.priorityExecutor = priorityHandlerCount > 0 ? - new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxQueueLength) : null; + new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxPriorityQueueLength) : + null; this.replicationExecutor = replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication", -- 2.7.3