Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java (revision 1220359)
+++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java (working copy)
@@ -174,7 +174,7 @@
// In this test, there is only a single coprocessor (BuggyMasterObserver).
String coprocessorName =
BuggyMasterObserver.class.getName();
- assertTrue(master.getLoadedCoprocessors().equals("[" + coprocessorName + "]"));
+ assertTrue(master.getLoadedCoprocessors().indexOf(coprocessorName)!=-1);
HTableDescriptor htd1 = new HTableDescriptor(TEST_TABLE1);
htd1.addFamily(new HColumnDescriptor(TEST_FAMILY1));
@@ -202,7 +202,7 @@
masterTracker.masterZKNodeWasDeleted);
String loadedCoprocessors = master.getLoadedCoprocessors();
- assertTrue(loadedCoprocessors.equals("[" + coprocessorName + "]"));
+ assertTrue(loadedCoprocessors.indexOf(coprocessorName)!=-1);
// Verify that BuggyMasterObserver has been removed due to its misbehavior
// by creating another table: should not have a problem this time.
Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java (revision 1220359)
+++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java (working copy)
@@ -186,10 +186,9 @@
// Test (part of the) output that should have be printed by master when it aborts:
// (namely the part that shows the set of loaded coprocessors).
// In this test, there is only a single coprocessor (BuggyMasterObserver).
- assertTrue(master.getLoadedCoprocessors().
- equals("[" +
- TestMasterCoprocessorExceptionWithAbort.BuggyMasterObserver.class.getName() +
- "]"));
+ assertTrue(master.getLoadedCoprocessors().indexOf(
+ TestMasterCoprocessorExceptionWithAbort.BuggyMasterObserver.class.getName()
+ )!=-1);
CreateTableThread createTableThread = new CreateTableThread(UTIL);
Index: src/test/java/org/apache/hadoop/hbase/ipc/TestActionPriority.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/ipc/TestActionPriority.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/ipc/TestActionPriority.java (revision 0)
@@ -0,0 +1,289 @@
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.PriorityJobQueue.OperationPriority;
+import org.apache.hadoop.hbase.ipc.TestTablePriority.Worker;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for action priority, use Action_Priority table and scan and put with
+ * different priorities. There are two tests,in first one Get's priority is
+ * 0(high priority, RPC priority = table priority 0+ action priority 0)
+ * scan's priority is 10 (low priority)and in the second test,we switch the
+ * priorities of scan and get.
+ */
+@Category(LargeTests.class)
+public class TestActionPriority {
+ private final static Log LOG = LogFactory.getLog(TestActionPriority.class);
+ static final Random r = new Random();
+ static final int rowNubmer = 100000;
+ static final int threadN = 200;
+ HBaseAdmin admin = null;
+ static final int regionNumber = 10;
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static byte[][] startKeys;
+ static {
+ startKeys = new byte[regionNumber][];
+ for (int i = 0; i < regionNumber; i++) {
+ startKeys[i] = Bytes.toBytes(String.format(
+ "%0" + ((regionNumber + "").length()-1) + "d", i));
+ }
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt(
+ PriorityJobQueue.WAIT_UNIT_CONF_KEY, 1000);
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * set up a cluster and prepare tables.
+ */
+ @Before
+ public void setUp() {
+ try {
+ admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ HTableDescriptor des;
+
+ try {
+ if (admin.tableExists("Action_Priority")) {
+ admin.disableTable("Action_Priority");
+ admin.deleteTable("Action_Priority");
+ }
+ des = new HTableDescriptor("Action_Priority");
+ des.addFamily(new HColumnDescriptor("ff"));
+ des.setValue(PriorityFunction.OPERATION_PRIORITY_KEY,
+ new OperationPriority(10, 0, 0).toBytes());
+ PriorityFunction.setPriority(1, des);
+ admin.createTable(des, startKeys);
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ TEST_UTIL
+ .getMiniHBaseCluster().getMaster().balance();
+ writeData();
+ TEST_UTIL
+ .getMiniHBaseCluster().getMaster().balance();
+ }
+
+ private void writeData() {
+ LOG.info("begion write data into test table");
+ int nPerWorker = rowNubmer / threadN;
+ Worker[] workers = new Worker[threadN];
+ for (int i = 0; i < workers.length; i++) {
+ try {
+ workers[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),
+ "Action_Priority"), "writeData", nPerWorker);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < workers.length; i++) {
+ workers[i].start();
+ }
+ for (int i = 0; i < workers.length; i++) {
+ try {
+ workers[i].join();
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ LOG.info("Write data into test table finished.");
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() {
+
+ }
+
+ /**
+ * Verify whether the function take effect
+ *
+ * @param highs
+ * high priority workers
+ * @param lows
+ * low priority workers
+ */
+
+ @SuppressWarnings("static-access")
+ public void verifyFunction(Worker highs[], Worker lows[]) {
+ boolean highFinished = false;
+ boolean lowFinished = false;
+ long highThroughPut = 0;
+ long lowThroughPut = 0;
+ while (!(highFinished && lowFinished)) {
+ try {
+ Thread.currentThread().sleep(200);
+ } catch (InterruptedException e) {
+ }
+ highThroughPut = 0;
+ lowThroughPut = 0;
+ for (int i = 0; i < highs.length; i++) {
+ highThroughPut += highs[i].getThroughput();
+ lowThroughPut += lows[i].getThroughput();
+ }
+ LOG.info("-------------------------------------------------------------");
+ LOG.info("High priority action type is:" + highs[0].getType()
+ + ", priority:" + highs[0].getActionPriority() + " throughput is:"
+ + highThroughPut);
+ LOG.info("low priority action type is:" + lows[0].getType()
+ + ", priority:" + lows[0].getActionPriority() + " throughput is:"
+ + lowThroughPut);
+
+ highFinished = true;
+ lowFinished = true;
+ for (int i = 0; i < highs.length; i++) {
+ if (highs[i].isAlive()) {
+ highFinished = false;
+ }
+ if (lows[i].isAlive()) {
+ lowFinished = false;
+ }
+ }
+ if (highFinished) {
+ for (int i = 0; i < highs.length; i++) {
+ lows[i].stopWorker();
+ }
+ lowFinished = true;
+ }
+ if (lowFinished) {
+ for (int i = 0; i < highs.length; i++) {
+ highs[i].stopWorker();
+ }
+ highFinished = true;
+ }
+
+ }
+ highThroughPut = 0;
+ lowThroughPut = 0;
+ for (int i = 0; i < highs.length; i++) {
+ highThroughPut += highs[i].getThroughput();
+ lowThroughPut += lows[i].getThroughput();
+ }
+ assertTrue("Action priority works properly", highThroughPut > lowThroughPut);
+ LOG.info("-------------------------------------------------------------");
+ LOG.info("---------------------Test finished --------------------------");
+ LOG.info("High priority action type is:" + highs[0].getType()
+ + ", priority:" + highs[0].getActionPriority() + " throughput is:"
+ + highThroughPut);
+ LOG.info("low priority action type is:" + lows[0].getType() + ", priority:"
+ + lows[0].getActionPriority() + " throughput is:" + lowThroughPut);
+ LOG.info("####### Test for " + highs[0].getType() + ", priority:"
+ + highs[0].getActionPriority() + " ,and " + lows[0].getType()
+ + ", priority:" + lows[0].getActionPriority() + " finished####");
+
+ LOG.info("-------------------------------------------------------------");
+ }
+
+ /**
+ * run the test;
+ */
+ @Test
+ public void testForDifferentActionPriority() {
+ Worker puts[] = new Worker[threadN / 2];
+ Worker scans[] = new Worker[threadN / 2];
+ for (int i = 0; i < puts.length; i++) {
+ try {
+ puts[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),
+ "Action_Priority"), "put", rowNubmer * 2 / threadN);
+ scans[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),
+ "Action_Priority"), "scan", rowNubmer * 2 / threadN);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < puts.length; i++) {
+ puts[i].setActionPriority(0);
+ puts[i].start();
+ }
+ for (int i = 0; i < scans.length; i++) {
+ scans[i].setActionPriority(10);
+ scans[i].start();
+ }
+ verifyFunction(puts, scans);
+
+ try {
+ admin.disableTable("Action_Priority");
+ HTableDescriptor des1 = admin.getTableDescriptor(Bytes
+ .toBytes("Action_Priority"));
+ des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY,
+ new OperationPriority(0, 10, 10).toBytes());
+ admin.modifyTable(Bytes.toBytes("Action_Priority"), des1);
+ admin.enableTable("Action_Priority");
+ GroupTestUtil.balanceTable("Action_Priority", TEST_UTIL
+ .getMiniHBaseCluster().getMaster());
+ } catch (IOException e) {
+ LOG.info(e);
+ }
+ puts = new Worker[threadN / 2];
+ scans = new Worker[threadN / 2];
+ for (int i = 0; i < puts.length; i++) {
+ try {
+
+ puts[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),
+ "Action_Priority"), "put", rowNubmer * 2 / threadN);
+ scans[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),
+ "Action_Priority"), "scan", rowNubmer * 2 / threadN);
+ puts[i].setActionPriority(10);
+ scans[i].setActionPriority(0);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < puts.length; i++) {
+ puts[i].start();
+ scans[i].start();
+ }
+ verifyFunction(scans, puts);
+ }
+
+ public static void main(String args[]) {
+
+ try {
+ setUpBeforeClass();
+ TestActionPriority t = new TestActionPriority();
+ t.setUp();
+ t.testForDifferentActionPriority();
+ tearDownAfterClass();
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+
+ }
+
+}
Index: src/test/java/org/apache/hadoop/hbase/ipc/TestPriorityJobQueue.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/ipc/TestPriorityJobQueue.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/ipc/TestPriorityJobQueue.java (revision 0)
@@ -0,0 +1,139 @@
+/**
+ * Copyright the Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.ipc.PriorityJobQueue;
+import org.apache.hadoop.hbase.ipc.PriorityJobQueue.Job;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test used to ensure the PriorityJobQueue works properly
+ *
+ */
+@Category(SmallTests.class)
+public class TestPriorityJobQueue {
+ private final Log LOG = LogFactory.getLog(TestPriorityJobQueue.class);
+
+ private static int queueSize = 20;
+ private static int testThreadN = 4;
+ private static final int testTimes = 300;
+
+ /**
+ * test the queue's function: threads (not with the lowest priority)
+ * can only get a job which have a higher priority And test the number which
+ * input jobs is equal with the output number
+ */
+ @Test
+ public void testGetJob() {
+ queueSize = 3;
+ testThreadN = 20;
+ doTest();
+ queueSize = 3;
+ testThreadN = 200;
+ doTest();
+ queueSize = 200;
+ testThreadN = 3;
+ doTest();
+ queueSize = 200;
+ testThreadN = 300;
+ doTest();
+ }
+ class TimesWorker extends Thread {
+ public int number = 0;
+ }
+ private void doTest() {
+ final Random r = new Random();
+ final PriorityJobQueue queue = new PriorityJobQueue(
+ queueSize, null, HBaseConfiguration.create());
+ TimesWorker consumer[] = new TimesWorker[testThreadN];
+
+ int pri = 10;
+ for (int i = 0; i < consumer.length; i++, pri--) {
+ consumer[i] = new TimesWorker() {
+ public void run() {
+ while (true) {
+ String j = null;
+ try {
+ j = queue.take();
+ number++;
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ }
+ };
+
+ if (pri < 1)
+ pri = 10;
+ consumer[i].setDaemon(true);
+ consumer[i].setPriority(pri);
+ consumer[i].start();
+ }
+
+ TimesWorker producer[] = new TimesWorker[testThreadN * 2];
+ for (int i = 0; i < producer.length; i++) {
+ producer[i] = new TimesWorker() {
+ public void run() {
+ for (int j = 0; j < testTimes; j++) {
+ int jobpri = (r.nextInt(25) - 10);
+ try {
+ queue.put("" + jobpri, jobpri);
+ number++;
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ }
+ };
+ producer[i].start();
+ }
+ int totalInput=0;
+ for (TimesWorker d :producer) {
+ try {
+ d.join();
+ totalInput+=d.number;
+ } catch (InterruptedException e) {
+ }
+ }
+
+ while (queue.size() != 0) {
+ try {
+ Thread.sleep(300);
+
+ LOG.info("queue size:" + queue.size());
+ } catch (InterruptedException e) {
+ }
+ }
+ int totalOutput=0;
+ for(TimesWorker d:consumer)
+ {
+ totalOutput+=d.number;
+ }
+ assertTrue("all job are finished", totalOutput==totalInput);
+ }
+}
Index: src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriorityHundredRegion.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriorityHundredRegion.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriorityHundredRegion.java (revision 0)
@@ -0,0 +1,654 @@
+/**
+ * Copyright the Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.PriorityJobQueue.OperationPriority;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for table priority, use table A,B,C which have different priorities.
+ * There are two tests,in first one table A's priority is 1(high priority), B is
+ * 5, C is 10 and in the second test,we switch the priorities of A and C.
+ */
+@Category(LargeTests.class)
+public class TestTablePriorityHundredRegion {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static Log LOG = LogFactory
+ .getLog(TestTablePriorityHundredRegion.class);
+ static final Random r = new Random();
+ static int rowNumber = 10000;
+ static final int threadN = 200;
+ HBaseAdmin admin = null;
+ // if there are data in the test tables,you
+ // can set method type to "scan"
+ static String method = "put";
+ static final int regionNumber = 99;
+ private static byte[][] startKeys;
+ private static final byte[] value = new byte[100];
+ static {
+ startKeys = new byte[regionNumber][];
+ for (int i = 0; i < regionNumber; i++) {
+ startKeys[i] = Bytes.toBytes(String.format(
+ "%0" + (regionNumber + "").length() + "d", i));
+ }
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt(
+ PriorityJobQueue.WAIT_UNIT_CONF_KEY, 1000);
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * set up a cluster and prepare tables.
+ */
+ @Before
+ public void setUp() {
+ try {
+ admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ HTableDescriptor des;
+ try {
+ if (admin.tableExists("Table_A")) {
+ admin.disableTable("Table_A");
+ admin.deleteTable("Table_A");
+ }
+ if (admin.tableExists("Table_B")) {
+ admin.disableTable("Table_B");
+ admin.deleteTable("Table_B");
+ }
+ if (admin.tableExists("Table_C")) {
+ admin.disableTable("Table_C");
+ admin.deleteTable("Table_C");
+ }
+ } catch (IOException e1) {
+ LOG.error(e1);
+ }
+ try {
+ des = new HTableDescriptor("Table_A");
+ des.addFamily(new HColumnDescriptor("ff"));
+ des = PriorityFunction.setPriority(1, des);
+ admin.createTable(des, startKeys);
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+ des = new HTableDescriptor("Table_B");
+ des.addFamily(new HColumnDescriptor("ff"));
+ des = PriorityFunction.setPriority(5, des);
+ admin.createTable(des, startKeys);
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+
+ des = new HTableDescriptor("Table_C");
+ des.addFamily(new HColumnDescriptor("ff"));
+ des = PriorityFunction.setPriority(10, des);
+ admin.createTable(des, startKeys);
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ balanceTable("Table_A", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ balanceTable("Table_B", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ balanceTable("Table_C", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ }
+
+ /**
+ * The worker used for test throughput and gather the result
+ */
+ public static class Worker extends Thread {
+ HTable table;
+ String type;
+ private long throughput = 0;
+ long times = 1000;
+ boolean stopFlag = false;
+ int tablePriority;
+ long n = 0;
+ long startTime = 0;
+ int actionPriority = 0;
+
+ /**
+ * Get throughput of this worker.
+ *
+ * @return
+ */
+ public long getThroughput() {
+ long end = System.currentTimeMillis();
+ throughput = (n * 1000) / (end - startTime + 1);
+ return throughput;
+ }
+
+ /**
+ * Get table priority.
+ *
+ * @return table priority
+ */
+ public int getTablePriority() {
+ return tablePriority;
+ }
+
+ /**
+ * Set table priority.
+ */
+ public void setTablePriority(int tablePriority) {
+ this.tablePriority = tablePriority;
+ }
+
+ /**
+ * Get action type.
+ *
+ * @return action type of the worker
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * Get the table used by the worker.
+ *
+ * @return table of the worker
+ */
+ public HTable getTable() {
+ return table;
+ }
+
+ public Worker(HTable t, String type, long times) {
+ this.table = t;
+ this.type = type;
+ this.times = times;
+ }
+
+ public void run() {
+ try {
+ doAction(table, type, times);
+ } catch (IOException e) {
+ LOG.info(e);
+ }
+ }
+
+ public int getActionPriority() {
+ return actionPriority;
+ }
+
+ public void setActionPriority(int actionPriority) {
+ this.actionPriority = actionPriority;
+ }
+
+ /**
+ * Stop the worker.
+ */
+ public void stopWorker() {
+ this.stopFlag = true;
+ }
+
+ private void doAction(HTable t, String type, long times) throws IOException {
+ long start = System.currentTimeMillis();
+ long end = System.currentTimeMillis();
+ startTime = System.currentTimeMillis();
+ int pri = Integer.parseInt(Bytes.toString(t.getTableDescriptor()
+ .getValue(PriorityFunction.PRI_KEY)));
+ Scan s = new Scan();
+ s.setStartRow(Bytes.toBytes(r.nextInt(10) + ""));
+ if (type.equals("writeData")) {
+ t.setAutoFlush(false);
+ t.setWriteBufferSize(1000);
+ }
+ ResultScanner sn = null;
+ boolean scan = false;
+ if (type.equals("scan")) {
+ sn = t.getScanner(s);
+ scan = true;
+ }
+ while (true) {
+ if (n % 10000 == 0) {
+ end = System.currentTimeMillis();
+ // LOG.debug("Thread:" + this.getId() + " type:" + type + " table"
+ // + t.getTableDescriptor().getNameAsString() + " pri :" + pri
+ // + " time:" + (end - start) + " total :" + n + " throughput:"
+ // + (n * 1000) / (end - startTime + 1));
+ throughput = (n * 1000) / (end - startTime + 1);
+ start = end;
+ }
+ if (n % 100 == 0) {
+ end = System.currentTimeMillis();
+ throughput = (n * 1000) / (end - startTime + 1);
+ }
+ Result ret = null;
+ if (scan) {
+ ret = sn.next();
+ if (ret == null) {
+ try {
+ sn.close();
+ } catch (Exception e) {
+ LOG.debug(e);
+ }
+ sn = t.getScanner(s);
+ }
+ } else {
+ put(t, n);
+ }
+ if (stopFlag || n > times) {
+ break;
+ }
+ n++;
+ }
+ t.flushCommits();
+ t.close();
+ }
+ }
+
+ /**
+ * Verify whether the function works properly.
+ *
+ * @param highs
+ * high priority workers
+ * @param lows
+ * low priority workers
+ */
+ @SuppressWarnings("static-access")
+ public void verifyFunction(Worker high[], Worker middle[], Worker low[]) {
+ boolean highFinished = false;
+ boolean lowFinished = false;
+ boolean middleFinished = false;
+ long highThroughPut = 0;
+ long middleThroughPut = 0;
+ long lowThroughPut = 0;
+ while (!(highFinished && lowFinished && middleFinished)) {
+ try {
+ Thread.currentThread().sleep(100);
+ } catch (InterruptedException e) {
+ // ignore exceptions
+ }
+ highThroughPut = 0;
+ middleThroughPut = 0;
+ lowThroughPut = 0;
+ for (int i = 0; i < high.length; i++) {
+ highThroughPut += high[i].getThroughput();
+ lowThroughPut += low[i].getThroughput();
+ middleThroughPut += middle[i].getThroughput();
+ }
+ LOG.info("-------------------------------------------------------------");
+ try {
+ LOG.info("high priority table is: "
+ + high[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + high[0].getType() + ", priority:"
+ + high[0].getTablePriority() + " throughput is:" + highThroughPut);
+ LOG.info("middle priority table is: "
+ + middle[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + middle[0].getType() + ", priority:"
+ + middle[0].getTablePriority() + " throughput is:"
+ + middleThroughPut);
+ LOG.info("low priority table is: "
+ + low[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + low[0].getType() + ", priority:"
+ + low[0].getTablePriority() + " throughput is:" + lowThroughPut);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+
+ highFinished = true;
+ lowFinished = true;
+ middleFinished = true;
+ for (int i = 0; i < high.length; i++) {
+ if (high[i].isAlive()) {
+ highFinished = false;
+ }
+ if (low[i].isAlive()) {
+ lowFinished = false;
+ }
+ if (middle[i].isAlive()) {
+ middleFinished = false;
+ }
+ }
+ if (highFinished) {
+ for (int i = 0; i < high.length; i++) {
+ low[i].stopWorker();
+ }
+ for (int i = 0; i < high.length; i++) {
+ middle[i].stopWorker();
+ }
+ middleFinished = true;
+ lowFinished = true;
+ }
+ if (lowFinished) {
+ for (int i = 0; i < high.length; i++) {
+ high[i].stopWorker();
+ }
+ for (int i = 0; i < high.length; i++) {
+ middle[i].stopWorker();
+ }
+ middleFinished = true;
+ highFinished = true;
+ }
+ if (middleFinished) {
+ for (int i = 0; i < high.length; i++) {
+ high[i].stopWorker();
+ }
+ for (int i = 0; i < high.length; i++) {
+ low[i].stopWorker();
+ }
+ lowFinished = true;
+ highFinished = true;
+ }
+
+ }
+ highThroughPut = 0;
+ middleThroughPut = 0;
+ lowThroughPut = 0;
+ for (int i = 0; i < high.length; i++) {
+ highThroughPut += high[i].getThroughput();
+ lowThroughPut += low[i].getThroughput();
+ middleThroughPut += middle[i].getThroughput();
+ }
+
+ assertTrue("Action priority works properly",
+ highThroughPut > middleThroughPut && middleThroughPut > lowThroughPut);
+ LOG.info("-------------------------------------------------------------");
+ LOG.info("---------------------Test finished --------------------------");
+
+ try {
+ LOG.info("high priority table is: "
+ + high[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + high[0].getType() + ", priority:"
+ + high[0].getTablePriority() + " throughput is:" + highThroughPut);
+ LOG.info("middle priority table is: "
+ + middle[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + middle[0].getType() + ", priority:"
+ + middle[0].getTablePriority() + " throughput is:" + middleThroughPut);
+ LOG.info("low priority table is: "
+ + low[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + low[0].getType() + ", priority:"
+ + low[0].getTablePriority() + " throughput is:" + lowThroughPut);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ LOG.info("####### Test for " + high[0].getType() + ", priority:"
+ + high[0].getTablePriority() + " " + middle[0].getType()
+ + ", priority:" + middle[0].getTablePriority() + " ,and "
+ + low[0].getType() + ", priority:" + low[0].getTablePriority()
+ + " finished####");
+
+ LOG.info("-------------------------------------------------------------");
+ }
+
+ private void changePriority(int pria, int prib, int pric) {
+ try {
+ admin.disableTable("Table_A");
+ HTableDescriptor des1 = admin
+ .getTableDescriptor(Bytes.toBytes("Table_A"));
+ des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY,
+ new OperationPriority(0, 0, 0).toBytes());
+ des1 = PriorityFunction.setPriority(pria, des1);
+ admin.modifyTable(Bytes.toBytes("Table_A"), des1);
+ admin.enableTable("Table_A");
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+ admin.disableTable("Table_B");
+ HTableDescriptor des1 = admin
+ .getTableDescriptor(Bytes.toBytes("Table_B"));
+ des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY,
+ new OperationPriority(0, 0, 0).toBytes());
+ des1 = PriorityFunction.setPriority(prib, des1);
+ admin.modifyTable(Bytes.toBytes("Table_B"), des1);
+ admin.enableTable("Table_B");
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+ admin.disableTable("Table_C");
+ HTableDescriptor des1 = admin
+ .getTableDescriptor(Bytes.toBytes("Table_C"));
+ des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY,
+ new OperationPriority(0, 0, 0).toBytes());
+ des1 = PriorityFunction.setPriority(pric, des1);
+ admin.modifyTable(Bytes.toBytes("Table_C"), des1);
+ admin.enableTable("Table_C");
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ balanceTable("Table_A", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ balanceTable("Table_B", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ balanceTable("Table_C", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ }
+
+ /**
+ * start the test.
+ */
+ @Test(timeout = 180000)
+ public void testForDifferentTablePriority() {
+ method = "put";
+ doTestJob();
+// method = "scan";
+// changePriority(1, 5, 10);
+// doTestJob();
+ }
+
+ private void doTestJob() {
+ if (method.equals("scan")) {
+ rowNumber = rowNumber * 10;
+ }
+ Worker A[] = new Worker[threadN / 3];
+ Worker B[] = new Worker[threadN / 3];
+ Worker C[] = new Worker[threadN / 3];
+ for (int i = 0; i < A.length; i++) {
+ try {
+ A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"),
+ method, rowNumber * 3 / threadN);
+ B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"),
+ method, rowNumber * 3 / threadN);
+ C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"),
+ method, rowNumber * 3 / threadN);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < A.length; i++) {
+ A[i].setTablePriority(1);
+ B[i].setTablePriority(5);
+ C[i].setTablePriority(10);
+ A[i].start();
+ B[i].start();
+ C[i].start();
+ }
+ verifyFunction(A, B, C);
+ changePriority(10, 5, 1);
+ A = new Worker[threadN / 3];
+ B = new Worker[threadN / 3];
+ C = new Worker[threadN / 3];
+ for (int i = 0; i < A.length; i++) {
+ try {
+ A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"),
+ method, rowNumber * 3 / threadN);
+ B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"),
+ method, rowNumber * 3 / threadN);
+ C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"),
+ method, rowNumber * 3 / threadN);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < A.length; i++) {
+ A[i].setTablePriority(10);
+ B[i].setTablePriority(5);
+ C[i].setTablePriority(1);
+ A[i].start();
+ B[i].start();
+ C[i].start();
+ }
+ verifyFunction(C, B, A);
+ }
+
+ public static void printTable(String name) {
+
+ try {
+ HTable A = new HTable(TEST_UTIL.getConfiguration(), name);
+ Scan s = new Scan();
+ ResultScanner rs;
+ rs = A.getScanner(s);
+ Result r = null;
+ while ((r = rs.next()) != null) {
+ System.out.println(r);
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+
+ }
+
+ public static void main(String args[]) {
+ try {
+ setUpBeforeClass();
+ TestTablePriorityHundredRegion t = new TestTablePriorityHundredRegion();
+ t.setUp();
+ tearDownAfterClass();
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ }
+
+ private static void put(HTable t, long i) {
+ Put p = new Put(
+ Bytes.toBytes("" + r.nextInt(1000) + (i) + r.nextInt(10000)));
+ p.add(Bytes.toBytes("ff"), Bytes.toBytes("ff"), value);
+ try {
+ t.put(p);
+ } catch (IOException e) {
+ LOG.info(e);
+ }
+ }
+
+ public static void balanceTable(String table, HMaster master) {
+ List servers = master.getServerManager().getOnlineServersList();
+ List regions = master.getAssignmentManager()
+ .getRegionsOfTable(Bytes.toBytes(table));
+ HashMap map = new HashMap();
+ for (HRegionInfo region : regions) {
+ map.put(region,
+ master.getAssignmentManager().getRegionServerOfRegion(region));
+
+ }
+ LOG.debug("Region location:" + map);
+ doBalance(new HashSet(servers), map, master);
+
+ }
+
+ private static void doBalance(HashSet servers,
+ HashMap map, HMaster master) {
+ HashMap moves = new HashMap();
+ HashMap> serverList = new HashMap>();
+ for (Entry e : map.entrySet()) {
+ if (serverList.get(e.getValue()) == null) {
+ serverList.put(e.getValue(), new ArrayList());
+ }
+ serverList.get(e.getValue()).add(e.getKey());
+ }
+ for (ServerName s : servers) {
+ if (serverList.get(s) == null) {
+ serverList.put(s, new ArrayList());
+ }
+ }
+
+ int div = 10;
+ while (div > 2) {
+ ServerName maxLoad = null, minLoad = null;
+ int maxLoadN = Integer.MIN_VALUE, minLoadN = Integer.MAX_VALUE;
+ for (Entry> e : serverList.entrySet()) {
+ if (e.getValue().size() >= maxLoadN) {
+ maxLoadN = e.getValue().size();
+ maxLoad = e.getKey();
+ }
+ if (e.getValue().size() <= minLoadN) {
+ minLoadN = e.getValue().size();
+ minLoad = e.getKey();
+ }
+ }
+ if (maxLoad == null || minLoad == null)
+ break;
+ if (serverList.get(maxLoad).size() == 0)
+ break;
+ else {
+ div = Math.abs(maxLoadN - minLoadN);
+ int index = r.nextInt(serverList.get(maxLoad).size());
+ moves.put(serverList.get(maxLoad).get(index), minLoad);
+ serverList.get(minLoad).add(serverList.get(maxLoad).get(index));
+ serverList.get(maxLoad).remove(index);
+ }
+
+ }
+
+ for (Entry e : moves.entrySet()) {
+ try {
+ LOG.info("move :" + e.getKey().getEncodedName() + " regions to "
+ + e.getValue());
+ master.move(e.getKey().getEncodedNameAsBytes(),
+ Bytes.toBytes(e.getValue().getServerName()));
+ } catch (Exception e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+ }
+ }
+}
Index: src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriority.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriority.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriority.java (revision 0)
@@ -0,0 +1,582 @@
+/**
+ * Copyright the Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.PriorityJobQueue.OperationPriority;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for table priority, use table A,B,C which have different priorities.
+ * There are two tests,in first one table A's priority is 1(high priority), B is
+ * 5, C is 10 and in the second test,we switch the priorities of A and C.
+ */
+@Category(LargeTests.class)
+public class TestTablePriority {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static Log LOG = LogFactory.getLog(TestTablePriority.class);
+ static final Random r = new Random();
+ static int rowNumber = 10000;
+ static int threadN =40;
+ HBaseAdmin admin = null;
+ // if there are data in the test tables,you
+ // can set method type to "scan"
+ static String method = "put";
+ static final int regionNumber=9;
+ private static byte[][]startKeys;
+ static
+ {
+ startKeys=new byte[regionNumber][];
+ for(int i=0;i times) {
+ break;
+ }
+ n++;
+ }
+ t.flushCommits();
+ t.close();
+ }
+ }
+
+ /**
+ * Verify whether the function take effect
+ *
+ * @param highs high priority workers
+ * @param lows low priority workers
+ */
+ @SuppressWarnings("static-access")
+ public void verifyFunction(Worker high[], Worker middle[], Worker low[]) {
+ boolean highFinished = false;
+ boolean lowFinished = false;
+ boolean middleFinished = false;
+ long highThroughPut = 0;
+ long middleThroughPut = 0;
+ long lowThroughPut = 0;
+ while (!(highFinished && lowFinished && middleFinished)) {
+ try {
+ Thread.currentThread().sleep(100);
+ } catch (InterruptedException e) {
+ // ignore exceptions
+ }
+ highThroughPut = 0;
+ middleThroughPut = 0;
+ lowThroughPut = 0;
+ for (int i = 0; i < high.length; i++) {
+ highThroughPut += high[i].getThroughput();
+ lowThroughPut += low[i].getThroughput();
+ middleThroughPut += middle[i].getThroughput();
+ }
+ LOG.info("-------------------------------------------------------------");
+ try {
+ LOG.info("high priority table is: "
+ + high[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + high[0].getType() + ", priority:"
+ + high[0].getTablePriority() + " throughput is:" + highThroughPut);
+ LOG.info("middle priority table is: "
+ + middle[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + middle[0].getType() + ", priority:"
+ + middle[0].getTablePriority() + " throughput is:"
+ + middleThroughPut);
+ LOG.info("low priority table is: "
+ + low[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + low[0].getType() + ", priority:"
+ + low[0].getTablePriority() + " throughput is:" + lowThroughPut);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+
+ highFinished = true;
+ lowFinished = true;
+ middleFinished = true;
+ for (int i = 0; i < high.length; i++) {
+ if (high[i].isAlive()) {
+ highFinished = false;
+ }
+ if (low[i].isAlive()) {
+ lowFinished = false;
+ }
+ if (middle[i].isAlive()) {
+ middleFinished = false;
+ }
+ }
+ if (highFinished) {
+ for (int i = 0; i < high.length; i++) {
+ low[i].stopWorker();
+ }
+ for (int i = 0; i < high.length; i++) {
+ middle[i].stopWorker();
+ }
+ middleFinished = true;
+ lowFinished = true;
+ }
+ if (lowFinished) {
+ for (int i = 0; i < high.length; i++) {
+ high[i].stopWorker();
+ }
+ for (int i = 0; i < high.length; i++) {
+ middle[i].stopWorker();
+ }
+ middleFinished = true;
+ highFinished = true;
+ }
+ if (middleFinished) {
+ for (int i = 0; i < high.length; i++) {
+ high[i].stopWorker();
+ }
+ for (int i = 0; i < high.length; i++) {
+ low[i].stopWorker();
+ }
+ lowFinished = true;
+ highFinished = true;
+ }
+
+ }
+ highThroughPut = 0;
+ middleThroughPut = 0;
+ lowThroughPut = 0;
+ for (int i = 0; i < high.length; i++) {
+ highThroughPut += high[i].getThroughput();
+ lowThroughPut += low[i].getThroughput();
+ middleThroughPut += middle[i].getThroughput();
+ }
+
+ assertTrue("Action priority works properly",
+ highThroughPut > middleThroughPut && middleThroughPut > lowThroughPut);
+ LOG.info("-------------------------------------------------------------");
+ LOG.info("---------------------Test finished --------------------------");
+
+ try {
+ LOG.info("high priority table is: "
+ + high[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + high[0].getType() + ", priority:"
+ + high[0].getTablePriority() + " throughput is:" + highThroughPut);
+ LOG.info("middle priority table is: "
+ + middle[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + middle[0].getType() + ", priority:"
+ + middle[0].getTablePriority() + " throughput is:" + middleThroughPut);
+ LOG.info("low priority table is: "
+ + low[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + low[0].getType() + ", priority:"
+ + low[0].getTablePriority() + " throughput is:" + lowThroughPut);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ LOG.info("####### Test for " + high[0].getType() + ", priority:"
+ + high[0].getTablePriority() + " " + middle[0].getType()
+ + ", priority:" + middle[0].getTablePriority() + " ,and "
+ + low[0].getType() + ", priority:" + low[0].getTablePriority()
+ + " finished####");
+
+ LOG.info("-------------------------------------------------------------");
+ }
+ private void changePriority(int pria,int prib,int pric)
+ {
+ try {
+ admin.disableTable("Table_A");
+ HTableDescriptor des1 = admin
+ .getTableDescriptor(Bytes.toBytes("Table_A"));
+ des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY,
+ new OperationPriority(0, 0, 0).toBytes());
+ des1 = PriorityFunction.setPriority(pria, des1);
+ admin.modifyTable(Bytes.toBytes("Table_A"), des1);
+ admin.enableTable("Table_A");
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+ admin.disableTable("Table_B");
+ HTableDescriptor des1 = admin
+ .getTableDescriptor(Bytes.toBytes("Table_B"));
+ des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY,
+ new OperationPriority(0, 0, 0).toBytes());
+ des1 = PriorityFunction.setPriority(prib, des1);
+ admin.modifyTable(Bytes.toBytes("Table_B"), des1);
+ admin.enableTable("Table_B");
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+ admin.disableTable("Table_C");
+ HTableDescriptor des1 = admin
+ .getTableDescriptor(Bytes.toBytes("Table_C"));
+ des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY,
+ new OperationPriority(0, 0, 0).toBytes());
+ des1 = PriorityFunction.setPriority(pric, des1);
+ admin.modifyTable(Bytes.toBytes("Table_C"), des1);
+ admin.enableTable("Table_C");
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ GroupTestUtil.balanceTable("Table_A", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ GroupTestUtil.balanceTable("Table_B",TEST_UTIL.getMiniHBaseCluster().getMaster());
+ GroupTestUtil.balanceTable("Table_C",TEST_UTIL.getMiniHBaseCluster().getMaster());
+ }
+ /**
+ * start the test.
+ */
+ @Test(timeout = 180000)
+ public void testForDifferentTablePriority() {
+ method="put";
+ doTestJob();
+// method="scan";
+// threadN=threadN*10;
+// changePriority(1,5,10);
+// doTestJob();
+ }
+
+
+ private void doTestJob()
+ {
+ if(method.equals("scan"))
+ {
+ rowNumber=rowNumber*10;
+ }
+ Worker A[] = new Worker[threadN / 3];
+ Worker B[] = new Worker[threadN / 3];
+ Worker C[] = new Worker[threadN / 3];
+ for (int i = 0; i < A.length; i++) {
+ try {
+ A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"),
+ method, rowNumber * 3 / threadN);
+ B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"),
+ method, rowNumber * 3 / threadN);
+ C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"),
+ method, rowNumber * 3 / threadN);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < A.length; i++) {
+ A[i].setTablePriority(1);
+ B[i].setTablePriority(5);
+ C[i].setTablePriority(10);
+ A[i].start();
+ B[i].start();
+ C[i].start();
+ }
+ verifyFunction(A, B, C);
+ changePriority(10,5,1);
+ A = new Worker[threadN / 3];
+ B = new Worker[threadN / 3];
+ C = new Worker[threadN / 3];
+ for (int i = 0; i < A.length; i++) {
+ try {
+ A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"),
+ method, rowNumber * 3 / threadN);
+ B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"),
+ method, rowNumber * 3 / threadN);
+ C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"),
+ method, rowNumber * 3 / threadN);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < A.length; i++) {
+ A[i].setTablePriority(10);
+ B[i].setTablePriority(5);
+ C[i].setTablePriority(1);
+ A[i].start();
+ B[i].start();
+ C[i].start();
+ }
+ verifyFunction(C, B, A);
+ }
+ public static void printTable(String name)
+ {
+
+ try {
+ HTable A = new HTable(TEST_UTIL.getConfiguration(), name);
+ Scan s=new Scan();
+ ResultScanner rs;
+ rs = A.getScanner(s);
+ Result r=null;
+ while((r=rs.next())!=null)
+ {
+ System.out.println(r);
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+
+ }
+ public static void main(String args[]) {
+ try {
+ setUpBeforeClass();
+ TestTablePriority t = new TestTablePriority();
+ t.setUp();
+ tearDownAfterClass();
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ }
+
+ static byte[] b=new byte[100];
+ private static void put(HTable t, long i) {
+ Put p = new Put(
+ Bytes.toBytes(("" + r.nextInt(10)) + r.nextInt(100000)));
+ p.add(Bytes.toBytes("ff"), Bytes.toBytes("ff"), b);
+ try {
+ t.put(p);
+ } catch (IOException e) {
+ LOG.info(e);
+ }
+ }
+}
Index: src/test/java/org/apache/hadoop/hbase/ipc/GroupTestUtil.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/ipc/GroupTestUtil.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/ipc/GroupTestUtil.java (revision 0)
@@ -0,0 +1,89 @@
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class GroupTestUtil {
+ private final static Log LOG = LogFactory.getLog(GroupTestUtil.class);
+ static final Random r = new Random();
+ public static void balanceTable(String table,HMaster master) {
+ List servers=master.getServerManager().getOnlineServersList();
+ List regions = master.getAssignmentManager().getRegionsOfTable(Bytes.toBytes(table));
+ HashMap map = new HashMap();
+ for(HRegionInfo region:regions)
+ {
+ map.put(region, master.getAssignmentManager().getRegionServerOfRegion(region));
+ }
+ LOG.debug("Region location:"+map);
+ doBalance(new HashSet(servers), map,master);
+
+ }
+ private static void doBalance(HashSet servers,
+ HashMap map,HMaster master) {
+ HashMap moves = new HashMap();
+ HashMap> serverList = new HashMap>();
+ for (Entry e : map.entrySet()) {
+ if (serverList.get(e.getValue()) == null) {
+ serverList.put(e.getValue(), new ArrayList());
+ }
+ serverList.get(e.getValue()).add(e.getKey());
+ }
+ for (ServerName s : servers) {
+ if (serverList.get(s) == null) {
+ serverList.put(s, new ArrayList());
+ }
+ }
+
+ int div = 10;
+ while (div > 2) {
+ ServerName maxLoad = null, minLoad = null;
+ int maxLoadN = Integer.MIN_VALUE, minLoadN = Integer.MAX_VALUE;
+ for (Entry> e : serverList.entrySet()) {
+ if (e.getValue().size() >= maxLoadN) {
+ maxLoadN = e.getValue().size();
+ maxLoad = e.getKey();
+ }
+ if (e.getValue().size() <= minLoadN) {
+ minLoadN = e.getValue().size();
+ minLoad = e.getKey();
+ }
+ }
+ if (maxLoad == null || minLoad == null)
+ break;
+ if (serverList.get(maxLoad).size() == 0)
+ break;
+ else {
+ div = Math.abs(maxLoadN - minLoadN);
+ int index = r.nextInt(serverList.get(maxLoad).size());
+ moves.put(serverList.get(maxLoad).get(index), minLoad);
+ serverList.get(minLoad).add(serverList.get(maxLoad).get(index));
+ serverList.get(maxLoad).remove(index);
+ }
+
+ }
+
+ for (Entry e : moves.entrySet()) {
+ try {
+ LOG.info("move :" + e.getKey().getEncodedName() + " regions to "
+ + e.getValue());
+ master.move(e.getKey().getEncodedNameAsBytes(),
+ Bytes.toBytes(e.getValue().getServerName()));
+ } catch (Exception e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+ }
+ }
+
+}
Index: src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriorityLargeRow.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriorityLargeRow.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriorityLargeRow.java (revision 0)
@@ -0,0 +1,654 @@
+/**
+ * Copyright the Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.PriorityJobQueue.OperationPriority;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for table priority, use table A,B,C which have different priorities.
+ * There are two tests,in first one table A's priority is 1(high priority), B is
+ * 5, C is 10 and in the second test,we switch the priorities of A and C.
+ */
+@Category(LargeTests.class)
+public class TestTablePriorityLargeRow {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static Log LOG = LogFactory
+ .getLog(TestTablePriorityLargeRow.class);
+ static final Random r = new Random();
+ static int rowNumber = 10000;
+ static final int threadN = 100;
+ HBaseAdmin admin = null;
+ // if there are data in the test tables,you
+ // can set method type to "scan"
+ static String method = "put";
+ static final int regionNumber = 9;
+ private static byte[][] startKeys;
+ private static final byte[] value = new byte[50000];
+ static {
+ startKeys = new byte[regionNumber][];
+ for (int i = 0; i < regionNumber; i++) {
+ startKeys[i] = Bytes.toBytes(String.format(
+ "%0" + (regionNumber + "").length() + "d", i));
+ }
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt(
+ PriorityJobQueue.WAIT_UNIT_CONF_KEY, 1000);
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * set up a cluster and prepare tables.
+ */
+ @Before
+ public void setUp() {
+ try {
+ admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ HTableDescriptor des;
+ try {
+ if (admin.tableExists("Table_A")) {
+ admin.disableTable("Table_A");
+ admin.deleteTable("Table_A");
+ }
+ if (admin.tableExists("Table_B")) {
+ admin.disableTable("Table_B");
+ admin.deleteTable("Table_B");
+ }
+ if (admin.tableExists("Table_C")) {
+ admin.disableTable("Table_C");
+ admin.deleteTable("Table_C");
+ }
+ } catch (IOException e1) {
+ LOG.error(e1);
+ }
+ try {
+ des = new HTableDescriptor("Table_A");
+ des.addFamily(new HColumnDescriptor("ff"));
+ des = PriorityFunction.setPriority(1, des);
+ admin.createTable(des, startKeys);
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+ des = new HTableDescriptor("Table_B");
+ des.addFamily(new HColumnDescriptor("ff"));
+ des = PriorityFunction.setPriority(5, des);
+ admin.createTable(des, startKeys);
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+
+ des = new HTableDescriptor("Table_C");
+ des.addFamily(new HColumnDescriptor("ff"));
+ des = PriorityFunction.setPriority(10, des);
+ admin.createTable(des, startKeys);
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ balanceTable("Table_A", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ balanceTable("Table_B", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ balanceTable("Table_C", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ }
+
+ /**
+ * The worker used for test throughput and gather the result
+ */
+ public static class Worker extends Thread {
+ HTable table;
+ String type;
+ private long throughput = 0;
+ long times = 1000;
+ boolean stopFlag = false;
+ int tablePriority;
+ long n = 0;
+ long startTime = 0;
+ int actionPriority = 0;
+
+ /**
+ * get throughput of this worker
+ *
+ * @return
+ */
+ public long getThroughput() {
+ long end = System.currentTimeMillis();
+ throughput = (n * 1000) / (end - startTime + 1);
+ return throughput;
+ }
+
+ /**
+ * get table priority
+ *
+ * @return table priority
+ */
+ public int getTablePriority() {
+ return tablePriority;
+ }
+
+ /**
+ * set table priority
+ */
+ public void setTablePriority(int tablePriority) {
+ this.tablePriority = tablePriority;
+ }
+
+ /**
+ * get action type
+ *
+ * @return action type of the worker
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * the table used by the worker
+ *
+ * @return table of the worker
+ */
+ public HTable getTable() {
+ return table;
+ }
+
+ public Worker(HTable t, String type, long times) {
+ this.table = t;
+ this.type = type;
+ this.times = times;
+ }
+
+ public void run() {
+ try {
+ doAction(table, type, times);
+ } catch (IOException e) {
+ LOG.info(e);
+ }
+ }
+
+ public int getActionPriority() {
+ return actionPriority;
+ }
+
+ public void setActionPriority(int actionPriority) {
+ this.actionPriority = actionPriority;
+ }
+
+ /**
+ * stop the worker
+ */
+ public void stopWorker() {
+ this.stopFlag = true;
+ }
+
+ private void doAction(HTable t, String type, long times) throws IOException {
+ long start = System.currentTimeMillis();
+ long end = System.currentTimeMillis();
+ startTime = System.currentTimeMillis();
+ int pri = Integer.parseInt(Bytes.toString(t.getTableDescriptor()
+ .getValue(PriorityFunction.PRI_KEY)));
+ Scan s = new Scan();
+ s.setStartRow(Bytes.toBytes(r.nextInt(10) + ""));
+ if (type.equals("writeData")) {
+ t.setAutoFlush(false);
+ t.setWriteBufferSize(1000);
+ }
+ ResultScanner sn = null;
+ boolean scan = false;
+ if (type.equals("scan")) {
+ sn = t.getScanner(s);
+ scan = true;
+ }
+ while (true) {
+ if (n % 10000 == 0) {
+ end = System.currentTimeMillis();
+ // LOG.debug("Thread:" + this.getId() + " type:" + type + " table"
+ // + t.getTableDescriptor().getNameAsString() + " pri :" + pri
+ // + " time:" + (end - start) + " total :" + n + " throughput:"
+ // + (n * 1000) / (end - startTime + 1));
+ throughput = (n * 1000) / (end - startTime + 1);
+ start = end;
+ }
+ if (n % 100 == 0) {
+ end = System.currentTimeMillis();
+ throughput = (n * 1000) / (end - startTime + 1);
+ }
+ Result ret = null;
+ if (scan) {
+ ret = sn.next();
+ if (ret == null) {
+ try {
+ sn.close();
+ } catch (Exception e) {
+ LOG.debug(e);
+ }
+ sn = t.getScanner(s);
+ }
+ } else {
+ put(t, n);
+ }
+ if (stopFlag || n > times) {
+ break;
+ }
+ n++;
+ }
+ t.flushCommits();
+ t.close();
+ }
+ }
+
+ /**
+ * Verify whether the function take effect
+ *
+ * @param highs
+ * high priority workers
+ * @param lows
+ * low priority workers
+ */
+ @SuppressWarnings("static-access")
+ public void verifyFunction(Worker high[], Worker middle[], Worker low[]) {
+ boolean highFinished = false;
+ boolean lowFinished = false;
+ boolean middleFinished = false;
+ long highThroughPut = 0;
+ long middleThroughPut = 0;
+ long lowThroughPut = 0;
+ while (!(highFinished && lowFinished && middleFinished)) {
+ try {
+ Thread.currentThread().sleep(100);
+ } catch (InterruptedException e) {
+ // ignore exceptions
+ }
+ highThroughPut = 0;
+ middleThroughPut = 0;
+ lowThroughPut = 0;
+ for (int i = 0; i < high.length; i++) {
+ highThroughPut += high[i].getThroughput();
+ lowThroughPut += low[i].getThroughput();
+ middleThroughPut += middle[i].getThroughput();
+ }
+ LOG.info("-------------------------------------------------------------");
+ try {
+ LOG.info("high priority table is: "
+ + high[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + high[0].getType() + ", priority:"
+ + high[0].getTablePriority() + " throughput is:" + highThroughPut);
+ LOG.info("middle priority table is: "
+ + middle[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + middle[0].getType() + ", priority:"
+ + middle[0].getTablePriority() + " throughput is:"
+ + middleThroughPut);
+ LOG.info("low priority table is: "
+ + low[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + low[0].getType() + ", priority:"
+ + low[0].getTablePriority() + " throughput is:" + lowThroughPut);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+
+ highFinished = true;
+ lowFinished = true;
+ middleFinished = true;
+ for (int i = 0; i < high.length; i++) {
+ if (high[i].isAlive()) {
+ highFinished = false;
+ }
+ if (low[i].isAlive()) {
+ lowFinished = false;
+ }
+ if (middle[i].isAlive()) {
+ middleFinished = false;
+ }
+ }
+ if (highFinished) {
+ for (int i = 0; i < high.length; i++) {
+ low[i].stopWorker();
+ }
+ for (int i = 0; i < high.length; i++) {
+ middle[i].stopWorker();
+ }
+ middleFinished = true;
+ lowFinished = true;
+ }
+ if (lowFinished) {
+ for (int i = 0; i < high.length; i++) {
+ high[i].stopWorker();
+ }
+ for (int i = 0; i < high.length; i++) {
+ middle[i].stopWorker();
+ }
+ middleFinished = true;
+ highFinished = true;
+ }
+ if (middleFinished) {
+ for (int i = 0; i < high.length; i++) {
+ high[i].stopWorker();
+ }
+ for (int i = 0; i < high.length; i++) {
+ low[i].stopWorker();
+ }
+ lowFinished = true;
+ highFinished = true;
+ }
+
+ }
+ highThroughPut = 0;
+ middleThroughPut = 0;
+ lowThroughPut = 0;
+ for (int i = 0; i < high.length; i++) {
+ highThroughPut += high[i].getThroughput();
+ lowThroughPut += low[i].getThroughput();
+ middleThroughPut += middle[i].getThroughput();
+ }
+
+ assertTrue("Action priority works properly",
+ highThroughPut > middleThroughPut && middleThroughPut > lowThroughPut);
+ LOG.info("-------------------------------------------------------------");
+ LOG.info("---------------------Test finished --------------------------");
+
+ try {
+ LOG.info("high priority table is: "
+ + high[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + high[0].getType() + ", priority:"
+ + high[0].getTablePriority() + " throughput is:" + highThroughPut);
+ LOG.info("middle priority table is: "
+ + middle[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + middle[0].getType() + ", priority:"
+ + middle[0].getTablePriority() + " throughput is:" + middleThroughPut);
+ LOG.info("low priority table is: "
+ + low[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + low[0].getType() + ", priority:"
+ + low[0].getTablePriority() + " throughput is:" + lowThroughPut);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ LOG.info("####### Test for " + high[0].getType() + ", priority:"
+ + high[0].getTablePriority() + " " + middle[0].getType()
+ + ", priority:" + middle[0].getTablePriority() + " ,and "
+ + low[0].getType() + ", priority:" + low[0].getTablePriority()
+ + " finished####");
+
+ LOG.info("-------------------------------------------------------------");
+ }
+
+ private void changePriority(int pria, int prib, int pric) {
+ try {
+ admin.disableTable("Table_A");
+ HTableDescriptor des1 = admin
+ .getTableDescriptor(Bytes.toBytes("Table_A"));
+ des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY,
+ new OperationPriority(0, 0, 0).toBytes());
+ des1 = PriorityFunction.setPriority(pria, des1);
+ admin.modifyTable(Bytes.toBytes("Table_A"), des1);
+ admin.enableTable("Table_A");
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+ admin.disableTable("Table_B");
+ HTableDescriptor des1 = admin
+ .getTableDescriptor(Bytes.toBytes("Table_B"));
+ des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY,
+ new OperationPriority(0, 0, 0).toBytes());
+ des1 = PriorityFunction.setPriority(prib, des1);
+ admin.modifyTable(Bytes.toBytes("Table_B"), des1);
+ admin.enableTable("Table_B");
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+ admin.disableTable("Table_C");
+ HTableDescriptor des1 = admin
+ .getTableDescriptor(Bytes.toBytes("Table_C"));
+ des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY,
+ new OperationPriority(0, 0, 0).toBytes());
+ des1 = PriorityFunction.setPriority(pric, des1);
+ admin.modifyTable(Bytes.toBytes("Table_C"), des1);
+ admin.enableTable("Table_C");
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ balanceTable("Table_A", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ balanceTable("Table_B", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ balanceTable("Table_C", TEST_UTIL.getMiniHBaseCluster().getMaster());
+ }
+
+ /**
+ * start the test.
+ */
+ @Test(timeout = 180000)
+ public void testForDifferentTablePriority() {
+ method = "put";
+ doTestJob();
+// method = "scan";
+// changePriority(1, 5, 10);
+// doTestJob();
+ }
+
+ private void doTestJob() {
+ if (method.equals("scan")) {
+ rowNumber = rowNumber * 10;
+ }
+ Worker A[] = new Worker[threadN / 3];
+ Worker B[] = new Worker[threadN / 3];
+ Worker C[] = new Worker[threadN / 3];
+ for (int i = 0; i < A.length; i++) {
+ try {
+ A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"),
+ method, rowNumber * 3 / threadN);
+ B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"),
+ method, rowNumber * 3 / threadN);
+ C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"),
+ method, rowNumber * 3 / threadN);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < A.length; i++) {
+ A[i].setTablePriority(1);
+ B[i].setTablePriority(5);
+ C[i].setTablePriority(10);
+ A[i].start();
+ B[i].start();
+ C[i].start();
+ }
+ verifyFunction(A, B, C);
+ changePriority(10, 5, 1);
+ A = new Worker[threadN / 3];
+ B = new Worker[threadN / 3];
+ C = new Worker[threadN / 3];
+ for (int i = 0; i < A.length; i++) {
+ try {
+ A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"),
+ method, rowNumber * 3 / threadN);
+ B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"),
+ method, rowNumber * 3 / threadN);
+ C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"),
+ method, rowNumber * 3 / threadN);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < A.length; i++) {
+ A[i].setTablePriority(10);
+ B[i].setTablePriority(5);
+ C[i].setTablePriority(1);
+ A[i].start();
+ B[i].start();
+ C[i].start();
+ }
+ verifyFunction(C, B, A);
+ }
+
+ public static void printTable(String name) {
+
+ try {
+ HTable A = new HTable(TEST_UTIL.getConfiguration(), name);
+ Scan s = new Scan();
+ ResultScanner rs;
+ rs = A.getScanner(s);
+ Result r = null;
+ while ((r = rs.next()) != null) {
+ System.out.println(r);
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+
+ }
+
+ public static void main(String args[]) {
+ try {
+ setUpBeforeClass();
+ TestTablePriorityLargeRow t = new TestTablePriorityLargeRow();
+ t.setUp();
+ tearDownAfterClass();
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ }
+
+ private static void put(HTable t, long i) {
+ Put p = new Put(
+ Bytes.toBytes("" + r.nextInt(1000) + (i) + r.nextInt(10000)));
+ p.add(Bytes.toBytes("ff"), Bytes.toBytes("ff"), value);
+ try {
+ t.put(p);
+ } catch (IOException e) {
+ LOG.info(e);
+ }
+ }
+
+ public static void balanceTable(String table, HMaster master) {
+ List servers = master.getServerManager().getOnlineServersList();
+ List regions = master.getAssignmentManager()
+ .getRegionsOfTable(Bytes.toBytes(table));
+ HashMap map = new HashMap();
+ for (HRegionInfo region : regions) {
+ map.put(region,
+ master.getAssignmentManager().getRegionServerOfRegion(region));
+
+ }
+ LOG.debug("Region location:" + map);
+ doBalance(new HashSet(servers), map, master);
+
+ }
+
+ private static void doBalance(HashSet servers,
+ HashMap map, HMaster master) {
+ HashMap moves = new HashMap();
+ HashMap> serverList = new HashMap>();
+ for (Entry e : map.entrySet()) {
+ if (serverList.get(e.getValue()) == null) {
+ serverList.put(e.getValue(), new ArrayList());
+ }
+ serverList.get(e.getValue()).add(e.getKey());
+ }
+ for (ServerName s : servers) {
+ if (serverList.get(s) == null) {
+ serverList.put(s, new ArrayList());
+ }
+ }
+
+ int div = 10;
+ while (div > 2) {
+ ServerName maxLoad = null, minLoad = null;
+ int maxLoadN = Integer.MIN_VALUE, minLoadN = Integer.MAX_VALUE;
+ for (Entry> e : serverList.entrySet()) {
+ if (e.getValue().size() >= maxLoadN) {
+ maxLoadN = e.getValue().size();
+ maxLoad = e.getKey();
+ }
+ if (e.getValue().size() <= minLoadN) {
+ minLoadN = e.getValue().size();
+ minLoad = e.getKey();
+ }
+ }
+ if (maxLoad == null || minLoad == null)
+ break;
+ if (serverList.get(maxLoad).size() == 0)
+ break;
+ else {
+ div = Math.abs(maxLoadN - minLoadN);
+ int index = r.nextInt(serverList.get(maxLoad).size());
+ moves.put(serverList.get(maxLoad).get(index), minLoad);
+ serverList.get(minLoad).add(serverList.get(maxLoad).get(index));
+ serverList.get(maxLoad).remove(index);
+ }
+
+ }
+
+ for (Entry e : moves.entrySet()) {
+ try {
+ LOG.info("move :" + e.getKey().getEncodedName() + " regions to "
+ + e.getValue());
+ master.move(e.getKey().getEncodedNameAsBytes(),
+ Bytes.toBytes(e.getValue().getServerName()));
+ } catch (Exception e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+ }
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1220359)
+++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy)
@@ -244,7 +244,7 @@
}
/** A call queued for handling. */
- protected class Call implements Delayable {
+ class Call implements Delayable {
protected int id; // the client's call id
protected Writable param; // the parameter passed
protected Connection connection; // connection to client
Index: src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java (revision 0)
@@ -0,0 +1,79 @@
+/**
+ * Copyright the Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.ipc.Invocation;
+import org.apache.hadoop.hbase.ipc.WritableRpcEngine.Server;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * An IPC server extends HBaseServer,and add schedule function to make table priority take
+ * effect.
+ */
+public class PriorityHBaseServer extends Server {
+
+ /**
+ * Construct an RPC server.
+ *
+ * @param instance the instance whose methods will be called
+ * @param ifaces Define metrics for all methods in the given classes
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ * @param numHandlers the number of method handler threads to run
+ * @param priorityHandlerCount used by {@link HBaseServer}
+ * @param verbose whether each call should be logged
+ * @param highPriorityLevel used by {@link HBaseServer},decides actions of meta info will be
+ * handled by metaHandlers
+ * @throws IOException e
+ */
+ public PriorityHBaseServer(Object instance, final Class>[] ifaces,
+ Configuration conf, String bindAddress, int port, int numHandlers,
+ int priorityHandlerCount, boolean verbose, int highPriorityLevel)
+ throws IOException {
+ super(instance, ifaces, conf, bindAddress, port, numHandlers,
+ priorityHandlerCount, verbose, highPriorityLevel);
+ int maxQueueSize = this.conf.getInt("ipc.server.max.queue.size", 500);
+ callQueue = new PriorityJobQueue(maxQueueSize,
+ new PriorityFunction((HRegionServer) instance, conf), conf);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Writable call(Class extends VersionedProtocol> protocol,
+ Writable param, long receivedTime, MonitoredRPCHandler status)
+ throws IOException {
+ Invocation call = (Invocation) param;
+ HbaseObjectWritable writable = (HbaseObjectWritable) super.call(protocol,
+ param, receivedTime, status);
+ if (call.getMethodName().endsWith("openScanner")) {
+ ((PriorityFunction) ((PriorityJobQueue) callQueue).getQosFunction())
+ .initScannerPriority(call, writable.get());
+ }
+ return writable;
+ }
+
+}
Index: src/main/java/org/apache/hadoop/hbase/ipc/QosRegionObserver.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/QosRegionObserver.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/ipc/QosRegionObserver.java (revision 0)
@@ -0,0 +1,64 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+
+public class QosRegionObserver extends BaseRegionObserver {
+ HashMap regionScannerToScan = new HashMap();
+
+ @Override
+ public void postOpen(ObserverContext e) {
+ HRegion region = e.getEnvironment().getRegion();
+ PriorityFunction.initRegionPriority(region);
+ }
+
+ @Override
+ public void postClose(ObserverContext e,
+ boolean abortRequested) {
+ HRegion region = e.getEnvironment().getRegion();
+ PriorityFunction.cleanRegionPriority(region);
+ }
+
+ @Override
+ public RegionScanner postScannerOpen(
+ final ObserverContext e, final Scan scan,
+ final RegionScanner s) {
+ regionScannerToScan.put(s, scan);
+ return s;
+ }
+
+ @Override
+ public void postScannerClose(
+ final ObserverContext e,
+ final InternalScanner s) throws IOException {
+ PriorityFunction.removeScanner(regionScannerToScan.get((RegionScanner) s));
+ regionScannerToScan.remove((RegionScanner) s);
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java (revision 0)
@@ -0,0 +1,437 @@
+/**
+ * Copyright the Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.PriorityQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Operation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import com.google.common.base.Function;
+
+/**
+ * This queue is used by {@link PriorityHBaseServer}. Compare to
+ * PriorityBlockingQueue this queue has accurate capacity management. The Job
+ * class of this queue can adjust the priority according to the waiting times of
+ * the Job. Please use take and put to manipulate this queue.
+ *
+ * @param
+ * the class contained by the queue
+ */
+public class PriorityJobQueue extends PriorityBlockingQueue {
+ private static final long serialVersionUID = 1L;
+ public static final String WAIT_UNIT_CONF_KEY="hbase.tablepriority.wait.unit";
+ private static final Log LOG = LogFactory.getLog(PriorityJobQueue.class);
+ private final PriorityQueue> q = new PriorityQueue>();
+ private int capacity = 500;
+ ReentrantLock lock = new ReentrantLock(true);
+ Condition notEmpty = lock.newCondition() ;
+ Condition full = lock.newCondition();
+ //waitTimeUnit is 10 means:
+ //after waiting for 10ms,the call's priority will be increased by 1;
+ private static int waitUnit= 10;
+ private long[] priorityAddTimes = new long[10];
+ private Function qosFunction;
+ private AtomicInteger outputIndicator = new AtomicInteger ();
+ private static long getTimes=0;
+ /**
+ * get static data of add times
+ * @return the add times arranged by priority
+ */
+ public long[] getPriorityAddTimes() {
+ return priorityAddTimes;
+ }
+
+ /**
+ @param capacity
+ * the capacity of the queue,not a precision value,if queue size
+ * exceeds this value, workers which add jobs should wait,but if a
+ * thread waits too many turns (more than maxWait),it will add the
+ * job anyhow.
+ * @param qosFunction the function used to decide the call's priority.
+ * @param conf configuration.
+ */
+ public PriorityJobQueue(int capacity,
+ Function qosFunction, final Configuration conf) {
+ this.qosFunction = qosFunction;
+ waitUnit = conf.getInt(WAIT_UNIT_CONF_KEY, 10);
+ this.capacity = capacity;
+ }
+
+ /**
+ * Add a job to this queue.
+ * @param call the job instance
+ * @param pri the job's priority
+ * @throws InterruptedException
+ */
+ public void put(T call, int pri) throws InterruptedException {
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ Job j=new Job(pri, call);
+ try {
+ try {
+ while (q.size() >= this.capacity) {
+ this.full.await();
+ }
+ } catch (InterruptedException e) {
+ this.full.signal();// propagate to non-interrupted thread
+ throw e;
+ }
+ this.q.add(j);
+ this.notEmpty.signal();
+ this.priorityAddTimes[getStaticsIndex(j)]++;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Used to increase the according flag in priorityAddTimes. The
+ * priorityAddTimes is for debug purpose.
+ *
+ * @param j
+ * @return
+ */
+ private int getStaticsIndex(Job j) {
+ return j.orgPri < 10 && j.orgPri > 0 ? j.orgPri - 1 : (j.orgPri <= 0 ? 0
+ : 9);
+ }
+
+ /**
+ * Get a job from the queue.
+ *
+ * @param priority
+ * the worker thread's priority
+ * @return the job
+ * @throws InterruptedException
+ */
+ private Job innerTake() throws InterruptedException {
+ Job ret = null;
+ final ReentrantLock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ while (q.size() == 0) {
+ try {
+ this.notEmpty.await();
+ } catch (InterruptedException ie) {
+ // propagate to non-interrupted thread
+ this.notEmpty.notify();
+ throw ie;
+ }
+ }
+ ret = q.poll();
+ assert ret != null;
+ this.full.signal();
+ getTimes++;
+ return ret;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void put(Object e) {
+ HBaseServer.Call call = (HBaseServer.Call) (e);
+ int pri = this.qosFunction.apply(call.param);
+ try {
+ this.put((T) call, pri);
+ } catch (InterruptedException e1) {
+ LOG.error(e1);
+ }
+ }
+
+ @Override
+ public T take() throws InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ if ((outputIndicator.addAndGet(1) << 52) >>> 52 == 0) {
+ this.printMetrix();
+ }
+ }
+ return this.innerTake().getCall();
+ }
+
+ public Function getQosFunction() {
+ return qosFunction;
+ }
+
+ /**
+ * Get the size of the queue.
+ * @return the size of the queue
+ */
+ @Override
+ public int size() {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ return q.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+ /**
+ * Print statistical information.
+ */
+ public void printMetrix() {
+ LOG.debug("size is :" + q.size());
+ LOG.debug("capacity is :" + this.capacity);
+ String out = "Add times ";
+ for (int i = 0; i < this.priorityAddTimes.length; i++) {
+ out += " pri:" + (i + 1) + ":" + this.priorityAddTimes[i];
+ this.priorityAddTimes[i]=0;
+ }
+ LOG.debug("priority request static:" + out);
+ PriorityFunction.printRegionPriority();
+ lock.lock();
+ try {
+
+ LOG.debug(""+this.q);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * The Job stored in queue
+ * @param
+ */
+ public static class Job implements Comparable> {
+ private int orgPri = 0;
+ private long initTime = 0;
+ private T call;
+ private long initIndex=0;
+
+// /**
+// * Get the priority now,the priority is calculated this way:
+// * priority_now =
+// * original priority - (time_now -time_constructed)/time_unit
+// * @return the priority of this call
+// */
+// public int getPriority() {
+// //In case of system time changed.
+// if(System.currentTimeMillis() arg0) {
+ return this.getPriority() - arg0.getPriority();
+ }
+ }
+
+
+ /**
+ * class used to store the action priority of table;
+ */
+ public static class OperationPriority {
+ private int mutationPriority = 0;
+ private int getPriority = 0;
+ private int scanPriority = 0;
+ public OperationPriority() {
+ }
+
+ /**
+ * Get the priority of this operation
+ * @param operation
+ * @return
+ */
+ public int getPriority(Operation operation) {
+ if(operation instanceof Get)
+ {
+ return this.getGetPriority();
+ }
+ else if(operation instanceof Mutation)
+ {
+ return this.getMutationPriority();
+ }
+ else if(operation instanceof Scan)
+ {
+ return this.getScanPriority();
+ }
+ return 0;
+ }
+
+ /**
+ * Construct a OperationPriority class.
+ * @param scanPriority priority of scan
+ * @param mutationPriority priority of mutation
+ * @param getPriority priority of get
+ */
+ public OperationPriority(int scanPriority, int mutationPriority,
+ int getPriority) {
+ this.scanPriority = scanPriority;
+ this.mutationPriority = mutationPriority;
+ this.getPriority = getPriority;
+ }
+
+ /**
+ * get put priority
+ * @return
+ */
+ public int getMutationPriority() {
+ return mutationPriority;
+ }
+
+ /**
+ * set put priority
+ *
+ * @return
+ */
+ public void setMutationPriority(int priority) {
+ this.mutationPriority = priority;
+ }
+
+ /**
+ * Get Get's priority
+ *
+ * @return
+ */
+ public int getGetPriority() {
+ return getPriority;
+ }
+
+ /**
+ * Set Get's priority
+ *
+ * @return
+ */
+ public void setGetPriority(int priority) {
+ this.getPriority = priority;
+ }
+
+ /**
+ * Get Scan's priority
+ *
+ * @return
+ */
+ public int getScanPriority() {
+ return scanPriority;
+ }
+
+ /**
+ * Set Scan's priority
+ *
+ * @return
+ */
+ public void setScanPriority(int priority) {
+ this.scanPriority = priority;
+ }
+
+ public String toString() {
+ return "scan priority:" + this.scanPriority + ", mutation priority:"
+ + this.mutationPriority + ", get priority:" + this.getPriority;
+ }
+
+ /**
+ * Store this instance into byte array.
+ * @return
+ */
+ public byte[] toBytes() {
+ byte[] ret = new byte[Bytes.SIZEOF_INT * 3];
+ Bytes.putInt(ret, 0, this.scanPriority);
+ Bytes.putInt(ret, Bytes.SIZEOF_INT, this.mutationPriority);
+ Bytes.putInt(ret, Bytes.SIZEOF_INT * 2, this.getPriority);
+ return ret;
+ }
+
+ /**
+ * Get value from a byte array.
+ * @param b the serialized value of an OperationPriority instance
+ */
+ public OperationPriority fromBytes(byte[] b) {
+ if (b.length != Bytes.SIZEOF_INT * 3)
+ return null;
+ else {
+ this.scanPriority = Bytes.toInt(b, 0);
+ this.mutationPriority = Bytes.toInt(b, Bytes.SIZEOF_INT);
+ this.getPriority = Bytes.toInt(b, Bytes.SIZEOF_INT * 2);
+ }
+ return this;
+ }
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 1220359)
+++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy)
@@ -387,6 +387,11 @@
final int numHandlers,
int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
throws IOException {
+ if (instance instanceof HRegionInterface
+ && PriorityFunction.enable(conf)) {
+ return new PriorityHBaseServer(instance, ifaces, conf, bindAddress, port,
+ numHandlers, metaHandlerCount, verbose, highPriorityLevel);
+ }
return getServer(instance.getClass(), instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel);
}
Index: src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java (revision 0)
@@ -0,0 +1,418 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.Action;
+import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.Operation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.ipc.PriorityJobQueue.OperationPriority;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Function;
+
+/**
+ * This Function is used by {@link PriorityHBaseServer}. In this
+ * Function,generally we can get the priority of a RPC call base by it's table
+ * priority and the table's method priority.
+ */
+
+public class PriorityFunction implements Function {
+
+ public final static byte[] PRI_KEY = Bytes.toBytes("_table_priority");
+ public final static byte[] OPERATION_PRIORITY_KEY = Bytes
+ .toBytes("_operation_priority");
+ public static final int LOWEST_PRI = 10;
+ public static final int DEFAULT_PRI = 5;
+ public static final int HIGHEST_PRI = -10;
+ public static final int HIGH_PRI = 0;
+ public static final String ENABLE_KEY = "hbase.tablepriority.enable";
+ private static Boolean enable = null;
+
+ /**
+ * Return whether table priority is enabled.
+ *
+ * @return
+ */
+ public static boolean enable(Configuration conf) {
+ if(enable==null)
+ {
+ enable=conf.getBoolean(ENABLE_KEY, true);
+ }
+ return enable;
+ }
+
+ /**
+ * The priority map cache the region's priority in memory. Key is the region
+ * name which usually is obtained from the IPC call. Value is the priority of
+ * the region also is the priority of the table which the region belong to.
+ */
+ private static final ConcurrentHashMap regionPriMap = new ConcurrentHashMap();
+
+ // Cache the Scan and its id,when scanner closed we can use the Scan to delete all cached information.
+ private static final ConcurrentHashMap scanToId = new ConcurrentHashMap();
+
+ // Cache the scanner ID and the priority of the scanner.
+ private static final ConcurrentHashMap scannerPriMap = new ConcurrentHashMap();
+
+ // Store the region name and it's according priority instance.
+ private static final ConcurrentHashMap regionActionPriorities = new ConcurrentHashMap();
+
+ private static final Log LOG = LogFactory
+ .getLog("org.apache.hadoop.hbase.ipc.PriorityFunction");
+ private static HRegionServer server;
+
+ public PriorityFunction(HRegionServer server, final Configuration conf)
+ throws MasterNotRunningException, ZooKeeperConnectionException {
+ if (PriorityFunction.server == null) {
+ PriorityFunction.server = server;
+ }
+ /**
+ * Enable the QosRegionObserver here.
+ */
+ if (enable) {
+ String cops = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
+ if (cops == null || cops.equals("")) {
+ cops = "org.apache.hadoop.hbase.ipc.QosRegionObserver";
+ } else {
+ cops += ",org.apache.hadoop.hbase.ipc.QosRegionObserver";
+ }
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, cops);
+ }
+ }
+
+ /**
+ * Set table priority.
+ * @param priority the priority you want to set
+ * @param des the table descriptor
+ * @return return the modified descriptor
+ * @throws Exception
+ */
+ public static HTableDescriptor setPriority(String priority,
+ HTableDescriptor des) throws Exception {
+ Integer pri = Integer.parseInt(priority);
+ if (pri < 1 || pri > 10)
+ throw new NumberFormatException(
+ "Table priority should be [1 to 10].");
+ des.setValue(PRI_KEY, Bytes.toBytes(priority));
+ return des;
+ }
+ /**
+ * Set table priority.
+ * @param priority the priority you want to set
+ * @param des table descriptor
+ * @param op AperationPriority instance
+ * @return the prepared table descriptor
+ * @throws Exception
+ */
+ public static HTableDescriptor setPriority(String priority,
+ HTableDescriptor des, OperationPriority op) throws Exception {
+ Integer pri = Integer.parseInt(priority);
+ if (pri < 1 || pri > 10)
+ throw new NumberFormatException(
+ "Table priority should be between [1 to 10].");
+ setTableActionPriorities(des, op);
+ des.setValue(PRI_KEY, Bytes.toBytes(priority));
+ return des;
+ }
+ /**
+ * Set table priority.
+ * @param priority the priority you want to set
+ * @param des the table descriptor
+ * @param op OperationPriority instance
+ * @return
+ * @throws Exception
+ */
+ public static HTableDescriptor setPriority(int priority,
+ HTableDescriptor des, OperationPriority op) throws Exception {
+ if (priority < 1 || priority > 10) {
+ throw new NumberFormatException(
+ "Table priority should be between [1 to 10].");
+ }
+ setTableActionPriorities(des, op);
+ des.setValue(PRI_KEY, Bytes.toBytes(priority));
+ return des;
+ }
+
+ /**
+ * Set table priority.
+ * @param priority the priority you want to set
+ * @param des the table descriptor
+ * @return
+ * @throws Exception
+ */
+ public static HTableDescriptor setPriority(int priority, HTableDescriptor des)
+ throws Exception {
+ if (priority < 1 || priority > 10) {
+ throw new NumberFormatException(
+ "Table priority should be between [1 to 10].");
+ }
+ des.setValue(PRI_KEY, Bytes.toBytes(priority + ""));
+ return des;
+ }
+
+ /**
+ * Get action priorities from table descriptor.
+ *
+ * @param des the table descriptor
+ * @return
+ */
+ public static OperationPriority getActionPriorities(HTableDescriptor des) {
+ byte[] op = des.getValue(OPERATION_PRIORITY_KEY);
+ OperationPriority ret = new OperationPriority();
+ if (op != null) {
+ try {
+ ret.fromBytes(op);
+ } catch (Exception e) {
+ LOG.error("This table has wrong action priorities,"
+ + "please use PriorityFunction.setPriority(int priority,"
+ + "HTableDescriptor des, OperationPriority op)"
+ + " to set table's action priority");
+ }
+ }
+ return ret;
+ }
+ /**
+ * Set action priorities for table descriptor.
+ * @param des the table descriptor
+ * @param op OperationPriority instance
+ */
+ public static void setTableActionPriorities(HTableDescriptor des,
+ OperationPriority op) {
+ des.setValue(PriorityFunction.OPERATION_PRIORITY_KEY,
+ op.toBytes());
+ }
+ /**
+ * Initialize the scanner's priority.
+ * @param call
+ * @param value scanner id
+ */
+ public void initScannerPriority(Invocation call, Object value) {
+ Long id = (Long) value;
+ byte[] region = (byte[]) call.getParameters()[0];
+ String regionN = Bytes.toString(region);
+ Integer pri = regionPriMap.get(regionN);
+ Scan scan=(Scan) call.getParameters()[1];
+ if (pri == null) {
+ pri=initRegionPriority(regionN, false);
+ }
+ OperationPriority action = regionActionPriorities.get(regionN);
+ if (action != null)
+ {
+ pri += action.getScanPriority();
+ }
+ scanToId.put(scan, id);
+ scannerPriMap.put(id, pri);
+ }
+ @Override
+ public Integer apply(Writable from) {
+ return getCallPriority(from);
+ }
+
+ /**
+ * Remove the cached region priorities.
+ * @param region
+ */
+ public static void cleanRegionPriority(HRegion region) {
+ String regionN = region.getRegionNameAsString();
+ if(regionN!=null)
+ {
+ regionPriMap.remove(regionN);
+ regionActionPriorities.remove(regionN);
+ }
+ }
+ /**
+ * Remove closed scanners.
+ * @param s the scanner id to be removed
+ */
+ public static void removeScanner(Scan s) {
+ Long id = null;
+ try {
+ id=scanToId.get(s);
+ scanToId.remove(s);
+ scannerPriMap.remove(id);
+ } catch (Exception e) {
+ LOG.error("scanner id:" + id +" scan:"+s+ " doesn't exist.");
+ }
+ }
+
+ /**
+ * Get the priority of the call
+ *
+ * @param call
+ * @return the priority
+ */
+ private int getCallPriority(Writable param) {
+ Invocation invo = (Invocation) param;
+ // if this is a next() call, we should find the priority of the scanner
+ // which
+ // is initiated by initScannerPriority(Invocation call, Object value)
+ if (invo.getMethodName().endsWith("next")) {
+ Long scanId = (Long) invo.getParameters()[0];
+ Integer pri = scannerPriMap.get(scanId);
+ if (pri == null) {
+ LOG.error("the scanner didn't get it's priority:" + scanId);
+ pri = DEFAULT_PRI;
+ }
+ return pri;
+ } else if (invo.getMethodName().endsWith("multi")) {
+ // if it's a multi call,we use the first action of the in
+ // the MutiAction map to decide the priority of this call.
+ MultiAction> multi = (MultiAction>) invo.getParameters()[0];
+ for (Entry e : multi.actions.entrySet()) {
+ String regionN = Bytes.toString(e.getKey());
+ Integer pri = getRegionPri(regionN);
+ OperationPriority op;
+ if ((op = regionActionPriorities.get(regionN)) != null) {
+ Action> action = ((List>) e.getValue()).get(0);
+ if (action != null) {
+ if (action.getAction() instanceof Operation) {
+ pri += op.getPriority((Operation) action.getAction());
+ }
+ // Actually there a Exec class doesn't extends Operation class.
+ // We don't how to classify this operation yet.
+ }
+ }
+ return pri;
+ }
+ return DEFAULT_PRI;
+ } else if (invo.getMethodName().endsWith("increment")
+ || invo.getMethodName().endsWith("incrementColumnValue")) {
+ // due to the Increment class does not inherit the Operation class,
+ // we deal with it respectively.
+ byte[] region = (byte[]) invo.getParameters()[0];
+ String regionN = Bytes.toString(region);
+ Integer pri = getRegionPri(regionN);
+ OperationPriority op;
+ if ((op = regionActionPriorities.get(regionN)) != null) {
+ pri += op.getMutationPriority();
+ }
+ return pri;
+ }
+ Object params[] = invo.getParameters();
+ // Deal with the subclasses of Operation
+ if (params.length >= 2 && params[0] instanceof byte[]
+ && params[params.length - 1] instanceof Operation) {
+ // If it's openScanner operation.
+ if (params[params.length - 1] instanceof Scan) {
+ return HIGH_PRI;
+ }
+ Operation ope = (Operation) params[params.length - 1];
+ byte[] region = (byte[]) params[0];
+ String regionN = Bytes.toString(region);
+ Integer pri = getRegionPri(regionN);
+ OperationPriority op;
+ if ((op = regionActionPriorities.get(regionN)) != null) {
+ pri += op.getPriority(ope);
+ }
+ return pri;
+ }
+ return DEFAULT_PRI;
+ }
+
+ private int getRegionPri(String regionN) {
+ Integer pri = regionPriMap.get(regionN);
+ if (pri == null) {
+ pri = initRegionPriority(regionN, false);
+ }
+ if (pri == null) {
+ return DEFAULT_PRI;
+ }
+ return pri;
+ }
+
+ /**
+ * Get region priority.
+ *
+ * @param region the region you want to get priority
+ * @return the region's priority
+ */
+ public static int initRegionPriority(HRegion region) {
+ if (region == null || region.getRegionNameAsString() == null) {
+ return DEFAULT_PRI;
+ }
+ String regionN = region.getRegionNameAsString();
+ int pri = DEFAULT_PRI;
+ if (region.getRegionInfo().isMetaRegion()
+ || region.getRegionInfo().isRootRegion()) {
+ pri = HIGHEST_PRI;
+ }
+ else if (region.getTableDesc().getValue(PRI_KEY) != null) {
+ try {
+ pri = Integer.parseInt(Bytes.toString(region.getTableDesc().getValue(
+ PRI_KEY)));
+ } catch (Exception e) {
+ LOG.error("Table " + region.getTableDesc().getNameAsString()
+ + " has a wrong priority");
+ }
+ }
+ OperationPriority op = getActionPriorities(region
+ .getTableDesc());
+ regionActionPriorities.put(regionN, op);
+ regionPriMap.put(regionN, pri);
+ return pri;
+ }
+
+ /**
+ * Get region priority.
+ *
+ * @param region
+ * the region you want to get priority
+ * @param force
+ * force to refresh priority, if true,the method will get priority
+ * from region's table descriptor,or it will look from the cache.
+ * @return the region's priority
+ */
+ private static int initRegionPriority(String region, boolean force) {
+ if (!force) {
+ Integer ret = regionPriMap.get(region);
+ if (ret != null) {
+ return ret;
+ }
+ }
+ HRegion hr = server.getOnlineRegion(Bytes.toBytes(region));
+ return initRegionPriority(hr);
+ }
+
+ /**
+ * Print out region priorities.
+ */
+ public static void printRegionPriority()
+ {
+ String out = "Region Priories:";
+ out+=regionPriMap.toString();
+ LOG.debug(out);
+ }
+
+}