diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java new file mode 100644 index 0000000..beb85c6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +@InterfaceAudience.Private +public class FastPathRWQueueRpcExecutor extends RpcExecutor { + private static final Log LOG = LogFactory.getLog(FastPathRWQueueRpcExecutor.class); + + private final int writeHandlersCount; + private final int readHandlersCount; + private final int scanHandlersCount; + + private FastPathBalancedQueueRpcExecutor writeExecutor; + private FastPathBalancedQueueRpcExecutor readExecutor; + private FastPathBalancedQueueRpcExecutor scanExecutor; + + + public FastPathRWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength, + final PriorityFunction priority, final Configuration conf, final Abortable abortable) { + super(name, handlerCount, maxQueueLength, priority, conf, abortable); + + float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0); + float callqScanShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0); + + writeHandlersCount = RWQueueRpcExecutor.calcNumWriters(handlerCount, callqReadShare); + + int readHandlers = RWQueueRpcExecutor.calcNumReaders(handlerCount, callqReadShare); + + int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * callqScanShare)); + + if(readHandlers > scanHandlers) { + readHandlers -= scanHandlers; + } else { + scanHandlers = 0; + } + readHandlersCount = readHandlers; + scanHandlersCount = scanHandlers; + LOG.info(getName() + " writeHandlers=" + writeHandlersCount + " readHandlers=" + + readHandlersCount + " scanHandlers=" + scanHandlersCount); + if(writeHandlersCount > 0) { + writeExecutor = new FastPathBalancedQueueRpcExecutor(name + ".write", + writeHandlersCount, maxQueueLength, priority, conf, abortable); + } + if(readHandlersCount > 0) { + readExecutor = new FastPathBalancedQueueRpcExecutor(name + ".read", + readHandlersCount, maxQueueLength, priority, conf, abortable); + } + if(scanHandlersCount > 0) { + scanExecutor = new FastPathBalancedQueueRpcExecutor(name + ".scan", + scanHandlersCount, maxQueueLength, priority, conf, abortable); + } + } + + @Override + protected void startHandlers(final int port) { + if(writeHandlersCount > 0) { + writeExecutor.start(port); + } + if(readHandlersCount > 0) { + readExecutor.start(port); + } + if(scanHandlersCount > 0) { + scanExecutor.start(port); + } + } + + @Override + public boolean dispatch(final CallRunner callTask) throws InterruptedException { + RpcCall call = callTask.getRpcCall(); + if (RWQueueRpcExecutor.isWriteRequest(call.getHeader(), call.getParam())) { + return writeExecutor.dispatch(callTask); + } else if (scanHandlersCount > 0 && RWQueueRpcExecutor.isScanRequest(call.getHeader(), call.getParam())) { + return scanExecutor.dispatch(callTask); + } else { + return readExecutor.dispatch(callTask); + } + } + + @Override + public int getQueueLength() { + int length = 0; + if(writeHandlersCount > 0) { + length += writeExecutor.getQueueLength(); + } + if(readHandlersCount > 0) { + length += readExecutor.getQueueLength(); + } + if(scanHandlersCount > 0) { + length += scanExecutor.getQueueLength(); + } + return length; + } + + @Override + protected List> getQueues() { + //Should not call this method in FastPathRWQueueRpcExecutor + List> queues = new ArrayList<>(); + if(writeHandlersCount > 0) { + queues.addAll(writeExecutor.getQueues()); + } + if(readHandlersCount > 0) { + queues.addAll(readExecutor.getQueues()); + } + if(scanHandlersCount > 0) { + queues.addAll(scanExecutor.getQueues()); + } + return queues; + } + + + + + + + + + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 8637f79..a8026e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -196,7 +196,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { return activeScanHandlerCount.get(); } - private boolean isWriteRequest(final RequestHeader header, final Message param) { + public static boolean isWriteRequest(final RequestHeader header, final Message param) { // TODO: Is there a better way to do this? if (param instanceof MultiRequest) { MultiRequest multi = (MultiRequest)param; @@ -229,7 +229,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { return false; } - private boolean isScanRequest(final RequestHeader header, final Message param) { + public static boolean isScanRequest(final RequestHeader header, final Message param) { if (param instanceof ScanRequest) { // The first scan request will be executed as a "short read" ScanRequest request = (ScanRequest)param; @@ -242,7 +242,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { * Calculate the number of writers based on the "total count" and the read share. * You'll get at least one writer. */ - private static int calcNumWriters(final int count, final float readShare) { + public static int calcNumWriters(final int count, final float readShare) { return Math.max(1, count - Math.max(1, (int)Math.round(count * readShare))); } @@ -250,7 +250,7 @@ public class RWQueueRpcExecutor extends RpcExecutor { * Calculate the number of readers based on the "total count" and the read share. * You'll get at least one reader. */ - private static int calcNumReaders(final int count, final float readShare) { + public static int calcNumReaders(final int count, final float readShare) { return count - calcNumWriters(count, readShare); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 616f741..dba0aea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -82,8 +82,13 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs if (callqReadShare > 0) { // at least 1 read handler and 1 write handler - callExecutor = new RWQueueRpcExecutor("deafult.RWQ", Math.max(2, handlerCount), - maxQueueLength, priority, conf, server); + if(RpcExecutor.isFifoQueueType(callQueueType)) { + callExecutor = new FastPathRWQueueRpcExecutor("deafult.FPRWQ", Math.max(2, handlerCount), + maxQueueLength, priority, conf, server); + } else { + callExecutor = new RWQueueRpcExecutor("deafult.RWQ", Math.max(2, handlerCount), + maxQueueLength, priority, conf, server); + } } else { if (RpcExecutor.isFifoQueueType(callQueueType)) { callExecutor = new FastPathBalancedQueueRpcExecutor("deafult.FPBQ", handlerCount,