From 5b03b459eef2466c9559456b88c6c6b8b73538e9 Mon Sep 17 00:00:00 2001 From: chenheng Date: Mon, 9 May 2016 18:17:19 +0800 Subject: [PATCH] HBASE-15765 RPC handlers should steal from other queues in the same or higher priority --- .../hbase/ipc/AdaptiveLifoCoDelCallQueue.java | 23 +++++++++--- .../hadoop/hbase/ipc/BalancedQueueRpcExecutor.java | 26 ++++++++++++-- .../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 41 +++++++++++++++++++++- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 18 +++++----- hbase-server/src/test/resources/hbase-site.xml | 2 +- 5 files changed, 91 insertions(+), 19 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java index 08c488b..63fd19d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java @@ -176,15 +176,28 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { return queue.toString(); } - // This class does NOT provide generic purpose BlockingQueue implementation, - // so to prevent misuse all other methods throw UnsupportedOperationException. - @Override public CallRunner poll(long timeout, TimeUnit unit) throws InterruptedException { - throw new UnsupportedOperationException("This class doesn't support anything," - + " but take() and offer() methods"); + CallRunner cr; + while(true) { + if (((double) queue.size() / this.maxCapacity) > lifoThreshold) { + numLifoModeSwitches.incrementAndGet(); + cr = queue.pollLast(timeout, unit); + } else { + cr = queue.pollFirst(timeout, unit); + } + if (needToDrop(cr)) { + numGeneralCallsDropped.incrementAndGet(); + cr.drop(); + } else { + return cr; + } + } } + // This class does NOT provide generic purpose BlockingQueue implementation, + // so to prevent misuse all other methods throw UnsupportedOperationException. + @Override public CallRunner poll() { throw new UnsupportedOperationException("This class doesn't support anything," diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index e4205eb..2c0550d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -39,6 +39,7 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { protected final List> queues; private final QueueBalancer balancer; + protected final RpcExecutor highLevelRpcExecutor; public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final int maxQueueLength) { @@ -47,18 +48,28 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final int maxQueueLength, final Configuration conf, final Abortable abortable) { - this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, maxQueueLength); + this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, + null, maxQueueLength); + } + + public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final int maxQueueLength, final Configuration conf, final Abortable abortable, + final RpcExecutor highLevelRpcExecutor) { + this(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, + highLevelRpcExecutor, maxQueueLength); } public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final Class queueClass, Object... initargs) { - this(name, handlerCount, numQueues, null, null, queueClass, initargs); + this(name, handlerCount, numQueues, null, null, queueClass, null, initargs); } public BalancedQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final Configuration conf, final Abortable abortable, - final Class queueClass, Object... initargs) { + final Class queueClass, + final RpcExecutor highLevelRpcExecutor, Object... initargs) { super(name, Math.max(handlerCount, numQueues), conf, abortable); + this.highLevelRpcExecutor = highLevelRpcExecutor; queues = new ArrayList>(numQueues); this.balancer = getBalancer(numQueues); initializeQueues(numQueues, queueClass, initargs); @@ -99,4 +110,13 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { public List> getQueues() { return queues; } + + @Override + protected List> getHighLevelQueues() { + if (this.highLevelRpcExecutor == null) { + return null; + } + return this.highLevelRpcExecutor.getQueues(); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 40c11aa..90910a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -95,6 +96,11 @@ public abstract class RpcExecutor { /** Returns the list of request queues */ protected abstract List> getQueues(); + /** Returns the list of high level request queues */ + protected List> getHighLevelQueues() { + return null; + } + protected void startHandlers(final int port) { List> callQueues = getQueues(); startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port); @@ -121,6 +127,33 @@ public abstract class RpcExecutor { } } + private CallRunner stealTaskFromOtherQueue() throws InterruptedException { + // Firstly, we try to get task from same Level queues + CallRunner task = getTaskFromMaxQueue(getQueues()); + if (task == null) { + // we get task from High Level queues + task = getTaskFromMaxQueue(getHighLevelQueues()); + } + return task; + } + + private CallRunner getTaskFromMaxQueue(List> callQueues) { + if (callQueues == null) { + return null; + } + int max = Integer.MIN_VALUE; + int i = 0, maxIndex = 0; + for (BlockingQueue queue : callQueues) { + if (queue.size() > max) { + max = queue.size(); + maxIndex = i; + } + i++; + } + return callQueues.get(maxIndex).poll(); + } + + protected void consumerLoop(final BlockingQueue myQueue) { boolean interrupted = false; double handlerFailureThreshhold = @@ -130,7 +163,13 @@ public abstract class RpcExecutor { while (running) { try { MonitoredRPCHandler status = RpcServer.getStatus(); - CallRunner task = myQueue.take(); + CallRunner task = myQueue.poll(3000, TimeUnit.MILLISECONDS); + if (task == null) { + task = stealTaskFromOtherQueue(); + if (task == null) { + continue; + } + } task.setStatus(status); try { activeHandlerCount.incrementAndGet(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 431aeeb..f195a26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -191,8 +191,12 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0); int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor)); - LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues); + // Create 2 queues to help priorityExecutor be more scalable. + this.priorityExecutor = priorityHandlerCount > 0 ? + new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxPriorityQueueLength) : + null; + LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues); if (numCallQueues > 1 && callqReadShare > 0) { // multiple read/write queues if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { @@ -216,23 +220,19 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, - conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); + conf, abortable, BoundedPriorityBlockingQueue.class, + this.priorityExecutor, maxQueueLength, callPriority); } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, numCallQueues, - conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength, + conf, abortable, AdaptiveLifoCoDelCallQueue.class, this.priorityExecutor, maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches); } else { callExecutor = new BalancedQueueRpcExecutor("B.default", handlerCount, - numCallQueues, maxQueueLength, conf, abortable); + numCallQueues, maxQueueLength, conf, abortable, this.priorityExecutor); } } - // Create 2 queues to help priorityExecutor be more scalable. - this.priorityExecutor = priorityHandlerCount > 0 ? - new BalancedQueueRpcExecutor("Priority", priorityHandlerCount, 2, maxPriorityQueueLength) : - null; - this.replicationExecutor = replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("Replication", replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml index 1b429b5..bca90a3 100644 --- a/hbase-server/src/test/resources/hbase-site.xml +++ b/hbase-server/src/test/resources/hbase-site.xml @@ -52,7 +52,7 @@ hbase.regionserver.metahandler.count - 6 + 5 hbase.ipc.server.read.threadpool.size -- 1.9.3 (Apple Git-50)