diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index a4e6578..a92a3b0 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -197,6 +197,14 @@ possible configurations would overwhelm and obscure the important.
+ hbase.ipc.server.callqueue.scan.share
+ 0
+ Used in conjunction with hbase.ipc.server.callqueue.read.share
+ will split the read call queues into small-read and long-read queues.
+ A value of 0 or 1 indicate to use the same set of queues for gets and scans.
+
+
+
hbase.regionserver.msginterval
3000
Interval between messages from the RegionServer to Master
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 247b7da..3876248 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -40,6 +41,7 @@ import com.google.protobuf.Message;
/**
* RPC Executor that uses different queues for reads and writes.
+ * With the options to use different queues/executors for gets and scans.
* Each handler has its own queue and there is no stealing.
*/
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@@ -51,20 +53,34 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private final Random balancer = new Random();
private final int writeHandlersCount;
private final int readHandlersCount;
+ private final int scanHandlersCount;
private final int numWriteQueues;
private final int numReadQueues;
+ private final int numScanQueues;
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength) {
- this(name, handlerCount, numQueues, readShare, maxQueueLength,
+ this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class);
+ }
+
+ public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final float scanShare, final int maxQueueLength) {
+ this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength,
LinkedBlockingQueue.class);
}
public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
final float readShare, final int maxQueueLength,
final Class extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
+ this(name, handlerCount, numQueues, readShare, 0, maxQueueLength,
+ readQueueClass, readQueueInitArgs);
+ }
+
+ public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues,
+ final float readShare, final float scanShare, final int maxQueueLength,
+ final Class extends BlockingQueue> readQueueClass, Object... readQueueInitArgs) {
this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare),
- calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare),
+ calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare,
LinkedBlockingQueue.class, new Object[] {maxQueueLength},
readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs));
}
@@ -73,23 +89,45 @@ public class RWQueueRpcExecutor extends RpcExecutor {
final int numWriteQueues, final int numReadQueues,
final Class extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
final Class extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
+ this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0,
+ writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs);
+ }
+
+ public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
+ int numWriteQueues, int numReadQueues, float scanShare,
+ final Class extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
+ final Class extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues));
+ int numScanQueues = (int)Math.floor(numReadQueues * scanShare);
+ int scanHandlers = (int)Math.floor(readHandlers * scanShare);
+ if ((numReadQueues - numScanQueues) > 0) {
+ numReadQueues -= numScanQueues;
+ readHandlers -= scanHandlers;
+ } else {
+ numScanQueues = 0;
+ scanHandlers = 0;
+ }
+
this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
this.readHandlersCount = Math.max(readHandlers, numReadQueues);
+ this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
this.numWriteQueues = numWriteQueues;
this.numReadQueues = numReadQueues;
+ this.numScanQueues = numScanQueues;
queues = new ArrayList>(writeHandlersCount + readHandlersCount);
LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
- " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount);
+ " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
+ ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
+ " scanHandlers=" + scanHandlersCount));
for (int i = 0; i < numWriteQueues; ++i) {
queues.add((BlockingQueue)
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
}
- for (int i = 0; i < numReadQueues; ++i) {
+ for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
queues.add((BlockingQueue)
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
}
@@ -99,6 +137,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
protected void startHandlers(final int port) {
startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
+ startHandlers(".scan", scanHandlersCount, queues,
+ numWriteQueues + numReadQueues, numScanQueues, port);
}
@Override
@@ -107,6 +147,8 @@ public class RWQueueRpcExecutor extends RpcExecutor {
int queueIndex;
if (isWriteRequest(call.getHeader(), call.param)) {
queueIndex = balancer.nextInt(numWriteQueues);
+ } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
+ queueIndex = numWriteQueues + numReadQueues + balancer.nextInt(numScanQueues);
} else {
queueIndex = numWriteQueues + balancer.nextInt(numReadQueues);
}
@@ -126,6 +168,19 @@ public class RWQueueRpcExecutor extends RpcExecutor {
}
}
}
+ if (methodName.equalsIgnoreCase("mutate")) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isScanRequest(final RequestHeader header, final Message param) {
+ String methodName = header.getMethodName();
+ if (methodName.equalsIgnoreCase("scan")) {
+ // The first scan request will be executed as a "short read"
+ ScanRequest request = (ScanRequest)param;
+ return request.hasScannerId();
+ }
return false;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 0458c00..2313861 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -39,6 +39,7 @@ public class SimpleRpcScheduler extends RpcScheduler {
public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.read.share";
+ public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.scan.share";
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"hbase.ipc.server.callqueue.handler.factor";
@@ -112,6 +113,7 @@ public class SimpleRpcScheduler extends RpcScheduler {
String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
+ float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));
@@ -123,10 +125,11 @@ public class SimpleRpcScheduler extends RpcScheduler {
if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
- callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority);
+ callqReadShare, callqScanShare, maxQueueLength,
+ BoundedPriorityBlockingQueue.class, callPriority);
} else {
callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues,
- callqReadShare, maxQueueLength);
+ callqReadShare, callqScanShare, maxQueueLength);
}
} else {
// multiple queues
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index d4a79b6..9d51915 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
import org.junit.Test;
@@ -46,6 +47,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
@@ -185,39 +187,9 @@ public class TestSimpleRpcScheduler {
when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L);
final ArrayList work = new ArrayList();
-
- doAnswer(new Answer