Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java (revision 1213130)
+++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java (working copy)
@@ -174,7 +174,7 @@
// In this test, there is only a single coprocessor (BuggyMasterObserver).
String coprocessorName =
BuggyMasterObserver.class.getName();
- assertTrue(master.getLoadedCoprocessors().equals("[" + coprocessorName + "]"));
+ assertTrue(master.getLoadedCoprocessors().indexOf(coprocessorName)!=-1);
HTableDescriptor htd1 = new HTableDescriptor(TEST_TABLE1);
htd1.addFamily(new HColumnDescriptor(TEST_FAMILY1));
@@ -202,7 +202,7 @@
masterTracker.masterZKNodeWasDeleted);
String loadedCoprocessors = master.getLoadedCoprocessors();
- assertTrue(loadedCoprocessors.equals("[" + coprocessorName + "]"));
+ assertTrue(loadedCoprocessors.indexOf(coprocessorName)!=-1);
// Verify that BuggyMasterObserver has been removed due to its misbehavior
// by creating another table: should not have a problem this time.
Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java (revision 1213130)
+++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java (working copy)
@@ -186,10 +186,9 @@
// Test (part of the) output that should have be printed by master when it aborts:
// (namely the part that shows the set of loaded coprocessors).
// In this test, there is only a single coprocessor (BuggyMasterObserver).
- assertTrue(master.getLoadedCoprocessors().
- equals("[" +
- TestMasterCoprocessorExceptionWithAbort.BuggyMasterObserver.class.getName() +
- "]"));
+ assertTrue(master.getLoadedCoprocessors().indexOf(
+ TestMasterCoprocessorExceptionWithAbort.BuggyMasterObserver.class.getName()
+ )!=-1);
CreateTableThread createTableThread = new CreateTableThread(UTIL);
Index: src/test/java/org/apache/hadoop/hbase/ipc/TestActionPriority.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/ipc/TestActionPriority.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/ipc/TestActionPriority.java (revision 0)
@@ -0,0 +1,268 @@
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.util.Random;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.PriorityJobQueue.ActionPriorities;
+import org.apache.hadoop.hbase.ipc.TestTablePriority.Worker;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for action priority, use Action_Priority table and scan and put with
+ * different priorities. There are two tests,in first one Get's priority is
+ * 0(high priority, RPC priority = table priority 0+ action priority 0)
+ * scan's priority is 10 (low priority)and in the second test,we switch the
+ * priorities of scan and get.
+ */
+@Category(LargeTests.class)
+public class TestActionPriority {
+ private final static Log LOG = LogFactory.getLog(TestActionPriority.class);
+ static final Random r = new Random();
+ static final int rowNubmer = 10000;
+ static final int threadN = 100;
+ HBaseAdmin admin = null;
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean(PriorityFunction.PRI_ENABLE_KEY, true);
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * set up a cluster and prepare tables.
+ */
+ @Before
+ public void setUp() {
+ try {
+ admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ HTableDescriptor des;
+ byte[][] startKeys = new byte[][] { Bytes.toBytes("0"), Bytes.toBytes("4") };
+ try {
+ if (admin.tableExists("Action_Priority")) {
+ admin.disableTable("Action_Priority");
+ admin.deleteTable("Action_Priority");
+ }
+ des = new HTableDescriptor("Action_Priority");
+ des.addFamily(new HColumnDescriptor("ff"));
+ des.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY,
+ new ActionPriorities(10, 0, 0, 0).toBytes());
+ des.setValue(PriorityFunction.PRI_KEY, Bytes.toBytes(0 + ""));
+ admin.createTable(des, startKeys);
+
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ writeData();
+ }
+
+ private void writeData() {
+ LOG.info("begion write data into test table");
+ int nPerWorker = rowNubmer / threadN;
+ Worker[] workers = new Worker[threadN];
+ for (int i = 0; i < workers.length; i++) {
+ try {
+ workers[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),
+ "Action_Priority"), "writeData", nPerWorker);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < workers.length; i++) {
+ workers[i].start();
+ }
+ for (int i = 0; i < workers.length; i++) {
+ try {
+ workers[i].join();
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ LOG.info("Write data into test table finished.");
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() {
+
+ }
+
+ /**
+ * Verify whether the function take effect
+ *
+ * @param highs high priority workers
+ * @param lows low priority workers
+ */
+
+ @SuppressWarnings("static-access")
+ public void verifyFunction(Worker highs[], Worker lows[]) {
+ boolean highFinished = false;
+ boolean lowFinished = false;
+ long highThroughPut = 0;
+ long lowThroughPut = 0;
+ while (!(highFinished && lowFinished)) {
+ try {
+ Thread.currentThread().sleep(200);
+ } catch (InterruptedException e) {
+ }
+ highThroughPut = 0;
+ lowThroughPut = 0;
+ for (int i = 0; i < highs.length; i++) {
+ highThroughPut += highs[i].getThroughput();
+ lowThroughPut += lows[i].getThroughput();
+ }
+ LOG.info("-------------------------------------------------------------");
+ LOG.info("High priority action type is:" + highs[0].getType()
+ + ", priority:" + highs[0].getActionPriority() + " throughput is:"
+ + highThroughPut);
+ LOG.info("low priority action type is:" + lows[0].getType()
+ + ", priority:" + lows[0].getActionPriority() + " throughput is:"
+ + lowThroughPut);
+
+ highFinished = true;
+ lowFinished = true;
+ for (int i = 0; i < highs.length; i++) {
+ if (highs[i].isAlive()) {
+ highFinished = false;
+ }
+ if (lows[i].isAlive()) {
+ lowFinished = false;
+ }
+ }
+ if (highFinished) {
+ for (int i = 0; i < highs.length; i++) {
+ lows[i].stopWorker();
+ }
+ lowFinished = true;
+ }
+ if (lowFinished) {
+ for (int i = 0; i < highs.length; i++) {
+ highs[i].stopWorker();
+ }
+ highFinished = true;
+ }
+
+ }
+ highThroughPut = 0;
+ lowThroughPut = 0;
+ for (int i = 0; i < highs.length; i++) {
+ highThroughPut += highs[i].getThroughput();
+ lowThroughPut += lows[i].getThroughput();
+ }
+ assertTrue("Action priority works properly", highThroughPut > lowThroughPut);
+ LOG.info("-------------------------------------------------------------");
+ LOG.info("---------------------Test finished --------------------------");
+ LOG.info("High priority action type is:" + highs[0].getType()
+ + ", priority:" + highs[0].getActionPriority() + " throughput is:"
+ + highThroughPut);
+ LOG.info("low priority action type is:" + lows[0].getType() + ", priority:"
+ + lows[0].getActionPriority() + " throughput is:" + lowThroughPut);
+ LOG.info("####### Test for " + highs[0].getType() + ", priority:"
+ + highs[0].getActionPriority() + " ,and " + lows[0].getType()
+ + ", priority:" + lows[0].getActionPriority() + " finished####");
+
+ LOG.info("-------------------------------------------------------------");
+ }
+
+ /**
+ * run the test;
+ */
+ @Test
+ public void testForDifferentActionPriority() {
+ Worker puts[] = new Worker[threadN / 2];
+ Worker scans[] = new Worker[threadN / 2];
+ for (int i = 0; i < puts.length; i++) {
+ try {
+ puts[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),
+ "Action_Priority"), "put", rowNubmer * 2 / threadN);
+ scans[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),
+ "Action_Priority"), "scan", rowNubmer * 2 / threadN);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < puts.length; i++) {
+ puts[i].setActionPriority(0);
+ puts[i].start();
+ scans[i].start();
+ scans[i].setActionPriority(10);
+ }
+ verifyFunction(puts, scans);
+
+ try {
+ admin.disableTable("Action_Priority");
+ HTableDescriptor des1 = admin.getTableDescriptor(Bytes
+ .toBytes("Action_Priority"));
+ des1.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY,
+ new ActionPriorities(0, 10, 10, 10).toBytes());
+ admin.modifyTable(Bytes.toBytes("Action_Priority"), des1);
+ admin.enableTable("Action_Priority");
+ } catch (IOException e) {
+ LOG.info(e);
+ }
+ puts = new Worker[threadN / 2];
+ scans = new Worker[threadN / 2];
+ for (int i = 0; i < puts.length; i++) {
+ try {
+
+ puts[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),
+ "Action_Priority"), "put", rowNubmer * 2 / threadN);
+ scans[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),
+ "Action_Priority"), "scan", rowNubmer * 2 / threadN);
+ puts[i].setActionPriority(10);
+ scans[i].setActionPriority(0);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < puts.length; i++) {
+ puts[i].start();
+ scans[i].start();
+ }
+ verifyFunction(scans, puts);
+ }
+
+ public static void main(String args[]) {
+
+ try {
+ setUpBeforeClass();
+ TestActionPriority t = new TestActionPriority();
+ t.setUp();
+ t.testForDifferentActionPriority();
+ tearDownAfterClass();
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+
+ }
+
+}
Index: src/test/java/org/apache/hadoop/hbase/ipc/TestPriorityJobQueue.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/ipc/TestPriorityJobQueue.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/ipc/TestPriorityJobQueue.java (revision 0)
@@ -0,0 +1,152 @@
+/**
+ * Copyright the Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.ipc.PriorityJobQueue;
+import org.apache.hadoop.hbase.ipc.PriorityJobQueue.Job;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test used to ensure the PriorityJobQueue works properly
+ *
+ */
+@Category(SmallTests.class)
+public class TestPriorityJobQueue {
+ private final Log LOG = LogFactory.getLog(TestPriorityJobQueue.class);
+ private static final int lowestPriority = 10;
+ private static final int queueSize = 1000;
+ private static final int testThreadN = 50;
+ private static final int testTimes = 3000;
+
+ public static void main(String args[]) {
+
+ // junit.textui.TestRunner.run(TestForPriorityJobQueue.class);
+ }
+
+ /**
+ * test for ensure the queue refreshed the jobs' priority
+ */
+ @SuppressWarnings("static-access")
+ public void testQueueRefresh() {
+ final PriorityJobQueue queue = new PriorityJobQueue(
+ queueSize, lowestPriority, null,HBaseConfiguration.create());
+ for (int i = 0; i < 10; i++) {
+ queue.add(i + "", 10);
+ }
+
+ boolean match = true;
+ queue.refresh();
+ queue.refresh();
+ try {
+ Thread.currentThread().sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ for (Object j : queue.toArray()) {
+ Job> job = (Job>) j;
+ LOG.debug("Job is:" + job);
+ if (job.getPriority() >= 10)
+ match = false;
+ }
+ assertTrue("calls' priorities are refreshed", match);
+ }
+
+ /**
+ * test the queue's function: threads (not with the lowest priority)
+ * can only get a job which have a higher priority And test the number which
+ * input jobs is equal with the output number
+ */
+ @Test
+ public void testGetJob() {
+ final Random r = new Random();
+ final PriorityJobQueue queue = new PriorityJobQueue(
+ queueSize, lowestPriority, null,HBaseConfiguration.create());
+ Thread consumer[] = new Thread[testThreadN];
+ int pri = 1;
+
+ for (int i = 0; i < consumer.length; i++, pri++) {
+ consumer[i] = new Thread(i + "") {
+ public void run() {
+ while (true) {
+ Job j = null;
+ try {
+ j = queue.getJob(PriorityFunction.priTrans(this.getPriority()));
+ } catch (InterruptedException e) {
+ }
+ if ((Math.abs(this.getPriority() - 10) + 1) != lowestPriority) {
+ assertTrue(
+ "Thread get a job which have a higher priority",
+ j.getPriority() <= (Math.abs(this.getPriority() - 10) + 1));
+ LOG.debug("job:" + j.getPriority() + " handler:"
+ + (Math.abs(this.getPriority() - 10) + 1));
+ }
+ }
+ }
+ };
+ if (pri >= 11)
+ pri = 1;
+ consumer[i].setDaemon(true);
+ consumer[i].setPriority(pri);
+ consumer[i].start();
+ }
+
+ Thread producer[] = new Thread[testThreadN];
+ for (int i = 0; i < producer.length; i++) {
+ producer[i] = new Thread(i + "") {
+ public void run() {
+ for (int j = 0; j < testTimes; j++) {
+ int jobpri = (r.nextInt(19) - 4);
+ queue.add("" + jobpri, jobpri);
+ }
+ }
+ };
+ producer[i].start();
+ }
+ for (Thread d : producer) {
+ try {
+ d.join();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ while (queue.size() != 0) {
+ try {
+ Thread.sleep(30);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ boolean match = true;
+ for (int i = 0; i < 10; i++) {
+ if (queue.getPriorityAddTimes()[i] != queue.getOrgPriorityGetTimes()[i]) {
+ match = false;
+ }
+ }
+ queue.printMetrix();
+ assertTrue("all job are finished", match);
+ }
+}
Index: src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriority.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriority.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriority.java (revision 0)
@@ -0,0 +1,561 @@
+/**
+ * Copyright the Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.util.Random;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.PriorityJobQueue.ActionPriorities;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for table priority, use table A,B,C which have different priorities.
+ * There are two tests,in first one table A's priority is 1(high priority), B is
+ * 5, C is 10 and in the second test,we switch the priorities of A and C.
+ */
+@Category(LargeTests.class)
+public class TestTablePriority {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static Log LOG = LogFactory.getLog(TestTablePriority.class);
+ static final Random r = new Random();
+ static final int rowNubmer = 10000;
+ static final int threadN = 150;
+ HBaseAdmin admin = null;
+ // if there are data in the test tables,you
+ // can set method type to "scan"
+ static String method = "put";
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean(PriorityFunction.PRI_ENABLE_KEY, true);
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * set up a cluster and prepare tables.
+ */
+ @Before
+ public void setUp() {
+ try {
+ admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ HTableDescriptor des;
+ byte[][] startKeys = new byte[][] { Bytes.toBytes("0"), Bytes.toBytes("4") };
+ try {
+ if (admin.tableExists("Table_A")) {
+ admin.disableTable("Table_A");
+ admin.deleteTable("Table_A");
+ }
+ if (admin.tableExists("Table_B")) {
+ admin.disableTable("Table_B");
+ admin.deleteTable("Table_B");
+ }
+ if (admin.tableExists("Table_C")) {
+ admin.disableTable("Table_C");
+ admin.deleteTable("Table_C");
+ }
+ } catch (IOException e1) {
+ LOG.error(e1);
+ }
+ try {
+ des = new HTableDescriptor("Table_A");
+ des.addFamily(new HColumnDescriptor("ff"));
+ des = PriorityFunction.setPriority(1, des);
+ admin.createTable(des, startKeys);
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+ des = new HTableDescriptor("Table_B");
+ des.addFamily(new HColumnDescriptor("ff"));
+ des = PriorityFunction.setPriority(5, des);
+ admin.createTable(des, startKeys);
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+
+ des = new HTableDescriptor("Table_C");
+ des.addFamily(new HColumnDescriptor("ff"));
+ des = PriorityFunction.setPriority(10, des);
+ admin.createTable(des, startKeys);
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ }
+
+ /**
+ * The worker used for test throughput and gather the result
+ */
+ public static class Worker extends Thread {
+ HTable table;
+ String type;
+ private long throughput = 0;
+ long times = 1000;
+ boolean stopFlag = false;
+ int tablePriority;
+ long n = 0;
+ long startTime = 0;
+ int actionPriority = 0;
+
+ /**
+ * get throughput of this worker
+ *
+ * @return
+ */
+ public long getThroughput() {
+ long end = System.currentTimeMillis();
+ throughput = (n * 1000) / (end - startTime + 1);
+ return throughput;
+ }
+
+ /**
+ * get table priority
+ *
+ * @return table priority
+ */
+ public int getTablePriority() {
+ return tablePriority;
+ }
+
+ /**
+ * set table priority
+ */
+ public void setTablePriority(int tablePriority) {
+ this.tablePriority = tablePriority;
+ }
+
+ /**
+ * get action type
+ *
+ * @return action type of the worker
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * the table used by the worker
+ *
+ * @return table of the worker
+ */
+ public HTable getTable() {
+ return table;
+ }
+
+ public Worker(HTable t, String type, long times) {
+ this.table = t;
+ this.type = type;
+ this.times = times;
+ }
+
+ public void run() {
+ try {
+ doAction(table, type, times);
+ } catch (IOException e) {
+ LOG.info(e);
+ }
+ }
+
+ public int getActionPriority() {
+ return actionPriority;
+ }
+
+ public void setActionPriority(int actionPriority) {
+ this.actionPriority = actionPriority;
+ }
+
+ /**
+ * stop the worker
+ */
+ public void stopWorker() {
+ this.stopFlag = true;
+ }
+
+ private void doAction(HTable t, String type, long times) throws IOException {
+ long start = System.currentTimeMillis();
+ long end = System.currentTimeMillis();
+ startTime = System.currentTimeMillis();
+ int pri = Integer.parseInt(Bytes.toString(t.getTableDescriptor()
+ .getValue(PriorityFunction.PRI_KEY)));
+ Scan s = new Scan();
+ s.setStartRow(Bytes.toBytes(r.nextInt(10)+""));
+ if (type.equals("writeData")) {
+ t.setAutoFlush(false);
+ t.setWriteBufferSize(1000);
+ }
+ ResultScanner sn = null;
+ boolean scan = false;
+ if (type.equals("scan"))
+ {
+ sn = t.getScanner(s);
+ scan = true;
+ }
+ while (true) {
+ if (n % 10000 == 0) {
+ end = System.currentTimeMillis();
+ LOG.debug("Thread:" + this.getId() + " type:" + type + " table"
+ + t.getTableDescriptor().getNameAsString() + " pri :" + pri
+ + " time:" + (end - start) + " total :" + n + " throughput:"
+ + (n * 1000) / (end - startTime + 1));
+ throughput = (n * 1000) / (end - startTime + 1);
+ start = end;
+ }
+ if (n % 100 == 0) {
+ end = System.currentTimeMillis();
+ throughput = (n * 1000) / (end - startTime + 1);
+ }
+ Result ret = null;
+ if (scan) {
+ ret = sn.next();
+ if (ret == null) {
+ try {
+ sn.close();
+ } catch (Exception e) {
+ LOG.debug(e);
+ }
+ sn = t.getScanner(s);
+ }
+ } else {
+ put(t, n);
+ }
+ if (stopFlag || n > times) {
+ break;
+ }
+ n++;
+ }
+ t.flushCommits();
+ t.close();
+ }
+ }
+
+ /**
+ * Verify whether the function take effect
+ *
+ * @param highs high priority workers
+ * @param lows low priority workers
+ */
+ @SuppressWarnings("static-access")
+ public void verifyFunction(Worker high[], Worker middle[], Worker low[]) {
+ boolean highFinished = false;
+ boolean lowFinished = false;
+ boolean middleFinished = false;
+ long highThroughPut = 0;
+ long middleThroughPut = 0;
+ long lowThroughPut = 0;
+ while (!(highFinished && lowFinished && middleFinished)) {
+ try {
+ Thread.currentThread().sleep(100);
+ } catch (InterruptedException e) {
+ // ignore exceptions
+ }
+ highThroughPut = 0;
+ middleThroughPut = 0;
+ lowThroughPut = 0;
+ for (int i = 0; i < high.length; i++) {
+ highThroughPut += high[i].getThroughput();
+ lowThroughPut += low[i].getThroughput();
+ middleThroughPut += middle[i].getThroughput();
+ }
+ LOG.info("-------------------------------------------------------------");
+ try {
+ LOG.info("high priority table is: "
+ + high[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + high[0].getType() + ", priority:"
+ + high[0].getTablePriority() + " throughput is:" + highThroughPut);
+ LOG.info("middle priority table is: "
+ + middle[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + middle[0].getType() + ", priority:"
+ + middle[0].getTablePriority() + " throughput is:"
+ + middleThroughPut);
+ LOG.info("low priority table is: "
+ + low[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + low[0].getType() + ", priority:"
+ + low[0].getTablePriority() + " throughput is:" + lowThroughPut);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+
+ highFinished = true;
+ lowFinished = true;
+ middleFinished = true;
+ for (int i = 0; i < high.length; i++) {
+ if (high[i].isAlive()) {
+ highFinished = false;
+ }
+ if (low[i].isAlive()) {
+ lowFinished = false;
+ }
+ if (middle[i].isAlive()) {
+ middleFinished = false;
+ }
+ }
+ if (highFinished) {
+ for (int i = 0; i < high.length; i++) {
+ low[i].stopWorker();
+ }
+ for (int i = 0; i < high.length; i++) {
+ middle[i].stopWorker();
+ }
+ middleFinished = true;
+ lowFinished = true;
+ }
+ if (lowFinished) {
+ for (int i = 0; i < high.length; i++) {
+ high[i].stopWorker();
+ }
+ for (int i = 0; i < high.length; i++) {
+ middle[i].stopWorker();
+ }
+ middleFinished = true;
+ highFinished = true;
+ }
+ if (middleFinished) {
+ for (int i = 0; i < high.length; i++) {
+ high[i].stopWorker();
+ }
+ for (int i = 0; i < high.length; i++) {
+ low[i].stopWorker();
+ }
+ lowFinished = true;
+ highFinished = true;
+ }
+
+ }
+ highThroughPut = 0;
+ middleThroughPut = 0;
+ lowThroughPut = 0;
+ for (int i = 0; i < high.length; i++) {
+ highThroughPut += high[i].getThroughput();
+ lowThroughPut += low[i].getThroughput();
+ middleThroughPut += middle[i].getThroughput();
+ }
+
+ assertTrue("Action priority works properly",
+ highThroughPut > middleThroughPut && middleThroughPut > lowThroughPut);
+ LOG.info("-------------------------------------------------------------");
+ LOG.info("---------------------Test finished --------------------------");
+
+ try {
+ LOG.info("high priority table is: "
+ + high[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + high[0].getType() + ", priority:"
+ + high[0].getTablePriority() + " throughput is:" + highThroughPut);
+ LOG.info("middle priority table is: "
+ + middle[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + middle[0].getType() + ", priority:"
+ + middle[0].getTablePriority() + " throughput is:" + middleThroughPut);
+ LOG.info("low priority table is: "
+ + low[0].getTable().getTableDescriptor().getNameAsString()
+ + " action type is:" + low[0].getType() + ", priority:"
+ + low[0].getTablePriority() + " throughput is:" + lowThroughPut);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ LOG.info("####### Test for " + high[0].getType() + ", priority:"
+ + high[0].getTablePriority() + " " + middle[0].getType()
+ + ", priority:" + middle[0].getTablePriority() + " ,and "
+ + low[0].getType() + ", priority:" + low[0].getTablePriority()
+ + " finished####");
+
+ LOG.info("-------------------------------------------------------------");
+ }
+ private void changePriority(int pria,int prib,int pric)
+ {
+ try {
+ admin.disableTable("Table_A");
+ HTableDescriptor des1 = admin
+ .getTableDescriptor(Bytes.toBytes("Table_A"));
+ des1.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY,
+ new ActionPriorities(0, 0, 0, 0).toBytes());
+ des1 = PriorityFunction.setPriority(pria, des1);
+ admin.modifyTable(Bytes.toBytes("Table_A"), des1);
+ admin.enableTable("Table_A");
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+ admin.disableTable("Table_B");
+ HTableDescriptor des1 = admin
+ .getTableDescriptor(Bytes.toBytes("Table_B"));
+ des1.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY,
+ new ActionPriorities(0, 0, 0, 0).toBytes());
+ des1 = PriorityFunction.setPriority(prib, des1);
+ admin.modifyTable(Bytes.toBytes("Table_B"), des1);
+ admin.enableTable("Table_B");
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ try {
+ admin.disableTable("Table_C");
+ HTableDescriptor des1 = admin
+ .getTableDescriptor(Bytes.toBytes("Table_C"));
+ des1.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY,
+ new ActionPriorities(0, 0, 0, 0).toBytes());
+ des1 = PriorityFunction.setPriority(pric, des1);
+ admin.modifyTable(Bytes.toBytes("Table_C"), des1);
+ admin.enableTable("Table_C");
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ }
+ /**
+ * start the test.
+ */
+ @Test(timeout = 180000)
+ public void testForDifferentTablePriority() {
+ method="put";
+ doTestJob();
+ method="scan";
+ changePriority(1,5,10);
+ doTestJob();
+ }
+
+
+ private void doTestJob()
+ {
+ Worker A[] = new Worker[threadN / 3];
+ Worker B[] = new Worker[threadN / 3];
+ Worker C[] = new Worker[threadN / 3];
+ for (int i = 0; i < A.length; i++) {
+ try {
+ A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"),
+ method, rowNubmer * 3 / threadN);
+ B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"),
+ method, rowNubmer * 3 / threadN);
+ C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"),
+ method, rowNubmer * 3 / threadN);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < A.length; i++) {
+ A[i].setTablePriority(1);
+ B[i].setTablePriority(5);
+ C[i].setTablePriority(10);
+ A[i].start();
+ B[i].start();
+ C[i].start();
+ }
+ verifyFunction(A, B, C);
+ changePriority(10,5,1);
+ A = new Worker[threadN / 3];
+ B = new Worker[threadN / 3];
+ C = new Worker[threadN / 3];
+ for (int i = 0; i < A.length; i++) {
+ try {
+ A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"),
+ method, rowNubmer * 3 / threadN);
+ B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"),
+ method, rowNubmer * 3 / threadN);
+ C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"),
+ method, rowNubmer * 3 / threadN);
+ } catch (IOException e) {
+ }
+ }
+ for (int i = 0; i < A.length; i++) {
+ A[i].setTablePriority(10);
+ B[i].setTablePriority(5);
+ C[i].setTablePriority(1);
+ A[i].start();
+ B[i].start();
+ C[i].start();
+ }
+ verifyFunction(C, B, A);
+ }
+ public static void printTable(String name)
+ {
+
+ try {
+ HTable A = new HTable(TEST_UTIL.getConfiguration(), name);
+ Scan s=new Scan();
+ ResultScanner rs;
+ rs = A.getScanner(s);
+ Result r=null;
+ while((r=rs.next())!=null)
+ {
+ System.out.println(r);
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+
+ }
+ public static void main(String args[]) {
+ try {
+ setUpBeforeClass();
+ TestTablePriority t = new TestTablePriority();
+ t.setUp();
+ tearDownAfterClass();
+ } catch (Exception e) {
+ LOG.info(e);
+ }
+ }
+
+ static byte[] b;
+ static {
+ String bs = "";
+ for (int i = 0; i < 100; i++) {
+ bs += i;
+ }
+ b = Bytes.toBytes(bs);
+ }
+ private static void put(HTable t, long i) {
+ Put p = new Put(
+ Bytes.toBytes("" + r.nextInt(1000) + (i) + r.nextInt(10000)));
+ p.add(Bytes.toBytes("ff"), Bytes.toBytes("ff"), b);
+ try {
+ t.put(p);
+ } catch (IOException e) {
+ LOG.info(e);
+ }
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1213130)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -2353,9 +2353,18 @@
RegionScanner s = scanners.remove(this.scannerName);
if (s != null) {
try {
+ HRegion region = getRegion(s.getRegionInfo().getRegionName());
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().preScannerClose(s);
+ }
+
s.close();
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerClose(s);
+ }
} catch (IOException e) {
- LOG.error("Closing scanner", e);
+ LOG.error("Closing scanner for "
+ + s.getRegionInfo().getRegionNameAsString(), e);
}
}
}
Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1213130)
+++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy)
@@ -208,7 +208,7 @@
private Listener listener = null;
protected Responder responder = null;
protected int numConnections = 0;
- private Handler[] handlers = null;
+ protected Handler[] handlers = null;
private Handler[] priorityHandlers = null;
protected HBaseRPCErrorHandler errorHandler = null;
@@ -244,7 +244,7 @@
}
/** A call queued for handling. */
- protected class Call implements Delayable {
+ class Call implements Delayable {
protected int id; // the client's call id
protected Writable param; // the parameter passed
protected Connection connection; // connection to client
@@ -1270,7 +1270,7 @@
}
/** Handles queued calls . */
- private class Handler extends Thread {
+ class Handler extends Thread {
private final BlockingQueue myCallQueue;
private MonitoredRPCHandler status;
Index: src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java (revision 0)
@@ -0,0 +1,120 @@
+/**
+ * Copyright the Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.ipc.Invocation;
+import org.apache.hadoop.hbase.ipc.WritableRpcEngine.Server;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * An IPC server extends HBaseServer,and add schedule function to make table priority take
+ * effect.
+ */
+public class PriorityHBaseServer extends Server {
+
+ /**
+ * Construct an RPC server.
+ *
+ * @param instance the instance whose methods will be called
+ * @param ifaces Define metrics for all methods in the given classes
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ * @param numHandlers the number of method handler threads to run
+ * @param priorityHandlerCount used by {@link HBaseServer}
+ * @param verbose whether each call should be logged
+ * @param highPriorityLevel used by {@link HBaseServer},decides actions of meta info will be
+ * handled by metaHandlers
+ * @throws IOException e
+ */
+ public PriorityHBaseServer(Object instance, final Class>[] ifaces,
+ Configuration conf, String bindAddress, int port, int numHandlers,
+ int priorityHandlerCount, boolean verbose, int highPriorityLevel)
+ throws IOException {
+ super(instance, ifaces, conf, bindAddress, port, numHandlers,
+ priorityHandlerCount, verbose, highPriorityLevel);
+ int maxQueueSize = this.conf.getInt("ipc.server.max.queue.size", 500);
+ int handlers=this.conf.getInt("hbase.regionserver.handler.count", 10);
+ callQueue = new PriorityJobQueue(maxQueueSize, handlers>=10?10:handlers,
+ new PriorityFunction((HRegionServer) instance, conf), conf);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Writable call(Class extends VersionedProtocol> protocol,
+ Writable param, long receivedTime, MonitoredRPCHandler status)
+ throws IOException {
+ Invocation call = (Invocation) param;
+ HbaseObjectWritable writable = (HbaseObjectWritable) super.call(protocol,
+ param, receivedTime, status);
+ if (call.getMethodName().endsWith("openScanner")) {
+ ((PriorityJobQueue) callQueue).getFuction().initScannerPriority(call,
+ writable.get());
+ }
+ return writable;
+ }
+ /**
+ * If handleSize is 20,and priorities of threads range from 1 to 10 the
+ * priority will be initialized to 10,9,8,7....1, 10,9,8,...2, 10 because Java
+ * thread use a priority rule which large number means higher priority. so
+ * this method will make the number of high priority threads is bigger than
+ * the lower ones. Because a thread can only handle the a job which have a
+ * higher priority than its own, the high priority jobs will have more threads
+ * to handle. Ps: the job priority is small number means higher priority,which
+ * is inverse of the thread priority, so there is a method {@PriorityFunction}
+ * priTrans do the priority transfer thing.
+ */
+ private int[] initPriorityArray(int handleSize) {
+ int[] priorityArray = new int[handleSize];
+ int minPriOfTurn = Thread.MIN_PRIORITY;
+ for (int i = 0, priNow = Thread.MAX_PRIORITY; i < handleSize; i++, priNow--) {
+ if (priNow < minPriOfTurn) {
+ priNow = Thread.MAX_PRIORITY;
+ minPriOfTurn++;
+ if (minPriOfTurn == Thread.MAX_PRIORITY) {
+ minPriOfTurn = Thread.MIN_PRIORITY;
+ }
+ }
+ priorityArray[i] = priNow;
+ }
+ return priorityArray;
+ }
+
+ @Override
+ public synchronized void startThreads() {
+ super.startThreads();
+ int[] priorityArray = initPriorityArray(handlers.length);
+ for (int i = 0; i < this.handlers.length; i++) {
+ handlers[i].setPriority(priorityArray[i]);
+ }
+ }
+ @Override
+ public void stop()
+ {
+ super.stop();
+ ((PriorityJobQueue) callQueue).stop();
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/ipc/QosRegionObserver.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/QosRegionObserver.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/ipc/QosRegionObserver.java (revision 0)
@@ -0,0 +1,60 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+
+public class QosRegionObserver extends BaseRegionObserver {
+ @Override
+ public void postOpen(ObserverContext e) {
+ HRegion region = e.getEnvironment().getRegion();
+ PriorityFunction.initRegionPriority(region);
+ }
+
+ @Override
+ public void postClose(ObserverContext e,
+ boolean abortRequested) {
+ HRegion region = e.getEnvironment().getRegion();
+ PriorityFunction.cleanRegionPriority(region);
+ }
+
+ @Override
+ public RegionScanner postScannerOpen(
+ final ObserverContext e, final Scan scan,
+ final RegionScanner s) {
+ PriorityFunction.addRegionScanner(scan, s);
+ return s;
+ }
+
+ @Override
+ public void postScannerClose(
+ final ObserverContext e,
+ final InternalScanner s) throws IOException {
+ PriorityFunction.removeScanner((RegionScanner) s);
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java (revision 0)
@@ -0,0 +1,749 @@
+/**
+ * Copyright the Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.InterruptedIOException;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Operation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Function;
+
+/**
+ * This queue is used by {@link PriorityHBaseServer} In this queue,generally
+ * thread can only get jobs which have same or higher priority than its thread
+ * priority.
+ * The thread which has the lowest priority can handle the jobs which has a
+ * lower priorities lower. Please use take and put to
+ * manipulate this queue.
+ *
+ * @param the class contained by the queue
+ */
+public class PriorityJobQueue implements BlockingQueue {
+ private static final Log LOG = LogFactory.getLog(PriorityJobQueue.class);
+ private final PriorityBlockingQueue> queue = new PriorityBlockingQueue>();
+ private int capacity = 100;
+ private int maxWait = 1000;
+ private boolean running = true;
+ final int lowestThreadPriority;
+ ReentrantLock readLock = new ReentrantLock();
+ Condition[] lockList ;
+ ReentrantLock addLock = new ReentrantLock();
+ Condition queueFull = addLock.newCondition();
+
+ private static int handleFreshInter = 8;
+ private static int move = Integer.SIZE - handleFreshInter;
+ private static int flush = 1;
+ private long[] priorityAddTimes = new long[10];
+ private long[] priorityGetTimes = new long[10];
+ private long[] orgPriorityGetTimes = new long[10];
+ private Function qosFunction;
+
+ /**
+ * get static data of add times
+ * @return the add times arranged by priority
+ */
+ public long[] getPriorityAddTimes() {
+ return priorityAddTimes;
+ }
+
+ /**
+ * get static data of get times
+ * @return the get times arranged by priority
+ */
+
+ public long[] getPriorityGetTimes() {
+ return priorityGetTimes;
+ }
+
+
+ /**
+ * get static data of get times
+ * @return the get times arranged by original priority
+ */
+ public long[] getOrgPriorityGetTimes() {
+ return orgPriorityGetTimes;
+ }
+
+ private void decreaseSize() {
+ this.addLock.lock();
+ if (queue.size() < this.capacity) {
+ this.queueFull.signal();
+ }
+ this.addLock.unlock();
+ }
+
+ private void add(Job j) throws InterruptedIOException {
+ int wait = 0;
+ while (queue.size() >= this.capacity) {
+ try {
+ addLock.lock();
+ this.queueFull.await(10, TimeUnit.MILLISECONDS);
+ wait++;
+ if (wait > this.maxWait)
+ break;
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("try add waiting interrupted");
+ }
+ finally{
+ addLock.unlock();
+ }
+ }
+ this.queue.add(j);
+ }
+ private Thread refresher = new Thread() {
+ public void run() {
+ while (running) {
+ refreshPeriodically();
+ try {
+ sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore this exception
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+ }
+ };
+ /**
+ * Stop the refresher.
+ */
+ public void stop() {
+ this.running = false;
+ this.refresher.interrupt();
+ }
+
+ /**
+ * Initialize a queue
+ *
+ * @param capacity
+ * the capacity of the queue,not a precision value,if queue size
+ * exceeds this value, workers which add jobs should wait,but if a
+ * thread waits too many turns (more than maxWait),it will add the
+ * job anyhow.
+ * @param lowestThreadPriority
+ * the lowest priority which worker threads have, the default
+ * priority is range from 1 to 10,small number means high priority.
+ * @param server
+ * the instance of PriorityHBaseServer
+ */
+ public PriorityJobQueue(int capacity, int lowestThreadPriority,
+ Function qosFunction, final Configuration conf) {
+ this.qosFunction = qosFunction;
+ // The handleFreshInter used to decide after how many RPC calls processed
+ // we increase the priority of the calls remained in this queue.
+ // 7 means after the 7th power of 2 RPC calls processed.
+ // we use ((flush << move) >>> move) == 0 to do mode.
+ // hbase.schedule.refreshinter:large number means lower priority calls will
+ // wait longer in this queue.
+ handleFreshInter = conf.getInt("hbase.schedule.refreshinter", 8);
+ handleFreshInter = handleFreshInter < 1 || handleFreshInter > 20 ? 8
+ : handleFreshInter;
+ move = Integer.SIZE - handleFreshInter;
+ this.capacity = capacity;
+ this.refresher.setDaemon(true);
+ this.refresher.start();
+ this.lowestThreadPriority = lowestThreadPriority;
+ lockList = new Condition[Thread.MAX_PRIORITY];
+ for (int i = 0; i < lockList.length; i++) {
+ lockList[i] = readLock.newCondition();
+ }
+ }
+
+ public PriorityFunction getFuction() {
+ return (PriorityFunction) this.qosFunction;
+ }
+
+ /**
+ * add a job to this queue
+ * @param call the job instance
+ * @param pri the job's priority
+ */
+ public void add(T call, int pri) {
+ try {
+ this.add(new Job(pri, call));
+ } catch (InterruptedIOException e) {
+ LOG.error(e);
+ }
+ singalHandler();
+ synchronized (this.priorityAddTimes) {
+ this.priorityAddTimes[this.getCondition(pri)]++;
+ }
+
+ }
+
+ /**
+ * get the size of the queue.
+ * @return the size of the queue
+ */
+ public int size() {
+ return this.queue.size();
+ }
+
+ /**
+ * get the size of the queue
+ * @return the size of the queue
+ */
+ public int queueSize() {
+ return queue.size();
+ }
+
+ private int getCondition(int pri) {
+ if (pri <= 10 && pri >= 1) {
+ return pri - 1;
+ } else if (pri > 10) {
+ return 9;
+ } else {
+ return 0;
+ }
+ }
+
+ private void singalHandler() {
+ readLock.lock();
+ Job jobt = queue.peek();
+ if (jobt != null) {
+ this.lockList[getCondition(jobt.priority)].signal();
+ }
+ readLock.unlock();
+ }
+
+ /**
+ * if handler's priority is lower than job's priority, then this handler can't
+ * get this job.
+ * @param job the job which worker want to get
+ * @param pri the worker thread's priority
+ * @return should the worker get this job
+ */
+ public boolean shouldWork(Job job, int pri) {
+ if (job == null)
+ return false;
+ return (pri >= job.priority)
+ || (job.priority < 1 && pri == 1)
+ || (job.priority > this.lowestThreadPriority && pri == this.lowestThreadPriority);
+ }
+
+ /**
+ * get a job from the queue ,this method will
+ * test whether the thread can get this job
+ * @param pri the worker thread's priority
+ * @return the job
+ * @throws InterruptedException
+ */
+ public T get(int pri) throws InterruptedException {
+ return getJob(pri).getCall();
+ }
+
+ /**
+ * get a job from the queue ,this method will
+ * test whether the thread can get this job
+ * @param pri the worker thread's priority
+ * @return the job
+ * @throws InterruptedException
+ */
+ public Job getJob(int pri) throws InterruptedException {
+ Job ret = null;
+ while (true) {
+ readLock.lock();
+ ret = queue.peek();
+ if (shouldWork(ret, pri)) {
+ ret = queue.take();
+ readLock.unlock();
+ break;
+ }
+ this.lockList[getCondition(pri)].await(100, TimeUnit.MILLISECONDS);
+ readLock.unlock();
+ }
+ if (ret.priority > pri && pri != this.lowestThreadPriority) {
+ LOG.error("The handle get a job priority is lower than its priority,this shouldn't happen"
+ + " job priority:" + ret.priority + " handler pri:" + pri);
+ }
+ this.singalHandler();
+ synchronized (this.priorityGetTimes) {
+ this.priorityGetTimes[this.getCondition(ret.priority)]++;
+ this.orgPriorityGetTimes[this.getCondition(ret.orgPri)]++;
+ }
+ this.decreaseSize();
+ return ret;
+ }
+
+ public void printMetrix() {
+ LOG.debug("size is :" + queue.size());
+ LOG.debug("capacity is :" + this.capacity);
+ String out = "Add times ";
+ for (int i = 0; i < this.priorityAddTimes.length; i++) {
+ out += " pri:" + (i + 1) + ":" + this.priorityAddTimes[i];
+ }
+ LOG.debug("priority request static:" + out);
+ out = "Get times(last priority)";
+ for (int i = 0; i < this.priorityGetTimes.length; i++) {
+ out += " pri:" + (i + 1) + ":" + this.priorityGetTimes[i];
+ }
+ LOG.debug("priority request static:" + out);
+ out = "Get times(org priority)";
+ for (int i = 0; i < this.priorityGetTimes.length; i++) {
+ out += " pri:" + (i + 1) + ":" + this.orgPriorityGetTimes[i];
+ }
+ LOG.debug("priority request static:" + out);
+ PriorityFunction.printRegionPriority();
+ }
+
+ /**
+ * refresh the priorities of the jobs which remaining in this queue,simply -1.
+ */
+ public void refresh() {
+ this.refresher.interrupt();
+ }
+
+ static int outputIndicator = 0;
+
+ private void refreshPeriodically() {
+ try {
+ if ((outputIndicator << 55) >>> 55 == 0) {
+ LOG.debug(Calendar.getInstance().getTime() + ":" + this.queue);
+ printMetrix();
+ }
+ outputIndicator++;
+ for (Job job : queue) {
+ if (job != null) {
+ job.increasePriority();
+ }
+ }
+ singalHandler();
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private void refreshIner(int n) {
+ try {
+ for (Job job : queue) {
+ if (job != null) {
+ job.increasePriority(n);
+ }
+ }
+ singalHandler();
+ } catch (Exception e) {
+ LOG.warn(e);
+ }
+ }
+ /**
+ * The Job stored in queue
+ * @param
+ */
+ public static class Job implements Comparable> {
+ private int orgPri = 0;
+ private int priority = 0;
+ private long initTime = 0;
+ private T call;
+
+ /**
+ * get the priority now
+ * @return the priority of this call
+ */
+ public int getPriority() {
+ return priority;
+ }
+
+ /**
+ * increase job's priority
+ */
+ public void increasePriority() {
+ this.priority--;
+ }
+
+ /**
+ * increase job's priority by n
+ * @param n the number you want to increase
+ */
+ public void increasePriority(int n) {
+ this.priority = this.priority - n;
+ }
+
+ /**
+ * get the instance hold by Job
+ * @return the call instance
+ */
+ public T getCall() {
+ return call;
+ }
+
+ /**
+ * set the instance hold by Job
+ * @param call the call instance
+ */
+ public void setCall(T call) {
+ this.call = call;
+ }
+
+ /**
+ * Initialize a Job instance
+ * @param pri the job priority
+ * @param call the instance hold by the job
+ */
+ public Job(int pri, T call) {
+ this.orgPri = pri;
+ this.priority = pri;
+ this.initTime = System.currentTimeMillis();
+ this.call = call;
+ }
+
+ /**
+ * print the Job instance
+ */
+ public String toString() {
+ return "orgPri:" + this.orgPri + ", lastPri:" + this.priority
+ + ", wait time:" + ((System.currentTimeMillis() - this.initTime))
+ + ",ino:";// + call;
+ }
+ @Override
+ public int compareTo(Job arg0) {
+ return this.priority - arg0.priority;
+ }
+ }
+
+ @Override
+ public Object remove() {
+ return this.queue.remove();
+ }
+
+ @Override
+ public Object poll() {
+ return this.queue.poll();
+ }
+
+ @Override
+ public Object element() {
+ return this.queue.element();
+ }
+
+ @Override
+ public Object peek() {
+ return this.queue.peek();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return this.queue.isEmpty();
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ @Override
+ public Iterator iterator() {
+ return this.queue.iterator();
+ }
+
+ @Override
+ public Object[] toArray() {
+ return this.queue.toArray();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object[] toArray(Object[] a) {
+ return this.queue.toArray(a);
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ @Override
+ public boolean containsAll(Collection c) {
+ return this.queue.containsAll(c);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public boolean addAll(Collection c) {
+ return this.queue.addAll(c);
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ @Override
+ public boolean removeAll(Collection c) {
+ return this.queue.removeAll(c);
+ }
+
+ @SuppressWarnings({ "rawtypes" })
+ @Override
+ public boolean retainAll(Collection c) {
+ return this.queue.retainAll(c);
+ }
+
+ @Override
+ public void clear() {
+ this.queue.clear();
+ }
+
+ @Override
+ public boolean add(Object e) {
+ if (queue.size() > this.capacity)
+ return false;
+ else
+ try {
+ this.put(e);
+ } catch (InterruptedException e1) {
+ LOG.error(e);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean offer(Object e) {
+ return this.offer(e);
+ }
+
+ @Override
+ public void put(Object e) throws InterruptedException {
+ HBaseServer.Call call = (HBaseServer.Call) (e);
+ int pri = this.qosFunction.apply(call.param);
+ this.add((T) call, pri);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean offer(Object e, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return this.queue.offer((Job) e, timeout, unit);
+ }
+
+ @Override
+ public Object take() throws InterruptedException {
+ if (((flush << move) >>> move) == 0) {
+ this.refresher.interrupt();
+ }
+ flush++;
+ return this.get(PriorityFunction.priTrans(Thread.currentThread()
+ .getPriority()));
+ }
+
+ @Override
+ public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
+
+ return this.get(PriorityFunction.priTrans(Thread.currentThread()
+ .getPriority()));
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return this.capacity - this.queue.size();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return this.queue.remove(o);
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return this.queue.contains(o);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int drainTo(@SuppressWarnings("rawtypes") Collection c) {
+ return this.queue.drainTo(c);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public int drainTo(Collection c, int maxElements) {
+ return this.queue.drainTo(c, maxElements);
+ }
+
+ /**
+ * class used to store the action priority of table;
+ */
+ public static class ActionPriorities {
+ private int putPriority = 0;
+ private int getPriority = 0;
+ private int deletePriority = 0;
+ private int scanPriority = 0;
+ public ActionPriorities() {
+ }
+
+ /**
+ * Get the priority of this operation
+ *
+ * @param operation
+ * @return
+ */
+ public int getPriority(Get operation) {
+ return this.getGetPriority();
+ }
+
+ /**
+ * Get the priority of this operation
+ *
+ * @param operation
+ * @return
+ */
+ public int getPriority(Mutation operation) {
+ return this.getPutPriority();
+ }
+
+ /**
+ * Get the priority of this operation
+ *
+ * @param operation
+ * @return
+ */
+ public int getPriority(Scan operation) {
+ return this.getScanPriority();
+ }
+
+ /**
+ * Get the priority of this operation
+ *
+ * @param operation
+ * @return
+ */
+ public int getPriority(Object operation) {
+ return 0;
+ }
+
+ public ActionPriorities(int scanPriority, int putPriority, int getPriority,
+ int deletePriority) {
+ this.scanPriority = scanPriority;
+ this.putPriority = putPriority;
+ this.getPriority = getPriority;
+ this.deletePriority = deletePriority;
+ }
+
+ /**
+ * get put priority
+ * @return
+ */
+ public int getPutPriority() {
+ return putPriority;
+ }
+
+ /**
+ * set put priority
+ *
+ * @return
+ */
+ public void setPutPriority(int priority) {
+ this.putPriority = priority;
+ }
+
+ /**
+ * get get priority
+ *
+ * @return
+ */
+ public int getGetPriority() {
+ return getPriority;
+ }
+
+ /**
+ * set get priority
+ *
+ * @return
+ */
+ public void setGetPriority(int priority) {
+ this.getPriority = priority;
+ }
+
+ /**
+ * get delete priority
+ *
+ * @return
+ */
+ public int getDeletePriority() {
+ return deletePriority;
+ }
+
+ /**
+ * set delete priority
+ *
+ * @return
+ */
+ public void setDeletePriority(int priority) {
+ this.deletePriority = priority;
+ }
+
+ /**
+ * get scan priority
+ *
+ * @return
+ */
+ public int getScanPriority() {
+ return scanPriority;
+ }
+
+ /**
+ * set scan priority
+ *
+ * @return
+ */
+ public void setScanPriority(int priority) {
+ this.scanPriority = priority;
+ }
+
+ public String toString() {
+ return this.scanPriority + "," + this.putPriority + "," + this.getPriority + ","
+ + this.deletePriority + ",";
+ }
+
+ /**
+ * store this object into byte array
+ *
+ * @return
+ */
+ public byte[] toBytes() {
+ byte[] ret = new byte[Bytes.SIZEOF_INT * 4];
+ Bytes.putInt(ret, 0, this.scanPriority);
+ Bytes.putInt(ret, Bytes.SIZEOF_INT, this.putPriority);
+ Bytes.putInt(ret, Bytes.SIZEOF_INT * 2, this.getPriority);
+ Bytes.putInt(ret, Bytes.SIZEOF_INT * 3, this.deletePriority);
+ return ret;
+ }
+
+ /**
+ * get value from byte[]
+ *
+ * @param b the serialized value of an ActionPriorites instance
+ */
+ public ActionPriorities fromBytes(byte[] b) {
+ if (b.length != Bytes.SIZEOF_INT * 4)
+ return null;
+ else {
+ this.scanPriority = Bytes.toInt(b, 0);
+ this.putPriority = Bytes.toInt(b, Bytes.SIZEOF_INT);
+ this.getPriority = Bytes.toInt(b, Bytes.SIZEOF_INT * 2);
+ this.deletePriority = Bytes.toInt(b, Bytes.SIZEOF_INT * 3);
+ }
+ return this;
+ }
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 1213130)
+++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy)
@@ -387,6 +387,11 @@
final int numHandlers,
int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
throws IOException {
+ if (instance instanceof HRegionInterface
+ && PriorityFunction.enable(conf)) {
+ return new PriorityHBaseServer(instance, ifaces, conf, bindAddress, port,
+ numHandlers, metaHandlerCount, verbose, highPriorityLevel);
+ }
return getServer(instance.getClass(), instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel);
}
Index: src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java (revision 0)
@@ -0,0 +1,481 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.Action;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.Operation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.ipc.PriorityJobQueue.ActionPriorities;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Function;
+
+/**
+ * This Function is used by {@link PriorityHBaseServer}. In this
+ * Function,generally we can get the priority of a RPC call base by it's table
+ * priority and the table's method priority.
+ */
+
+public class PriorityFunction implements Function {
+
+ public final static byte[] PRI_KEY = Bytes.toBytes("_table_priority");
+ public final static byte[] PRI_KEY_ACTION_PRIORITY = Bytes
+ .toBytes("_action_priority");
+ public static final int LOWEST_PRI = 10;
+ public static final int DEFAULT_PRI = 5;
+ public static final int HIGHEST_PRI = -10;
+ public static final int HIGH_PRI = 0;
+ public static final String PRI_ENABLE_KEY = "hbase.tablepriority.enable";
+ private static Boolean enable = null;
+
+ /**
+ * If table priority is enabled.
+ *
+ * @return
+ */
+ public static boolean enable(Configuration conf) {
+ if(enable==null)
+ {
+ enable=conf.getBoolean(PRI_ENABLE_KEY, true);
+ }
+ return enable;
+ }
+
+ /**
+ * The priority map cache the region's priority in memory. Key is the region
+ * name which usually is obtained from the IPC call. Value is the priority of
+ * the region also is the priority of the table which the region belong to.
+ */
+ private static final ConcurrentHashMap regionPriMap = new ConcurrentHashMap();
+
+ // Cache the scanner and its id
+ private static final ConcurrentHashMap scannerToId = new ConcurrentHashMap();
+
+ // Cache the scanner and its scan
+ private static final ConcurrentHashMap rScannerToScan = new ConcurrentHashMap();
+
+ // Cache the scanner ID and the region name of the scanner.
+ private static final ConcurrentHashMap scannerRegionMap = new ConcurrentHashMap();
+
+ // Cache the scanner ID and the priority of the scanner.
+ private static final ConcurrentHashMap scannerPriMap = new ConcurrentHashMap();
+
+ // Store the region name and it's according priority instance.
+ private static final ConcurrentHashMap regionActionPriorities = new ConcurrentHashMap();
+
+ // TODO: add priority of source host.
+ @SuppressWarnings("unused")
+ private final ConcurrentHashMap> tableSourceHostPlus = new ConcurrentHashMap>();
+ @SuppressWarnings("unused")
+ private final ConcurrentHashMap> regionSourceHostPlus = new ConcurrentHashMap>();
+ private static final Log LOG = LogFactory
+ .getLog("org.apache.hadoop.hbase.ipc.PriorityFunction");
+ private static HRegionServer server;
+
+ public PriorityFunction(HRegionServer server, final Configuration conf)
+ throws MasterNotRunningException, ZooKeeperConnectionException {
+ if (PriorityFunction.server == null) {
+ PriorityFunction.server = server;
+ }
+ /**
+ * enable the coprocessor here.
+ */
+ if (enable) {
+ String cops = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
+ if (cops == null || cops.equals("")) {
+ cops = "org.apache.hadoop.hbase.ipc.QosRegionObserver";
+ } else {
+ cops += ",org.apache.hadoop.hbase.ipc.QosRegionObserver";
+ }
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, cops);
+ }
+ }
+
+ /**
+ * Set table priority
+ * @param priority the priority you want to set
+ * @param des the table descriptor
+ * @return
+ * @throws Exception
+ */
+ public static HTableDescriptor setPriority(String priority,
+ HTableDescriptor des) throws Exception {
+ Integer pri = Integer.parseInt(priority);
+ if (pri < 0 || pri > 10)
+ throw new NumberFormatException(
+ "Table priority should be between 0 and 10.");
+ des.setValue(PRI_KEY, Bytes.toBytes(priority));
+ return des;
+ }
+ /**
+ * Set table priority
+ * @param priority the priority you want to set
+ * @param des table descriptor
+ * @param actionPriority ActionPriority instance
+ * @return the prepared table descriptor
+ * @throws Exception
+ */
+ public static HTableDescriptor setPriority(String priority,
+ HTableDescriptor des, ActionPriorities actionPriority) throws Exception {
+ Integer pri = Integer.parseInt(priority);
+ if (pri < 0 || pri > 10)
+ throw new NumberFormatException(
+ "Table priority should be between 0 and 10.");
+ setTableActionPriorities(des, actionPriority);
+ des.setValue(PRI_KEY, Bytes.toBytes(priority));
+ return des;
+ }
+ /**
+ * Set table priority
+ * @param priority the priority you want to set
+ * @param des the table descriptor
+ * @param actionPriority ActionPriority instance
+ * @return
+ * @throws Exception
+ */
+ public static HTableDescriptor setPriority(int priority,
+ HTableDescriptor des, ActionPriorities actionPriority) throws Exception {
+ if (priority < 0 || priority > 10) {
+ throw new NumberFormatException(
+ "Table priority should be between 0 and 10.");
+ }
+ setTableActionPriorities(des, actionPriority);
+ des.setValue(PRI_KEY, Bytes.toBytes(priority));
+ return des;
+ }
+
+ /**
+ * Set table priority
+ * @param priority the priority you want to set
+ * @param des the table descriptor
+ * @return
+ * @throws Exception
+ */
+ public static HTableDescriptor setPriority(int priority, HTableDescriptor des)
+ throws Exception {
+ if (priority < 0 || priority > 10) {
+ throw new NumberFormatException(
+ "Table priority should be between 0 and 10.");
+ }
+ des.setValue(PRI_KEY, Bytes.toBytes(priority + ""));
+ return des;
+ }
+
+ /**
+ * get action priorities from table descriptor
+ *
+ * @param des the table descriptor
+ * @return
+ */
+ public static ActionPriorities getTableActionPriorities(HTableDescriptor des) {
+ byte[] actionPriority = des.getValue(PRI_KEY_ACTION_PRIORITY);
+ ActionPriorities ret = new ActionPriorities();
+ if (actionPriority != null) {
+ try {
+ ret.fromBytes(actionPriority);
+ } catch (Exception e) {
+ LOG.error("This table has wrong action priorities,"
+ + "please use {@ PriorityFunction}setPriority()"
+ + " method to set table's action priority");
+ }
+ }
+ return ret;
+ }
+ /**
+ * set action priorities for table descriptor
+ * @param des the table descriptor
+ * @param actionPriority ActionPriority instance
+ */
+ public static void setTableActionPriorities(HTableDescriptor des,
+ ActionPriorities actionPriority) {
+ des.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY,
+ actionPriority.toBytes());
+ }
+ /**
+ * Initialize the scanner's priority
+ * @param call
+ * @param value scanner id
+ */
+ public void initScannerPriority(Invocation call, Object value) {
+ Long id = (Long) value;
+ byte[] region = (byte[]) call.getParameters()[0];
+ Scan scan=(Scan) call.getParameters()[1];
+ String regionN = Bytes.toString(region);
+ Integer pri = regionPriMap.get(regionN);
+ if (pri == null) {
+ pri=initRegionPriority(regionN, false);
+ }
+ ActionPriorities action = regionActionPriorities.get(regionN);
+ if (action != null)
+ pri += action.getScanPriority();
+ scannerPriMap.put(id, pri);
+ scannerRegionMap.put(id, regionN);
+ scannerToId.put(scan, id);
+ }
+ @Override
+ public Integer apply(Writable from) {
+ return getCallPriority(from);
+ }
+
+ /**
+ * Used by QosRegionObserver to store the region scanners
+ * and it's scanner ID.
+ * @param scan
+ * @param s
+ */
+ public static void addRegionScanner(Scan scan,RegionScanner s)
+ {
+ rScannerToScan.put(s, scan);
+ }
+
+ /**
+ * remove the cached region priorities
+ * @param region
+ */
+ public static void cleanRegionPriority(HRegion region) {
+ String regionN = region.getRegionNameAsString();
+ if(regionN!=null)
+ {
+ regionPriMap.remove(regionN);
+ regionActionPriorities.remove(regionN);
+ }
+ }
+ /**
+ * Remove lease expired scanners
+ * @param s the scanner id to be removed
+ */
+ public static void removeScanner(RegionScanner s) {
+ Long id = null;
+ try {
+ Scan ss=rScannerToScan.get(s);
+ rScannerToScan.remove(s);
+ id=scannerToId.get(ss);
+ scannerToId.remove(ss);
+ scannerRegionMap.remove(id);
+ scannerPriMap.remove(id);
+ } catch (Exception e) {
+ LOG.error("scanner id:" + id + " doesn't exist.");
+ }
+ }
+
+ /**
+ * Get the priority of the call
+ *
+ * @param call
+ * @return the priority
+ */
+ private int getCallPriority(Writable param) {
+ Invocation invo = (Invocation) param;
+ // if this is a next() call, we should find the priority of the scanner
+ // which
+ // is initiated by initScannerPriority(Invocation call, Object value)
+ if (invo.getMethodName().endsWith("next")) {
+ Long scanId = (Long) invo.getParameters()[0];
+ Integer pri = scannerPriMap.get(scanId);
+ if (pri == null) {
+ String regionN = scannerRegionMap.get(scanId);
+ LOG.error("the scanner didn't get it's priority:" + scanId
+ + " regionName:" + regionN);
+ if (regionN != null) {
+ pri = getRegionPri(regionN);
+ // get the priority of the region,i.e.
+ // the priority of table
+ ActionPriorities action = regionActionPriorities.get(regionN);
+ if (action != null) {
+ pri += action.getScanPriority();// add the action priority of the
+ // table
+ }
+ scannerPriMap.put(scanId, pri);// cache the priority
+ return pri;
+ } else {
+ LOG.error("error,there is no this scanner id");
+ return DEFAULT_PRI;
+ }
+ }
+ return pri;
+ } else if (invo.getMethodName().endsWith("multi")) {
+ // if it's a multi call,we use the first action of the in
+ // the MutiAction map to decide the priority of this call.
+ MultiAction> multi = (MultiAction>) invo.getParameters()[0];
+ for (Map.Entry e : multi.actions.entrySet()) {
+ String regionN = Bytes.toString((byte[]) e.getKey());
+ Integer pri = getRegionPri(regionN);
+ ActionPriorities actionPriority;
+ if ((actionPriority = regionActionPriorities.get(regionN)) != null) {
+ List actionsForRegion = (List) e.getValue();
+ Action action = actionsForRegion.get(0);
+ if (action != null) {
+ Row row = action.getAction();
+ if (row instanceof Delete) {
+ pri += actionPriority.getDeletePriority();
+ } else if (row instanceof Get) {
+ pri += actionPriority.getGetPriority();
+
+ } else if (row instanceof Put) {
+ pri += actionPriority.getPutPriority();
+ }
+ }
+ }
+ return pri;
+ }
+ return DEFAULT_PRI;
+ } else if (invo.getMethodName().endsWith("increment")
+ || invo.getMethodName().endsWith("incrementColumnValue")) {
+ // due to the Increment class does not inherit the Operation class,
+ //we deal with it respectively.
+ byte[] region = (byte[]) invo.getParameters()[0];
+ String regionN = Bytes.toString(region);
+ Integer pri = getRegionPri(regionN);
+ ActionPriorities actionPriority;
+ if ((actionPriority = regionActionPriorities.get(regionN)) != null) {
+ pri += actionPriority.getPutPriority();
+ }
+ return pri;
+ }
+ Object params[] = invo.getParameters();
+ // Deal with the subclasses of Operation
+ if (params.length >= 2 && params[0] instanceof byte[]
+ && params[params.length - 1] instanceof Operation
+ && !(params[params.length - 1] instanceof Scan)) {
+ Operation ope = (Operation) params[params.length - 1];
+ byte[] region = (byte[]) params[0];
+ String regionN = Bytes.toString(region);
+ Integer pri = getRegionPri(regionN);
+ ActionPriorities actionPriority;
+ if ((actionPriority = regionActionPriorities.get(regionN)) != null) {
+ pri += actionPriority.getPriority(ope);
+ }
+ return pri;
+ }
+ return HIGHEST_PRI;
+ }
+
+ private int getRegionPri(String regionN) {
+ Integer pri = regionPriMap.get(regionN);
+ if (pri == null) {
+ pri = initRegionPriority(regionN, false);
+ }
+ if (pri == null) {
+ return DEFAULT_PRI;
+ }
+ return pri;
+ }
+
+ /**
+ * Get region priority
+ *
+ * @param region the region you want to get priority
+ * @return the region's priority
+ */
+ public static int initRegionPriority(HRegion region) {
+ if (region == null || region.getRegionNameAsString() == null) {
+ return DEFAULT_PRI;
+ }
+ String regionN = region.getRegionNameAsString();
+ int pri = DEFAULT_PRI;
+ if (region.getRegionInfo().isMetaRegion()
+ || region.getRegionInfo().isRootRegion()) {
+ pri = HIGHEST_PRI;
+ }
+ else if (region.getTableDesc().getValue(PRI_KEY) != null) {
+ try {
+ pri = Integer.parseInt(Bytes.toString(region.getTableDesc().getValue(
+ PRI_KEY)));
+ } catch (Exception e) {
+ LOG.error("Table " + region.getTableDesc().getNameAsString()
+ + " has a wrong priority");
+ }
+ }
+ ActionPriorities actionPriority = getTableActionPriorities(region
+ .getTableDesc());
+ regionActionPriorities.put(regionN, actionPriority);
+ regionPriMap.put(regionN, pri);
+ return pri;
+ }
+
+ /**
+ * Get region priority
+ *
+ * @param region
+ * the region you want to get priority
+ * @param force
+ * force to refresh priority, if true,the method will get priority
+ * from region's table descriptor,or it will look from the cache.
+ * @return the region's priority
+ */
+ private static int initRegionPriority(String region, boolean force) {
+ if (!force) {
+ Integer ret = regionPriMap.get(region);
+ if (ret != null) {
+ return ret;
+ }
+ }
+ HRegion hr = server.getOnlineRegion(Bytes.toBytes(region));
+ return initRegionPriority(hr);
+ }
+
+ /**
+ * translate thread priority to system priority thread priority is from 1 to
+ * 10 and 10 is highest priority, but system priority used by RPC is the same
+ * with linux system, small number means high priority.So translate the thread
+ * priority to system priority to see which job this thread can handle. Ps:
+ * thread can only handle the job which has a higher priority than its thread
+ * priority.
+ *
+ * @param tpri
+ * transfer thread priority to according table priority
+ * @return
+ */
+ public static int priTrans(int tpri) {
+ return tpri > 10 || tpri < 1 ? 5 : 11 - tpri;
+ }
+ /**
+ * print out the region priorities
+ */
+ public static void printRegionPriority()
+ {
+ String out = "Region Priories:";
+ out+=regionPriMap.toString();
+ LOG.debug(out);
+ }
+
+}