diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java new file mode 100644 index 0000000000..bdcf113ccb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java @@ -0,0 +1,54 @@ +/** + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; + +public class MetaRWQueueRpcExecutor extends RWQueueRpcExecutor { + public static final String META_CALL_QUEUE_READ_SHARE_CONF_KEY = + "hbase.ipc.server.metacallqueue.read.ratio"; + public static final String META_CALL_QUEUE_SCAN_SHARE_CONF_KEY = + "hbase.ipc.server.metacallqueue.scan.ratio"; + + public MetaRWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength, + final PriorityFunction priority, final Configuration conf, final Abortable abortable) { + super(name, handlerCount, maxQueueLength, priority, conf, abortable); + } + + protected boolean isScanRequest(final RPCProtos.RequestHeader header, final Message param) { + if (param instanceof ClientProtos.ScanRequest) { + // Client read meta use scan request. + return true; + } + return false; + } + + protected float getReadShare(final Configuration conf) { + return conf.getFloat(META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.9f); + } + + protected float getScanShare(final Configuration conf) { + return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 7187a71195..5b7a27bcb7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -71,8 +71,8 @@ public class RWQueueRpcExecutor extends RpcExecutor { final PriorityFunction priority, final Configuration conf, final Abortable abortable) { super(name, handlerCount, maxQueueLength, priority, conf, abortable); - float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0); - float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0); + float callqReadShare = getReadShare(conf); + float callqScanShare = getScanShare(conf); numWriteQueues = calcNumWriters(this.numCallQueues, callqReadShare); writeHandlersCount = Math.max(numWriteQueues, calcNumWriters(handlerCount, callqReadShare)); @@ -195,7 +195,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { return activeScanHandlerCount.get(); } - private boolean isWriteRequest(final RequestHeader header, final Message param) { + protected boolean isWriteRequest(final RequestHeader header, final Message param) { // TODO: Is there a better way to do this? if (param instanceof MultiRequest) { MultiRequest multi = (MultiRequest)param; @@ -228,7 +228,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { return false; } - private boolean isScanRequest(final RequestHeader header, final Message param) { + protected boolean isScanRequest(final RequestHeader header, final Message param) { if (param instanceof ScanRequest) { // The first scan request will be executed as a "short read" ScanRequest request = (ScanRequest)param; @@ -237,6 +237,14 @@ public class RWQueueRpcExecutor extends RpcExecutor { return false; } + protected float getReadShare(final Configuration conf) { + return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0); + } + + protected float getScanShare(final Configuration conf) { + return conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0); + } + /* * Calculate the number of writers based on the "total count" and the read share. * You'll get at least one writer. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 60ecce51f8..824f361900 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -100,10 +100,21 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs } } - // Create 2 queues to help priorityExecutor be more scalable. - this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor( - "priority.FPBQ", priorityHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, - maxPriorityQueueLength, priority, conf, abortable) : null; + float metaCallqReadShare = + conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.9f); + if (metaCallqReadShare > 0) { + // different read/write handler for meta, at least 1 read handler and 1 write handler + this.priorityExecutor = + new MetaRWQueueRpcExecutor("priority.RWQ", Math.max(2, priorityHandlerCount), + maxPriorityQueueLength, priority, conf, server); + } else { + // Create 2 queues to help priorityExecutor be more scalable. + this.priorityExecutor = priorityHandlerCount > 0 ? + new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount, + RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, + abortable) : + null; + } this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount,