Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 1213130) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy) @@ -387,6 +387,11 @@ final int numHandlers, int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel) throws IOException { + if (instance instanceof HRegionInterface + && PriorityFunction.enable(conf)) { + return new PriorityHBaseServer(instance, ifaces, conf, bindAddress, port, + numHandlers, metaHandlerCount, verbose, highPriorityLevel); + } return getServer(instance.getClass(), instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel); } Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1213130) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -208,7 +208,7 @@ private Listener listener = null; protected Responder responder = null; protected int numConnections = 0; - private Handler[] handlers = null; + protected Handler[] handlers = null; private Handler[] priorityHandlers = null; protected HBaseRPCErrorHandler errorHandler = null; @@ -244,7 +244,7 @@ } /** A call queued for handling. */ - protected class Call implements Delayable { + class Call implements Delayable { protected int id; // the client's call id protected Writable param; // the parameter passed protected Connection connection; // connection to client @@ -1270,7 +1270,7 @@ } /** Handles queued calls . */ - private class Handler extends Thread { + class Handler extends Thread { private final BlockingQueue myCallQueue; private MonitoredRPCHandler status; Index: src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java (revision 0) @@ -0,0 +1,481 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.Action; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.MultiAction; +import org.apache.hadoop.hbase.client.Operation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.ActionPriorities; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +import com.google.common.base.Function; + +/** + * This Function is used by {@link PriorityHBaseServer}. In this + * Function,generally we can get the priority of a RPC call base by it's table + * priority and the table's method priority.
+ */ + +public class PriorityFunction implements Function { + + public final static byte[] PRI_KEY = Bytes.toBytes("_table_priority"); + public final static byte[] PRI_KEY_ACTION_PRIORITY = Bytes + .toBytes("_action_priority"); + public static final int LOWEST_PRI = 10; + public static final int DEFAULT_PRI = 5; + public static final int HIGHEST_PRI = -10; + public static final int HIGH_PRI = 0; + public static final String PRI_ENABLE_KEY = "hbase.tablepriority.enable"; + private static Boolean enable = null; + + /** + * If table priority is enabled. + * + * @return + */ + public static boolean enable(Configuration conf) { + if(enable==null) + { + enable=conf.getBoolean(PRI_ENABLE_KEY, false); + } + return enable; + } + + /** + * The priority map cache the region's priority in memory. Key is the region + * name which usually is obtained from the IPC call. Value is the priority of + * the region also is the priority of the table which the region belong to. + */ + private static final ConcurrentHashMap regionPriMap = new ConcurrentHashMap(); + + // Cache the scanner and its id + private static final ConcurrentHashMap scannerToId = new ConcurrentHashMap(); + + // Cache the scanner and its scan + private static final ConcurrentHashMap rScannerToScan = new ConcurrentHashMap(); + + // Cache the scanner ID and the region name of the scanner. + private static final ConcurrentHashMap scannerRegionMap = new ConcurrentHashMap(); + + // Cache the scanner ID and the priority of the scanner. + private static final ConcurrentHashMap scannerPriMap = new ConcurrentHashMap(); + + // Store the region name and it's according priority instance. + private static final ConcurrentHashMap regionActionPriorities = new ConcurrentHashMap(); + + // TODO: add priority of source host. + @SuppressWarnings("unused") + private final ConcurrentHashMap> tableSourceHostPlus = new ConcurrentHashMap>(); + @SuppressWarnings("unused") + private final ConcurrentHashMap> regionSourceHostPlus = new ConcurrentHashMap>(); + private static final Log LOG = LogFactory + .getLog("org.apache.hadoop.hbase.ipc.PriorityFunction"); + private static HRegionServer server; + + public PriorityFunction(HRegionServer server, final Configuration conf) + throws MasterNotRunningException, ZooKeeperConnectionException { + if (PriorityFunction.server == null) { + PriorityFunction.server = server; + } + /** + * enable the coprocessor here. + */ + if (enable) { + String cops = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); + if (cops == null || cops.equals("")) { + cops = "org.apache.hadoop.hbase.ipc.QosRegionObserver"; + } else { + cops += ",org.apache.hadoop.hbase.ipc.QosRegionObserver"; + } + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, cops); + } + } + + /** + * Set table priority + * @param priority the priority you want to set + * @param des the table descriptor + * @return + * @throws Exception + */ + public static HTableDescriptor setPriority(String priority, + HTableDescriptor des) throws Exception { + Integer pri = Integer.parseInt(priority); + if (pri < 0 || pri > 10) + throw new NumberFormatException( + "Table priority should be between 0 and 10."); + des.setValue(PRI_KEY, Bytes.toBytes(priority)); + return des; + } + /** + * Set table priority + * @param priority the priority you want to set + * @param des table descriptor + * @param actionPriority ActionPriority instance + * @return the prepared table descriptor + * @throws Exception + */ + public static HTableDescriptor setPriority(String priority, + HTableDescriptor des, ActionPriorities actionPriority) throws Exception { + Integer pri = Integer.parseInt(priority); + if (pri < 0 || pri > 10) + throw new NumberFormatException( + "Table priority should be between 0 and 10."); + setTableActionPriorities(des, actionPriority); + des.setValue(PRI_KEY, Bytes.toBytes(priority)); + return des; + } + /** + * Set table priority + * @param priority the priority you want to set + * @param des the table descriptor + * @param actionPriority ActionPriority instance + * @return + * @throws Exception + */ + public static HTableDescriptor setPriority(int priority, + HTableDescriptor des, ActionPriorities actionPriority) throws Exception { + if (priority < 0 || priority > 10) { + throw new NumberFormatException( + "Table priority should be between 0 and 10."); + } + setTableActionPriorities(des, actionPriority); + des.setValue(PRI_KEY, Bytes.toBytes(priority)); + return des; + } + + /** + * Set table priority + * @param priority the priority you want to set + * @param des the table descriptor + * @return + * @throws Exception + */ + public static HTableDescriptor setPriority(int priority, HTableDescriptor des) + throws Exception { + if (priority < 0 || priority > 10) { + throw new NumberFormatException( + "Table priority should be between 0 and 10."); + } + des.setValue(PRI_KEY, Bytes.toBytes(priority + "")); + return des; + } + + /** + * get action priorities from table descriptor + * + * @param des the table descriptor + * @return + */ + public static ActionPriorities getTableActionPriorities(HTableDescriptor des) { + byte[] actionPriority = des.getValue(PRI_KEY_ACTION_PRIORITY); + ActionPriorities ret = new ActionPriorities(); + if (actionPriority != null) { + try { + ret.fromBytes(actionPriority); + } catch (Exception e) { + LOG.error("This table has wrong action priorities," + + "please use {@ PriorityFunction}setPriority()" + + " method to set table's action priority"); + } + } + return ret; + } + /** + * set action priorities for table descriptor + * @param des the table descriptor + * @param actionPriority ActionPriority instance + */ + public static void setTableActionPriorities(HTableDescriptor des, + ActionPriorities actionPriority) { + des.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY, + actionPriority.toBytes()); + } + /** + * Initialize the scanner's priority + * @param call + * @param value scanner id + */ + public void initScannerPriority(Invocation call, Object value) { + Long id = (Long) value; + byte[] region = (byte[]) call.getParameters()[0]; + Scan scan=(Scan) call.getParameters()[1]; + String regionN = Bytes.toString(region); + Integer pri = regionPriMap.get(regionN); + if (pri == null) { + pri=initRegionPriority(regionN, false); + } + ActionPriorities action = regionActionPriorities.get(regionN); + if (action != null) + pri += action.getScanPriority(); + scannerPriMap.put(id, pri); + scannerRegionMap.put(id, regionN); + scannerToId.put(scan, id); + } + @Override + public Integer apply(Writable from) { + return getCallPriority(from); + } + + /** + * Used by QosRegionObserver to store the region scanners + * and it's scanner ID. + * @param scan + * @param s + */ + public static void addRegionScanner(Scan scan,RegionScanner s) + { + rScannerToScan.put(s, scan); + } + + /** + * remove the cached region priorities + * @param region + */ + public static void cleanRegionPriority(HRegion region) { + String regionN = region.getRegionNameAsString(); + if(regionN!=null) + { + regionPriMap.remove(regionN); + regionActionPriorities.remove(regionN); + } + } + /** + * Remove lease expired scanners + * @param s the scanner id to be removed + */ + public static void removeScanner(RegionScanner s) { + Long id = null; + try { + Scan ss=rScannerToScan.get(s); + rScannerToScan.remove(s); + id=scannerToId.get(ss); + scannerToId.remove(ss); + scannerRegionMap.remove(id); + scannerPriMap.remove(id); + } catch (Exception e) { + LOG.error("scanner id:" + id + " doesn't exist."); + } + } + + /** + * Get the priority of the call + * + * @param call + * @return the priority + */ + private int getCallPriority(Writable param) { + Invocation invo = (Invocation) param; + // if this is a next() call, we should find the priority of the scanner + // which + // is initiated by initScannerPriority(Invocation call, Object value) + if (invo.getMethodName().endsWith("next")) { + Long scanId = (Long) invo.getParameters()[0]; + Integer pri = scannerPriMap.get(scanId); + if (pri == null) { + String regionN = scannerRegionMap.get(scanId); + LOG.error("the scanner didn't get it's priority:" + scanId + + " regionName:" + regionN); + if (regionN != null) { + pri = getRegionPri(regionN); + // get the priority of the region,i.e. + // the priority of table + ActionPriorities action = regionActionPriorities.get(regionN); + if (action != null) { + pri += action.getScanPriority();// add the action priority of the + // table + } + scannerPriMap.put(scanId, pri);// cache the priority + return pri; + } else { + LOG.error("error,there is no this scanner id"); + return DEFAULT_PRI; + } + } + return pri; + } else if (invo.getMethodName().endsWith("multi")) { + // if it's a multi call,we use the first action of the in + // the MutiAction map to decide the priority of this call. + MultiAction multi = (MultiAction) invo.getParameters()[0]; + for (Map.Entry e : multi.actions.entrySet()) { + String regionN = Bytes.toString((byte[]) e.getKey()); + Integer pri = getRegionPri(regionN); + ActionPriorities actionPriority; + if ((actionPriority = regionActionPriorities.get(regionN)) != null) { + List actionsForRegion = (List) e.getValue(); + Action action = actionsForRegion.get(0); + if (action != null) { + Row row = action.getAction(); + if (row instanceof Delete) { + pri += actionPriority.getDeletePriority(); + } else if (row instanceof Get) { + pri += actionPriority.getGetPriority(); + + } else if (row instanceof Put) { + pri += actionPriority.getPutPriority(); + } + } + } + return pri; + } + return DEFAULT_PRI; + } else if (invo.getMethodName().endsWith("increment") + || invo.getMethodName().endsWith("incrementColumnValue")) { + // due to the Increment class does not inherit the Operation class, + //we deal with it respectively. + byte[] region = (byte[]) invo.getParameters()[0]; + String regionN = Bytes.toString(region); + Integer pri = getRegionPri(regionN); + ActionPriorities actionPriority; + if ((actionPriority = regionActionPriorities.get(regionN)) != null) { + pri += actionPriority.getPutPriority(); + } + return pri; + } + Object params[] = invo.getParameters(); + // Deal with the subclasses of Operation + if (params.length >= 2 && params[0] instanceof byte[] + && params[params.length - 1] instanceof Operation + && !(params[params.length - 1] instanceof Scan)) { + Operation ope = (Operation) params[params.length - 1]; + byte[] region = (byte[]) params[0]; + String regionN = Bytes.toString(region); + Integer pri = getRegionPri(regionN); + ActionPriorities actionPriority; + if ((actionPriority = regionActionPriorities.get(regionN)) != null) { + pri += actionPriority.getPriority(ope); + } + return pri; + } + return HIGHEST_PRI; + } + + private int getRegionPri(String regionN) { + Integer pri = regionPriMap.get(regionN); + if (pri == null) { + pri = initRegionPriority(regionN, false); + } + if (pri == null) { + return DEFAULT_PRI; + } + return pri; + } + + /** + * Get region priority + * + * @param region the region you want to get priority + * @return the region's priority + */ + public static int initRegionPriority(HRegion region) { + if (region == null || region.getRegionNameAsString() == null) { + return DEFAULT_PRI; + } + String regionN = region.getRegionNameAsString(); + int pri = DEFAULT_PRI; + if (region.getRegionInfo().isMetaRegion() + || region.getRegionInfo().isRootRegion()) { + pri = HIGHEST_PRI; + } + else if (region.getTableDesc().getValue(PRI_KEY) != null) { + try { + pri = Integer.parseInt(Bytes.toString(region.getTableDesc().getValue( + PRI_KEY))); + } catch (Exception e) { + LOG.error("Table " + region.getTableDesc().getNameAsString() + + " has a wrong priority"); + } + } + ActionPriorities actionPriority = getTableActionPriorities(region + .getTableDesc()); + regionActionPriorities.put(regionN, actionPriority); + regionPriMap.put(regionN, pri); + return pri; + } + + /** + * Get region priority + * + * @param region + * the region you want to get priority + * @param force + * force to refresh priority, if true,the method will get priority + * from region's table descriptor,or it will look from the cache. + * @return the region's priority + */ + private static int initRegionPriority(String region, boolean force) { + if (!force) { + Integer ret = regionPriMap.get(region); + if (ret != null) { + return ret; + } + } + HRegion hr = server.getOnlineRegion(Bytes.toBytes(region)); + return initRegionPriority(hr); + } + + /** + * translate thread priority to system priority thread priority is from 1 to + * 10 and 10 is highest priority, but system priority used by RPC is the same + * with linux system, small number means high priority.So translate the thread + * priority to system priority to see which job this thread can handle. Ps: + * thread can only handle the job which has a higher priority than its thread + * priority. + * + * @param tpri + * transfer thread priority to according table priority + * @return + */ + public static int priTrans(int tpri) { + return tpri > 10 || tpri < 1 ? 5 : 11 - tpri; + } + /** + * print out the region priorities + */ + public static void printRegionPriority() + { + String out = "Region Priories:"; + out+=regionPriMap.toString(); + LOG.debug(out); + } + +} Index: src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java (revision 0) @@ -0,0 +1,120 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.ipc.Invocation; +import org.apache.hadoop.hbase.ipc.WritableRpcEngine.Server; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.io.Writable; + +/** + * An IPC server extends HBaseServer,and add schedule function to make table priority take + * effect. + */ +public class PriorityHBaseServer extends Server { + + /** + * Construct an RPC server. + * + * @param instance the instance whose methods will be called + * @param ifaces Define metrics for all methods in the given classes + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param priorityHandlerCount used by {@link HBaseServer} + * @param verbose whether each call should be logged + * @param highPriorityLevel used by {@link HBaseServer},decides actions of meta info will be + * handled by metaHandlers + * @throws IOException e + */ + public PriorityHBaseServer(Object instance, final Class[] ifaces, + Configuration conf, String bindAddress, int port, int numHandlers, + int priorityHandlerCount, boolean verbose, int highPriorityLevel) + throws IOException { + super(instance, ifaces, conf, bindAddress, port, numHandlers, + priorityHandlerCount, verbose, highPriorityLevel); + int maxQueueSize = this.conf.getInt("ipc.server.max.queue.size", 500); + int handlers=this.conf.getInt("hbase.regionserver.handler.count", 10); + callQueue = new PriorityJobQueue(maxQueueSize, handlers>=10?10:handlers, + new PriorityFunction((HRegionServer) instance, conf), conf); + } + + @SuppressWarnings("rawtypes") + @Override + public Writable call(Class protocol, + Writable param, long receivedTime, MonitoredRPCHandler status) + throws IOException { + Invocation call = (Invocation) param; + HbaseObjectWritable writable = (HbaseObjectWritable) super.call(protocol, + param, receivedTime, status); + if (call.getMethodName().endsWith("openScanner")) { + ((PriorityJobQueue) callQueue).getFuction().initScannerPriority(call, + writable.get()); + } + return writable; + } + /** + * If handleSize is 20,and priorities of threads range from 1 to 10 the + * priority will be initialized to 10,9,8,7....1, 10,9,8,...2, 10 because Java + * thread use a priority rule which large number means higher priority. so + * this method will make the number of high priority threads is bigger than + * the lower ones. Because a thread can only handle the a job which have a + * higher priority than its own, the high priority jobs will have more threads + * to handle. Ps: the job priority is small number means higher priority,which + * is inverse of the thread priority, so there is a method {@PriorityFunction} + * priTrans do the priority transfer thing. + */ + private int[] initPriorityArray(int handleSize) { + int[] priorityArray = new int[handleSize]; + int minPriOfTurn = Thread.MIN_PRIORITY; + for (int i = 0, priNow = Thread.MAX_PRIORITY; i < handleSize; i++, priNow--) { + if (priNow < minPriOfTurn) { + priNow = Thread.MAX_PRIORITY; + minPriOfTurn++; + if (minPriOfTurn == Thread.MAX_PRIORITY) { + minPriOfTurn = Thread.MIN_PRIORITY; + } + } + priorityArray[i] = priNow; + } + return priorityArray; + } + + @Override + public synchronized void startThreads() { + super.startThreads(); + int[] priorityArray = initPriorityArray(handlers.length); + for (int i = 0; i < this.handlers.length; i++) { + handlers[i].setPriority(priorityArray[i]); + } + } + @Override + public void stop() + { + super.stop(); + ((PriorityJobQueue) callQueue).stop(); + } +} Index: src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java (revision 0) @@ -0,0 +1,723 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.InterruptedIOException; +import java.util.Calendar; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Operation; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +import com.google.common.base.Function; + +/** + * This queue is used by {@link PriorityHBaseServer} In this queue,generally + * thread can only get jobs which have same or higher priority than its thread + * priority.
+ * The thread which has the lowest priority can handle the jobs which has a + * lower priorities lower. Please use take and put to + * manipulate this queue.
+ * + * @param the class contained by the queue + */ +public class PriorityJobQueue implements BlockingQueue { + private static final Log LOG = LogFactory.getLog(PriorityJobQueue.class); + private final PriorityBlockingQueue> queue = new PriorityBlockingQueue>(); + private int capacity = 100; + private int maxWait = 1000; + private boolean running = true; + final int lowestThreadPriority; + ReentrantLock readLock = new ReentrantLock(); + Condition[] lockList ; + ReentrantLock addLock = new ReentrantLock(); + Condition queueFull = addLock.newCondition(); + + private static int handleFreshInter = 8; + private static int move = Integer.SIZE - handleFreshInter; + private static int flush = 1; + private long[] priorityAddTimes = new long[10]; + private long[] priorityGetTimes = new long[10]; + + private Function qosFunction; + + /** + * get static data of add times + * @return the add times arranged by priority + */ + public long[] getPriorityAddTimes() { + return priorityAddTimes; + } + + /** + * get static data of get times + * @return the get times arranged by priority + */ + + public long[] getPriorityGetTimes() { + return priorityGetTimes; + } + + private void decreaseSize() { + this.addLock.lock(); + if (queue.size() < this.capacity) { + this.queueFull.signal(); + } + this.addLock.unlock(); + } + + private void add(Job j) throws InterruptedIOException { + int wait = 0; + while (queue.size() >= this.capacity) { + try { + addLock.lock(); + this.queueFull.await(10, TimeUnit.MILLISECONDS); + addLock.unlock(); + wait++; + if (wait > this.maxWait) + break; + } catch (InterruptedException e) { + throw new InterruptedIOException("try add waiting interrupted"); + } + } + this.queue.add(j); + } + private Thread refresher = new Thread() { + public void run() { + while (running) { + refreshPeriodically(); + try { + sleep(1000); + } catch (InterruptedException e) { + // ignore this exception + } catch (Exception e) { + LOG.error(e); + } + } + } + }; + /** + * Stop the refresher. + */ + public void stop() { + this.running = false; + this.refresher.interrupt(); + } + + /** + * Initialize a queue + * + * @param capacity + * the capacity of the queue,not a precision value,if queue size + * exceeds this value, workers which add jobs should wait,but if a + * thread waits too many turns (more than maxWait),it will add the + * job anyhow. + * @param lowestThreadPriority + * the lowest priority which worker threads have, the default + * priority is range from 1 to 10,small number means high priority. + * @param server + * the instance of PriorityHBaseServer + */ + public PriorityJobQueue(int capacity, int lowestThreadPriority, + Function qosFunction, final Configuration conf) { + this.qosFunction = qosFunction; + // The handleFreshInter used to decide after how many RPC calls processed + // we increase the priority of the calls remained in this queue. + // 7 means after the 7th power of 2 RPC calls processed. + // we use ((flush << move) >>> move) == 0 to do mode. + // hbase.schedule.refreshinter:large number means lower priority calls will + // wait longer in this queue. + handleFreshInter = conf.getInt("hbase.schedule.refreshinter", 8); + handleFreshInter = handleFreshInter < 1 || handleFreshInter > 20 ? 8 + : handleFreshInter; + move = Integer.SIZE - handleFreshInter; + this.capacity = capacity; + this.refresher.setDaemon(true); + this.refresher.start(); + this.lowestThreadPriority = lowestThreadPriority; + lockList = new Condition[Thread.MAX_PRIORITY]; + for (int i = 0; i < lockList.length; i++) { + lockList[i] = readLock.newCondition(); + } + } + + public PriorityFunction getFuction() { + return (PriorityFunction) this.qosFunction; + } + + /** + * add a job to this queue + * @param call the job instance + * @param pri the job's priority + */ + public void add(T call, int pri) { + try { + this.add(new Job(pri, call)); + } catch (InterruptedIOException e) { + LOG.error(e); + } + singalHandler(); + synchronized (this.priorityAddTimes) { + this.priorityAddTimes[this.getCondition(pri)]++; + } + + } + + /** + * get the size of the queue. + * @return the size of the queue + */ + public int size() { + return this.queue.size(); + } + + /** + * get the size of the queue + * @return the size of the queue + */ + public int queueSize() { + return queue.size(); + } + + private int getCondition(int pri) { + if (pri <= 10 && pri >= 1) { + return pri - 1; + } else if (pri > 10) { + return 9; + } else { + return 0; + } + } + + private void singalHandler() { + readLock.lock(); + Job jobt = queue.peek(); + if (jobt != null) { + this.lockList[getCondition(jobt.orgPri)].signal(); + } + readLock.unlock(); + } + + /** + * if handler's priority is lower than job's priority, then this handler can't + * get this job. + * @param job the job which worker want to get + * @param pri the worker thread's priority + * @return should the worker get this job + */ + public boolean shouldWork(Job job, int pri) { + if (job == null) + return false; + return (pri >= job.orgPri) + || (job.orgPri < 1 && pri == 1) + || (job.orgPri > this.lowestThreadPriority && pri == this.lowestThreadPriority); + } + + /** + * get a job from the queue ,this method will + * test whether the thread can get this job + * + * @param pri the worker thread's priority + * @return the job + * @throws InterruptedException + */ + public T get(int pri) throws InterruptedException { + Job ret = null; + while (true) { + readLock.lock(); + ret = queue.peek(); + if (shouldWork(ret, pri)) { + ret = queue.take(); + readLock.unlock(); + break; + } + this.lockList[getCondition(pri)].await(100, TimeUnit.MILLISECONDS); + readLock.unlock(); + } + if (ret.orgPri > pri && pri != this.lowestThreadPriority) { + LOG.error("The handle get a job priority is lower than its priority,this shouldn't happen" + + " job priority:" + ret.orgPri + " handler pri:" + pri); + } + this.singalHandler(); + synchronized (this.priorityGetTimes) { + this.priorityGetTimes[this.getCondition(ret.orgPri)]++; + } + this.decreaseSize(); + return ret.getCall(); + + } + + public void printMetrix() { + LOG.debug("size is :" + queue.size()); + LOG.debug("capacity is :" + this.capacity); + String out = "Add times"; + for (int i = 0; i < this.priorityAddTimes.length; i++) { + out += " pri:" + (i + 1) + ":" + this.priorityAddTimes[i]; + } + LOG.debug("priority request static:" + out); + out = "Get times"; + for (int i = 0; i < this.priorityGetTimes.length; i++) { + out += " pri:" + (i + 1) + ":" + this.priorityGetTimes[i]; + } + LOG.debug("priority request static:" + out); + PriorityFunction.printRegionPriority(); + } + + /** + * refresh the priorities of the jobs which remaining in this queue,simply -1. + */ + public void refresh() { + this.refresher.interrupt(); + } + + static int outputIndicator = 0; + + private void refreshPeriodically() { + try { + if ((outputIndicator << 55) >>> 55 == 0) { + LOG.debug(Calendar.getInstance().getTime() + ":" + this.queue); + printMetrix(); + } + outputIndicator++; + for (Job job : queue) { + if (job != null) { + job.increasePriority(); + } + } + singalHandler(); + } catch (Exception e) { + LOG.error(e); + } + } + + @SuppressWarnings("unused") + private void refreshIner(int n) { + try { + for (Job job : queue) { + if (job != null) { + job.increasePriority(n); + } + } + singalHandler(); + } catch (Exception e) { + LOG.warn(e); + } + } + /** + * The Job stored in queue + * @param + */ + public static class Job implements Comparable> { + int orgPri = 0; + int priority = 0; + long initTime = 0; + T call; + + /** + * get the priority now + * @return the priority of this call + */ + public int getPriority() { + return priority; + } + + /** + * increase job's priority + */ + public void increasePriority() { + this.priority--; + } + + /** + * increase job's priority by n + * @param n the number you want to increase + */ + public void increasePriority(int n) { + this.priority = this.priority - n; + } + + /** + * get the instance hold by Job + * @return the call instance + */ + public T getCall() { + return call; + } + + /** + * set the instance hold by Job + * @param call the call instance + */ + public void setCall(T call) { + this.call = call; + } + + /** + * Initialize a Job instance + * @param pri the job priority + * @param call the instance hold by the job + */ + public Job(int pri, T call) { + this.orgPri = pri; + this.priority = pri; + this.initTime = System.currentTimeMillis(); + this.call = call; + } + + /** + * print the Job instance + */ + public String toString() { + return "orgPri:" + this.orgPri + ", lastPri:" + this.priority + + ", wait time:" + ((System.currentTimeMillis() - this.initTime)) + + ",ino:";// + call; + } + @Override + public int compareTo(Job arg0) { + return this.priority - arg0.priority; + } + } + + @Override + public Object remove() { + return this.queue.remove(); + } + + @Override + public Object poll() { + return this.queue.poll(); + } + + @Override + public Object element() { + return this.queue.element(); + } + + @Override + public Object peek() { + return this.queue.peek(); + } + + @Override + public boolean isEmpty() { + return this.queue.isEmpty(); + } + + @SuppressWarnings({ "rawtypes" }) + @Override + public Iterator iterator() { + return this.queue.iterator(); + } + + @Override + public Object[] toArray() { + return this.queue.toArray(); + } + + @SuppressWarnings("unchecked") + @Override + public Object[] toArray(Object[] a) { + return this.queue.toArray(a); + } + + @SuppressWarnings({ "rawtypes" }) + @Override + public boolean containsAll(Collection c) { + return this.queue.containsAll(c); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public boolean addAll(Collection c) { + return this.queue.addAll(c); + } + + @SuppressWarnings({ "rawtypes" }) + @Override + public boolean removeAll(Collection c) { + return this.queue.removeAll(c); + } + + @SuppressWarnings({ "rawtypes" }) + @Override + public boolean retainAll(Collection c) { + return this.queue.retainAll(c); + } + + @Override + public void clear() { + this.queue.clear(); + } + + @Override + public boolean add(Object e) { + if (queue.size() > this.capacity) + return false; + else + try { + this.put(e); + } catch (InterruptedException e1) { + LOG.error(e); + } + return true; + } + + @Override + public boolean offer(Object e) { + return this.offer(e); + } + + @Override + public void put(Object e) throws InterruptedException { + HBaseServer.Call call = (HBaseServer.Call) (e); + int pri = this.qosFunction.apply(call.param); + this.add((T) call, pri); + } + + @SuppressWarnings("unchecked") + @Override + public boolean offer(Object e, long timeout, TimeUnit unit) + throws InterruptedException { + return this.queue.offer((Job) e, timeout, unit); + } + + @Override + public Object take() throws InterruptedException { + if (((flush << move) >>> move) == 0) { + this.refresher.interrupt(); + } + flush++; + return this.get(PriorityFunction.priTrans(Thread.currentThread() + .getPriority())); + } + + @Override + public Object poll(long timeout, TimeUnit unit) throws InterruptedException { + + return this.get(PriorityFunction.priTrans(Thread.currentThread() + .getPriority())); + } + + @Override + public int remainingCapacity() { + return this.capacity - this.queue.size(); + } + + @Override + public boolean remove(Object o) { + return this.queue.remove(o); + } + + @Override + public boolean contains(Object o) { + return this.queue.contains(o); + } + + @SuppressWarnings("unchecked") + @Override + public int drainTo(@SuppressWarnings("rawtypes") Collection c) { + return this.queue.drainTo(c); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public int drainTo(Collection c, int maxElements) { + return this.queue.drainTo(c, maxElements); + } + + /** + * class used to store the action priority of table; + */ + public static class ActionPriorities { + private int putPriority = 0; + private int getPriority = 0; + private int deletePriority = 0; + private int scanPriority = 0; + public ActionPriorities() { + } + + /** + * Get the priority of this operation + * + * @param operation + * @return + */ + public int getPriority(Get operation) { + return this.getGetPriority(); + } + + /** + * Get the priority of this operation + * + * @param operation + * @return + */ + public int getPriority(Mutation operation) { + return this.getPutPriority(); + } + + /** + * Get the priority of this operation + * + * @param operation + * @return + */ + public int getPriority(Scan operation) { + return this.getScanPriority(); + } + + /** + * Get the priority of this operation + * + * @param operation + * @return + */ + public int getPriority(Object operation) { + return 0; + } + + public ActionPriorities(int scanPriority, int putPriority, int getPriority, + int deletePriority) { + this.scanPriority = scanPriority; + this.putPriority = putPriority; + this.getPriority = getPriority; + this.deletePriority = deletePriority; + } + + /** + * get put priority + * @return + */ + public int getPutPriority() { + return putPriority; + } + + /** + * set put priority + * + * @return + */ + public void setPutPriority(int priority) { + this.putPriority = priority; + } + + /** + * get get priority + * + * @return + */ + public int getGetPriority() { + return getPriority; + } + + /** + * set get priority + * + * @return + */ + public void setGetPriority(int priority) { + this.getPriority = priority; + } + + /** + * get delete priority + * + * @return + */ + public int getDeletePriority() { + return deletePriority; + } + + /** + * set delete priority + * + * @return + */ + public void setDeletePriority(int priority) { + this.deletePriority = priority; + } + + /** + * get scan priority + * + * @return + */ + public int getScanPriority() { + return scanPriority; + } + + /** + * set scan priority + * + * @return + */ + public void setScanPriority(int priority) { + this.scanPriority = priority; + } + + public String toString() { + return this.scanPriority + "," + this.putPriority + "," + this.getPriority + "," + + this.deletePriority + ","; + } + + /** + * store this object into byte array + * + * @return + */ + public byte[] toBytes() { + byte[] ret = new byte[Bytes.SIZEOF_INT * 4]; + Bytes.putInt(ret, 0, this.scanPriority); + Bytes.putInt(ret, Bytes.SIZEOF_INT, this.putPriority); + Bytes.putInt(ret, Bytes.SIZEOF_INT * 2, this.getPriority); + Bytes.putInt(ret, Bytes.SIZEOF_INT * 3, this.deletePriority); + return ret; + } + + /** + * get value from byte[] + * + * @param b the serialized value of an ActionPriorites instance + */ + public ActionPriorities fromBytes(byte[] b) { + if (b.length != Bytes.SIZEOF_INT * 4) + return null; + else { + this.scanPriority = Bytes.toInt(b, 0); + this.putPriority = Bytes.toInt(b, Bytes.SIZEOF_INT); + this.getPriority = Bytes.toInt(b, Bytes.SIZEOF_INT * 2); + this.deletePriority = Bytes.toInt(b, Bytes.SIZEOF_INT * 3); + } + return this; + } + } +} Index: src/main/java/org/apache/hadoop/hbase/ipc/QosRegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/QosRegionObserver.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/ipc/QosRegionObserver.java (revision 0) @@ -0,0 +1,60 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; + +public class QosRegionObserver extends BaseRegionObserver { + @Override + public void postOpen(ObserverContext e) { + HRegion region = e.getEnvironment().getRegion(); + PriorityFunction.initRegionPriority(region); + } + + @Override + public void postClose(ObserverContext e, + boolean abortRequested) { + HRegion region = e.getEnvironment().getRegion(); + PriorityFunction.cleanRegionPriority(region); + } + + @Override + public RegionScanner postScannerOpen( + final ObserverContext e, final Scan scan, + final RegionScanner s) { + PriorityFunction.addRegionScanner(scan, s); + return s; + } + + @Override + public void postScannerClose( + final ObserverContext e, + final InternalScanner s) throws IOException { + PriorityFunction.removeScanner((RegionScanner) s); + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1213130) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -2353,9 +2353,18 @@ RegionScanner s = scanners.remove(this.scannerName); if (s != null) { try { + HRegion region = getRegion(s.getRegionInfo().getRegionName()); + if (region != null && region.getCoprocessorHost() != null) { + region.getCoprocessorHost().preScannerClose(s); + } + s.close(); + if (region != null && region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postScannerClose(s); + } } catch (IOException e) { - LOG.error("Closing scanner", e); + LOG.error("Closing scanner for " + + s.getRegionInfo().getRegionNameAsString(), e); } } } Index: src/test/java/org/apache/hadoop/hbase/ipc/TestActionPriority.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ipc/TestActionPriority.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/ipc/TestActionPriority.java (revision 0) @@ -0,0 +1,268 @@ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.Random; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.ActionPriorities; +import org.apache.hadoop.hbase.ipc.TestTablePriority.Worker; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertTrue; + +/** + * Test for action priority, use Action_Priority table and scan and put with + * different priorities. There are two tests,in first one Get's priority is + * 0(high priority, RPC priority = table priority 0+ action priority 0)
+ * scan's priority is 10 (low priority)and in the second test,we switch the + * priorities of scan and get. + */ +@Category(LargeTests.class) +public class TestActionPriority { + private final static Log LOG = LogFactory.getLog(TestActionPriority.class); + static final Random r = new Random(); + static final int rowNubmer = 10000; + static final int threadN = 100; + HBaseAdmin admin = null; + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(PriorityFunction.PRI_ENABLE_KEY, true); + TEST_UTIL.startMiniCluster(1); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * set up a cluster and prepare tables. + */ + @Before + public void setUp() { + try { + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } catch (Exception e) { + LOG.info(e); + } + HTableDescriptor des; + byte[][] startKeys = new byte[][] { Bytes.toBytes("0"), Bytes.toBytes("4") }; + try { + if (admin.tableExists("Action_Priority")) { + admin.disableTable("Action_Priority"); + admin.deleteTable("Action_Priority"); + } + des = new HTableDescriptor("Action_Priority"); + des.addFamily(new HColumnDescriptor("ff")); + des.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY, + new ActionPriorities(10, 0, 0, 0).toBytes()); + des.setValue(PriorityFunction.PRI_KEY, Bytes.toBytes(0 + "")); + admin.createTable(des, startKeys); + + } catch (Exception e) { + LOG.info(e); + } + writeData(); + } + + private void writeData() { + LOG.info("begion write data into test table"); + int nPerWorker = rowNubmer / threadN; + Worker[] workers = new Worker[threadN]; + for (int i = 0; i < workers.length; i++) { + try { + workers[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), + "Action_Priority"), "writeData", nPerWorker); + } catch (IOException e) { + } + } + for (int i = 0; i < workers.length; i++) { + workers[i].start(); + } + for (int i = 0; i < workers.length; i++) { + try { + workers[i].join(); + } catch (InterruptedException e) { + LOG.error(e); + } + } + LOG.info("Write data into test table finished."); + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() { + + } + + /** + * Verify whether the function take effect + * + * @param highs high priority workers + * @param lows low priority workers + */ + + @SuppressWarnings("static-access") + public void verifyFunction(Worker highs[], Worker lows[]) { + boolean highFinished = false; + boolean lowFinished = false; + long highThroughPut = 0; + long lowThroughPut = 0; + while (!(highFinished && lowFinished)) { + try { + Thread.currentThread().sleep(200); + } catch (InterruptedException e) { + } + highThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < highs.length; i++) { + highThroughPut += highs[i].getThroughput(); + lowThroughPut += lows[i].getThroughput(); + } + LOG.info("-------------------------------------------------------------"); + LOG.info("High priority action type is:" + highs[0].getType() + + ", priority:" + highs[0].getActionPriority() + " throughput is:" + + highThroughPut); + LOG.info("low priority action type is:" + lows[0].getType() + + ", priority:" + lows[0].getActionPriority() + " throughput is:" + + lowThroughPut); + + highFinished = true; + lowFinished = true; + for (int i = 0; i < highs.length; i++) { + if (highs[i].isAlive()) { + highFinished = false; + } + if (lows[i].isAlive()) { + lowFinished = false; + } + } + if (highFinished) { + for (int i = 0; i < highs.length; i++) { + lows[i].stopWorker(); + } + lowFinished = true; + } + if (lowFinished) { + for (int i = 0; i < highs.length; i++) { + highs[i].stopWorker(); + } + highFinished = true; + } + + } + highThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < highs.length; i++) { + highThroughPut += highs[i].getThroughput(); + lowThroughPut += lows[i].getThroughput(); + } + assertTrue("Action priority works properly", highThroughPut > lowThroughPut); + LOG.info("-------------------------------------------------------------"); + LOG.info("---------------------Test finished --------------------------"); + LOG.info("High priority action type is:" + highs[0].getType() + + ", priority:" + highs[0].getActionPriority() + " throughput is:" + + highThroughPut); + LOG.info("low priority action type is:" + lows[0].getType() + ", priority:" + + lows[0].getActionPriority() + " throughput is:" + lowThroughPut); + LOG.info("####### Test for " + highs[0].getType() + ", priority:" + + highs[0].getActionPriority() + " ,and " + lows[0].getType() + + ", priority:" + lows[0].getActionPriority() + " finished####"); + + LOG.info("-------------------------------------------------------------"); + } + + /** + * run the test; + */ + @Test + public void testForDifferentActionPriority() { + Worker puts[] = new Worker[threadN / 2]; + Worker scans[] = new Worker[threadN / 2]; + for (int i = 0; i < puts.length; i++) { + try { + puts[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), + "Action_Priority"), "put", rowNubmer * 2 / threadN); + scans[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), + "Action_Priority"), "scan", rowNubmer * 2 / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < puts.length; i++) { + puts[i].setActionPriority(0); + puts[i].start(); + scans[i].start(); + scans[i].setActionPriority(10); + } + verifyFunction(puts, scans); + + try { + admin.disableTable("Action_Priority"); + HTableDescriptor des1 = admin.getTableDescriptor(Bytes + .toBytes("Action_Priority")); + des1.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY, + new ActionPriorities(0, 10, 10, 10).toBytes()); + admin.modifyTable(Bytes.toBytes("Action_Priority"), des1); + admin.enableTable("Action_Priority"); + } catch (IOException e) { + LOG.info(e); + } + puts = new Worker[threadN / 2]; + scans = new Worker[threadN / 2]; + for (int i = 0; i < puts.length; i++) { + try { + + puts[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), + "Action_Priority"), "put", rowNubmer * 2 / threadN); + scans[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), + "Action_Priority"), "scan", rowNubmer * 2 / threadN); + puts[i].setActionPriority(10); + scans[i].setActionPriority(0); + } catch (IOException e) { + } + } + for (int i = 0; i < puts.length; i++) { + puts[i].start(); + scans[i].start(); + } + verifyFunction(scans, puts); + } + + public static void main(String args[]) { + + try { + setUpBeforeClass(); + TestActionPriority t = new TestActionPriority(); + t.setUp(); + t.testForDifferentActionPriority(); + tearDownAfterClass(); + } catch (Exception e) { + LOG.error(e); + } + + } + +} Index: src/test/java/org/apache/hadoop/hbase/ipc/TestPriorityJobQueue.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ipc/TestPriorityJobQueue.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/ipc/TestPriorityJobQueue.java (revision 0) @@ -0,0 +1,152 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.Job; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertTrue; + +/** + * Test used to ensure the PriorityJobQueue works properly + * + */ +@Category(SmallTests.class) +public class TestPriorityJobQueue { + private final Log LOG = LogFactory.getLog(TestPriorityJobQueue.class); + private static final int lowestPriority = 10; + private static final int queueSize = 1000; + private static final int testThreadN = 50; + private static final int testTimes = 3000; + + public static void main(String args[]) { + + // junit.textui.TestRunner.run(TestForPriorityJobQueue.class); + } + + /** + * test for ensure the queue refreshed the jobs' priority + */ + @SuppressWarnings("static-access") + public void testQueueRefresh() { + final PriorityJobQueue queue = new PriorityJobQueue( + queueSize, lowestPriority, null,HBaseConfiguration.create()); + for (int i = 0; i < 10; i++) { + queue.add(i + "", 10); + } + + boolean match = true; + queue.refresh(); + queue.refresh(); + try { + Thread.currentThread().sleep(1000); + } catch (InterruptedException e) { + } + for (Object j : queue.toArray()) { + Job job = (Job) j; + LOG.debug("Job is:" + job); + if (job.getPriority() >= 10) + match = false; + } + assertTrue("calls' priorities are refreshed", match); + } + + /** + * test the queue's function: threads (not with the lowest priority)
+ * can only get a job which have a higher priority And test the number which + * input jobs is equal with the output number + */ + @Test + public void testGetJob() { + final Random r = new Random(); + final PriorityJobQueue queue = new PriorityJobQueue( + queueSize, lowestPriority, null,HBaseConfiguration.create()); + Thread consumer[] = new Thread[testThreadN]; + int pri = 1; + + for (int i = 0; i < consumer.length; i++, pri++) { + consumer[i] = new Thread(i + "") { + public void run() { + while (true) { + String j = null; + try { + j = (String) queue.take(); + } catch (InterruptedException e) { + } + if ((Math.abs(this.getPriority() - 10) + 1) != lowestPriority) { + assertTrue( + "Thread get a job which have a higher priority", + Integer.parseInt(j) <= (Math.abs(this.getPriority() - 10) + 1)); + LOG.debug("job:" + Integer.parseInt(j) + " handler:" + + (Math.abs(this.getPriority() - 10) + 1)); + } + } + } + }; + if (pri >= 11) + pri = 1; + consumer[i].setDaemon(true); + consumer[i].setPriority(pri); + consumer[i].start(); + } + + Thread producer[] = new Thread[testThreadN]; + for (int i = 0; i < producer.length; i++) { + producer[i] = new Thread(i + "") { + public void run() { + for (int j = 0; j < testTimes; j++) { + int jobpri = (r.nextInt(19) - 4); + queue.add("" + jobpri, jobpri); + } + } + }; + producer[i].start(); + } + for (Thread d : producer) { + try { + d.join(); + } catch (InterruptedException e) { + } + } + + while (queue.size() != 0) { + try { + Thread.sleep(30); + } catch (InterruptedException e) { + } + } + + boolean match = true; + for (int i = 0; i < 10; i++) { + if (queue.getPriorityAddTimes()[i] != queue.getPriorityGetTimes()[i]) { + match = false; + } + } + queue.printMetrix(); + assertTrue("all job are finished", match); + } +} Index: src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriority.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriority.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/ipc/TestTablePriority.java (revision 0) @@ -0,0 +1,561 @@ +/** + * Copyright the Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.Random; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.PriorityJobQueue.ActionPriorities; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertTrue; + +/** + * Test for table priority, use table A,B,C which have different priorities. + * There are two tests,in first one table A's priority is 1(high priority), B is + * 5, C is 10 and in the second test,we switch the priorities of A and C. + */ +@Category(LargeTests.class) +public class TestTablePriority { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static Log LOG = LogFactory.getLog(TestTablePriority.class); + static final Random r = new Random(); + static final int rowNubmer = 10000; + static final int threadN = 150; + HBaseAdmin admin = null; + // if there are data in the test tables,you + // can set method type to "scan" + static String method = "put"; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(PriorityFunction.PRI_ENABLE_KEY, true); + TEST_UTIL.startMiniCluster(1); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * set up a cluster and prepare tables. + */ + @Before + public void setUp() { + try { + admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + } catch (Exception e) { + LOG.info(e); + } + HTableDescriptor des; + byte[][] startKeys = new byte[][] { Bytes.toBytes("0"), Bytes.toBytes("4") }; + try { + if (admin.tableExists("Table_A")) { + admin.disableTable("Table_A"); + admin.deleteTable("Table_A"); + } + if (admin.tableExists("Table_B")) { + admin.disableTable("Table_B"); + admin.deleteTable("Table_B"); + } + if (admin.tableExists("Table_C")) { + admin.disableTable("Table_C"); + admin.deleteTable("Table_C"); + } + } catch (IOException e1) { + LOG.error(e1); + } + try { + des = new HTableDescriptor("Table_A"); + des.addFamily(new HColumnDescriptor("ff")); + des = PriorityFunction.setPriority(1, des); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + try { + des = new HTableDescriptor("Table_B"); + des.addFamily(new HColumnDescriptor("ff")); + des = PriorityFunction.setPriority(5, des); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + try { + + des = new HTableDescriptor("Table_C"); + des.addFamily(new HColumnDescriptor("ff")); + des = PriorityFunction.setPriority(10, des); + admin.createTable(des, startKeys); + } catch (Exception e) { + LOG.info(e); + } + } + + /** + * The worker used for test throughput and gather the result + */ + public static class Worker extends Thread { + HTable table; + String type; + private long throughput = 0; + long times = 1000; + boolean stopFlag = false; + int tablePriority; + long n = 0; + long startTime = 0; + int actionPriority = 0; + + /** + * get throughput of this worker + * + * @return + */ + public long getThroughput() { + long end = System.currentTimeMillis(); + throughput = (n * 1000) / (end - startTime + 1); + return throughput; + } + + /** + * get table priority + * + * @return table priority + */ + public int getTablePriority() { + return tablePriority; + } + + /** + * set table priority + */ + public void setTablePriority(int tablePriority) { + this.tablePriority = tablePriority; + } + + /** + * get action type + * + * @return action type of the worker + */ + public String getType() { + return type; + } + + /** + * the table used by the worker + * + * @return table of the worker + */ + public HTable getTable() { + return table; + } + + public Worker(HTable t, String type, long times) { + this.table = t; + this.type = type; + this.times = times; + } + + public void run() { + try { + doAction(table, type, times); + } catch (IOException e) { + LOG.info(e); + } + } + + public int getActionPriority() { + return actionPriority; + } + + public void setActionPriority(int actionPriority) { + this.actionPriority = actionPriority; + } + + /** + * stop the worker + */ + public void stopWorker() { + this.stopFlag = true; + } + + private void doAction(HTable t, String type, long times) throws IOException { + long start = System.currentTimeMillis(); + long end = System.currentTimeMillis(); + startTime = System.currentTimeMillis(); + int pri = Integer.parseInt(Bytes.toString(t.getTableDescriptor() + .getValue(PriorityFunction.PRI_KEY))); + Scan s = new Scan(); + s.setStartRow(Bytes.toBytes(r.nextInt(10)+"")); + if (type.equals("writeData")) { + t.setAutoFlush(false); + t.setWriteBufferSize(1000); + } + ResultScanner sn = null; + boolean scan = false; + if (type.equals("scan")) + { + sn = t.getScanner(s); + scan = true; + } + while (true) { + if (n % 10000 == 0) { + end = System.currentTimeMillis(); + LOG.debug("Thread:" + this.getId() + " type:" + type + " table" + + t.getTableDescriptor().getNameAsString() + " pri :" + pri + + " time:" + (end - start) + " total :" + n + " throughput:" + + (n * 1000) / (end - startTime + 1)); + throughput = (n * 1000) / (end - startTime + 1); + start = end; + } + if (n % 100 == 0) { + end = System.currentTimeMillis(); + throughput = (n * 1000) / (end - startTime + 1); + } + Result ret = null; + if (scan) { + ret = sn.next(); + if (ret == null) { + try { + sn.close(); + } catch (Exception e) { + LOG.debug(e); + } + sn = t.getScanner(s); + } + } else { + put(t, n); + } + if (stopFlag || n > times) { + break; + } + n++; + } + t.flushCommits(); + t.close(); + } + } + + /** + * Verify whether the function take effect + * + * @param highs high priority workers + * @param lows low priority workers + */ + @SuppressWarnings("static-access") + public void verifyFunction(Worker high[], Worker middle[], Worker low[]) { + boolean highFinished = false; + boolean lowFinished = false; + boolean middleFinished = false; + long highThroughPut = 0; + long middleThroughPut = 0; + long lowThroughPut = 0; + while (!(highFinished && lowFinished && middleFinished)) { + try { + Thread.currentThread().sleep(100); + } catch (InterruptedException e) { + // ignore exceptions + } + highThroughPut = 0; + middleThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < high.length; i++) { + highThroughPut += high[i].getThroughput(); + lowThroughPut += low[i].getThroughput(); + middleThroughPut += middle[i].getThroughput(); + } + LOG.info("-------------------------------------------------------------"); + try { + LOG.info("high priority table is: " + + high[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " throughput is:" + highThroughPut); + LOG.info("middle priority table is: " + + middle[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + middle[0].getType() + ", priority:" + + middle[0].getTablePriority() + " throughput is:" + + middleThroughPut); + LOG.info("low priority table is: " + + low[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + low[0].getType() + ", priority:" + + low[0].getTablePriority() + " throughput is:" + lowThroughPut); + } catch (Exception e) { + LOG.error(e); + } + + highFinished = true; + lowFinished = true; + middleFinished = true; + for (int i = 0; i < high.length; i++) { + if (high[i].isAlive()) { + highFinished = false; + } + if (low[i].isAlive()) { + lowFinished = false; + } + if (middle[i].isAlive()) { + middleFinished = false; + } + } + if (highFinished) { + for (int i = 0; i < high.length; i++) { + low[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + middle[i].stopWorker(); + } + middleFinished = true; + lowFinished = true; + } + if (lowFinished) { + for (int i = 0; i < high.length; i++) { + high[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + middle[i].stopWorker(); + } + middleFinished = true; + highFinished = true; + } + if (middleFinished) { + for (int i = 0; i < high.length; i++) { + high[i].stopWorker(); + } + for (int i = 0; i < high.length; i++) { + low[i].stopWorker(); + } + lowFinished = true; + highFinished = true; + } + + } + highThroughPut = 0; + middleThroughPut = 0; + lowThroughPut = 0; + for (int i = 0; i < high.length; i++) { + highThroughPut += high[i].getThroughput(); + lowThroughPut += low[i].getThroughput(); + middleThroughPut += middle[i].getThroughput(); + } + + assertTrue("Action priority works properly", + highThroughPut > middleThroughPut && middleThroughPut > lowThroughPut); + LOG.info("-------------------------------------------------------------"); + LOG.info("---------------------Test finished --------------------------"); + + try { + LOG.info("high priority table is: " + + high[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " throughput is:" + highThroughPut); + LOG.info("middle priority table is: " + + middle[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + middle[0].getType() + ", priority:" + + middle[0].getTablePriority() + " throughput is:" + middleThroughPut); + LOG.info("low priority table is: " + + low[0].getTable().getTableDescriptor().getNameAsString() + + " action type is:" + low[0].getType() + ", priority:" + + low[0].getTablePriority() + " throughput is:" + lowThroughPut); + } catch (Exception e) { + LOG.error(e); + } + LOG.info("####### Test for " + high[0].getType() + ", priority:" + + high[0].getTablePriority() + " " + middle[0].getType() + + ", priority:" + middle[0].getTablePriority() + " ,and " + + low[0].getType() + ", priority:" + low[0].getTablePriority() + + " finished####"); + + LOG.info("-------------------------------------------------------------"); + } + private void changePriority(int pria,int prib,int pric) + { + try { + admin.disableTable("Table_A"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_A")); + des1.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY, + new ActionPriorities(0, 0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(pria, des1); + admin.modifyTable(Bytes.toBytes("Table_A"), des1); + admin.enableTable("Table_A"); + } catch (Exception e) { + LOG.info(e); + } + try { + admin.disableTable("Table_B"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_B")); + des1.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY, + new ActionPriorities(0, 0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(prib, des1); + admin.modifyTable(Bytes.toBytes("Table_B"), des1); + admin.enableTable("Table_B"); + } catch (Exception e) { + LOG.info(e); + } + try { + admin.disableTable("Table_C"); + HTableDescriptor des1 = admin + .getTableDescriptor(Bytes.toBytes("Table_C")); + des1.setValue(PriorityFunction.PRI_KEY_ACTION_PRIORITY, + new ActionPriorities(0, 0, 0, 0).toBytes()); + des1 = PriorityFunction.setPriority(pric, des1); + admin.modifyTable(Bytes.toBytes("Table_C"), des1); + admin.enableTable("Table_C"); + } catch (Exception e) { + LOG.info(e); + } + } + /** + * start the test. + */ + @Test(timeout = 180000) + public void testForDifferentTablePriority() { + method="put"; + doTestJob(); + method="scan"; + changePriority(1,5,10); + doTestJob(); + } + + + private void doTestJob() + { + Worker A[] = new Worker[threadN / 3]; + Worker B[] = new Worker[threadN / 3]; + Worker C[] = new Worker[threadN / 3]; + for (int i = 0; i < A.length; i++) { + try { + A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"), + method, rowNubmer * 3 / threadN); + B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"), + method, rowNubmer * 3 / threadN); + C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"), + method, rowNubmer * 3 / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < A.length; i++) { + A[i].setTablePriority(1); + B[i].setTablePriority(5); + C[i].setTablePriority(10); + A[i].start(); + B[i].start(); + C[i].start(); + } + verifyFunction(A, B, C); + changePriority(10,5,1); + A = new Worker[threadN / 3]; + B = new Worker[threadN / 3]; + C = new Worker[threadN / 3]; + for (int i = 0; i < A.length; i++) { + try { + A[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_A"), + method, rowNubmer * 3 / threadN); + B[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_B"), + method, rowNubmer * 3 / threadN); + C[i] = new Worker(new HTable(TEST_UTIL.getConfiguration(), "Table_C"), + method, rowNubmer * 3 / threadN); + } catch (IOException e) { + } + } + for (int i = 0; i < A.length; i++) { + A[i].setTablePriority(10); + B[i].setTablePriority(5); + C[i].setTablePriority(1); + A[i].start(); + B[i].start(); + C[i].start(); + } + verifyFunction(C, B, A); + } + public static void printTable(String name) + { + + try { + HTable A = new HTable(TEST_UTIL.getConfiguration(), name); + Scan s=new Scan(); + ResultScanner rs; + rs = A.getScanner(s); + Result r=null; + while((r=rs.next())!=null) + { + System.out.println(r); + } + } catch (IOException e) { + LOG.error(e); + } + + } + public static void main(String args[]) { + try { + setUpBeforeClass(); + TestTablePriority t = new TestTablePriority(); + t.setUp(); + tearDownAfterClass(); + } catch (Exception e) { + LOG.info(e); + } + } + + static byte[] b; + static { + String bs = ""; + for (int i = 0; i < 100; i++) { + bs += i; + } + b = Bytes.toBytes(bs); + } + private static void put(HTable t, long i) { + Put p = new Put( + Bytes.toBytes("" + r.nextInt(1000) + (i) + r.nextInt(10000))); + p.add(Bytes.toBytes("ff"), Bytes.toBytes("ff"), b); + try { + t.put(p); + } catch (IOException e) { + LOG.info(e); + } + } +}