From 613e56ee8297096f5c5d1986c49382531ccd75f2 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Wed, 20 Jan 2016 17:43:22 -0800 Subject: [PATCH] HBASE-15146 Don't block on Reader threads queueing to a scheduler queue --- .../hadoop/hbase/ipc/BalancedQueueRpcExecutor.java | 5 +++-- .../main/java/org/apache/hadoop/hbase/ipc/RpcServer.java | 16 +++++++++++++++- .../org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java | 2 +- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 56424df..3d7293f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -72,9 +72,10 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { } @Override - public void dispatch(final CallRunner callTask) throws InterruptedException { + public void dispatch(final CallRunner callTask) + throws InterruptedException, IllegalStateException { int queueIndex = balancer.getNextQueue(); - queues.get(queueIndex).put(callTask); + queues.get(queueIndex).add(callTask); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index ff4e078..2425ab1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1926,7 +1926,21 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { : null; Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, totalRequestSize, traceInfo, this.addr); - scheduler.dispatch(new CallRunner(RpcServer.this, call)); + try { + scheduler.dispatch(new CallRunner(RpcServer.this, call)); + } catch (IllegalStateException ise) { + final Call callTooBig = + new Call(id, this.service, null, null, null, null, this, + responder, totalRequestSize, null, null); + ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); + metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); + InetSocketAddress address = getListenerAddress(); + setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue is full on " + (address != null ? address : "(channel closed)") + + ", is hbase.ipc.server.max.callqueue.size too small?"); + responder.doRespond(callTooBig); + return; + } } private boolean authorizeConnection() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index b8e9c52..bd2419f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -190,7 +190,7 @@ public class SimpleRpcScheduler extends RpcScheduler { } @Override - public void dispatch(CallRunner callTask) throws InterruptedException { + public void dispatch(CallRunner callTask) throws InterruptedException, IllegalStateException { RpcServer.Call call = callTask.getCall(); int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser()); if (priorityExecutor != null && level > highPriorityLevel) { -- 2.7.0