From d74f4af1b70d277e0c176b6f2cef49cf0a6f29a4 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Wed, 20 Jan 2016 17:43:22 -0800 Subject: [PATCH] HBASE-15146 Don't block on Reader threads queueing to a scheduler queue --- .../apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java | 5 +++-- .../java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java | 12 +++++++++++- .../org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java | 4 ++-- .../main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java | 2 +- .../main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java | 2 +- .../main/java/org/apache/hadoop/hbase/ipc/RpcServer.java | 13 ++++++++++++- .../org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java | 8 ++++---- 7 files changed, 34 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 56424df..70d9c4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -72,9 +72,10 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { } @Override - public void dispatch(final CallRunner callTask) throws InterruptedException { + public boolean dispatch(final CallRunner callTask) + throws InterruptedException, IllegalStateException { int queueIndex = balancer.getNextQueue(); - queues.get(queueIndex).put(callTask); + return queues.get(queueIndex).offer(callTask); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index 621a8ef..a40f4fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * A very simple {@code }RpcScheduler} that serves incoming requests in order. @@ -34,6 +36,7 @@ public class FifoRpcScheduler extends RpcScheduler { private final int handlerCount; private final int maxQueueLength; + private final AtomicInteger queueSize = new AtomicInteger(0); private ThreadPoolExecutor executor; public FifoRpcScheduler(Configuration conf, int handlerCount) { @@ -65,7 +68,13 @@ public class FifoRpcScheduler extends RpcScheduler { } @Override - public void dispatch(final CallRunner task) throws IOException, InterruptedException { + public boolean dispatch(final CallRunner task) throws IOException, InterruptedException { + // Executors provide no offer, so make our own. + int queued = queueSize.incrementAndGet(); + if (queued >= maxQueueLength) { + queueSize.decrementAndGet(); + return false; + } executor.submit(new Runnable() { @Override public void run() { @@ -73,6 +82,7 @@ public class FifoRpcScheduler extends RpcScheduler { task.run(); } }); + return true; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 8f19bd6..d14e052 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -160,7 +160,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { } @Override - public void dispatch(final CallRunner callTask) throws InterruptedException { + public boolean dispatch(final CallRunner callTask) throws InterruptedException, IllegalStateException { RpcServer.Call call = callTask.getCall(); int queueIndex; if (isWriteRequest(call.getHeader(), call.param)) { @@ -170,7 +170,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { } else { queueIndex = numWriteQueues + readBalancer.getNextQueue(); } - queues.get(queueIndex).put(callTask); + return queues.get(queueIndex).offer(callTask); } private boolean isWriteRequest(final RequestHeader header, final Message param) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 27750a7..017bf39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -86,7 +86,7 @@ public abstract class RpcExecutor { public abstract int getQueueLength(); /** Add the request to the executor queue */ - public abstract void dispatch(final CallRunner callTask) throws InterruptedException; + public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException; /** Returns the list of request queues */ protected abstract List> getQueues(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index f273865..fffe7f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -58,7 +58,7 @@ public abstract class RpcScheduler { * * @param task the request to be dispatched */ - public abstract void dispatch(CallRunner task) throws IOException, InterruptedException; + public abstract boolean dispatch(CallRunner task) throws IOException, InterruptedException; /** Retrieves length of the general queue for metrics. */ public abstract int getGeneralQueueLength(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index ff4e078..46f7626 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1926,7 +1926,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { : null; Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, traceInfo, this.addr); - scheduler.dispatch(new CallRunner(RpcServer.this, call)); + + if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) { + callQueueSize.add(-1 * call.getSize()); + + ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); + InetSocketAddress address = getListenerAddress(); + setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + (address != null ? address : "(channel closed)") + + ", too many items queued ?"); + responder.doRespond(call); + } } private boolean authorizeConnection() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index b8e9c52..a31e586 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -190,15 +190,15 @@ public class SimpleRpcScheduler extends RpcScheduler { } @Override - public void dispatch(CallRunner callTask) throws InterruptedException { + public boolean dispatch(CallRunner callTask) throws InterruptedException, IllegalStateException { RpcServer.Call call = callTask.getCall(); int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser()); if (priorityExecutor != null && level > highPriorityLevel) { - priorityExecutor.dispatch(callTask); + return priorityExecutor.dispatch(callTask); } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { - replicationExecutor.dispatch(callTask); + return replicationExecutor.dispatch(callTask); } else { - callExecutor.dispatch(callTask); + return callExecutor.dispatch(callTask); } } -- 2.7.0