diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 2b58680..4e7545f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -162,10 +162,13 @@ public class RWQueueRpcExecutor extends RpcExecutor { RpcServer.Call call = callTask.getCall(); int queueIndex; if (isWriteRequest(call.getHeader(), call.param)) { + LOG.info("DISPATCH WRITE " + call.getHeader().getMethodName()); queueIndex = writeBalancer.getNextQueue(); } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) { + LOG.info("DISPATCH SCAN " + call.getHeader().getMethodName()); queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(); } else { + LOG.info("DISPATCH READ " + call.getHeader().getMethodName()); queueIndex = numWriteQueues + readBalancer.getNextQueue(); } queues.get(queueIndex).put(callTask); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRwQueueRpcExecutor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRwQueueRpcExecutor.java new file mode 100644 index 0000000..b8668b2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRwQueueRpcExecutor.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, LargeTests.class}) +public class TestRwQueueRpcExecutor { + private static final Log LOG = LogFactory.getLog(TestRwQueueRpcExecutor.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static void setupConf(Configuration conf) { + conf.setInt("hbase.regionserver.handler.count", 50); + conf.setFloat("hbase.ipc.server.callqueue.handler.factor", 1.0f); + conf.setFloat("hbase.ipc.server.callqueue.read.ratio", 0.3f); + conf.setFloat("hbase.ipc.server.callqueue.scan.ratio", 0.2f); + } + + @Before + public void setup() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(1); + } + + @After + public void tearDown() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Test(timeout=60000) + public void testOps() throws Exception { + TableName tableName = TableName.valueOf("test"); + byte[] familyName = Bytes.toBytes("f"); + byte[] qualifier = Bytes.toBytes("q"); + Table table = UTIL.createTable(tableName, familyName); + try { + Put put = new Put(Bytes.toBytes("row")); + put.add(familyName, qualifier, Bytes.toBytes("value")); + table.put(put); + + RowMutations arm = new RowMutations(Bytes.toBytes("row")); + Put p = new Put(Bytes.toBytes("row")); + p.add(familyName, qualifier, Bytes.toBytes("v2")); + arm.add(p); + table.mutateRow(arm); + + Delete delete = new Delete(Bytes.toBytes("row")); + table.delete(delete); + } finally { + table.close(); + UTIL.deleteTable(tableName); + } + } + + // ========================================================================== + // Helpers + // ========================================================================== + private FileSystem getFileSystem() { + return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem(); + } + + private Path getRootDir() { + return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + } + + private Path getTempDir() { + return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getTempDir(); + } +}