--- src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 1189169) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy) @@ -374,6 +374,11 @@ final int numHandlers, int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel) throws IOException { + if (instance instanceof HRegionInterface + && conf.getBoolean("hbase.tablepriority.enable", true)) { + return new PriorityHBaseServer(instance, ifaces, conf, bindAddress, port, + numHandlers, metaHandlerCount, verbose, highPriorityLevel); + } return getServer(instance.getClass(), instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel); } diff -uNr src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1189169) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -207,7 +207,7 @@ private Listener listener = null; protected Responder responder = null; protected int numConnections = 0; - private Handler[] handlers = null; + protected Handler[] handlers = null; private Handler[] priorityHandlers = null; protected HBaseRPCErrorHandler errorHandler = null; @@ -243,7 +243,7 @@ } /** A call queued for handling. */ - protected class Call implements Delayable { + class Call implements Delayable { protected int id; // the client's call id protected Writable param; // the parameter passed protected Connection connection; // connection to client @@ -1196,7 +1196,7 @@ } /** Handles queued calls . */ - private class Handler extends Thread { + class Handler extends Thread { private final BlockingQueue myCallQueue; private MonitoredRPCHandler status; diff -uNr src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java --- src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java (working copy) @@ -0,0 +1,476 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.Action; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.MetaScanner; +import org.apache.hadoop.hbase.client.MultiAction; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.ActionPriorities; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +import com.google.common.base.Function; + +/** + * This Function is used by {@link PriorityHBaseServer} In this + * Function,generally we can get the priority of call base by it's table's + * priority and the table's method's priority.
+ */ + +public class PriorityFunction implements Function { + + public final static byte[] PRI_KEY = Bytes.toBytes("_table_priority"); + public final static byte[] PRI_KEY_ACTION_PRIORITY = Bytes.toBytes("_action_priority"); + public static final int LOWEST_PRI = 10; + public static final int DEFAULT_PRI = 5; + public static final int HIGHEST_PRI = -10; + public static final int HIGH_PRI = 0; + /** + * priority refresh interval + */ + private static int INIT_INTER = 12000; + + /** + * The priority map cache the region's priority in memory. + * Key is the region name which usually is obtained from the IPC call. + * Value is the priority of the region also is the + * priority of the table which the region belong to. + */ + private final ConcurrentHashMap regionPriMap = new ConcurrentHashMap(); + + // key is name of table, value is the table's priority. + private final ConcurrentHashMap tablePriMap = new ConcurrentHashMap(); + + // Cache the scanner ID and the region name of the scanner. + private final ConcurrentHashMap scannerRegionMap = new ConcurrentHashMap(); + + // Cache the scanner ID and the priority of the scanner. + private final ConcurrentHashMap scannerPriMap = new ConcurrentHashMap(); + + // Store the table name and it's action priority instance. + private final ConcurrentHashMap actionPriorities = new ConcurrentHashMap(); + + // Store the region name and it's according priority instance. + private final ConcurrentHashMap regionActionPriorities = new ConcurrentHashMap(); + + // TODO: add priority of source host. + @SuppressWarnings("unused") + private final ConcurrentHashMap> tableSourceHostPlus = new ConcurrentHashMap>(); + @SuppressWarnings("unused") + private final ConcurrentHashMap> regionSourceHostPlus = new ConcurrentHashMap>(); + public static final Log LOG = LogFactory + .getLog("org.apache.hadoop.hbase.ipc.PriorityFunction"); + + private HRegionServer server; + private Configuration conf; + protected HBaseAdmin admin = null; + private static boolean run = true; + + public PriorityFunction(HRegionServer server, final Configuration conf) + throws MasterNotRunningException, ZooKeeperConnectionException { + this.conf = conf; + this.server = server; + INIT_INTER = this.conf.getInt("ipc.priority.refresh.interval", 12000); + this.priorityIniter.start(); + + // this.admin = new HBaseAdmin(this.conf); + } + + public void stop() + { + this.run=false; + this.priorityIniter.interrupt(); + } + /** + * get the initiation interval + * @return the initiation interval + */ + public static int getRefreshInterval() { + return INIT_INTER; + } + + /** + * Set table priority + * + * @param priority the priority you want to set + * @param des the table descriptor + * @return + * @throws Exception + */ + public static HTableDescriptor setPriority(String priority, + HTableDescriptor des) throws Exception { + Integer pri = Integer.parseInt(priority); + if (pri < 0 || pri > 10) + throw new NumberFormatException("Table priority should be between 0 and 10."); + des.setValue(PRI_KEY, Bytes.toBytes(priority)); + return des; + } + + /** + * Set table priority + * + * @param priority the priority you want to set + * @param des the table descriptor + * @return + * @throws Exception + */ + public static HTableDescriptor setPriority(int priority, HTableDescriptor des) + throws Exception { + if (priority < 0 || priority > 10) + throw new NumberFormatException("Table priority should be between 0 and 10."); + des.setValue(PRI_KEY, Bytes.toBytes(priority + "")); + return des; + } + + /** + * get action priorities from table descriptor + * + * @param des the table descriptor + * @return + */ + public static ActionPriorities getTableActionPriorities(HTableDescriptor des) { + byte[] actionPriority = des.getValue(PRI_KEY_ACTION_PRIORITY); + ActionPriorities ret = new ActionPriorities(); + if (actionPriority != null) { + try { + ret.fromBytes(actionPriority); + } catch (Exception e) { + LOG.error("This table has wrong action priorities," + + "please use {@ PriorityFunction}setPriority()" + +" method to set table action priority"); + } + } + return ret; + } + + /** + * set action priorities for table descriptor + * @param des the table descriptor + * @param actionPriority ActionPriority instance + */ + public static void getTableActionPriorities(HTableDescriptor des, ActionPriorities actionPriority) { + des.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY, + actionPriority.toBytes()); + } + + /** + * Initialize the scanner's priority,invoked by openscanner + * + * @param call + * @param value scanner id + */ + public void initScannerPriority(Invocation call, Object value) { + Long id = (Long) value; + byte[] region = (byte[]) call.getParameters()[0]; + String regionN = Bytes.toString(region); + Integer prii = regionPriMap.get(regionN); + if (prii == null) { + this.initRegionPriority(regionN, false); + } + scannerRegionMap.put(id, regionN); + } + + /** + * + */ + public void refreshCache() { + priorityIniter.interrupt(); + } + + @Override + public Integer apply(Writable from) { + return getCallPriority(from); + + } + + private Thread priorityIniter = new Thread() { + public void run() { + while (run&&(!server.isStopped())&&(!server.isStopping())) + { + try { + sleep(INIT_INTER); + } catch (InterruptedException e) { + } + if (server.isOnline()) + refreshPriority(); + } + } + }; + + private void refreshRegionPriority() { + Integer pri = DEFAULT_PRI; + List list = null; + try { + list = MetaScanner.listAllRegions(conf); + } catch (IOException e) { + LOG.error(e); + return; + } + for (HRegionInfo region : list) { + pri = DEFAULT_PRI; + String tableN = Bytes.toString(HRegionInfo.getTableName(region + .getRegionName())); + Integer priT = tablePriMap.get(tableN); + if (priT != null) { + pri = priT; + } + ActionPriorities actionPriority = actionPriorities.get(tableN); + if (actionPriority == null) { + actionPriority = new ActionPriorities(); + } + regionActionPriorities.put(region.getRegionNameAsString(), actionPriority); + regionPriMap.put(region.getRegionNameAsString(), pri); + } + } + + /** + * Refresh priorities. + */ + private void refreshPriority() { + refreshTablePriority(); + refreshRegionPriority(); + for (Long id : scannerRegionMap.keySet()) { + if (!server.scannerAlive(String.valueOf(id))) { + scannerRegionMap.remove(id); + } + } + for (Long id : scannerPriMap.keySet()) { + if (!server.scannerAlive(String.valueOf(id))) { + scannerPriMap.remove(id); + } + } + } + + + /** + * Get the priority of the call + * + * @param call + * @return the priority + */ + private int getCallPriority(Writable param) { + Invocation invo = (Invocation) param; + if (invo.getMethodName().endsWith("next")) { + Long scanId = (Long) invo.getParameters()[0]; + Integer pri = scannerPriMap.get(scanId); + if (pri == null) { + String regionN = scannerRegionMap.get(scanId); + if (regionN != null) { + pri = getRegionPri(regionN); + ActionPriorities action = regionActionPriorities.get(regionN); + if (action != null) { + pri += action.getScanPlus(); + } + scannerPriMap.put(scanId, pri); + return pri; + } else { + LOG.error("error,there is no this scanner id"); + return DEFAULT_PRI; + } + } + return pri; + } else if (invo.getMethodName().endsWith("multi")) { + MultiAction multi = (MultiAction) invo.getParameters()[0]; + for (Map.Entry e : multi.actions.entrySet()) { + String regionN = Bytes.toString((byte[]) e.getKey()); + Integer pri = getRegionPri(regionN); + ActionPriorities actionPriority; + if ((actionPriority = regionActionPriorities.get(regionN)) != null) { + List actionsForRegion = (List) e.getValue(); + Action action = actionsForRegion.get(0); + if (action != null) { + Row row = action.getAction(); + if (row instanceof Delete) { + pri += actionPriority.getDeletePlus(); + + } else if (row instanceof Get) { + pri += actionPriority.getGetPlus(); + + } else if (row instanceof Put) { + pri += actionPriority.getPutPlus(); + } + } + } + return pri; + } + return DEFAULT_PRI; + } else if (invo.getMethodName().endsWith("put") + || invo.getMethodName().endsWith("increment") + || invo.getMethodName().endsWith("incrementColumnValue") + || invo.getMethodName().endsWith("checkAndPut")) { + byte[] region = (byte[]) invo.getParameters()[0]; + String regionN = Bytes.toString(region); + Integer pri = getRegionPri(regionN); + ActionPriorities actionPriority; + if ((actionPriority = regionActionPriorities.get(regionN)) != null) { + pri += actionPriority.getPutPlus(); + } + return pri; + } else if (invo.getMethodName().endsWith("get")) { + byte[] region = (byte[]) invo.getParameters()[0]; + String regionN = Bytes.toString(region); + Integer pri = getRegionPri(regionN); + ActionPriorities actionPriority; + if ((actionPriority = regionActionPriorities.get(regionN)) != null) { + pri += actionPriority.getGetPlus(); + } + return pri; + } else if (invo.getMethodName().endsWith("delete") + || invo.getMethodName().endsWith("checkAndDelete")) { + byte[] region = (byte[]) invo.getParameters()[0]; + String regionN = Bytes.toString(region); + Integer pri = getRegionPri(regionN); + ActionPriorities actionPriority; + if ((actionPriority = regionActionPriorities.get(regionN)) != null) { + pri += actionPriority.getDeletePlus(); + } + return pri; + } else { + return HIGH_PRI; + } + } + + private int getRegionPri(String regionN) { + Integer pri = regionPriMap.get(regionN); + if (pri == null) { + pri = initRegionPriority(regionN, false); + } + if (pri == null) + return DEFAULT_PRI; + return pri; + } + + /** + * Initialize the region priority + * + * @param regions the region want to get priority + * @param force force refresh priority,if true will + * get priority from table descriptor. + * @return the region priority + */ + private int initRegionPriority(String region, boolean force) { + if (!force) { + Integer ret = regionPriMap.get(region); + if (ret != null) + return ret; + } + Integer prii; + int pri = DEFAULT_PRI; + HRegion hr = this.server.getOnlineRegion(Bytes.toBytes(region)); + + if (hr != null) { + if (hr.getRegionInfo().isMetaRegion() + || hr.getRegionInfo().isRootRegion()) { + pri = HIGHEST_PRI; + regionPriMap.put(region, pri); + return pri; + } + String tableName = hr.getTableDesc().getNameAsString(); + prii = tablePriMap.get(tableName); + if (prii == null) { + if (hr.getTableDesc().getValue(PRI_KEY) != null) { + try { + + pri = Integer.parseInt(Bytes.toString(hr.getTableDesc().getValue( + PRI_KEY))); + } catch (Exception e) { + LOG.error("Table " + hr.getTableDesc().getNameAsString() + + " has a wrong priority"); + } + } + tablePriMap.put(tableName, pri); + } else { + pri = prii; + } + } else { + LOG.error("error this is no this region" + region); + } + regionPriMap.put(region, pri); + if (regionActionPriorities.get(region) == null) { + if (hr != null) { + ActionPriorities actionPriority = this.getTableActionPriorities(hr.getTableDesc()); + regionActionPriorities.put(region, actionPriority); + } + } + return pri; + } + + private void refreshTablePriority() { + HTableDescriptor[] dess; + try { + if(admin==null) + { + admin=new HBaseAdmin(this.conf); + } + dess = admin.listTables(); + } catch (IOException e) { + LOG.error(e); + return; + } + Integer pri = DEFAULT_PRI; + for (HTableDescriptor des : dess) { + pri = DEFAULT_PRI; + byte[] prib = des.getValue(PRI_KEY); + if (prib != null) + try { + pri = Integer.parseInt(Bytes.toString((prib))); + } catch (Exception e) { + LOG.error("table priority error :" + Bytes.toString(prib) + + " table name:" + des.getNameAsString()); + } + ActionPriorities actionPriority = getTableActionPriorities(des); + tablePriMap.put(des.getNameAsString(), pri); + actionPriorities.put(des.getNameAsString(), actionPriority); + } + } + + /** + * translate thread priority to system priority + * + * @param tpri transfor thread priority to according table priority + * @return + */ + public static int priTrans(int tpri) { + // return 10; + return tpri > 10 || tpri < 1 ? 5 : 11 - tpri; + } + +} diff -uNr src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java --- src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java (working copy) @@ -0,0 +1,120 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.ipc.Invocation; +import org.apache.hadoop.hbase.ipc.WritableRpcEngine.Server; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.io.Writable; + +/** + * An IPC server extends HBaseServer,and add schedule function to make table priority take + * effect. + */ +public class PriorityHBaseServer extends Server { + + /** + * Construct an RPC server. + * + * @param instance the instance whose methods will be called + * @param ifaces Define metrics for all methods in the given classes + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param priorityHandlerCount used by {@link HBaseServer} + * @param verbose whether each call should be logged + * @param highPriorityLevel used by {@link HBaseServer},decide actions of meta info will be + * handled by metaHandlers + * @throws IOException e + */ + public PriorityHBaseServer(Object instance, final Class[] ifaces, + Configuration conf, String bindAddress, int port, int numHandlers, + int priorityHandlerCount, boolean verbose, int highPriorityLevel) + throws IOException { + super(instance, ifaces, conf, bindAddress, port, numHandlers, + priorityHandlerCount, verbose, highPriorityLevel); + int maxQueueSize = this.conf.getInt("ipc.server.max.queue.size", 500); + int handlers=this.conf.getInt("hbase.regionserver.handler.count", 10); + callQueue = new PriorityJobQueue(maxQueueSize, handlers>=10?10:handlers, + new PriorityFunction((HRegionServer) instance, conf), conf); + } + + @Override + public Writable call(Class protocol, + Writable param, long receivedTime, MonitoredRPCHandler status) + throws IOException { + Invocation call = (Invocation) param; + HbaseObjectWritable writable = (HbaseObjectWritable) super.call(protocol, + param, receivedTime, status); + if (call.getMethodName().endsWith("openScanner")) { + ((PriorityJobQueue) callQueue).getFuction().initScannerPriority(call, + writable.get()); + } + return writable; + } + + /** + * If handleSize is 20,and priorities of threads range from 1 to 10 the + * priority will be initialized to 10,9,8,7....1, 10,9,8,...2, 10 because Java + * thread use a priority rule which large number meanse higher priority. so + * this method will make the number of high priority threads is bigger than + * the lower ones. Because a thread can only handle the a job which have a + * higher priority than its own, the hight priority jobs will have more thread + * to handle. Ps: the job priority is small number means higher priority,which + * is inverse of the thread priority, so there is a method {@PriorityFunction} + * priTrans do the priority transfer thing. + */ + private int[] initPriorityArray(int handleSize) { + int[] priorityArray = new int[handleSize]; + int minPriOfTurn = Thread.MIN_PRIORITY; + for (int i = 0, priNow = Thread.MAX_PRIORITY; i < handleSize; i++, priNow--) { + if (priNow < minPriOfTurn) { + priNow = Thread.MAX_PRIORITY; + minPriOfTurn++; + if (minPriOfTurn == Thread.MAX_PRIORITY) { + minPriOfTurn = Thread.MIN_PRIORITY; + } + } + priorityArray[i] = priNow; + } + return priorityArray; + } + + @Override + public synchronized void startThreads() { + super.startThreads(); + int[] priorityArray = initPriorityArray(handlers.length); + for (int i = 0; i < this.handlers.length; i++) { + handlers[i].setPriority(priorityArray[i]); + } + } + @Override + public void stop() + { + super.stop(); + ((PriorityJobQueue) callQueue).stop(); + } +} diff -uNr src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java --- src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java (working copy) @@ -0,0 +1,727 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.InterruptedIOException; +import java.util.Calendar; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +import com.google.common.base.Function; + +/** + * This queue is used by {@link PriorityHBaseServer} In this queue,generally + * thread can only get jobs which have same or higher priority than its thread + * priority.
+ * The thread which has the lowest priority can handle the jobs which has a + * lower priorities than the lowest priority. Please use take and put to + * manipulate this queue.
+ * + * @param the class contained by the queue + */ +public class PriorityJobQueue implements BlockingQueue { + private static final Log LOG = LogFactory.getLog(PriorityJobQueue.class); + private final PriorityBlockingQueue> queue = new PriorityBlockingQueue>(); + private int size = 0; + private int capacity = 100; + private int maxWait = 1000; + private boolean running = true; + final int lowestThreadPriority; + ReentrantLock readLock = new ReentrantLock(); + Condition[] lockList ; + ReentrantLock addLock = new ReentrantLock(); + Condition queueFull = addLock.newCondition(); + + private static int handleFreshInter = 6; + private static int move = Integer.SIZE - handleFreshInter; + private static int flush = 1; + private long[] priorityAddTimes = new long[10]; + private long[] priorityGetTimes = new long[10]; + + private Function qosFunction; + + /** + * get static data of add times + * + * @return the add times arranged by priority + */ + public long[] getPriorityAddTimes() { + return priorityAddTimes; + } + + /** + * get static data of get times + * + * @return the get times arranged by priority + */ + + public long[] getPriorityGetTimes() { + return priorityGetTimes; + } + + private void setSize(int size) { + this.addLock.lock(); + this.size = size; + if (this.size < this.capacity) { + this.queueFull.signalAll(); + } + this.addLock.unlock(); + } + + private void addSize() { + this.addLock.lock(); + this.size++; + this.addLock.unlock(); + } + + private void decreaseSize() { + this.addLock.lock(); + this.size--; + if (this.size < this.capacity) { + this.queueFull.signalAll(); + } + this.addLock.unlock(); + } + + private void tryAdd(Job j) throws InterruptedIOException { + int wait = 0; + while (this.size >= this.capacity) { + try { + addLock.lock(); + this.queueFull.await(10, TimeUnit.MILLISECONDS); + // this.queueFull.await(); + addLock.unlock(); + wait++; + this.size = this.queue.size(); + if (wait > this.maxWait) + break; + } catch (InterruptedException e) { + throw new InterruptedIOException("try add waiting interrupted"); + } + } + addLock.lock(); + this.queue.add(j); + this.addSize(); + addLock.unlock(); + } + + private Thread refresher = new Thread() { + public void run() { + while (running) { + refreshPeriodically(); + try { + sleep(1000); + } catch (InterruptedException e) { + // ignore this exception + } catch (Exception e) { + LOG.error(e); + } + } + } + }; + + /** + * Stop the refresher. + */ + public void stop() { + ((PriorityFunction)qosFunction).stop(); + this.running = false; + this.refresher.interrupt(); + } + + /** + * Initialize a queue + * + * @param capacity + * the capacity of the queue,not a precision value,if queue size + * exceed this value, workers which add jobs should wait + * @param lowestThreadPriority + * the lowest priority which worker thread hold,the default priority + * is range from 1 to 10,reverse from java thread priority + * @param server the instance of PriorityHBaseServer + */ + public PriorityJobQueue(int capacity, int lowestThreadPriority, + Function qosFunction, final Configuration conf) { + this.qosFunction = qosFunction; + // The handleFreshInter used to decide after how many RPC calls processed + // we increase the priority of the calls remained in this queue. + // 7 means after the 7th power of 2 RPC calls processed. + // we use ((flush << move) >>> move) == 0 to do mode. + handleFreshInter = conf.getInt("hbase.schedule.refreshinter", 7); + handleFreshInter = handleFreshInter < 1 || handleFreshInter > 10 ? 7 + : handleFreshInter; + move = Integer.SIZE - handleFreshInter; + this.capacity = capacity; + this.refresher.setDaemon(true); + this.refresher.start(); + this.lowestThreadPriority = lowestThreadPriority; + lockList = new Condition[Thread.MAX_PRIORITY]; + for (int i = 0; i < lockList.length; i++) { + lockList[i] = readLock.newCondition(); + } + } + + public PriorityFunction getFuction() { + return (PriorityFunction) this.qosFunction; + } + + /** + * add a job to this queue + * + * @param call the job instance + * @param pri the job's priority + */ + public void add(T call, int pri) { + try { + this.tryAdd(new Job(pri, call)); + } catch (InterruptedIOException e) { + // TODO should retry some times + LOG.error(e); + } + singalHandler(); + synchronized (this.priorityAddTimes) { + this.priorityAddTimes[this.getCondition(pri)]++; + } + + } + + /** + * get the size of the queue,maintain a integer to indicate the size for + * performance. + * + * @return the size of the queue + */ + public int size() { + return this.queue.size(); + } + + /** + * get the size of the queue + * + * @return the size of the queue + */ + public int queueSize() { + return queue.size(); + } + + private int getCondition(int pri) { + if (pri <= 10 && pri >= 1) { + return pri - 1; + } else if (pri > 10) { + return 9; + } else { + return 0; + } + } + + private void singalHandler() { + readLock.lock(); + Job jobt = queue.peek(); + if (jobt != null) { + this.lockList[getCondition(jobt.orgPri)].signal(); + } + readLock.unlock(); + } + + /** + * if handler's priority lower than job's priority, then this handler can't + * get this job. + * + * @param job the job which worker want to get + * @param pri the worker thread's priority + * @return should the worker get this job + */ + public boolean shouldWork(Job job, int pri) { + if (job == null) + return false; + return (pri >= job.orgPri) + || (job.orgPri < 1 && pri == 1) + || (job.orgPri > this.lowestThreadPriority && pri == this.lowestThreadPriority); + } + + /** + * get a job from the queue ,will test whether the thread can get this job + * + * @param pri the worker thread's priority + * @return the job + * @throws InterruptedException + */ + public T get(int pri) throws InterruptedException { + Job ret = null; + while (true) { + readLock.lock(); + ret = queue.peek(); + if (shouldWork(ret, pri)) { + ret = queue.take(); + readLock.unlock(); + break; + } + this.lockList[getCondition(pri)].await(100, TimeUnit.MILLISECONDS); + readLock.unlock(); + if (ret == null) { + this.setSize(0); + } + } + if (ret.orgPri > pri && pri != this.lowestThreadPriority) { + LOG.error("The handle get a job priority is lower than its priority,this shouldn't happen" + + " job priority:" + ret.orgPri + " handler pri:" + pri); + } + this.singalHandler(); + synchronized (this.priorityGetTimes) { + this.priorityGetTimes[this.getCondition(ret.orgPri)]++; + } + this.decreaseSize(); + return ret.getCall(); + + } + + public void printMetrix() { + LOG.info("size is :" + this.size); + LOG.info("capacity is :" + this.capacity); + String out = "Add times"; + for (int i = 0; i < this.priorityAddTimes.length; i++) { + out += " pri:" + (i + 1) + ":" + this.priorityAddTimes[i]; + } + LOG.info("priority request static:" + out); + out = "Get times"; + for (int i = 0; i < this.priorityGetTimes.length; i++) { + out += " pri:" + (i + 1) + ":" + this.priorityGetTimes[i]; + } + LOG.info("priority request static:" + out); + } + + /** + * refresh the priorities of the jobs in queue,simply -1 + */ + public void refresh() { + this.refresher.interrupt(); + } + + static int outputIndicator = 0; + + private void refreshPeriodically() { + try { + if ((outputIndicator << 55) >>> 55 == 0) { + LOG.debug(Calendar.getInstance().getTime() + ":" + this.queue); + } + outputIndicator++; + for (Job job : queue) { + if (job != null) { + job.increasePriority(); + } + } + singalHandler(); + } catch (Exception e) { + LOG.error(e); + } + } + + private void refreshIner(int n) { + try { + for (Job job : queue) { + if (job != null) { + job.increasePriority(n); + } + } + singalHandler(); + } catch (Exception e) { + LOG.warn(e); + } + } + + /** + * + * The Job hold by queue + * + * @param + */ + public static class Job implements Comparable> { + int orgPri = 0; + int priority = 0; + long initTime = 0; + T call; + + /** + * get the priority now + * + * @return the priority of this call + */ + public int getPriority() { + return priority; + } + + /** + * increase job's priority + */ + public void increasePriority() { + this.priority--; + } + + /** + * increase job's priority by n + * + * @param n the number you want to increase + */ + public void increasePriority(int n) { + + this.priority = this.priority - n; + + } + + /** + * get the instance hold by the job + * + * @return the call instance + */ + public T getCall() { + return call; + } + + /** + * set the instance hold byt the job + * + * @param call the call instance + */ + public void setCall(T call) { + this.call = call; + } + + /** + * Initialize a job + * + * @param pri the job priority + * @param call the instance hold by the job + */ + public Job(int pri, T call) { + this.orgPri = pri; + this.priority = pri; + this.initTime = System.currentTimeMillis(); + this.call = call; + } + + /** + * print the job + */ + public String toString() { + + return "orgPri:" + this.orgPri + ", lastPri:" + this.priority + + ", wait time:" + ((System.currentTimeMillis() - this.initTime)) + + ",ino:";// + call; + } + + @Override + public int compareTo(Job arg0) { + // TODO Auto-generated method stub + return this.priority - arg0.priority; + } + + } + + @Override + public Object remove() { + return this.queue.remove(); + } + + @Override + public Object poll() { + return this.queue.poll(); + } + + @Override + public Object element() { + return this.queue.element(); + } + + @Override + public Object peek() { + return this.queue.peek(); + } + + @Override + public boolean isEmpty() { + return this.queue.isEmpty(); + } + + @SuppressWarnings({ "rawtypes" }) + @Override + public Iterator iterator() { + return this.queue.iterator(); + } + + @Override + public Object[] toArray() { + return this.queue.toArray(); + } + + @Override + public Object[] toArray(Object[] a) { + return this.queue.toArray(a); + } + + @SuppressWarnings({ "rawtypes" }) + @Override + public boolean containsAll(Collection c) { + return this.queue.containsAll(c); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public boolean addAll(Collection c) { + return this.queue.addAll(c); + } + + @SuppressWarnings({ "rawtypes" }) + @Override + public boolean removeAll(Collection c) { + return this.queue.removeAll(c); + } + + @SuppressWarnings({ "rawtypes" }) + @Override + public boolean retainAll(Collection c) { + return this.queue.retainAll(c); + } + + @Override + public void clear() { + this.queue.clear(); + } + + @Override + public boolean add(Object e) { + if (this.size > this.capacity) + return false; + else + try { + this.put(e); + } catch (InterruptedException e1) { + LOG.error(e); + } + return true; + } + + @Override + public boolean offer(Object e) { + return this.offer(e); + } + + @Override + public void put(Object e) throws InterruptedException { + HBaseServer.Call call = (HBaseServer.Call) (e); + int pri = this.qosFunction.apply(call.param); + this.add((T) call, pri); + + } + + @SuppressWarnings("unchecked") + @Override + public boolean offer(Object e, long timeout, TimeUnit unit) + throws InterruptedException { + return this.queue.offer((Job) e, timeout, unit); + } + + @Override + public Object take() throws InterruptedException { + if (((flush << move) >>> move) == 0) { + this.refresher.interrupt(); + } + flush++; + return this.get(PriorityFunction.priTrans(Thread.currentThread() + .getPriority())); + } + + @Override + public Object poll(long timeout, TimeUnit unit) throws InterruptedException { + + return this.get(PriorityFunction.priTrans(Thread.currentThread() + .getPriority())); + } + + @Override + public int remainingCapacity() { + return this.capacity - this.size; + } + + @Override + public boolean remove(Object o) { + return this.queue.remove(o); + } + + @Override + public boolean contains(Object o) { + return this.queue.contains(o); + } + + @SuppressWarnings("unchecked") + @Override + public int drainTo(@SuppressWarnings("rawtypes") Collection c) { + return this.queue.drainTo(c); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public int drainTo(Collection c, int maxElements) { + return this.queue.drainTo(c, maxElements); + } + + /** + * + * class used to store the action priority of table; + * + */ + public static class ActionPriorities { + int putPlus = 0; + int getPlus = 0; + int deletePlus = 0; + int scanPlus = 0; + + public ActionPriorities() { + + } + + public ActionPriorities(int scanPlus, int putPlus, int getPlus, + int deletePlus) { + this.scanPlus = scanPlus; + this.putPlus = putPlus; + this.getPlus = getPlus; + this.deletePlus = deletePlus; + } + + /** + * get put priority plus + * + * @return + */ + public int getPutPlus() { + return putPlus; + } + + /** + * set put priority plus + * + * @return + */ + public void setPutPlus(int putPlus) { + this.putPlus = putPlus; + } + + /** + * get get priority plus + * + * @return + */ + public int getGetPlus() { + return getPlus; + } + + /** + * set get priority plus + * + * @return + */ + public void setGetPlus(int getPlus) { + this.getPlus = getPlus; + } + + /** + * get delete priority plus + * + * @return + */ + public int getDeletePlus() { + return deletePlus; + } + + /** + * set delete priority plus + * + * @return + */ + public void setDeletePlus(int deletePlus) { + this.deletePlus = deletePlus; + } + + /** + * get scan priority plus + * + * @return + */ + public int getScanPlus() { + return scanPlus; + } + + /** + * set scan priority plus + * + * @return + */ + public void setScanPlus(int scanPlus) { + this.scanPlus = scanPlus; + } + + public String toString() { + return this.scanPlus + "," + this.putPlus + "," + this.getPlus + "," + + this.deletePlus + ","; + } + + /** + * store this object into byte[] + * + * @return + */ + public byte[] toBytes() { + byte[] ret = new byte[Bytes.SIZEOF_INT * 4]; + Bytes.putInt(ret, 0, this.scanPlus); + Bytes.putInt(ret, Bytes.SIZEOF_INT, this.putPlus); + Bytes.putInt(ret, Bytes.SIZEOF_INT * 2, this.getPlus); + Bytes.putInt(ret, Bytes.SIZEOF_INT * 3, this.deletePlus); + return ret; + } + + /** + * get initiate value from byte[] + * + * @param b the serialized value of an ActionPriorites instance + */ + public ActionPriorities fromBytes(byte[] b) { + if (b.length != Bytes.SIZEOF_INT * 4) + return null; + else { + this.scanPlus = Bytes.toInt(b, 0); + this.putPlus = Bytes.toInt(b, Bytes.SIZEOF_INT); + this.getPlus = Bytes.toInt(b, Bytes.SIZEOF_INT * 2); + this.deletePlus = Bytes.toInt(b, Bytes.SIZEOF_INT * 3); + } + return this; + } + } + + +} diff -uNr src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1189169) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -3357,4 +3357,13 @@ HServerLoad hsl = buildServerLoad(); return hsl == null? null: hsl.getCoprocessors(); } +/** + * check whether a scanner is alive + * @param valueOf the scanner ID + * @return + */ + public boolean scannerAlive(String valueOf) { + return this.scanners.get(valueOf)!=null; + } + } diff -uNr src/test/java/org/apache/hadoop/hbase/allocation/test/TestActionPriority.java src/test/java/org/apache/hadoop/hbase/allocation/test/TestActionPriority.java --- src/test/java/org/apache/hadoop/hbase/allocation/test/TestActionPriority.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/allocation/test/TestActionPriority.java (working copy) @@ -0,0 +1,266 @@ +package org.apache.hadoop.hbase.allocation.test; + +import java.io.IOException; +import java.util.Random; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.allocation.test.TestTablePriority.Worker; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.ActionPriorities; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertTrue; + + +/** + * Test for action priority, use Action_Priority table and scan and put with + * different priorities. There are two tests,in first one get's priority is + * 0(high priority, RPC priority = table priority 0+ get priority 0)
+ * scan's priority is 10 (RPC priority = table priority 0+ scan priority 10) and + * in the second test,we switch the priorities of scan and get. + */ +public class TestActionPriority { + private final static Log LOG = LogFactory.getLog(TestActionPriority.class); + static final Random r = new Random(); + static final int rowNubmer = 30000; + static final int threadN = 150; + HBaseAdmin admin = null; + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(1); + } + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * set up a cluster and prepare tables. + */ + @Before + public void setUp() { + try { + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } catch (Exception e) { + LOG.info(e); + } + HTableDescriptor des; + byte[][] startKeys = new byte[][] { Bytes.toBytes("0"), Bytes.toBytes("2"), + Bytes.toBytes("4"), Bytes.toBytes("8") }; + try { + if (!admin.tableExists("Action_Priority")) { + des = new HTableDescriptor("Action_Priority"); + des.addFamily(new HColumnDescriptor("ff")); + des.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY, + new ActionPriorities(10, 0, 0, 0).toBytes()); + des.setValue(PriorityFunction.PRI_KEY, Bytes.toBytes(0 + "")); + admin.createTable(des, startKeys); + } + } catch (Exception e) { + LOG.info(e); + } + writeData(); + } + + private void writeData() { + LOG.info("begion write data into test table"); + int nPerWorker = rowNubmer / threadN; + Worker[] workers = new Worker[threadN]; + for (int i = 0; i < workers.length; i++) { + try { + workers[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),"Action_Priority"), "writeData", + nPerWorker); + } catch (IOException e) { + } + } + for (int i = 0; i < workers.length; i++) { + workers[i].start(); + } + for (int i = 0; i < workers.length; i++) { + try { + workers[i].join(); + } catch (InterruptedException e) { + LOG.error(e); + } + } + LOG.info("Write data into test table finished."); + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() { + + } + + + + /** + * Verify whether the function take effect + * + * @param highs high priority workers + * @param lows low priority workers + */ + + @SuppressWarnings("static-access") + public void verifyFunction(Worker highs[], Worker lows[]) { + boolean highFinished = false; + boolean lowFinished = false; + long highThroughPut = 0; + long lowThroughPut = 0; + while (!(highFinished && lowFinished)) { + highThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < highs.length; i++) { + highThroughPut += highs[i].getThroughput(); + lowThroughPut += lows[i].getThroughput(); + } + LOG.info("-------------------------------------------------------------"); + LOG.info("High priority action type is:" + highs[0].getType() + + ", priority:" + highs[0].getActionPriority() + " throughput is:" + + highThroughPut); + LOG.info("low priority action type is:" + lows[0].getType() + + ", priority:" + lows[0].getActionPriority() + " throughput is:" + + lowThroughPut); + highFinished = true; + lowFinished = true; + for (int i = 0; i < highs.length; i++) { + if (highs[i].isAlive()) { + highFinished = false; + } + if (lows[i].isAlive()) { + lowFinished = false; + } + } + if (highFinished) { + for (int i = 0; i < highs.length; i++) { + lows[i].stopWorker(); + } + lowFinished = true; + } + if (lowFinished) { + for (int i = 0; i < highs.length; i++) { + highs[i].stopWorker(); + } + highFinished = true; + } + try { + Thread.currentThread().sleep(1000); + } catch (InterruptedException e) { + // ignore exceptions + } + } + highThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < highs.length; i++) { + highThroughPut += highs[i].getThroughput(); + lowThroughPut += lows[i].getThroughput(); + } + LOG.info("-------------------------------------------------------------"); + LOG.info("---------------------Test finished --------------------------"); + LOG.info("High priority action type is:" + highs[0].getType() + + ", priority:" + highs[0].getActionPriority() + " throughput is:" + + highThroughPut); + LOG.info("low priority action type is:" + lows[0].getType() + ", priority:" + + lows[0].getActionPriority() + " throughput is:" + lowThroughPut); + LOG.info("####### Test for " + highs[0].getType() + ", priority:" + + highs[0].getActionPriority() + " ,and " + lows[0].getType() + + ", priority:" + lows[0].getActionPriority() + " finished####"); + assertTrue("Action priority works properly", highThroughPut > lowThroughPut); + LOG.info("-------------------------------------------------------------"); + } + + /** + * run the test; + */ + @Test + public void testForDifferentActionPriority() { + Worker puts[] = new Worker[threadN / 2]; + Worker scans[] = new Worker[threadN / 2]; + for (int i = 0; i < puts.length; i++) { + try { + puts[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),"Action_Priority"), "put", rowNubmer + * 2 / threadN); + scans[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),"Action_Priority"), "scan", rowNubmer + * 2 / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < puts.length; i++) { + puts[i].setActionPriority(0); + puts[i].start(); + scans[i].start(); + scans[i].setActionPriority(10); + } + verifyFunction(puts, scans); + + try { + admin.disableTable("Action_Priority"); + HTableDescriptor des1 = admin.getTableDescriptor(Bytes + .toBytes("Action_Priority")); + des1.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY, + new ActionPriorities(0, 10, 10, 10).toBytes()); + admin.modifyTable(Bytes.toBytes("Action_Priority"), des1); + admin.enableTable("Action_Priority"); + } catch (IOException e) { + LOG.info(e); + } + try { + Thread.currentThread().sleep(PriorityFunction.getRefreshInterval()); + } catch (InterruptedException e1) { + } + puts = new Worker[threadN / 2]; + scans = new Worker[threadN / 2]; + for (int i = 0; i < puts.length; i++) { + try { + + puts[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),"Action_Priority"), "put", rowNubmer + * 2 / threadN); + scans[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),"Action_Priority"), "scan", rowNubmer + * 2 / threadN); + puts[i].setActionPriority(10); + scans[i].setActionPriority(0); + } catch (IOException e) { + } + } + for (int i = 0; i < puts.length; i++) { + puts[i].start(); + scans[i].start(); + } + verifyFunction(scans, puts); + } + + public static void main(String args[]) { + + try { + setUpBeforeClass(); + TestActionPriority t=new TestActionPriority(); + t.setUp(); + t.testForDifferentActionPriority(); + tearDownAfterClass(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + // junit.textui.TestRunner.run(TestForTablePriority.class); + } + +} diff -uNr src/test/java/org/apache/hadoop/hbase/allocation/test/TestPriorityJobQueue.java src/test/java/org/apache/hadoop/hbase/allocation/test/TestPriorityJobQueue.java --- src/test/java/org/apache/hadoop/hbase/allocation/test/TestPriorityJobQueue.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/allocation/test/TestPriorityJobQueue.java (working copy) @@ -0,0 +1,150 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.allocation.test; + +import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.Job; +import org.junit.Test; +import static org.junit.Assert.assertTrue; + +/** + * Test used to ensure the PriorityJobQueue works properly + * + */ +public class TestPriorityJobQueue { + private final Log LOG = LogFactory.getLog(TestPriorityJobQueue.class); + private static final int lowestPriority = 10; + private static final int queueSize = 1000; + private static final int testThreadN = 50; + private static final int testTimes = 10000; + + public static void main(String args[]) { + + // junit.textui.TestRunner.run(TestForPriorityJobQueue.class); + } + + /** + * test for ensure the queue refreshed the jobs' priority + */ + @SuppressWarnings("static-access") + public void testQueueRefresh() { + final PriorityJobQueue queue = new PriorityJobQueue( + queueSize, lowestPriority, null,HBaseConfiguration.create()); + for (int i = 0; i < 10; i++) { + queue.add(i + "", 10); + } + + boolean match = true; + queue.refresh(); + queue.refresh(); + try { + Thread.currentThread().sleep(1000); + } catch (InterruptedException e) { + } + for (Object j : queue.toArray()) { + Job job = (Job) j; + LOG.debug("Job is:" + job); + if (job.getPriority() >= 10) + match = false; + } + assertTrue("calls' priorities are refreshed", match); + } + + /** + * test the queue's function: threads (not with the lowest priority)
+ * can only get a job which have a higher priority And test the number which + * input jobs is equal with the output number + */ + @Test + public void testGetJob() { + final Random r = new Random(); + final PriorityJobQueue queue = new PriorityJobQueue( + queueSize, lowestPriority, null,HBaseConfiguration.create()); + Thread consumer[] = new Thread[testThreadN]; + int pri = 1; + + for (int i = 0; i < consumer.length; i++, pri++) { + consumer[i] = new Thread(i + "") { + public void run() { + while (true) { + String j = null; + try { + j = (String) queue.take(); + } catch (InterruptedException e) { + } + if ((Math.abs(this.getPriority() - 10) + 1) != lowestPriority) { + assertTrue( + "Thread get a job which have a higher priority", + Integer.parseInt(j) <= (Math.abs(this.getPriority() - 10) + 1)); + LOG.debug("job:" + Integer.parseInt(j) + " handler:" + + (Math.abs(this.getPriority() - 10) + 1)); + } + } + } + }; + if (pri >= 11) + pri = 1; + consumer[i].setDaemon(true); + consumer[i].setPriority(pri); + consumer[i].start(); + } + + Thread producer[] = new Thread[testThreadN]; + for (int i = 0; i < producer.length; i++) { + producer[i] = new Thread(i + "") { + @SuppressWarnings("static-access") + public void run() { + for (int j = 0; j < testTimes; j++) { + int jobpri = (r.nextInt(19) - 4); + queue.add("" + jobpri, jobpri); + } + } + }; + producer[i].start(); + } + for (Thread d : producer) { + try { + d.join(); + } catch (InterruptedException e) { + } + } + + while (queue.size() != 0) { + try { + Thread.currentThread().sleep(100); + } catch (InterruptedException e) { + } + } + + boolean match = true; + for (int i = 0; i < 10; i++) { + if (queue.getPriorityAddTimes()[i] != queue.getPriorityGetTimes()[i]) { + match = false; + } + } + assertTrue("all job are finished", match); + queue.printMetrix(); + // assertTrue("Thread get a job which have a higher priority", true); + } +} diff -uNr src/test/java/org/apache/hadoop/hbase/allocation/test/TestTablePriority.java src/test/java/org/apache/hadoop/hbase/allocation/test/TestTablePriority.java --- src/test/java/org/apache/hadoop/hbase/allocation/test/TestTablePriority.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/allocation/test/TestTablePriority.java (working copy) @@ -0,0 +1,525 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.allocation.test; + +import java.io.IOException; +import java.util.Random; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.ActionPriorities; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.assertTrue; + +/** + * Test for table priority, use table A,B,C which have different priorities. + * There are two tests,in first one table A's priority is 1(high priority), B is + * 5, C is 10 and in the second test,we switch the priorities of A and C. + */ +public class TestTablePriority { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static Log LOG = LogFactory.getLog(TestTablePriority.class); + static final Random r = new Random(); + static final int rowNubmer = 50000; + static final int threadN = 70; + HBaseAdmin admin = null; + static final String method = "put";// if there are data in the test tables,you + // can set method type to "scan" + + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(1); + } + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + + /** + * set up a cluster and prepare tables. + */ + @Before + public void setUp() { + try { + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } catch (Exception e) { + LOG.info(e); + } + HTableDescriptor des; + byte[][] startKeys = new byte[][] { Bytes.toBytes("0"), Bytes.toBytes("2"), + Bytes.toBytes("4"), Bytes.toBytes("8") }; + try { + if (!admin.tableExists("Table_A")) { + des = new HTableDescriptor("Table_A"); + des.addFamily(new HColumnDescriptor("ff")); + des = PriorityFunction.setPriority(1, des); + admin.createTable(des, startKeys); + } + } catch (Exception e) { + LOG.info(e); + } + try { + if (!admin.tableExists("Table_B")) { + des = new HTableDescriptor("Table_B"); + des.addFamily(new HColumnDescriptor("ff")); + des = PriorityFunction.setPriority(5, des); + admin.createTable(des, startKeys); + } + } catch (Exception e) { + LOG.info(e); + } + try { + if (!admin.tableExists("Table_C")) { + des = new HTableDescriptor("Table_C"); + des.addFamily(new HColumnDescriptor("ff")); + des = PriorityFunction.setPriority(10, des); + admin.createTable(des, startKeys); + } + } catch (Exception e) { + LOG.info(e); + } + + } + + /** + * The worker used for test throughput and gather the result + */ + public static class Worker extends Thread { + HTable table; + String type; + private long throughput = 0; + long times = 1000; + boolean stopFlag = false; + int tablePriority; + long n = 0; + long startTime = 0; + int actionPriority = 0; + + /** + * get throughput of this worker + * + * @return + */ + public long getThroughput() { + long end = System.currentTimeMillis(); + throughput = (n * 1000) / (end - startTime + 1); + return throughput; + } + + /** + * get table priority + * + * @return table priority + */ + public int getTablePriority() { + return tablePriority; + } + + /** + * set table priority + */ + public void setTablePriority(int tablePriority) { + this.tablePriority = tablePriority; + } + + /** + * get action type + * + * @return action type of the worker + */ + public String getType() { + return type; + } + + /** + * the table used by the worker + * + * @return table of the worker + */ + public HTable getTable() { + return table; + } + + public Worker(HTable t, String type, long times) { + this.table = t; + this.type = type; + this.times = times; + } + + public void run() { + try { + doAction(table, type, times); + } catch (IOException e) { + LOG.info(e); + } + } + + public int getActionPriority() { + return actionPriority; + } + + public void setActionPriority(int actionPriority) { + this.actionPriority = actionPriority; + } + + /** + * stop the worker + */ + public void stopWorker() { + this.stopFlag = true; + } + + private void doAction(HTable t, String type, long times) throws IOException { + long start = System.currentTimeMillis(); + long end = System.currentTimeMillis(); + startTime = System.currentTimeMillis(); + int pri = Integer.parseInt(Bytes.toString(t.getTableDescriptor() + .getValue(PriorityFunction.PRI_KEY))); + Scan s = new Scan(); + s.setStartRow(Bytes.toBytes(r.nextInt(10))); + if (type.equals("writeData")) { + t.setAutoFlush(false); + t.setWriteBufferSize(1000); + } + ResultScanner sn = t.getScanner(s); + boolean scan = false; + if (type.equals("scan")) + scan = true; + while (true) { + if (n % 10000 == 0) { + end = System.currentTimeMillis(); + LOG.debug("Thread:" + this.getId() + " type:" + type + " table" + + t.getTableDescriptor().getNameAsString() + " pri :" + pri + + " time:" + (end - start) + " total :" + n + " throughput:" + + (n * 1000) / (end - startTime + 1)); + throughput = (n * 1000) / (end - startTime + 1); + start = end; + } + if (n % 100 == 0) { + end = System.currentTimeMillis(); + throughput = (n * 1000) / (end - startTime + 1); + } + Result ret = null; + if (scan) { + ret = sn.next(); + if (ret == null) { + try { + sn.close(); + } catch (Exception e) { + LOG.debug(e); + } + sn = t.getScanner(s); + } + } else { + put(t, n); + } + if (stopFlag || n > times) { + break; + } + n++; + } + t.flushCommits(); + t.close(); + } + } + + /** + * Verify whether the function take effect + * + * @param highs high priority workers + * @param lows low priority workers + */ + @SuppressWarnings("static-access") + public void verifyFunction(Worker high[], Worker middle[], Worker low[]) { + boolean highFinished = false; + boolean lowFinished = false; + boolean middleFinished = false; + long highThroughPut = 0; + long middleThroughPut = 0; + long lowThroughPut = 0; + while (!(highFinished && lowFinished && middleFinished)) { + highThroughPut = 0; + middleThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < high.length; i++) { + highThroughPut += high[i].getThroughput(); + lowThroughPut += low[i].getThroughput(); + middleThroughPut += middle[i].getThroughput(); + } + LOG.info("-------------------------------------------------------------"); + + try { + LOG.info("high priority table is: " + + high[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " throughput is:" + highThroughPut); + LOG.info("middle priority table is: " + + middle[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + middle[0].getType() + ", priority:" + + middle[0].getTablePriority() + " throughput is:" + + middleThroughPut); + LOG.info("low priority table is: " + + low[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + low[0].getType() + ", priority:" + + low[0].getTablePriority() + " throughput is:" + lowThroughPut); + } catch (Exception e) { + LOG.error(e); + } + + highFinished = true; + lowFinished = true; + middleFinished = true; + for (int i = 0; i < high.length; i++) { + if (high[i].isAlive()) { + highFinished = false; + } + if (low[i].isAlive()) { + lowFinished = false; + } + if (middle[i].isAlive()) { + middleFinished = false; + } + } + if (highFinished) { + for (int i = 0; i < high.length; i++) { + low[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + middle[i].stopWorker(); + } + middleFinished = true; + lowFinished = true; + } + if (lowFinished) { + for (int i = 0; i < high.length; i++) { + high[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + middle[i].stopWorker(); + } + middleFinished = true; + highFinished = true; + } + if (middleFinished) { + for (int i = 0; i < high.length; i++) { + high[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + low[i].stopWorker(); + } + lowFinished = true; + highFinished = true; + } + try { + Thread.currentThread().sleep(1000); + } catch (InterruptedException e) { + // ignore exceptions + } + } + highThroughPut = 0; + middleThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < high.length; i++) { + highThroughPut += high[i].getThroughput(); + lowThroughPut += low[i].getThroughput(); + middleThroughPut += middle[i].getThroughput(); + } + + LOG.info("-------------------------------------------------------------"); + LOG.info("---------------------Test finished --------------------------"); + + try { + LOG.info("high priority table is: " + + high[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " throughput is:" + highThroughPut); + LOG.info("middle priority table is: " + + middle[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + middle[0].getType() + ", priority:" + + middle[0].getTablePriority() + " throughput is:" + middleThroughPut); + LOG.info("low priority table is: " + + low[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + low[0].getType() + ", priority:" + + low[0].getTablePriority() + " throughput is:" + lowThroughPut); + } catch (Exception e) { + LOG.error(e); + } + LOG.info("####### Test for " + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " " + middle[0].getType() + + ", priority:" + middle[0].getTablePriority() + " ,and " + + low[0].getType() + ", priority:" + low[0].getTablePriority() + + " finished####"); + assertTrue("Action priority works properly", + highThroughPut > middleThroughPut && middleThroughPut > lowThroughPut); + LOG.info("-------------------------------------------------------------"); + } + + /** + * start the test. + */ + @Test (timeout=180000) + public void testForDifferentTablePriority() { + Worker A[] = new Worker[threadN / 3]; + Worker B[] = new Worker[threadN / 3]; + Worker C[] = new Worker[threadN / 3]; + for (int i = 0; i < A.length; i++) { + try { + A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),"Table_A"), method, rowNubmer * 3 + / threadN); + B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),"Table_B"), method, rowNubmer * 3 + / threadN); + C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),"Table_C"), method, rowNubmer * 3 + / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < A.length; i++) { + A[i].setTablePriority(1); + B[i].setTablePriority(5); + C[i].setTablePriority(10); + A[i].start(); + B[i].start(); + C[i].start(); + } + verifyFunction(A, B, C); + + try { + admin.disableTable("Table_A"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_A")); + des1.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY, + new ActionPriorities(0, 0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(10, des1); + admin.modifyTable(Bytes.toBytes("Table_A"), des1); + admin.enableTable("Table_A"); + } catch (Exception e) { + LOG.info(e); + } + try { + admin.disableTable("Table_B"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_B")); + des1.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY, + new ActionPriorities(0, 0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(5, des1); + admin.modifyTable(Bytes.toBytes("Table_B"), des1); + admin.enableTable("Table_B"); + } catch (Exception e) { + LOG.info(e); + } + try { + admin.disableTable("Table_C"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_C")); + des1.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY, + new ActionPriorities(0, 0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(1, des1); + admin.modifyTable(Bytes.toBytes("Table_C"), des1); + admin.enableTable("Table_C"); + } catch (Exception e) { + LOG.info(e); + } + try { + Thread.currentThread().sleep(PriorityFunction.getRefreshInterval()); + } catch (InterruptedException e1) { + } + A = new Worker[threadN / 3]; + B = new Worker[threadN / 3]; + C = new Worker[threadN / 3]; + for (int i = 0; i < A.length; i++) { + try { + A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),"Table_A"), method, rowNubmer * 3 + / threadN); + B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),"Table_B"), method, rowNubmer * 3 + / threadN); + C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(),"Table_C"), method, rowNubmer * 3 + / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < A.length; i++) { + A[i].setTablePriority(10); + B[i].setTablePriority(5); + C[i].setTablePriority(1); + A[i].start(); + B[i].start(); + C[i].start(); + } + verifyFunction(C, B, A); + + } + + public static void main(String args[]) { + try { + setUpBeforeClass(); + TestTablePriority t=new TestTablePriority(); + t.setUp(); + t.testForDifferentTablePriority(); + tearDownAfterClass(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + // junit.textui.TestRunner.run(TestForTablePriority.class); + } + + static byte[] b; + static { + String bs = ""; + for (int i = 0; i < 100; i++) { + bs += i; + } + b = Bytes.toBytes(bs); + } + + private static void put(HTable t, long i) { + Put p = new Put( + Bytes.toBytes("" + r.nextInt(1000) + (i) + r.nextInt(10000))); + p.add(Bytes.toBytes("ff"), Bytes.toBytes("ff"), b); + try { + t.put(p); + } catch (IOException e) { + LOG.info(e); + } + } + +}