From 969009ac7162041cdfeddb16b84926d91dc6684a Mon Sep 17 00:00:00 2001 From: stack Date: Fri, 6 Dec 2019 10:13:23 -0800 Subject: [PATCH] HBASE-23597 Give high priority for meta assign procedure and ServerCrashProcedure which carry meta. --- .../hbase/ipc/BalancedQueueRpcExecutor.java | 10 ++--- .../hadoop/hbase/ipc/ServerRpcConnection.java | 43 ++++++++++++------- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index db8e93fb3a..65a6a2c83a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -22,6 +22,7 @@ import java.util.concurrent.BlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -55,10 +56,9 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { public boolean dispatch(final CallRunner callTask) throws InterruptedException { int queueIndex = balancer.getNextQueue(); BlockingQueue queue = queues.get(queueIndex); - // that means we can overflow by at most size (5), that's ok - if (queue.size() >= currentQueueLimit) { - return false; - } - return queue.offer(callTask); + // That means we can overflow by at most size (5), that's ok. + // Also, take on high priority items rather than send them back even if overloaded. + return callTask.getRpcCall().getPriority() < HConstants.HIGH_QOS && + queue.size() >= currentQueueLimit? false: queue.offer(callTask); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index cff917f89f..f62765fae7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -36,6 +36,7 @@ import org.apache.commons.crypto.random.CryptoRandom; import org.apache.commons.crypto.random.CryptoRandomFactory; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; @@ -619,19 +620,31 @@ abstract class ServerRpcConnection implements Closeable { RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) + " totalRequestSize: " + totalRequestSize + " bytes"); } - // Enforcing the call queue size, this triggers a retry in the client - // This is a bit late to be doing this check - we have already read in the - // total request. - if ((totalRequestSize + - this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) { - final ServerCall callTooBig = createCall(id, this.service, null, null, null, null, - totalRequestSize, null, 0, this.callCleanup); - this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); - callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + this.rpcServer.server.getServerName() + - ", is hbase.ipc.server.max.callqueue.size too small?"); - callTooBig.sendResponseIfReady(); - return; + // Enforcing the call queue size. Triggers a retry in the client. + // It is a bit late to be doing this check - we have already read in the + // total request. Throw CallQueueTooBigException for low priorty + // calls only. Let high priority calls in. Edits to hbase:meta that + // don't land cause trouble messing up Master's view on cluster. This is + // a hack. Fix full Queuings Do CoDel. TODO. + // Note, if no priority, it is low-priority. + long sum = totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum(); + if (sum > this.rpcServer.maxQueueSizeInBytes) { + int priority = header.hasPriority()? header.getPriority(): -1; + if (priority >= HConstants.HIGH_QOS) { + RpcServer.LOG.debug("REMOVE!!! {} {} -- prority just to see incidence", + this.rpcServer.callQueueSizeInBytes.sum(), TextFormat.shortDebugString(header)); + } else { + final ServerCall callTooBig = createCall(id, this.service, null, null, null, null, + totalRequestSize, null, 0, this.callCleanup); + this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); + callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, + "Call queue full on " + this.rpcServer.server.getServerName() + + ", is hbase.ipc.server.max.callqueue.size too small?; current=" + sum + + ", limit=" + this.rpcServer.maxQueueSizeInBytes + ", header=" + + TextFormat.shortDebugString(header)); + callTooBig.sendResponseIfReady(); + return; + } } MethodDescriptor md = null; Message param = null; @@ -704,8 +717,8 @@ abstract class ServerRpcConnection implements Closeable { this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize()); this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION); call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION, - "Call queue is full on " + this.rpcServer.server.getServerName() + - ", too many items queued ?"); + "Failed dispatch; call queue full on " + this.rpcServer.server.getServerName() + + ", too many items queued ? header=" + TextFormat.shortDebugString(header)); call.sendResponseIfReady(); } } -- 2.19.1