Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java (revision 1220359) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java (working copy) @@ -174,7 +174,7 @@ // In this test, there is only a single coprocessor (BuggyMasterObserver). String coprocessorName = BuggyMasterObserver.class.getName(); - assertTrue(master.getLoadedCoprocessors().equals("[" + coprocessorName + "]")); + assertTrue(master.getLoadedCoprocessors().indexOf(coprocessorName)!=-1); HTableDescriptor htd1 = new HTableDescriptor(TEST_TABLE1); htd1.addFamily(new HColumnDescriptor(TEST_FAMILY1)); @@ -202,7 +202,7 @@ masterTracker.masterZKNodeWasDeleted); String loadedCoprocessors = master.getLoadedCoprocessors(); - assertTrue(loadedCoprocessors.equals("[" + coprocessorName + "]")); + assertTrue(loadedCoprocessors.indexOf(coprocessorName)!=-1); // Verify that BuggyMasterObserver has been removed due to its misbehavior // by creating another table: should not have a problem this time. Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java (revision 1220359) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithAbort.java (working copy) @@ -186,10 +186,9 @@ // Test (part of the) output that should have be printed by master when it aborts: // (namely the part that shows the set of loaded coprocessors). // In this test, there is only a single coprocessor (BuggyMasterObserver). - assertTrue(master.getLoadedCoprocessors(). - equals("[" + - TestMasterCoprocessorExceptionWithAbort.BuggyMasterObserver.class.getName() + - "]")); + assertTrue(master.getLoadedCoprocessors().indexOf( + TestMasterCoprocessorExceptionWithAbort.BuggyMasterObserver.class.getName() + )!=-1); CreateTableThread createTableThread = new CreateTableThread(UTIL); Index: src/test/java/org/apache/hadoop/hbase/ipc/TestActionPriority.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ipc/TestActionPriority.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/ipc/TestActionPriority.java (revision 0) @@ -0,0 +1,289 @@ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.OperationPriority; +import org.apache.hadoop.hbase.ipc.TestTablePriority.Worker; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertTrue; + +/** + * Test for action priority, use Action_Priority table and scan and put with + * different priorities. There are two tests,in first one Get's priority is + * 0(high priority, RPC priority = table priority 0+ action priority 0)
+ * scan's priority is 10 (low priority)and in the second test,we switch the + * priorities of scan and get. + */ +@Category(LargeTests.class) +public class TestActionPriority { + private final static Log LOG = LogFactory.getLog(TestActionPriority.class); + static final Random r = new Random(); + static final int rowNubmer = 100000; + static final int threadN = 200; + HBaseAdmin admin = null; + static final int regionNumber = 10; + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte[][] startKeys; + static { + startKeys = new byte[regionNumber][]; + for (int i = 0; i < regionNumber; i++) { + startKeys[i] = Bytes.toBytes(String.format( + "%0" + ((regionNumber + "").length()-1) + "d", i)); + } + } + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt( + PriorityJobQueue.WAIT_UNIT_CONF_KEY, 1000); + TEST_UTIL.startMiniCluster(1); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * set up a cluster and prepare tables. + */ + @Before + public void setUp() { + try { + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } catch (Exception e) { + LOG.info(e); + } + HTableDescriptor des; + + try { + if (admin.tableExists("Action_Priority")) { + admin.disableTable("Action_Priority"); + admin.deleteTable("Action_Priority"); + } + des = new HTableDescriptor("Action_Priority"); + des.addFamily(new HColumnDescriptor("ff")); + des.setValue(PriorityFunction.OPERATION_PRIORITY_KEY, + new OperationPriority(10, 0, 0).toBytes()); + PriorityFunction.setPriority(1, des); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + TEST_UTIL + .getMiniHBaseCluster().getMaster().balance(); + writeData(); + TEST_UTIL + .getMiniHBaseCluster().getMaster().balance(); + } + + private void writeData() { + LOG.info("begion write data into test table"); + int nPerWorker = rowNubmer / threadN; + Worker[] workers = new Worker[threadN]; + for (int i = 0; i < workers.length; i++) { + try { + workers[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), + "Action_Priority"), "writeData", nPerWorker); + } catch (IOException e) { + } + } + for (int i = 0; i < workers.length; i++) { + workers[i].start(); + } + for (int i = 0; i < workers.length; i++) { + try { + workers[i].join(); + } catch (InterruptedException e) { + LOG.error(e); + } + } + LOG.info("Write data into test table finished."); + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() { + + } + + /** + * Verify whether the function take effect + * + * @param highs + * high priority workers + * @param lows + * low priority workers + */ + + @SuppressWarnings("static-access") + public void verifyFunction(Worker highs[], Worker lows[]) { + boolean highFinished = false; + boolean lowFinished = false; + long highThroughPut = 0; + long lowThroughPut = 0; + while (!(highFinished && lowFinished)) { + try { + Thread.currentThread().sleep(200); + } catch (InterruptedException e) { + } + highThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < highs.length; i++) { + highThroughPut += highs[i].getThroughput(); + lowThroughPut += lows[i].getThroughput(); + } + LOG.info("-------------------------------------------------------------"); + LOG.info("High priority action type is:" + highs[0].getType() + + ", priority:" + highs[0].getActionPriority() + " throughput is:" + + highThroughPut); + LOG.info("low priority action type is:" + lows[0].getType() + + ", priority:" + lows[0].getActionPriority() + " throughput is:" + + lowThroughPut); + + highFinished = true; + lowFinished = true; + for (int i = 0; i < highs.length; i++) { + if (highs[i].isAlive()) { + highFinished = false; + } + if (lows[i].isAlive()) { + lowFinished = false; + } + } + if (highFinished) { + for (int i = 0; i < highs.length; i++) { + lows[i].stopWorker(); + } + lowFinished = true; + } + if (lowFinished) { + for (int i = 0; i < highs.length; i++) { + highs[i].stopWorker(); + } + highFinished = true; + } + + } + highThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < highs.length; i++) { + highThroughPut += highs[i].getThroughput(); + lowThroughPut += lows[i].getThroughput(); + } + assertTrue("Action priority works properly", highThroughPut > lowThroughPut); + LOG.info("-------------------------------------------------------------"); + LOG.info("---------------------Test finished --------------------------"); + LOG.info("High priority action type is:" + highs[0].getType() + + ", priority:" + highs[0].getActionPriority() + " throughput is:" + + highThroughPut); + LOG.info("low priority action type is:" + lows[0].getType() + ", priority:" + + lows[0].getActionPriority() + " throughput is:" + lowThroughPut); + LOG.info("####### Test for " + highs[0].getType() + ", priority:" + + highs[0].getActionPriority() + " ,and " + lows[0].getType() + + ", priority:" + lows[0].getActionPriority() + " finished####"); + + LOG.info("-------------------------------------------------------------"); + } + + /** + * run the test; + */ + @Test + public void testForDifferentActionPriority() { + Worker puts[] = new Worker[threadN / 2]; + Worker scans[] = new Worker[threadN / 2]; + for (int i = 0; i < puts.length; i++) { + try { + puts[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), + "Action_Priority"), "put", rowNubmer * 2 / threadN); + scans[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), + "Action_Priority"), "scan", rowNubmer * 2 / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < puts.length; i++) { + puts[i].setActionPriority(0); + puts[i].start(); + } + for (int i = 0; i < scans.length; i++) { + scans[i].setActionPriority(10); + scans[i].start(); + } + verifyFunction(puts, scans); + + try { + admin.disableTable("Action_Priority"); + HTableDescriptor des1 = admin.getTableDescriptor(Bytes + .toBytes("Action_Priority")); + des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY, + new OperationPriority(0, 10, 10).toBytes()); + admin.modifyTable(Bytes.toBytes("Action_Priority"), des1); + admin.enableTable("Action_Priority"); + GroupTestUtil.balanceTable("Action_Priority", TEST_UTIL + .getMiniHBaseCluster().getMaster()); + } catch (IOException e) { + LOG.info(e); + } + puts = new Worker[threadN / 2]; + scans = new Worker[threadN / 2]; + for (int i = 0; i < puts.length; i++) { + try { + + puts[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), + "Action_Priority"), "put", rowNubmer * 2 / threadN); + scans[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), + "Action_Priority"), "scan", rowNubmer * 2 / threadN); + puts[i].setActionPriority(10); + scans[i].setActionPriority(0); + } catch (IOException e) { + } + } + for (int i = 0; i < puts.length; i++) { + puts[i].start(); + scans[i].start(); + } + verifyFunction(scans, puts); + } + + public static void main(String args[]) { + + try { + setUpBeforeClass(); + TestActionPriority t = new TestActionPriority(); + t.setUp(); + t.testForDifferentActionPriority(); + tearDownAfterClass(); + } catch (Exception e) { + LOG.error(e); + } + + } + +} Index: src/test/java/org/apache/hadoop/hbase/ipc/TestPriorityJobQueue.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ipc/TestPriorityJobQueue.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/ipc/TestPriorityJobQueue.java (revision 0) @@ -0,0 +1,139 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.Job; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertTrue; + +/** + * Test used to ensure the PriorityJobQueue works properly + * + */ +@Category(SmallTests.class) +public class TestPriorityJobQueue { + private final Log LOG = LogFactory.getLog(TestPriorityJobQueue.class); + + private static int queueSize = 20; + private static int testThreadN = 4; + private static final int testTimes = 300; + + /** + * test the queue's function: threads (not with the lowest priority)
+ * can only get a job which have a higher priority And test the number which + * input jobs is equal with the output number + */ + @Test + public void testGetJob() { + queueSize = 3; + testThreadN = 20; + doTest(); + queueSize = 3; + testThreadN = 200; + doTest(); + queueSize = 200; + testThreadN = 3; + doTest(); + queueSize = 200; + testThreadN = 300; + doTest(); + } + class TimesWorker extends Thread { + public int number = 0; + } + private void doTest() { + final Random r = new Random(); + final PriorityJobQueue queue = new PriorityJobQueue( + queueSize, null, HBaseConfiguration.create()); + TimesWorker consumer[] = new TimesWorker[testThreadN]; + + int pri = 10; + for (int i = 0; i < consumer.length; i++, pri--) { + consumer[i] = new TimesWorker() { + public void run() { + while (true) { + String j = null; + try { + j = queue.take(); + number++; + } catch (InterruptedException e) { + LOG.error(e); + } + } + } + }; + + if (pri < 1) + pri = 10; + consumer[i].setDaemon(true); + consumer[i].setPriority(pri); + consumer[i].start(); + } + + TimesWorker producer[] = new TimesWorker[testThreadN * 2]; + for (int i = 0; i < producer.length; i++) { + producer[i] = new TimesWorker() { + public void run() { + for (int j = 0; j < testTimes; j++) { + int jobpri = (r.nextInt(25) - 10); + try { + queue.put("" + jobpri, jobpri); + number++; + } catch (InterruptedException e) { + LOG.error(e); + } + } + } + }; + producer[i].start(); + } + int totalInput=0; + for (TimesWorker d :producer) { + try { + d.join(); + totalInput+=d.number; + } catch (InterruptedException e) { + } + } + + while (queue.size() != 0) { + try { + Thread.sleep(300); + + LOG.info("queue size:" + queue.size()); + } catch (InterruptedException e) { + } + } + int totalOutput=0; + for(TimesWorker d:consumer) + { + totalOutput+=d.number; + } + assertTrue("all job are finished", totalOutput==totalInput); + } +} Index: src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriorityHundredRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriorityHundredRegion.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriorityHundredRegion.java (revision 0) @@ -0,0 +1,654 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.OperationPriority; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertTrue; + +/** + * Test for table priority, use table A,B,C which have different priorities. + * There are two tests,in first one table A's priority is 1(high priority), B is + * 5, C is 10 and in the second test,we switch the priorities of A and C. + */ +@Category(LargeTests.class) +public class TestTablePriorityHundredRegion { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static Log LOG = LogFactory + .getLog(TestTablePriorityHundredRegion.class); + static final Random r = new Random(); + static int rowNumber = 10000; + static final int threadN = 200; + HBaseAdmin admin = null; + // if there are data in the test tables,you + // can set method type to "scan" + static String method = "put"; + static final int regionNumber = 99; + private static byte[][] startKeys; + private static final byte[] value = new byte[100]; + static { + startKeys = new byte[regionNumber][]; + for (int i = 0; i < regionNumber; i++) { + startKeys[i] = Bytes.toBytes(String.format( + "%0" + (regionNumber + "").length() + "d", i)); + } + } + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt( + PriorityJobQueue.WAIT_UNIT_CONF_KEY, 1000); + TEST_UTIL.startMiniCluster(3); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * set up a cluster and prepare tables. + */ + @Before + public void setUp() { + try { + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } catch (Exception e) { + LOG.info(e); + } + HTableDescriptor des; + try { + if (admin.tableExists("Table_A")) { + admin.disableTable("Table_A"); + admin.deleteTable("Table_A"); + } + if (admin.tableExists("Table_B")) { + admin.disableTable("Table_B"); + admin.deleteTable("Table_B"); + } + if (admin.tableExists("Table_C")) { + admin.disableTable("Table_C"); + admin.deleteTable("Table_C"); + } + } catch (IOException e1) { + LOG.error(e1); + } + try { + des = new HTableDescriptor("Table_A"); + des.addFamily(new HColumnDescriptor("ff")); + des = PriorityFunction.setPriority(1, des); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + try { + des = new HTableDescriptor("Table_B"); + des.addFamily(new HColumnDescriptor("ff")); + des = PriorityFunction.setPriority(5, des); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + try { + + des = new HTableDescriptor("Table_C"); + des.addFamily(new HColumnDescriptor("ff")); + des = PriorityFunction.setPriority(10, des); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + balanceTable("Table_A", TEST_UTIL.getMiniHBaseCluster().getMaster()); + balanceTable("Table_B", TEST_UTIL.getMiniHBaseCluster().getMaster()); + balanceTable("Table_C", TEST_UTIL.getMiniHBaseCluster().getMaster()); + } + + /** + * The worker used for test throughput and gather the result + */ + public static class Worker extends Thread { + HTable table; + String type; + private long throughput = 0; + long times = 1000; + boolean stopFlag = false; + int tablePriority; + long n = 0; + long startTime = 0; + int actionPriority = 0; + + /** + * Get throughput of this worker. + * + * @return + */ + public long getThroughput() { + long end = System.currentTimeMillis(); + throughput = (n * 1000) / (end - startTime + 1); + return throughput; + } + + /** + * Get table priority. + * + * @return table priority + */ + public int getTablePriority() { + return tablePriority; + } + + /** + * Set table priority. + */ + public void setTablePriority(int tablePriority) { + this.tablePriority = tablePriority; + } + + /** + * Get action type. + * + * @return action type of the worker + */ + public String getType() { + return type; + } + + /** + * Get the table used by the worker. + * + * @return table of the worker + */ + public HTable getTable() { + return table; + } + + public Worker(HTable t, String type, long times) { + this.table = t; + this.type = type; + this.times = times; + } + + public void run() { + try { + doAction(table, type, times); + } catch (IOException e) { + LOG.info(e); + } + } + + public int getActionPriority() { + return actionPriority; + } + + public void setActionPriority(int actionPriority) { + this.actionPriority = actionPriority; + } + + /** + * Stop the worker. + */ + public void stopWorker() { + this.stopFlag = true; + } + + private void doAction(HTable t, String type, long times) throws IOException { + long start = System.currentTimeMillis(); + long end = System.currentTimeMillis(); + startTime = System.currentTimeMillis(); + int pri = Integer.parseInt(Bytes.toString(t.getTableDescriptor() + .getValue(PriorityFunction.PRI_KEY))); + Scan s = new Scan(); + s.setStartRow(Bytes.toBytes(r.nextInt(10) + "")); + if (type.equals("writeData")) { + t.setAutoFlush(false); + t.setWriteBufferSize(1000); + } + ResultScanner sn = null; + boolean scan = false; + if (type.equals("scan")) { + sn = t.getScanner(s); + scan = true; + } + while (true) { + if (n % 10000 == 0) { + end = System.currentTimeMillis(); + // LOG.debug("Thread:" + this.getId() + " type:" + type + " table" + // + t.getTableDescriptor().getNameAsString() + " pri :" + pri + // + " time:" + (end - start) + " total :" + n + " throughput:" + // + (n * 1000) / (end - startTime + 1)); + throughput = (n * 1000) / (end - startTime + 1); + start = end; + } + if (n % 100 == 0) { + end = System.currentTimeMillis(); + throughput = (n * 1000) / (end - startTime + 1); + } + Result ret = null; + if (scan) { + ret = sn.next(); + if (ret == null) { + try { + sn.close(); + } catch (Exception e) { + LOG.debug(e); + } + sn = t.getScanner(s); + } + } else { + put(t, n); + } + if (stopFlag || n > times) { + break; + } + n++; + } + t.flushCommits(); + t.close(); + } + } + + /** + * Verify whether the function works properly. + * + * @param highs + * high priority workers + * @param lows + * low priority workers + */ + @SuppressWarnings("static-access") + public void verifyFunction(Worker high[], Worker middle[], Worker low[]) { + boolean highFinished = false; + boolean lowFinished = false; + boolean middleFinished = false; + long highThroughPut = 0; + long middleThroughPut = 0; + long lowThroughPut = 0; + while (!(highFinished && lowFinished && middleFinished)) { + try { + Thread.currentThread().sleep(100); + } catch (InterruptedException e) { + // ignore exceptions + } + highThroughPut = 0; + middleThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < high.length; i++) { + highThroughPut += high[i].getThroughput(); + lowThroughPut += low[i].getThroughput(); + middleThroughPut += middle[i].getThroughput(); + } + LOG.info("-------------------------------------------------------------"); + try { + LOG.info("high priority table is: " + + high[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " throughput is:" + highThroughPut); + LOG.info("middle priority table is: " + + middle[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + middle[0].getType() + ", priority:" + + middle[0].getTablePriority() + " throughput is:" + + middleThroughPut); + LOG.info("low priority table is: " + + low[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + low[0].getType() + ", priority:" + + low[0].getTablePriority() + " throughput is:" + lowThroughPut); + } catch (Exception e) { + LOG.error(e); + } + + highFinished = true; + lowFinished = true; + middleFinished = true; + for (int i = 0; i < high.length; i++) { + if (high[i].isAlive()) { + highFinished = false; + } + if (low[i].isAlive()) { + lowFinished = false; + } + if (middle[i].isAlive()) { + middleFinished = false; + } + } + if (highFinished) { + for (int i = 0; i < high.length; i++) { + low[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + middle[i].stopWorker(); + } + middleFinished = true; + lowFinished = true; + } + if (lowFinished) { + for (int i = 0; i < high.length; i++) { + high[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + middle[i].stopWorker(); + } + middleFinished = true; + highFinished = true; + } + if (middleFinished) { + for (int i = 0; i < high.length; i++) { + high[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + low[i].stopWorker(); + } + lowFinished = true; + highFinished = true; + } + + } + highThroughPut = 0; + middleThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < high.length; i++) { + highThroughPut += high[i].getThroughput(); + lowThroughPut += low[i].getThroughput(); + middleThroughPut += middle[i].getThroughput(); + } + + assertTrue("Action priority works properly", + highThroughPut > middleThroughPut && middleThroughPut > lowThroughPut); + LOG.info("-------------------------------------------------------------"); + LOG.info("---------------------Test finished --------------------------"); + + try { + LOG.info("high priority table is: " + + high[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " throughput is:" + highThroughPut); + LOG.info("middle priority table is: " + + middle[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + middle[0].getType() + ", priority:" + + middle[0].getTablePriority() + " throughput is:" + middleThroughPut); + LOG.info("low priority table is: " + + low[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + low[0].getType() + ", priority:" + + low[0].getTablePriority() + " throughput is:" + lowThroughPut); + } catch (Exception e) { + LOG.error(e); + } + LOG.info("####### Test for " + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " " + middle[0].getType() + + ", priority:" + middle[0].getTablePriority() + " ,and " + + low[0].getType() + ", priority:" + low[0].getTablePriority() + + " finished####"); + + LOG.info("-------------------------------------------------------------"); + } + + private void changePriority(int pria, int prib, int pric) { + try { + admin.disableTable("Table_A"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_A")); + des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY, + new OperationPriority(0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(pria, des1); + admin.modifyTable(Bytes.toBytes("Table_A"), des1); + admin.enableTable("Table_A"); + } catch (Exception e) { + LOG.info(e); + } + try { + admin.disableTable("Table_B"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_B")); + des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY, + new OperationPriority(0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(prib, des1); + admin.modifyTable(Bytes.toBytes("Table_B"), des1); + admin.enableTable("Table_B"); + } catch (Exception e) { + LOG.info(e); + } + try { + admin.disableTable("Table_C"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_C")); + des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY, + new OperationPriority(0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(pric, des1); + admin.modifyTable(Bytes.toBytes("Table_C"), des1); + admin.enableTable("Table_C"); + } catch (Exception e) { + LOG.info(e); + } + balanceTable("Table_A", TEST_UTIL.getMiniHBaseCluster().getMaster()); + balanceTable("Table_B", TEST_UTIL.getMiniHBaseCluster().getMaster()); + balanceTable("Table_C", TEST_UTIL.getMiniHBaseCluster().getMaster()); + } + + /** + * start the test. + */ + @Test(timeout = 180000) + public void testForDifferentTablePriority() { + method = "put"; + doTestJob(); +// method = "scan"; +// changePriority(1, 5, 10); +// doTestJob(); + } + + private void doTestJob() { + if (method.equals("scan")) { + rowNumber = rowNumber * 10; + } + Worker A[] = new Worker[threadN / 3]; + Worker B[] = new Worker[threadN / 3]; + Worker C[] = new Worker[threadN / 3]; + for (int i = 0; i < A.length; i++) { + try { + A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"), + method, rowNumber * 3 / threadN); + B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"), + method, rowNumber * 3 / threadN); + C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"), + method, rowNumber * 3 / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < A.length; i++) { + A[i].setTablePriority(1); + B[i].setTablePriority(5); + C[i].setTablePriority(10); + A[i].start(); + B[i].start(); + C[i].start(); + } + verifyFunction(A, B, C); + changePriority(10, 5, 1); + A = new Worker[threadN / 3]; + B = new Worker[threadN / 3]; + C = new Worker[threadN / 3]; + for (int i = 0; i < A.length; i++) { + try { + A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"), + method, rowNumber * 3 / threadN); + B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"), + method, rowNumber * 3 / threadN); + C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"), + method, rowNumber * 3 / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < A.length; i++) { + A[i].setTablePriority(10); + B[i].setTablePriority(5); + C[i].setTablePriority(1); + A[i].start(); + B[i].start(); + C[i].start(); + } + verifyFunction(C, B, A); + } + + public static void printTable(String name) { + + try { + HTable A = new HTable(TEST_UTIL.getConfiguration(), name); + Scan s = new Scan(); + ResultScanner rs; + rs = A.getScanner(s); + Result r = null; + while ((r = rs.next()) != null) { + System.out.println(r); + } + } catch (IOException e) { + LOG.error(e); + } + + } + + public static void main(String args[]) { + try { + setUpBeforeClass(); + TestTablePriorityHundredRegion t = new TestTablePriorityHundredRegion(); + t.setUp(); + tearDownAfterClass(); + } catch (Exception e) { + LOG.info(e); + } + } + + private static void put(HTable t, long i) { + Put p = new Put( + Bytes.toBytes("" + r.nextInt(1000) + (i) + r.nextInt(10000))); + p.add(Bytes.toBytes("ff"), Bytes.toBytes("ff"), value); + try { + t.put(p); + } catch (IOException e) { + LOG.info(e); + } + } + + public static void balanceTable(String table, HMaster master) { + List servers = master.getServerManager().getOnlineServersList(); + List regions = master.getAssignmentManager() + .getRegionsOfTable(Bytes.toBytes(table)); + HashMap map = new HashMap(); + for (HRegionInfo region : regions) { + map.put(region, + master.getAssignmentManager().getRegionServerOfRegion(region)); + + } + LOG.debug("Region location:" + map); + doBalance(new HashSet(servers), map, master); + + } + + private static void doBalance(HashSet servers, + HashMap map, HMaster master) { + HashMap moves = new HashMap(); + HashMap> serverList = new HashMap>(); + for (Entry e : map.entrySet()) { + if (serverList.get(e.getValue()) == null) { + serverList.put(e.getValue(), new ArrayList()); + } + serverList.get(e.getValue()).add(e.getKey()); + } + for (ServerName s : servers) { + if (serverList.get(s) == null) { + serverList.put(s, new ArrayList()); + } + } + + int div = 10; + while (div > 2) { + ServerName maxLoad = null, minLoad = null; + int maxLoadN = Integer.MIN_VALUE, minLoadN = Integer.MAX_VALUE; + for (Entry> e : serverList.entrySet()) { + if (e.getValue().size() >= maxLoadN) { + maxLoadN = e.getValue().size(); + maxLoad = e.getKey(); + } + if (e.getValue().size() <= minLoadN) { + minLoadN = e.getValue().size(); + minLoad = e.getKey(); + } + } + if (maxLoad == null || minLoad == null) + break; + if (serverList.get(maxLoad).size() == 0) + break; + else { + div = Math.abs(maxLoadN - minLoadN); + int index = r.nextInt(serverList.get(maxLoad).size()); + moves.put(serverList.get(maxLoad).get(index), minLoad); + serverList.get(minLoad).add(serverList.get(maxLoad).get(index)); + serverList.get(maxLoad).remove(index); + } + + } + + for (Entry e : moves.entrySet()) { + try { + LOG.info("move :" + e.getKey().getEncodedName() + " regions to " + + e.getValue()); + master.move(e.getKey().getEncodedNameAsBytes(), + Bytes.toBytes(e.getValue().getServerName())); + } catch (Exception e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + } + } +} Index: src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriority.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriority.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriority.java (revision 0) @@ -0,0 +1,582 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.OperationPriority; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertTrue; + +/** + * Test for table priority, use table A,B,C which have different priorities. + * There are two tests,in first one table A's priority is 1(high priority), B is + * 5, C is 10 and in the second test,we switch the priorities of A and C. + */ +@Category(LargeTests.class) +public class TestTablePriority { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static Log LOG = LogFactory.getLog(TestTablePriority.class); + static final Random r = new Random(); + static int rowNumber = 10000; + static int threadN =40; + HBaseAdmin admin = null; + // if there are data in the test tables,you + // can set method type to "scan" + static String method = "put"; + static final int regionNumber=9; + private static byte[][]startKeys; + static + { + startKeys=new byte[regionNumber][]; + for(int i=0;i times) { + break; + } + n++; + } + t.flushCommits(); + t.close(); + } + } + + /** + * Verify whether the function take effect + * + * @param highs high priority workers + * @param lows low priority workers + */ + @SuppressWarnings("static-access") + public void verifyFunction(Worker high[], Worker middle[], Worker low[]) { + boolean highFinished = false; + boolean lowFinished = false; + boolean middleFinished = false; + long highThroughPut = 0; + long middleThroughPut = 0; + long lowThroughPut = 0; + while (!(highFinished && lowFinished && middleFinished)) { + try { + Thread.currentThread().sleep(100); + } catch (InterruptedException e) { + // ignore exceptions + } + highThroughPut = 0; + middleThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < high.length; i++) { + highThroughPut += high[i].getThroughput(); + lowThroughPut += low[i].getThroughput(); + middleThroughPut += middle[i].getThroughput(); + } + LOG.info("-------------------------------------------------------------"); + try { + LOG.info("high priority table is: " + + high[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " throughput is:" + highThroughPut); + LOG.info("middle priority table is: " + + middle[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + middle[0].getType() + ", priority:" + + middle[0].getTablePriority() + " throughput is:" + + middleThroughPut); + LOG.info("low priority table is: " + + low[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + low[0].getType() + ", priority:" + + low[0].getTablePriority() + " throughput is:" + lowThroughPut); + } catch (Exception e) { + LOG.error(e); + } + + highFinished = true; + lowFinished = true; + middleFinished = true; + for (int i = 0; i < high.length; i++) { + if (high[i].isAlive()) { + highFinished = false; + } + if (low[i].isAlive()) { + lowFinished = false; + } + if (middle[i].isAlive()) { + middleFinished = false; + } + } + if (highFinished) { + for (int i = 0; i < high.length; i++) { + low[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + middle[i].stopWorker(); + } + middleFinished = true; + lowFinished = true; + } + if (lowFinished) { + for (int i = 0; i < high.length; i++) { + high[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + middle[i].stopWorker(); + } + middleFinished = true; + highFinished = true; + } + if (middleFinished) { + for (int i = 0; i < high.length; i++) { + high[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + low[i].stopWorker(); + } + lowFinished = true; + highFinished = true; + } + + } + highThroughPut = 0; + middleThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < high.length; i++) { + highThroughPut += high[i].getThroughput(); + lowThroughPut += low[i].getThroughput(); + middleThroughPut += middle[i].getThroughput(); + } + + assertTrue("Action priority works properly", + highThroughPut > middleThroughPut && middleThroughPut > lowThroughPut); + LOG.info("-------------------------------------------------------------"); + LOG.info("---------------------Test finished --------------------------"); + + try { + LOG.info("high priority table is: " + + high[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " throughput is:" + highThroughPut); + LOG.info("middle priority table is: " + + middle[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + middle[0].getType() + ", priority:" + + middle[0].getTablePriority() + " throughput is:" + middleThroughPut); + LOG.info("low priority table is: " + + low[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + low[0].getType() + ", priority:" + + low[0].getTablePriority() + " throughput is:" + lowThroughPut); + } catch (Exception e) { + LOG.error(e); + } + LOG.info("####### Test for " + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " " + middle[0].getType() + + ", priority:" + middle[0].getTablePriority() + " ,and " + + low[0].getType() + ", priority:" + low[0].getTablePriority() + + " finished####"); + + LOG.info("-------------------------------------------------------------"); + } + private void changePriority(int pria,int prib,int pric) + { + try { + admin.disableTable("Table_A"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_A")); + des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY, + new OperationPriority(0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(pria, des1); + admin.modifyTable(Bytes.toBytes("Table_A"), des1); + admin.enableTable("Table_A"); + } catch (Exception e) { + LOG.info(e); + } + try { + admin.disableTable("Table_B"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_B")); + des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY, + new OperationPriority(0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(prib, des1); + admin.modifyTable(Bytes.toBytes("Table_B"), des1); + admin.enableTable("Table_B"); + } catch (Exception e) { + LOG.info(e); + } + try { + admin.disableTable("Table_C"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_C")); + des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY, + new OperationPriority(0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(pric, des1); + admin.modifyTable(Bytes.toBytes("Table_C"), des1); + admin.enableTable("Table_C"); + } catch (Exception e) { + LOG.info(e); + } + GroupTestUtil.balanceTable("Table_A", TEST_UTIL.getMiniHBaseCluster().getMaster()); + GroupTestUtil.balanceTable("Table_B",TEST_UTIL.getMiniHBaseCluster().getMaster()); + GroupTestUtil.balanceTable("Table_C",TEST_UTIL.getMiniHBaseCluster().getMaster()); + } + /** + * start the test. + */ + @Test(timeout = 180000) + public void testForDifferentTablePriority() { + method="put"; + doTestJob(); +// method="scan"; +// threadN=threadN*10; +// changePriority(1,5,10); +// doTestJob(); + } + + + private void doTestJob() + { + if(method.equals("scan")) + { + rowNumber=rowNumber*10; + } + Worker A[] = new Worker[threadN / 3]; + Worker B[] = new Worker[threadN / 3]; + Worker C[] = new Worker[threadN / 3]; + for (int i = 0; i < A.length; i++) { + try { + A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"), + method, rowNumber * 3 / threadN); + B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"), + method, rowNumber * 3 / threadN); + C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"), + method, rowNumber * 3 / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < A.length; i++) { + A[i].setTablePriority(1); + B[i].setTablePriority(5); + C[i].setTablePriority(10); + A[i].start(); + B[i].start(); + C[i].start(); + } + verifyFunction(A, B, C); + changePriority(10,5,1); + A = new Worker[threadN / 3]; + B = new Worker[threadN / 3]; + C = new Worker[threadN / 3]; + for (int i = 0; i < A.length; i++) { + try { + A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"), + method, rowNumber * 3 / threadN); + B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"), + method, rowNumber * 3 / threadN); + C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"), + method, rowNumber * 3 / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < A.length; i++) { + A[i].setTablePriority(10); + B[i].setTablePriority(5); + C[i].setTablePriority(1); + A[i].start(); + B[i].start(); + C[i].start(); + } + verifyFunction(C, B, A); + } + public static void printTable(String name) + { + + try { + HTable A = new HTable(TEST_UTIL.getConfiguration(), name); + Scan s=new Scan(); + ResultScanner rs; + rs = A.getScanner(s); + Result r=null; + while((r=rs.next())!=null) + { + System.out.println(r); + } + } catch (IOException e) { + LOG.error(e); + } + + } + public static void main(String args[]) { + try { + setUpBeforeClass(); + TestTablePriority t = new TestTablePriority(); + t.setUp(); + tearDownAfterClass(); + } catch (Exception e) { + LOG.info(e); + } + } + + static byte[] b=new byte[100]; + private static void put(HTable t, long i) { + Put p = new Put( + Bytes.toBytes(("" + r.nextInt(10)) + r.nextInt(100000))); + p.add(Bytes.toBytes("ff"), Bytes.toBytes("ff"), b); + try { + t.put(p); + } catch (IOException e) { + LOG.info(e); + } + } +} Index: src/test/java/org/apache/hadoop/hbase/ipc/GroupTestUtil.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ipc/GroupTestUtil.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/ipc/GroupTestUtil.java (revision 0) @@ -0,0 +1,89 @@ +package org.apache.hadoop.hbase.ipc; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.util.Bytes; + +public class GroupTestUtil { + private final static Log LOG = LogFactory.getLog(GroupTestUtil.class); + static final Random r = new Random(); + public static void balanceTable(String table,HMaster master) { + List servers=master.getServerManager().getOnlineServersList(); + List regions = master.getAssignmentManager().getRegionsOfTable(Bytes.toBytes(table)); + HashMap map = new HashMap(); + for(HRegionInfo region:regions) + { + map.put(region, master.getAssignmentManager().getRegionServerOfRegion(region)); + } + LOG.debug("Region location:"+map); + doBalance(new HashSet(servers), map,master); + + } + private static void doBalance(HashSet servers, + HashMap map,HMaster master) { + HashMap moves = new HashMap(); + HashMap> serverList = new HashMap>(); + for (Entry e : map.entrySet()) { + if (serverList.get(e.getValue()) == null) { + serverList.put(e.getValue(), new ArrayList()); + } + serverList.get(e.getValue()).add(e.getKey()); + } + for (ServerName s : servers) { + if (serverList.get(s) == null) { + serverList.put(s, new ArrayList()); + } + } + + int div = 10; + while (div > 2) { + ServerName maxLoad = null, minLoad = null; + int maxLoadN = Integer.MIN_VALUE, minLoadN = Integer.MAX_VALUE; + for (Entry> e : serverList.entrySet()) { + if (e.getValue().size() >= maxLoadN) { + maxLoadN = e.getValue().size(); + maxLoad = e.getKey(); + } + if (e.getValue().size() <= minLoadN) { + minLoadN = e.getValue().size(); + minLoad = e.getKey(); + } + } + if (maxLoad == null || minLoad == null) + break; + if (serverList.get(maxLoad).size() == 0) + break; + else { + div = Math.abs(maxLoadN - minLoadN); + int index = r.nextInt(serverList.get(maxLoad).size()); + moves.put(serverList.get(maxLoad).get(index), minLoad); + serverList.get(minLoad).add(serverList.get(maxLoad).get(index)); + serverList.get(maxLoad).remove(index); + } + + } + + for (Entry e : moves.entrySet()) { + try { + LOG.info("move :" + e.getKey().getEncodedName() + " regions to " + + e.getValue()); + master.move(e.getKey().getEncodedNameAsBytes(), + Bytes.toBytes(e.getValue().getServerName())); + } catch (Exception e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + } + } + +} Index: src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriorityLargeRow.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriorityLargeRow.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriorityLargeRow.java (revision 0) @@ -0,0 +1,654 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.OperationPriority; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertTrue; + +/** + * Test for table priority, use table A,B,C which have different priorities. + * There are two tests,in first one table A's priority is 1(high priority), B is + * 5, C is 10 and in the second test,we switch the priorities of A and C. + */ +@Category(LargeTests.class) +public class TestTablePriorityLargeRow { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static Log LOG = LogFactory + .getLog(TestTablePriorityLargeRow.class); + static final Random r = new Random(); + static int rowNumber = 10000; + static final int threadN = 100; + HBaseAdmin admin = null; + // if there are data in the test tables,you + // can set method type to "scan" + static String method = "put"; + static final int regionNumber = 9; + private static byte[][] startKeys; + private static final byte[] value = new byte[50000]; + static { + startKeys = new byte[regionNumber][]; + for (int i = 0; i < regionNumber; i++) { + startKeys[i] = Bytes.toBytes(String.format( + "%0" + (regionNumber + "").length() + "d", i)); + } + } + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt( + PriorityJobQueue.WAIT_UNIT_CONF_KEY, 1000); + TEST_UTIL.startMiniCluster(3); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * set up a cluster and prepare tables. + */ + @Before + public void setUp() { + try { + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } catch (Exception e) { + LOG.info(e); + } + HTableDescriptor des; + try { + if (admin.tableExists("Table_A")) { + admin.disableTable("Table_A"); + admin.deleteTable("Table_A"); + } + if (admin.tableExists("Table_B")) { + admin.disableTable("Table_B"); + admin.deleteTable("Table_B"); + } + if (admin.tableExists("Table_C")) { + admin.disableTable("Table_C"); + admin.deleteTable("Table_C"); + } + } catch (IOException e1) { + LOG.error(e1); + } + try { + des = new HTableDescriptor("Table_A"); + des.addFamily(new HColumnDescriptor("ff")); + des = PriorityFunction.setPriority(1, des); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + try { + des = new HTableDescriptor("Table_B"); + des.addFamily(new HColumnDescriptor("ff")); + des = PriorityFunction.setPriority(5, des); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + try { + + des = new HTableDescriptor("Table_C"); + des.addFamily(new HColumnDescriptor("ff")); + des = PriorityFunction.setPriority(10, des); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + balanceTable("Table_A", TEST_UTIL.getMiniHBaseCluster().getMaster()); + balanceTable("Table_B", TEST_UTIL.getMiniHBaseCluster().getMaster()); + balanceTable("Table_C", TEST_UTIL.getMiniHBaseCluster().getMaster()); + } + + /** + * The worker used for test throughput and gather the result + */ + public static class Worker extends Thread { + HTable table; + String type; + private long throughput = 0; + long times = 1000; + boolean stopFlag = false; + int tablePriority; + long n = 0; + long startTime = 0; + int actionPriority = 0; + + /** + * get throughput of this worker + * + * @return + */ + public long getThroughput() { + long end = System.currentTimeMillis(); + throughput = (n * 1000) / (end - startTime + 1); + return throughput; + } + + /** + * get table priority + * + * @return table priority + */ + public int getTablePriority() { + return tablePriority; + } + + /** + * set table priority + */ + public void setTablePriority(int tablePriority) { + this.tablePriority = tablePriority; + } + + /** + * get action type + * + * @return action type of the worker + */ + public String getType() { + return type; + } + + /** + * the table used by the worker + * + * @return table of the worker + */ + public HTable getTable() { + return table; + } + + public Worker(HTable t, String type, long times) { + this.table = t; + this.type = type; + this.times = times; + } + + public void run() { + try { + doAction(table, type, times); + } catch (IOException e) { + LOG.info(e); + } + } + + public int getActionPriority() { + return actionPriority; + } + + public void setActionPriority(int actionPriority) { + this.actionPriority = actionPriority; + } + + /** + * stop the worker + */ + public void stopWorker() { + this.stopFlag = true; + } + + private void doAction(HTable t, String type, long times) throws IOException { + long start = System.currentTimeMillis(); + long end = System.currentTimeMillis(); + startTime = System.currentTimeMillis(); + int pri = Integer.parseInt(Bytes.toString(t.getTableDescriptor() + .getValue(PriorityFunction.PRI_KEY))); + Scan s = new Scan(); + s.setStartRow(Bytes.toBytes(r.nextInt(10) + "")); + if (type.equals("writeData")) { + t.setAutoFlush(false); + t.setWriteBufferSize(1000); + } + ResultScanner sn = null; + boolean scan = false; + if (type.equals("scan")) { + sn = t.getScanner(s); + scan = true; + } + while (true) { + if (n % 10000 == 0) { + end = System.currentTimeMillis(); + // LOG.debug("Thread:" + this.getId() + " type:" + type + " table" + // + t.getTableDescriptor().getNameAsString() + " pri :" + pri + // + " time:" + (end - start) + " total :" + n + " throughput:" + // + (n * 1000) / (end - startTime + 1)); + throughput = (n * 1000) / (end - startTime + 1); + start = end; + } + if (n % 100 == 0) { + end = System.currentTimeMillis(); + throughput = (n * 1000) / (end - startTime + 1); + } + Result ret = null; + if (scan) { + ret = sn.next(); + if (ret == null) { + try { + sn.close(); + } catch (Exception e) { + LOG.debug(e); + } + sn = t.getScanner(s); + } + } else { + put(t, n); + } + if (stopFlag || n > times) { + break; + } + n++; + } + t.flushCommits(); + t.close(); + } + } + + /** + * Verify whether the function take effect + * + * @param highs + * high priority workers + * @param lows + * low priority workers + */ + @SuppressWarnings("static-access") + public void verifyFunction(Worker high[], Worker middle[], Worker low[]) { + boolean highFinished = false; + boolean lowFinished = false; + boolean middleFinished = false; + long highThroughPut = 0; + long middleThroughPut = 0; + long lowThroughPut = 0; + while (!(highFinished && lowFinished && middleFinished)) { + try { + Thread.currentThread().sleep(100); + } catch (InterruptedException e) { + // ignore exceptions + } + highThroughPut = 0; + middleThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < high.length; i++) { + highThroughPut += high[i].getThroughput(); + lowThroughPut += low[i].getThroughput(); + middleThroughPut += middle[i].getThroughput(); + } + LOG.info("-------------------------------------------------------------"); + try { + LOG.info("high priority table is: " + + high[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " throughput is:" + highThroughPut); + LOG.info("middle priority table is: " + + middle[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + middle[0].getType() + ", priority:" + + middle[0].getTablePriority() + " throughput is:" + + middleThroughPut); + LOG.info("low priority table is: " + + low[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + low[0].getType() + ", priority:" + + low[0].getTablePriority() + " throughput is:" + lowThroughPut); + } catch (Exception e) { + LOG.error(e); + } + + highFinished = true; + lowFinished = true; + middleFinished = true; + for (int i = 0; i < high.length; i++) { + if (high[i].isAlive()) { + highFinished = false; + } + if (low[i].isAlive()) { + lowFinished = false; + } + if (middle[i].isAlive()) { + middleFinished = false; + } + } + if (highFinished) { + for (int i = 0; i < high.length; i++) { + low[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + middle[i].stopWorker(); + } + middleFinished = true; + lowFinished = true; + } + if (lowFinished) { + for (int i = 0; i < high.length; i++) { + high[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + middle[i].stopWorker(); + } + middleFinished = true; + highFinished = true; + } + if (middleFinished) { + for (int i = 0; i < high.length; i++) { + high[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + low[i].stopWorker(); + } + lowFinished = true; + highFinished = true; + } + + } + highThroughPut = 0; + middleThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < high.length; i++) { + highThroughPut += high[i].getThroughput(); + lowThroughPut += low[i].getThroughput(); + middleThroughPut += middle[i].getThroughput(); + } + + assertTrue("Action priority works properly", + highThroughPut > middleThroughPut && middleThroughPut > lowThroughPut); + LOG.info("-------------------------------------------------------------"); + LOG.info("---------------------Test finished --------------------------"); + + try { + LOG.info("high priority table is: " + + high[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " throughput is:" + highThroughPut); + LOG.info("middle priority table is: " + + middle[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + middle[0].getType() + ", priority:" + + middle[0].getTablePriority() + " throughput is:" + middleThroughPut); + LOG.info("low priority table is: " + + low[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + low[0].getType() + ", priority:" + + low[0].getTablePriority() + " throughput is:" + lowThroughPut); + } catch (Exception e) { + LOG.error(e); + } + LOG.info("####### Test for " + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " " + middle[0].getType() + + ", priority:" + middle[0].getTablePriority() + " ,and " + + low[0].getType() + ", priority:" + low[0].getTablePriority() + + " finished####"); + + LOG.info("-------------------------------------------------------------"); + } + + private void changePriority(int pria, int prib, int pric) { + try { + admin.disableTable("Table_A"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_A")); + des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY, + new OperationPriority(0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(pria, des1); + admin.modifyTable(Bytes.toBytes("Table_A"), des1); + admin.enableTable("Table_A"); + } catch (Exception e) { + LOG.info(e); + } + try { + admin.disableTable("Table_B"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_B")); + des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY, + new OperationPriority(0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(prib, des1); + admin.modifyTable(Bytes.toBytes("Table_B"), des1); + admin.enableTable("Table_B"); + } catch (Exception e) { + LOG.info(e); + } + try { + admin.disableTable("Table_C"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_C")); + des1.setValue(PriorityFunction.OPERATION_PRIORITY_KEY, + new OperationPriority(0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(pric, des1); + admin.modifyTable(Bytes.toBytes("Table_C"), des1); + admin.enableTable("Table_C"); + } catch (Exception e) { + LOG.info(e); + } + balanceTable("Table_A", TEST_UTIL.getMiniHBaseCluster().getMaster()); + balanceTable("Table_B", TEST_UTIL.getMiniHBaseCluster().getMaster()); + balanceTable("Table_C", TEST_UTIL.getMiniHBaseCluster().getMaster()); + } + + /** + * start the test. + */ + @Test(timeout = 180000) + public void testForDifferentTablePriority() { + method = "put"; + doTestJob(); +// method = "scan"; +// changePriority(1, 5, 10); +// doTestJob(); + } + + private void doTestJob() { + if (method.equals("scan")) { + rowNumber = rowNumber * 10; + } + Worker A[] = new Worker[threadN / 3]; + Worker B[] = new Worker[threadN / 3]; + Worker C[] = new Worker[threadN / 3]; + for (int i = 0; i < A.length; i++) { + try { + A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"), + method, rowNumber * 3 / threadN); + B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"), + method, rowNumber * 3 / threadN); + C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"), + method, rowNumber * 3 / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < A.length; i++) { + A[i].setTablePriority(1); + B[i].setTablePriority(5); + C[i].setTablePriority(10); + A[i].start(); + B[i].start(); + C[i].start(); + } + verifyFunction(A, B, C); + changePriority(10, 5, 1); + A = new Worker[threadN / 3]; + B = new Worker[threadN / 3]; + C = new Worker[threadN / 3]; + for (int i = 0; i < A.length; i++) { + try { + A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"), + method, rowNumber * 3 / threadN); + B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"), + method, rowNumber * 3 / threadN); + C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"), + method, rowNumber * 3 / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < A.length; i++) { + A[i].setTablePriority(10); + B[i].setTablePriority(5); + C[i].setTablePriority(1); + A[i].start(); + B[i].start(); + C[i].start(); + } + verifyFunction(C, B, A); + } + + public static void printTable(String name) { + + try { + HTable A = new HTable(TEST_UTIL.getConfiguration(), name); + Scan s = new Scan(); + ResultScanner rs; + rs = A.getScanner(s); + Result r = null; + while ((r = rs.next()) != null) { + System.out.println(r); + } + } catch (IOException e) { + LOG.error(e); + } + + } + + public static void main(String args[]) { + try { + setUpBeforeClass(); + TestTablePriorityLargeRow t = new TestTablePriorityLargeRow(); + t.setUp(); + tearDownAfterClass(); + } catch (Exception e) { + LOG.info(e); + } + } + + private static void put(HTable t, long i) { + Put p = new Put( + Bytes.toBytes("" + r.nextInt(1000) + (i) + r.nextInt(10000))); + p.add(Bytes.toBytes("ff"), Bytes.toBytes("ff"), value); + try { + t.put(p); + } catch (IOException e) { + LOG.info(e); + } + } + + public static void balanceTable(String table, HMaster master) { + List servers = master.getServerManager().getOnlineServersList(); + List regions = master.getAssignmentManager() + .getRegionsOfTable(Bytes.toBytes(table)); + HashMap map = new HashMap(); + for (HRegionInfo region : regions) { + map.put(region, + master.getAssignmentManager().getRegionServerOfRegion(region)); + + } + LOG.debug("Region location:" + map); + doBalance(new HashSet(servers), map, master); + + } + + private static void doBalance(HashSet servers, + HashMap map, HMaster master) { + HashMap moves = new HashMap(); + HashMap> serverList = new HashMap>(); + for (Entry e : map.entrySet()) { + if (serverList.get(e.getValue()) == null) { + serverList.put(e.getValue(), new ArrayList()); + } + serverList.get(e.getValue()).add(e.getKey()); + } + for (ServerName s : servers) { + if (serverList.get(s) == null) { + serverList.put(s, new ArrayList()); + } + } + + int div = 10; + while (div > 2) { + ServerName maxLoad = null, minLoad = null; + int maxLoadN = Integer.MIN_VALUE, minLoadN = Integer.MAX_VALUE; + for (Entry> e : serverList.entrySet()) { + if (e.getValue().size() >= maxLoadN) { + maxLoadN = e.getValue().size(); + maxLoad = e.getKey(); + } + if (e.getValue().size() <= minLoadN) { + minLoadN = e.getValue().size(); + minLoad = e.getKey(); + } + } + if (maxLoad == null || minLoad == null) + break; + if (serverList.get(maxLoad).size() == 0) + break; + else { + div = Math.abs(maxLoadN - minLoadN); + int index = r.nextInt(serverList.get(maxLoad).size()); + moves.put(serverList.get(maxLoad).get(index), minLoad); + serverList.get(minLoad).add(serverList.get(maxLoad).get(index)); + serverList.get(maxLoad).remove(index); + } + + } + + for (Entry e : moves.entrySet()) { + try { + LOG.info("move :" + e.getKey().getEncodedName() + " regions to " + + e.getValue()); + master.move(e.getKey().getEncodedNameAsBytes(), + Bytes.toBytes(e.getValue().getServerName())); + } catch (Exception e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + } + } +} Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1220359) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -244,7 +244,7 @@ } /** A call queued for handling. */ - protected class Call implements Delayable { + class Call implements Delayable { protected int id; // the client's call id protected Writable param; // the parameter passed protected Connection connection; // connection to client Index: src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java (revision 0) @@ -0,0 +1,79 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.ipc.Invocation; +import org.apache.hadoop.hbase.ipc.WritableRpcEngine.Server; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.io.Writable; + +/** + * An IPC server extends HBaseServer,and add schedule function to make table priority take + * effect. + */ +public class PriorityHBaseServer extends Server { + + /** + * Construct an RPC server. + * + * @param instance the instance whose methods will be called + * @param ifaces Define metrics for all methods in the given classes + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param priorityHandlerCount used by {@link HBaseServer} + * @param verbose whether each call should be logged + * @param highPriorityLevel used by {@link HBaseServer},decides actions of meta info will be + * handled by metaHandlers + * @throws IOException e + */ + public PriorityHBaseServer(Object instance, final Class[] ifaces, + Configuration conf, String bindAddress, int port, int numHandlers, + int priorityHandlerCount, boolean verbose, int highPriorityLevel) + throws IOException { + super(instance, ifaces, conf, bindAddress, port, numHandlers, + priorityHandlerCount, verbose, highPriorityLevel); + int maxQueueSize = this.conf.getInt("ipc.server.max.queue.size", 500); + callQueue = new PriorityJobQueue(maxQueueSize, + new PriorityFunction((HRegionServer) instance, conf), conf); + } + + @SuppressWarnings("rawtypes") + @Override + public Writable call(Class protocol, + Writable param, long receivedTime, MonitoredRPCHandler status) + throws IOException { + Invocation call = (Invocation) param; + HbaseObjectWritable writable = (HbaseObjectWritable) super.call(protocol, + param, receivedTime, status); + if (call.getMethodName().endsWith("openScanner")) { + ((PriorityFunction) ((PriorityJobQueue) callQueue).getQosFunction()) + .initScannerPriority(call, writable.get()); + } + return writable; + } + +} Index: src/main/java/org/apache/hadoop/hbase/ipc/QosRegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/QosRegionObserver.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/QosRegionObserver.java (revision 0) @@ -0,0 +1,64 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; + +public class QosRegionObserver extends BaseRegionObserver { + HashMap regionScannerToScan = new HashMap(); + + @Override + public void postOpen(ObserverContext e) { + HRegion region = e.getEnvironment().getRegion(); + PriorityFunction.initRegionPriority(region); + } + + @Override + public void postClose(ObserverContext e, + boolean abortRequested) { + HRegion region = e.getEnvironment().getRegion(); + PriorityFunction.cleanRegionPriority(region); + } + + @Override + public RegionScanner postScannerOpen( + final ObserverContext e, final Scan scan, + final RegionScanner s) { + regionScannerToScan.put(s, scan); + return s; + } + + @Override + public void postScannerClose( + final ObserverContext e, + final InternalScanner s) throws IOException { + PriorityFunction.removeScanner(regionScannerToScan.get((RegionScanner) s)); + regionScannerToScan.remove((RegionScanner) s); + } +} Index: src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java (revision 0) @@ -0,0 +1,437 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.PriorityQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Operation; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; +import com.google.common.base.Function; + +/** + * This queue is used by {@link PriorityHBaseServer}. Compare to + * PriorityBlockingQueue this queue has accurate capacity management. The Job + * class of this queue can adjust the priority according to the waiting times of + * the Job. Please use take and put to manipulate this queue.
+ * + * @param + * the class contained by the queue + */ +public class PriorityJobQueue extends PriorityBlockingQueue { + private static final long serialVersionUID = 1L; + public static final String WAIT_UNIT_CONF_KEY="hbase.tablepriority.wait.unit"; + private static final Log LOG = LogFactory.getLog(PriorityJobQueue.class); + private final PriorityQueue> q = new PriorityQueue>(); + private int capacity = 500; + ReentrantLock lock = new ReentrantLock(true); + Condition notEmpty = lock.newCondition() ; + Condition full = lock.newCondition(); + //waitTimeUnit is 10 means: + //after waiting for 10ms,the call's priority will be increased by 1; + private static int waitUnit= 10; + private long[] priorityAddTimes = new long[10]; + private Function qosFunction; + private AtomicInteger outputIndicator = new AtomicInteger (); + private static long getTimes=0; + /** + * get static data of add times + * @return the add times arranged by priority + */ + public long[] getPriorityAddTimes() { + return priorityAddTimes; + } + + /** + @param capacity + * the capacity of the queue,not a precision value,if queue size + * exceeds this value, workers which add jobs should wait,but if a + * thread waits too many turns (more than maxWait),it will add the + * job anyhow. + * @param qosFunction the function used to decide the call's priority. + * @param conf configuration. + */ + public PriorityJobQueue(int capacity, + Function qosFunction, final Configuration conf) { + this.qosFunction = qosFunction; + waitUnit = conf.getInt(WAIT_UNIT_CONF_KEY, 10); + this.capacity = capacity; + } + + /** + * Add a job to this queue. + * @param call the job instance + * @param pri the job's priority + * @throws InterruptedException + */ + public void put(T call, int pri) throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + Job j=new Job(pri, call); + try { + try { + while (q.size() >= this.capacity) { + this.full.await(); + } + } catch (InterruptedException e) { + this.full.signal();// propagate to non-interrupted thread + throw e; + } + this.q.add(j); + this.notEmpty.signal(); + this.priorityAddTimes[getStaticsIndex(j)]++; + } finally { + lock.unlock(); + } + } + + /** + * Used to increase the according flag in priorityAddTimes. The + * priorityAddTimes is for debug purpose. + * + * @param j + * @return + */ + private int getStaticsIndex(Job j) { + return j.orgPri < 10 && j.orgPri > 0 ? j.orgPri - 1 : (j.orgPri <= 0 ? 0 + : 9); + } + + /** + * Get a job from the queue. + * + * @param priority + * the worker thread's priority + * @return the job + * @throws InterruptedException + */ + private Job innerTake() throws InterruptedException { + Job ret = null; + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (q.size() == 0) { + try { + this.notEmpty.await(); + } catch (InterruptedException ie) { + // propagate to non-interrupted thread + this.notEmpty.notify(); + throw ie; + } + } + ret = q.poll(); + assert ret != null; + this.full.signal(); + getTimes++; + return ret; + } finally { + lock.unlock(); + } + } + + @SuppressWarnings("unchecked") + @Override + public void put(Object e) { + HBaseServer.Call call = (HBaseServer.Call) (e); + int pri = this.qosFunction.apply(call.param); + try { + this.put((T) call, pri); + } catch (InterruptedException e1) { + LOG.error(e1); + } + } + + @Override + public T take() throws InterruptedException { + if (LOG.isDebugEnabled()) { + if ((outputIndicator.addAndGet(1) << 52) >>> 52 == 0) { + this.printMetrix(); + } + } + return this.innerTake().getCall(); + } + + public Function getQosFunction() { + return qosFunction; + } + + /** + * Get the size of the queue. + * @return the size of the queue + */ + @Override + public int size() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.size(); + } finally { + lock.unlock(); + } + } + /** + * Print statistical information. + */ + public void printMetrix() { + LOG.debug("size is :" + q.size()); + LOG.debug("capacity is :" + this.capacity); + String out = "Add times "; + for (int i = 0; i < this.priorityAddTimes.length; i++) { + out += " pri:" + (i + 1) + ":" + this.priorityAddTimes[i]; + this.priorityAddTimes[i]=0; + } + LOG.debug("priority request static:" + out); + PriorityFunction.printRegionPriority(); + lock.lock(); + try { + + LOG.debug(""+this.q); + } catch (Exception e) { + LOG.error(e); + } + finally + { + lock.unlock(); + } + } + + /** + * The Job stored in queue + * @param + */ + public static class Job implements Comparable> { + private int orgPri = 0; + private long initTime = 0; + private T call; + private long initIndex=0; + +// /** +// * Get the priority now,the priority is calculated this way: +// * priority_now = +// * original priority - (time_now -time_constructed)/time_unit +// * @return the priority of this call +// */ +// public int getPriority() { +// //In case of system time changed. +// if(System.currentTimeMillis() arg0) { + return this.getPriority() - arg0.getPriority(); + } + } + + + /** + * class used to store the action priority of table; + */ + public static class OperationPriority { + private int mutationPriority = 0; + private int getPriority = 0; + private int scanPriority = 0; + public OperationPriority() { + } + + /** + * Get the priority of this operation + * @param operation + * @return + */ + public int getPriority(Operation operation) { + if(operation instanceof Get) + { + return this.getGetPriority(); + } + else if(operation instanceof Mutation) + { + return this.getMutationPriority(); + } + else if(operation instanceof Scan) + { + return this.getScanPriority(); + } + return 0; + } + + /** + * Construct a OperationPriority class. + * @param scanPriority priority of scan + * @param mutationPriority priority of mutation + * @param getPriority priority of get + */ + public OperationPriority(int scanPriority, int mutationPriority, + int getPriority) { + this.scanPriority = scanPriority; + this.mutationPriority = mutationPriority; + this.getPriority = getPriority; + } + + /** + * get put priority + * @return + */ + public int getMutationPriority() { + return mutationPriority; + } + + /** + * set put priority + * + * @return + */ + public void setMutationPriority(int priority) { + this.mutationPriority = priority; + } + + /** + * Get Get's priority + * + * @return + */ + public int getGetPriority() { + return getPriority; + } + + /** + * Set Get's priority + * + * @return + */ + public void setGetPriority(int priority) { + this.getPriority = priority; + } + + /** + * Get Scan's priority + * + * @return + */ + public int getScanPriority() { + return scanPriority; + } + + /** + * Set Scan's priority + * + * @return + */ + public void setScanPriority(int priority) { + this.scanPriority = priority; + } + + public String toString() { + return "scan priority:" + this.scanPriority + ", mutation priority:" + + this.mutationPriority + ", get priority:" + this.getPriority; + } + + /** + * Store this instance into byte array. + * @return + */ + public byte[] toBytes() { + byte[] ret = new byte[Bytes.SIZEOF_INT * 3]; + Bytes.putInt(ret, 0, this.scanPriority); + Bytes.putInt(ret, Bytes.SIZEOF_INT, this.mutationPriority); + Bytes.putInt(ret, Bytes.SIZEOF_INT * 2, this.getPriority); + return ret; + } + + /** + * Get value from a byte array. + * @param b the serialized value of an OperationPriority instance + */ + public OperationPriority fromBytes(byte[] b) { + if (b.length != Bytes.SIZEOF_INT * 3) + return null; + else { + this.scanPriority = Bytes.toInt(b, 0); + this.mutationPriority = Bytes.toInt(b, Bytes.SIZEOF_INT); + this.getPriority = Bytes.toInt(b, Bytes.SIZEOF_INT * 2); + } + return this; + } + } +} Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 1220359) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy) @@ -387,6 +387,11 @@ final int numHandlers, int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel) throws IOException { + if (instance instanceof HRegionInterface + && PriorityFunction.enable(conf)) { + return new PriorityHBaseServer(instance, ifaces, conf, bindAddress, port, + numHandlers, metaHandlerCount, verbose, highPriorityLevel); + } return getServer(instance.getClass(), instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel); } Index: src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java (revision 0) @@ -0,0 +1,418 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.Action; +import org.apache.hadoop.hbase.client.MultiAction; +import org.apache.hadoop.hbase.client.Operation; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.OperationPriority; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +import com.google.common.base.Function; + +/** + * This Function is used by {@link PriorityHBaseServer}. In this + * Function,generally we can get the priority of a RPC call base by it's table + * priority and the table's method priority.
+ */ + +public class PriorityFunction implements Function { + + public final static byte[] PRI_KEY = Bytes.toBytes("_table_priority"); + public final static byte[] OPERATION_PRIORITY_KEY = Bytes + .toBytes("_operation_priority"); + public static final int LOWEST_PRI = 10; + public static final int DEFAULT_PRI = 5; + public static final int HIGHEST_PRI = -10; + public static final int HIGH_PRI = 0; + public static final String ENABLE_KEY = "hbase.tablepriority.enable"; + private static Boolean enable = null; + + /** + * Return whether table priority is enabled. + * + * @return + */ + public static boolean enable(Configuration conf) { + if(enable==null) + { + enable=conf.getBoolean(ENABLE_KEY, true); + } + return enable; + } + + /** + * The priority map cache the region's priority in memory. Key is the region + * name which usually is obtained from the IPC call. Value is the priority of + * the region also is the priority of the table which the region belong to. + */ + private static final ConcurrentHashMap regionPriMap = new ConcurrentHashMap(); + + // Cache the Scan and its id,when scanner closed we can use the Scan to delete all cached information. + private static final ConcurrentHashMap scanToId = new ConcurrentHashMap(); + + // Cache the scanner ID and the priority of the scanner. + private static final ConcurrentHashMap scannerPriMap = new ConcurrentHashMap(); + + // Store the region name and it's according priority instance. + private static final ConcurrentHashMap regionActionPriorities = new ConcurrentHashMap(); + + private static final Log LOG = LogFactory + .getLog("org.apache.hadoop.hbase.ipc.PriorityFunction"); + private static HRegionServer server; + + public PriorityFunction(HRegionServer server, final Configuration conf) + throws MasterNotRunningException, ZooKeeperConnectionException { + if (PriorityFunction.server == null) { + PriorityFunction.server = server; + } + /** + * Enable the QosRegionObserver here. + */ + if (enable) { + String cops = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); + if (cops == null || cops.equals("")) { + cops = "org.apache.hadoop.hbase.ipc.QosRegionObserver"; + } else { + cops += ",org.apache.hadoop.hbase.ipc.QosRegionObserver"; + } + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, cops); + } + } + + /** + * Set table priority. + * @param priority the priority you want to set + * @param des the table descriptor + * @return return the modified descriptor + * @throws Exception + */ + public static HTableDescriptor setPriority(String priority, + HTableDescriptor des) throws Exception { + Integer pri = Integer.parseInt(priority); + if (pri < 1 || pri > 10) + throw new NumberFormatException( + "Table priority should be [1 to 10]."); + des.setValue(PRI_KEY, Bytes.toBytes(priority)); + return des; + } + /** + * Set table priority. + * @param priority the priority you want to set + * @param des table descriptor + * @param op AperationPriority instance + * @return the prepared table descriptor + * @throws Exception + */ + public static HTableDescriptor setPriority(String priority, + HTableDescriptor des, OperationPriority op) throws Exception { + Integer pri = Integer.parseInt(priority); + if (pri < 1 || pri > 10) + throw new NumberFormatException( + "Table priority should be between [1 to 10]."); + setTableActionPriorities(des, op); + des.setValue(PRI_KEY, Bytes.toBytes(priority)); + return des; + } + /** + * Set table priority. + * @param priority the priority you want to set + * @param des the table descriptor + * @param op OperationPriority instance + * @return + * @throws Exception + */ + public static HTableDescriptor setPriority(int priority, + HTableDescriptor des, OperationPriority op) throws Exception { + if (priority < 1 || priority > 10) { + throw new NumberFormatException( + "Table priority should be between [1 to 10]."); + } + setTableActionPriorities(des, op); + des.setValue(PRI_KEY, Bytes.toBytes(priority)); + return des; + } + + /** + * Set table priority. + * @param priority the priority you want to set + * @param des the table descriptor + * @return + * @throws Exception + */ + public static HTableDescriptor setPriority(int priority, HTableDescriptor des) + throws Exception { + if (priority < 1 || priority > 10) { + throw new NumberFormatException( + "Table priority should be between [1 to 10]."); + } + des.setValue(PRI_KEY, Bytes.toBytes(priority + "")); + return des; + } + + /** + * Get action priorities from table descriptor. + * + * @param des the table descriptor + * @return + */ + public static OperationPriority getActionPriorities(HTableDescriptor des) { + byte[] op = des.getValue(OPERATION_PRIORITY_KEY); + OperationPriority ret = new OperationPriority(); + if (op != null) { + try { + ret.fromBytes(op); + } catch (Exception e) { + LOG.error("This table has wrong action priorities," + + "please use PriorityFunction.setPriority(int priority," + + "HTableDescriptor des, OperationPriority op)" + + " to set table's action priority"); + } + } + return ret; + } + /** + * Set action priorities for table descriptor. + * @param des the table descriptor + * @param op OperationPriority instance + */ + public static void setTableActionPriorities(HTableDescriptor des, + OperationPriority op) { + des.setValue(PriorityFunction.OPERATION_PRIORITY_KEY, + op.toBytes()); + } + /** + * Initialize the scanner's priority. + * @param call + * @param value scanner id + */ + public void initScannerPriority(Invocation call, Object value) { + Long id = (Long) value; + byte[] region = (byte[]) call.getParameters()[0]; + String regionN = Bytes.toString(region); + Integer pri = regionPriMap.get(regionN); + Scan scan=(Scan) call.getParameters()[1]; + if (pri == null) { + pri=initRegionPriority(regionN, false); + } + OperationPriority action = regionActionPriorities.get(regionN); + if (action != null) + { + pri += action.getScanPriority(); + } + scanToId.put(scan, id); + scannerPriMap.put(id, pri); + } + @Override + public Integer apply(Writable from) { + return getCallPriority(from); + } + + /** + * Remove the cached region priorities. + * @param region + */ + public static void cleanRegionPriority(HRegion region) { + String regionN = region.getRegionNameAsString(); + if(regionN!=null) + { + regionPriMap.remove(regionN); + regionActionPriorities.remove(regionN); + } + } + /** + * Remove closed scanners. + * @param s the scanner id to be removed + */ + public static void removeScanner(Scan s) { + Long id = null; + try { + id=scanToId.get(s); + scanToId.remove(s); + scannerPriMap.remove(id); + } catch (Exception e) { + LOG.error("scanner id:" + id +" scan:"+s+ " doesn't exist."); + } + } + + /** + * Get the priority of the call + * + * @param call + * @return the priority + */ + private int getCallPriority(Writable param) { + Invocation invo = (Invocation) param; + // if this is a next() call, we should find the priority of the scanner + // which + // is initiated by initScannerPriority(Invocation call, Object value) + if (invo.getMethodName().endsWith("next")) { + Long scanId = (Long) invo.getParameters()[0]; + Integer pri = scannerPriMap.get(scanId); + if (pri == null) { + LOG.error("the scanner didn't get it's priority:" + scanId); + pri = DEFAULT_PRI; + } + return pri; + } else if (invo.getMethodName().endsWith("multi")) { + // if it's a multi call,we use the first action of the in + // the MutiAction map to decide the priority of this call. + MultiAction multi = (MultiAction) invo.getParameters()[0]; + for (Entry e : multi.actions.entrySet()) { + String regionN = Bytes.toString(e.getKey()); + Integer pri = getRegionPri(regionN); + OperationPriority op; + if ((op = regionActionPriorities.get(regionN)) != null) { + Action action = ((List>) e.getValue()).get(0); + if (action != null) { + if (action.getAction() instanceof Operation) { + pri += op.getPriority((Operation) action.getAction()); + } + // Actually there a Exec class doesn't extends Operation class. + // We don't how to classify this operation yet. + } + } + return pri; + } + return DEFAULT_PRI; + } else if (invo.getMethodName().endsWith("increment") + || invo.getMethodName().endsWith("incrementColumnValue")) { + // due to the Increment class does not inherit the Operation class, + // we deal with it respectively. + byte[] region = (byte[]) invo.getParameters()[0]; + String regionN = Bytes.toString(region); + Integer pri = getRegionPri(regionN); + OperationPriority op; + if ((op = regionActionPriorities.get(regionN)) != null) { + pri += op.getMutationPriority(); + } + return pri; + } + Object params[] = invo.getParameters(); + // Deal with the subclasses of Operation + if (params.length >= 2 && params[0] instanceof byte[] + && params[params.length - 1] instanceof Operation) { + // If it's openScanner operation. + if (params[params.length - 1] instanceof Scan) { + return HIGH_PRI; + } + Operation ope = (Operation) params[params.length - 1]; + byte[] region = (byte[]) params[0]; + String regionN = Bytes.toString(region); + Integer pri = getRegionPri(regionN); + OperationPriority op; + if ((op = regionActionPriorities.get(regionN)) != null) { + pri += op.getPriority(ope); + } + return pri; + } + return DEFAULT_PRI; + } + + private int getRegionPri(String regionN) { + Integer pri = regionPriMap.get(regionN); + if (pri == null) { + pri = initRegionPriority(regionN, false); + } + if (pri == null) { + return DEFAULT_PRI; + } + return pri; + } + + /** + * Get region priority. + * + * @param region the region you want to get priority + * @return the region's priority + */ + public static int initRegionPriority(HRegion region) { + if (region == null || region.getRegionNameAsString() == null) { + return DEFAULT_PRI; + } + String regionN = region.getRegionNameAsString(); + int pri = DEFAULT_PRI; + if (region.getRegionInfo().isMetaRegion() + || region.getRegionInfo().isRootRegion()) { + pri = HIGHEST_PRI; + } + else if (region.getTableDesc().getValue(PRI_KEY) != null) { + try { + pri = Integer.parseInt(Bytes.toString(region.getTableDesc().getValue( + PRI_KEY))); + } catch (Exception e) { + LOG.error("Table " + region.getTableDesc().getNameAsString() + + " has a wrong priority"); + } + } + OperationPriority op = getActionPriorities(region + .getTableDesc()); + regionActionPriorities.put(regionN, op); + regionPriMap.put(regionN, pri); + return pri; + } + + /** + * Get region priority. + * + * @param region + * the region you want to get priority + * @param force + * force to refresh priority, if true,the method will get priority + * from region's table descriptor,or it will look from the cache. + * @return the region's priority + */ + private static int initRegionPriority(String region, boolean force) { + if (!force) { + Integer ret = regionPriMap.get(region); + if (ret != null) { + return ret; + } + } + HRegion hr = server.getOnlineRegion(Bytes.toBytes(region)); + return initRegionPriority(hr); + } + + /** + * Print out region priorities. + */ + public static void printRegionPriority() + { + String out = "Region Priories:"; + out+=regionPriMap.toString(); + LOG.debug(out); + } + +}