diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index a4e6578..a92a3b0 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -197,6 +197,14 @@ possible configurations would overwhelm and obscure the important. + hbase.ipc.server.callqueue.scan.share + 0 + Used in conjunction with hbase.ipc.server.callqueue.read.share + will split the read call queues into small-read and long-read queues. + A value of 0 or 1 indicate to use the same set of queues for gets and scans. + + + hbase.regionserver.msginterval 3000 Interval between messages from the RegionServer to Master diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 247b7da..3876248 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -40,6 +41,7 @@ import com.google.protobuf.Message; /** * RPC Executor that uses different queues for reads and writes. + * With the options to use different queues/executors for gets and scans. * Each handler has its own queue and there is no stealing. */ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @@ -51,20 +53,34 @@ public class RWQueueRpcExecutor extends RpcExecutor { private final Random balancer = new Random(); private final int writeHandlersCount; private final int readHandlersCount; + private final int scanHandlersCount; private final int numWriteQueues; private final int numReadQueues; + private final int numScanQueues; public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final float readShare, final int maxQueueLength) { - this(name, handlerCount, numQueues, readShare, maxQueueLength, + this(name, handlerCount, numQueues, readShare, maxQueueLength, 0, LinkedBlockingQueue.class); + } + + public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final float scanShare, final int maxQueueLength) { + this(name, handlerCount, numQueues, readShare, scanShare, maxQueueLength, LinkedBlockingQueue.class); } public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, final float readShare, final int maxQueueLength, final Class readQueueClass, Object... readQueueInitArgs) { + this(name, handlerCount, numQueues, readShare, 0, maxQueueLength, + readQueueClass, readQueueInitArgs); + } + + public RWQueueRpcExecutor(final String name, final int handlerCount, final int numQueues, + final float readShare, final float scanShare, final int maxQueueLength, + final Class readQueueClass, Object... readQueueInitArgs) { this(name, calcNumWriters(handlerCount, readShare), calcNumReaders(handlerCount, readShare), - calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), + calcNumWriters(numQueues, readShare), calcNumReaders(numQueues, readShare), scanShare, LinkedBlockingQueue.class, new Object[] {maxQueueLength}, readQueueClass, ArrayUtils.addAll(new Object[] {maxQueueLength}, readQueueInitArgs)); } @@ -73,23 +89,45 @@ public class RWQueueRpcExecutor extends RpcExecutor { final int numWriteQueues, final int numReadQueues, final Class writeQueueClass, Object[] writeQueueInitArgs, final Class readQueueClass, Object[] readQueueInitArgs) { + this(name, writeHandlers, readHandlers, numWriteQueues, numReadQueues, 0, + writeQueueClass, writeQueueInitArgs, readQueueClass, readQueueInitArgs); + } + + public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers, + int numWriteQueues, int numReadQueues, float scanShare, + final Class writeQueueClass, Object[] writeQueueInitArgs, + final Class readQueueClass, Object[] readQueueInitArgs) { super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues)); + int numScanQueues = (int)Math.floor(numReadQueues * scanShare); + int scanHandlers = (int)Math.floor(readHandlers * scanShare); + if ((numReadQueues - numScanQueues) > 0) { + numReadQueues -= numScanQueues; + readHandlers -= scanHandlers; + } else { + numScanQueues = 0; + scanHandlers = 0; + } + this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues); this.readHandlersCount = Math.max(readHandlers, numReadQueues); + this.scanHandlersCount = Math.max(scanHandlers, numScanQueues); this.numWriteQueues = numWriteQueues; this.numReadQueues = numReadQueues; + this.numScanQueues = numScanQueues; queues = new ArrayList>(writeHandlersCount + readHandlersCount); LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount + - " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount); + " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + + ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues + + " scanHandlers=" + scanHandlersCount)); for (int i = 0; i < numWriteQueues; ++i) { queues.add((BlockingQueue) ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs)); } - for (int i = 0; i < numReadQueues; ++i) { + for (int i = 0; i < (numReadQueues + numScanQueues); ++i) { queues.add((BlockingQueue) ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs)); } @@ -99,6 +137,8 @@ public class RWQueueRpcExecutor extends RpcExecutor { protected void startHandlers(final int port) { startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port); startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port); + startHandlers(".scan", scanHandlersCount, queues, + numWriteQueues + numReadQueues, numScanQueues, port); } @Override @@ -107,6 +147,8 @@ public class RWQueueRpcExecutor extends RpcExecutor { int queueIndex; if (isWriteRequest(call.getHeader(), call.param)) { queueIndex = balancer.nextInt(numWriteQueues); + } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) { + queueIndex = numWriteQueues + numReadQueues + balancer.nextInt(numScanQueues); } else { queueIndex = numWriteQueues + balancer.nextInt(numReadQueues); } @@ -126,6 +168,19 @@ public class RWQueueRpcExecutor extends RpcExecutor { } } } + if (methodName.equalsIgnoreCase("mutate")) { + return true; + } + return false; + } + + private boolean isScanRequest(final RequestHeader header, final Message param) { + String methodName = header.getMethodName(); + if (methodName.equalsIgnoreCase("scan")) { + // The first scan request will be executed as a "short read" + ScanRequest request = (ScanRequest)param; + return request.hasScannerId(); + } return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 0458c00..2313861 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -39,6 +39,7 @@ public class SimpleRpcScheduler extends RpcScheduler { public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class); public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.read.share"; + public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.scan.share"; public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor"; @@ -112,6 +113,7 @@ public class SimpleRpcScheduler extends RpcScheduler { String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE); float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0); + float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0); float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0); int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor)); @@ -123,10 +125,11 @@ public class SimpleRpcScheduler extends RpcScheduler { if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) { CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority); callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues, - callqReadShare, maxQueueLength, BoundedPriorityBlockingQueue.class, callPriority); + callqReadShare, callqScanShare, maxQueueLength, + BoundedPriorityBlockingQueue.class, callPriority); } else { callExecutor = new RWQueueRpcExecutor("default", handlerCount, numCallQueues, - callqReadShare, maxQueueLength); + callqReadShare, callqScanShare, maxQueueLength); } } else { // multiple queues diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index d4a79b6..9d51915 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.util.Threads; import org.junit.Before; import org.junit.Test; @@ -46,6 +47,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; @@ -185,39 +187,9 @@ public class TestSimpleRpcScheduler { when(priority.getDeadline(eq(hugeHead), any(Message.class))).thenReturn(100L); final ArrayList work = new ArrayList(); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) { - synchronized (work) { - work.add(10); - } - Threads.sleepWithoutInterrupt(100); - return null; - } - }).when(smallCallTask).run(); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) { - synchronized (work) { - work.add(50); - } - Threads.sleepWithoutInterrupt(100); - return null; - } - }).when(largeCallTask).run(); - - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) { - synchronized (work) { - work.add(100); - } - Threads.sleepWithoutInterrupt(100); - return null; - } - }).when(hugeCallTask).run(); + doAnswerTaskExecution(smallCallTask, work, 10, 250); + doAnswerTaskExecution(largeCallTask, work, 50, 250); + doAnswerTaskExecution(hugeCallTask, work, 100, 250); scheduler.dispatch(smallCallTask); scheduler.dispatch(smallCallTask); @@ -253,4 +225,84 @@ public class TestSimpleRpcScheduler { scheduler.stop(); } } + + @Test + public void testScanQueues() throws Exception { + Configuration schedConf = HBaseConfiguration.create(); + schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); + schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f); + schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); + + PriorityFunction priority = mock(PriorityFunction.class); + when(priority.getPriority(any(RequestHeader.class), any(Message.class))) + .thenReturn(HConstants.NORMAL_QOS); + + RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, + HConstants.QOS_THRESHOLD); + try { + scheduler.start(); + + CallRunner putCallTask = mock(CallRunner.class); + RpcServer.Call putCall = mock(RpcServer.Call.class); + RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); + when(putCallTask.getCall()).thenReturn(putCall); + when(putCall.getHeader()).thenReturn(putHead); + + CallRunner getCallTask = mock(CallRunner.class); + RpcServer.Call getCall = mock(RpcServer.Call.class); + RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); + when(getCallTask.getCall()).thenReturn(getCall); + when(getCall.getHeader()).thenReturn(getHead); + + CallRunner scanCallTask = mock(CallRunner.class); + RpcServer.Call scanCall = mock(RpcServer.Call.class); + scanCall.param = ScanRequest.newBuilder().setScannerId(1).build(); + RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); + when(scanCallTask.getCall()).thenReturn(scanCall); + when(scanCall.getHeader()).thenReturn(scanHead); + + ArrayList work = new ArrayList(); + doAnswerTaskExecution(putCallTask, work, 1, 1000); + doAnswerTaskExecution(getCallTask, work, 2, 1000); + doAnswerTaskExecution(scanCallTask, work, 3, 1000); + + // There are 3 queues: [puts], [gets], [scans] + // so the calls will be interleaved + scheduler.dispatch(putCallTask); + scheduler.dispatch(putCallTask); + scheduler.dispatch(putCallTask); + scheduler.dispatch(getCallTask); + scheduler.dispatch(getCallTask); + scheduler.dispatch(getCallTask); + scheduler.dispatch(scanCallTask); + scheduler.dispatch(scanCallTask); + scheduler.dispatch(scanCallTask); + + while (work.size() < 6) { + Threads.sleepWithoutInterrupt(100); + } + + for (int i = 0; i < work.size() - 2; i += 3) { + assertNotEquals(work.get(i + 0), work.get(i + 1)); + assertNotEquals(work.get(i + 0), work.get(i + 2)); + assertNotEquals(work.get(i + 1), work.get(i + 2)); + } + } finally { + scheduler.stop(); + } + } + + private void doAnswerTaskExecution(final CallRunner callTask, + final ArrayList results, final int value, final int sleepInterval) { + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + synchronized (results) { + results.add(value); + } + Threads.sleepWithoutInterrupt(sleepInterval); + return null; + } + }).when(callTask).run(); + } }