diff -uNr org/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java changed/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java 2011-08-04 19:52:17.000000000 +0800 +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java 2011-08-04 19:52:15.000000000 +0800 @@ -511,6 +511,10 @@ final int numHandlers, int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel) throws IOException { + if(instance instanceof HRegionInterface&&conf.getBoolean("hbase.tablepriority.enable", true)) + { + return new PriorityHBaseServer(instance, ifaces, conf, bindAddress, port, numHandlers, metaHandlerCount, verbose, highPriorityLevel); + } return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, metaHandlerCount, verbose, highPriorityLevel); } diff -uNr org/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java changed/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java 2011-08-04 19:52:17.000000000 +0800 +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java 2011-08-04 19:52:15.000000000 +0800 @@ -217,7 +217,7 @@ } /** A call queued for handling. */ - private static class Call { + static class Call { protected int id; // the client's call id protected Writable param; // the parameter passed protected Connection connection; // connection to client @@ -811,7 +811,7 @@ } /** Reads calls from a connection and queues them for handling. */ - private class Connection { + protected class Connection { private boolean versionRead = false; //if initial signature and //version are read private boolean headerRead = false; //if the connection header that @@ -1000,7 +1000,7 @@ } /** Handles queued calls . */ - private class Handler extends Thread { + protected class Handler extends Thread { private final BlockingQueue myCallQueue; static final int BUFFER_INITIAL_SIZE = 1024; diff -uNr org/src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java changed/src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java --- src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java 1970-01-01 08:00:00.000000000 +0800 +++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityHBaseServer.java 2011-08-04 19:52:15.000000000 +0800 @@ -0,0 +1,649 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Action; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.MetaScanner; +import org.apache.hadoop.hbase.client.MultiAction; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.ipc.HBaseRPC.Invocation; +import org.apache.hadoop.hbase.ipc.HBaseRPC.Server; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +/** + * An abstract IPC service. IPC calls take a single {@link Writable} as a + * parameter, and return a {@link Writable} as their value. A service runs on a + * port and is defined by a parameter class and a value class. + * + * Extends HBaseServer,and add schedule function to make table priority take + * effect. + */ +public class PriorityHBaseServer extends Server { + + public final static String PRI_KEY = "priority"; + public final static String PRI_KEY_ACTION_PLUS = "action_plus"; + public static final int LOWEST_PRI = 10; + public static final int DEFAULT_PRI = 5; + public static final int HIGHEST_PRI = -10; + public static final int HIGH_PRI = 0; + /** + * priority refresh interval + */ + private static int initInter = 120000; + + /** + * the priority map,cache the region table and scanner's priority in memory. + */ + private static Map scannersMap; + private final static ConcurrentHashMap regionPriMap = new ConcurrentHashMap(); + private final static ConcurrentHashMap tablePriMap = new ConcurrentHashMap(); + private final static ConcurrentHashMap scannerPriMap = new ConcurrentHashMap(); + private final static ConcurrentHashMap scannerPriMapInteger = new ConcurrentHashMap(); + + private final static ConcurrentHashMap tableActionPlus = new ConcurrentHashMap(); + private final static ConcurrentHashMap regionActionPlus = new ConcurrentHashMap(); + + // TODO: add priority of source host. + private final static ConcurrentHashMap> tableSourceHostPlus = new ConcurrentHashMap>(); + private final static ConcurrentHashMap> regionSourceHostPlus = new ConcurrentHashMap>(); + + protected BlockingQueue callQueue; + public static final Log LOG = LogFactory + .getLog("org.apache.hadoop.ipc.ScheduleHBaseServer"); + protected static final ThreadLocal SERVER = new ThreadLocal(); + private Object instance; + private Handler[] handlersChild = null; + + /** + * Construct an RPC server. + * + * @param instance + * the instance whose methods will be called + * @param conf + * the configuration to use + * @param bindAddress + * the address to bind on to listen for connection + * @param port + * the port to listen for connections on + * @param numHandlers + * the number of method handler threads to run + * @param verbose + * whether each call should be logged + * @throws IOException + * e + */ + public PriorityHBaseServer(Object instance, final Class[] ifaces, + Configuration conf, String bindAddress, int port, int numHandlers, + int metaHandlerCount, boolean verbose, int highPriorityLevel) + throws IOException { + super(instance, ifaces, conf, bindAddress, port, numHandlers, + metaHandlerCount, verbose, highPriorityLevel); + int maxQueueSize = this.conf.getInt("ipc.server.max.queue.size", 500); + initInter = this.conf.getInt("ipc.priority.refresh.interval", 120000); + Field f; + try { + f = super.getClass().getSuperclass().getSuperclass() + .getDeclaredField("callQueue"); + f.setAccessible(true); + callQueue = new PriorityJobQueue(maxQueueSize, + numHandlers >= 10 ? 10 : numHandlers, this); + f.set(this, this.callQueue); + f = instance.getClass().getDeclaredField("scanners"); + f.setAccessible(true); + scannersMap = (Map) f.get(instance); + } catch (SecurityException e) { + e.printStackTrace(); + } catch (NoSuchFieldException e) { + e.printStackTrace(); + } catch (IllegalArgumentException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + this.instance = instance; + } + + /** + * get action priorities from table descriptor + * + * @param des + * the table descriptor + * @return + */ + public ActionPriPlus getTableActionPlus(HTableDescriptor des) { + byte[] plus = des.getValue(Bytes.toBytes(PRI_KEY_ACTION_PLUS)); + ActionPriPlus ret = new ActionPriPlus(); + if (plus != null) { + try { + ret.fromBytes(plus); + } catch (Exception e) { + e.printStackTrace(); + } + return ret; + } + return ret; + } + + /** + * Initiate the region priority + * + * @param regions + * the region want to get priority + * @param force + * force refresh priority,if true will get priority from table + * descriptor. + * @return the region priority + */ + + @SuppressWarnings("unused") + private int initRegionPri(byte[] regions, boolean force) { + String region = Bytes.toString(regions); + return this.initRegionPri(region, force); + + } + + /** + * Get the priority of the call + * + * @param call + * @return the priority + */ + protected int getCallPri(Call call) { + Invocation invo = (Invocation) call.param; + if (invo.getMethodName().endsWith("next")) { + Long scanN = (Long) invo.getParameters()[0]; + Integer pri = scannerPriMapInteger.get(scanN); + if (pri == null) { + String regionN = scannerPriMap.get(scanN); + if (regionN != null) { + pri = getRegionPri(regionN); + ActionPriPlus action = this.regionActionPlus.get(regionN); + if (action != null) { + pri += action.getScanPlus(); + } + scannerPriMapInteger.put(scanN, pri); + return pri; + } else { + LOG.error("error,there is no this scanner id"); + return DEFAULT_PRI; + } + } + return pri; + } else if (invo.getMethodName().endsWith("multi")) { + MultiAction multi = (MultiAction) invo.getParameters()[0]; + for (Map.Entry> e : multi.actions.entrySet()) { + String regionN = Bytes.toString(e.getKey()); + Integer pri = getRegionPri(regionN); + ActionPriPlus plus; + if ((plus = this.regionActionPlus.get(regionN)) != null) { + List actionsForRegion = e.getValue(); + Action action = actionsForRegion.get(0); + if (action != null) { + Row row = action.getAction(); + if (row instanceof Delete) { + pri += plus.getDeletePlus(); + + } else if (row instanceof Get) { + pri += plus.getGetPlus(); + + } else if (row instanceof Put) { + pri += plus.getPutPlus(); + } + } + } + return pri; + } + return DEFAULT_PRI; + } else if (invo.getMethodName().endsWith("get") + || invo.getMethodName().endsWith("put") + || invo.getMethodName().endsWith("delete")) { + byte[] region = (byte[]) invo.getParameters()[0]; + String regionN = Bytes.toString(region); + Integer pri = getRegionPri(regionN); + ActionPriPlus plus; + if ((plus = this.regionActionPlus.get(regionN)) != null) { + if (invo.getMethodName().endsWith("delete")) { + pri += plus.getDeletePlus(); + } else if (invo.getMethodName().endsWith("get")) { + pri += plus.getGetPlus(); + + } else if (invo.getMethodName().endsWith("put")) { + pri += plus.getPutPlus(); + } + } + return pri; + } else { + return HIGH_PRI; + } + } + + private int getRegionPri(String regionN) { + Integer pri = regionPriMap.get(regionN); + if (pri == null) { + pri = initRegionPri(regionN, false); + } + if (pri == null) + return DEFAULT_PRI; + return pri; + } + + /** + * Initiate the region priority + * + * @param regions + * the region want to get priority + * @param force + * force refresh priority,if true will get priority from table + * descriptor. + * @return the region priority + */ + private int initRegionPri(String region, boolean force) { + if (!force) { + Integer ret = regionPriMap.get(region); + if (ret != null) + return ret; + } + Integer prii; + int pri = DEFAULT_PRI; + HRegion hr = ((HRegionServer) this.instance).getOnlineRegion(Bytes + .toBytes(region)); + + if (hr != null) { + if (hr.getRegionInfo().isMetaRegion() + || hr.getRegionInfo().isRootRegion()) { + pri = HIGHEST_PRI; + regionPriMap.put(region, pri); + return pri; + } + String tableName = hr.getTableDesc().getNameAsString(); + + prii = tablePriMap.get(tableName); + + if (prii == null) { + if (hr.getTableDesc().getValue(Bytes.toBytes(PRI_KEY)) != null) { + try { + + pri = Integer.parseInt(Bytes.toString(hr.getTableDesc().getValue( + Bytes.toBytes(PRI_KEY)))); + } catch (Exception e) { + e.printStackTrace(); + } + } + tablePriMap.put(tableName, pri); + } else { + pri = prii; + } + } else { + LOG.error("error this is no this region" + region); + } + regionPriMap.put(region, pri); + + if (this.regionActionPlus.get(region) == null) { + if (hr != null) { + ActionPriPlus plus = this.getTableActionPlus(hr.getTableDesc()); + this.regionActionPlus.put(region, plus); + } + } + return pri; + } + + /** + * Initiate the scanner's priority,invoked by openscanner + * + * @param call + * @param value + * scanner id + */ + public void initScannerPri(Invocation call, Object value) { + Long id = (Long) value; + byte[] region = (byte[]) call.getParameters()[0]; + String regionN = Bytes.toString(region); + Integer prii = regionPriMap.get(regionN); + if (prii == null) { + this.initRegionPri(regionN, false); + } + scannerPriMap.put(id, regionN); + } + + private Thread priorityIniter = new Thread() { + public void run() { + while (running) { + + try { + sleep(initInter); + } catch (InterruptedException e) { + e.printStackTrace(); + } + initPriority(); + } + + } + }; + + @Override + public void start() { + super.start(); + this.priorityIniter.setDaemon(true); + this.priorityIniter.start(); + + } + + /** + * Initiate the table priorities. + */ + private void initPriority() { + try { + List list = MetaScanner.listAllRegions(conf); + for (HRegionInfo region : list) { + int pri = DEFAULT_PRI; + HTableDescriptor des = region.getTableDesc(); + + byte[] prib = des.getValue(Bytes.toBytes(PRI_KEY)); + ActionPriPlus actionPlus = this.getTableActionPlus(des); + if (prib != null) { + try { + pri = Integer.parseInt(Bytes.toString((prib))); + } catch (Exception e) { + LOG.error("table priority error :" + Bytes.toString(prib) + + " table name:" + des.getNameAsString()); + } + } + tablePriMap.put(des.getNameAsString(), pri); + + if (actionPlus != null) { + this.tableActionPlus.put(des.getNameAsString(), actionPlus); + this.regionActionPlus.put(region.getRegionNameAsString(), actionPlus); + } + regionPriMap.put(region.getRegionNameAsString(), pri); + } + } catch (Exception e) { + e.printStackTrace(); + } + for (Long id : scannerPriMap.keySet()) { + if (scannersMap.get(String.valueOf(id)) == null) { + scannerPriMap.remove(id); + } + } + for (Long id : scannerPriMapInteger.keySet()) { + if (scannersMap.get(String.valueOf(id)) == null) { + scannerPriMapInteger.remove(id); + } + } + + // for (String regionName : regionPriMap.keySet()) { + // this.initRegionPri(regionName, true); + // } + } + + /** + * translate thread priority to system priority + * + * @param tpri + * @return + */ + public int priTrans(int tpri) { + switch (tpri) { + case 10: + return 1; + case 9: + return 2; + case 8: + return 3; + case 7: + return 4; + case 6: + return 5; + case 5: + return 6; + case 4: + return 7; + case 3: + return 8; + case 2: + return 9; + case 1: + return 10; + default: + return 5; + } + } + + private int[] priorityArray; + + private void initPriorityArray(int handleSize) { + priorityArray = new int[handleSize]; + int minPriOfTurn = 1; + for (int i = 0, priNow = 10; i < handleSize; i++, priNow--) { + if (priNow < minPriOfTurn) { + priNow = 10; + minPriOfTurn++; + if (minPriOfTurn == 10) { + minPriOfTurn = 1; + } + } + priorityArray[i] = priNow; + } + } + + /** + * start Threads and set priority of handlers + */ + @Override + public synchronized void startThreads() { + super.startThreads(); + + Field f; + try { + f = super.getClass().getSuperclass().getSuperclass() + .getDeclaredField("handlers"); + f.setAccessible(true); + this.handlersChild = (Handler[]) f.get(this); + } catch (SecurityException e) { + e.printStackTrace(); + } catch (NoSuchFieldException e) { + e.printStackTrace(); + } catch (IllegalArgumentException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + initPriorityArray(handlersChild.length); + for (int i = 0; i < this.handlersChild.length; i++) { + + handlersChild[i].setPriority(priorityArray[i]); + + } + } + + /** Stops the service. No new calls will be handled after this is called. */ + @SuppressWarnings("unchecked") + @Override + public synchronized void stop() { + super.stop(); + if (this.priorityIniter != null) { + this.priorityIniter.interrupt(); + } + /** + * added here to stop the priority refresher. + */ + ((PriorityJobQueue) this.callQueue).stop(); + } + + @Override + public Writable call(Writable param, long receivedTime) throws IOException { + Invocation call = (Invocation) param; + HbaseObjectWritable writable = (HbaseObjectWritable) super.call(param, + receivedTime); + if (call.getMethodName().endsWith("openScanner")) { + this.initScannerPri(call, writable.get()); + } + return writable; + } + + /** + * + * class used to store the action priority of table; + * + */ + public static class ActionPriPlus { + int putPlus = 0; + int getPlus = 0; + int deletePlus = 0; + int scanPlus = 0; + + public ActionPriPlus() { + + } + + public ActionPriPlus(int scanPlus, int putPlus, int getPlus, int deletePlus) { + this.scanPlus = scanPlus; + this.putPlus = putPlus; + this.getPlus = getPlus; + this.deletePlus = deletePlus; + } + + /** + * get put priority plus + * + * @return + */ + public int getPutPlus() { + return putPlus; + } + + /** + * set put priority plus + * + * @return + */ + public void setPutPlus(int putPlus) { + this.putPlus = putPlus; + } + + /** + * get get priority plus + * + * @return + */ + public int getGetPlus() { + return getPlus; + } + + /** + * set get priority plus + * + * @return + */ + public void setGetPlus(int getPlus) { + this.getPlus = getPlus; + } + + /** + * get delete priority plus + * + * @return + */ + public int getDeletePlus() { + return deletePlus; + } + + /** + * set delete priority plus + * + * @return + */ + public void setDeletePlus(int deletePlus) { + this.deletePlus = deletePlus; + } + + /** + * get scan priority plus + * + * @return + */ + public int getScanPlus() { + return scanPlus; + } + + /** + * set scan priority plus + * + * @return + */ + public void setScanPlus(int scanPlus) { + this.scanPlus = scanPlus; + } + + public String toString() { + return this.scanPlus + "," + this.putPlus + "," + this.getPlus + "," + + this.deletePlus + ","; + } + + /** + * store this object into byte[] + * + * @return + */ + public byte[] toBytes() { + byte[] ret = new byte[Bytes.SIZEOF_INT * 4]; + Bytes.putInt(ret, 0, this.scanPlus); + Bytes.putInt(ret, Bytes.SIZEOF_INT, this.putPlus); + Bytes.putInt(ret, Bytes.SIZEOF_INT * 2, this.getPlus); + Bytes.putInt(ret, Bytes.SIZEOF_INT * 3, this.deletePlus); + return ret; + } + + /** + * get initiate value from byte[] + * + * @param b + */ + public void fromBytes(byte[] b) { + if (b.length != Bytes.SIZEOF_INT * 4) + return; + else { + this.scanPlus = Bytes.toInt(b, 0); + this.putPlus = Bytes.toInt(b, Bytes.SIZEOF_INT); + this.getPlus = Bytes.toInt(b, Bytes.SIZEOF_INT * 2); + this.deletePlus = Bytes.toInt(b, Bytes.SIZEOF_INT * 3); + } + } + } +} \ No newline at end of file diff -uNr org/src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java changed/src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java --- src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java 1970-01-01 08:00:00.000000000 +0800 +++ src/main/java/org/apache/hadoop/hbase/ipc/PriorityJobQueue.java 2011-08-04 19:52:15.000000000 +0800 @@ -0,0 +1,625 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Calendar; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * + * this queue is used by {@link PriorityHBaseServer} + * + * @param + * the class contained by the queue + * + */ +public class PriorityJobQueue implements BlockingQueue { + private static final Log LOG = LogFactory.getLog(PriorityJobQueue.class); + private final PriorityBlockingQueue> queue = new PriorityBlockingQueue>(); + private int size = 0; + private int capacity = 100; + private int maxWait = 1000; + private boolean running = true; + private PriorityHBaseServer server; + ReentrantLock readLock = new ReentrantLock(); + Condition[] lockList = new Condition[10]; + ReentrantLock addLock = new ReentrantLock(); + Condition queueFull = addLock.newCondition(); + int lowestThreadPir = 10; + boolean verbose = true; + public static int handleFreshInter = 6; + public static int move = Integer.SIZE - handleFreshInter; + public static int flush = 1; + public long[] callPriAddTimes = new long[10]; + public long[] callPriGetTimes = new long[10]; + + private void setSize(int size) { + this.addLock.lock(); + this.size = size; + if (this.size < this.capacity) { + this.queueFull.signalAll(); + } + this.addLock.unlock(); + } + + private void addSize() { + this.addLock.lock(); + this.size++; + this.addLock.unlock(); + } + + private void decreaseSize() { + this.addLock.lock(); + this.size--; + if (this.size < this.capacity) { + this.queueFull.signalAll(); + } + this.addLock.unlock(); + } + + private void testAdd(Job j) { + int wait = 0; + while (this.size >= this.capacity) { + try { + addLock.lock(); + this.queueFull.await(10, TimeUnit.MILLISECONDS); + // this.queueFull.await(); + addLock.unlock(); + wait++; + if (wait > this.maxWait) + break; + } catch (InterruptedException e) { + + e.printStackTrace(); + } + } + addLock.lock(); + this.queue.add(j); + this.addSize(); + addLock.unlock(); + } + + private int getSize() { + return this.size; + } + + private Thread refresher = new Thread() { + public void run() { + while (running) { + refreshIner(); + try { + sleep(1000); + } catch (InterruptedException e) { + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }; + + public void stop() { + this.running = false; + this.refresher.interrupt(); + } + + /** + * Init a queue + * + * @param size + * the capacity of the queue,not a precision value,if queue size + * exceed this value, workers which add jobs should wait + * @param lowestPrid + * the lowest priority which worker thread hold,the default priority + * is range from 1 to 10,reverse from java thread priority + * @param server + * The instance of PriorityHBaseServer + */ + public PriorityJobQueue(int size, int lowestPrid, PriorityHBaseServer server) { + if (server != null) { + handleFreshInter = server.conf.getInt("hbase.schedule.refreshinter", 7); + } else { + handleFreshInter = 7; + } + move = Integer.SIZE - handleFreshInter; + this.capacity = size; + this.refresher.setDaemon(true); + this.refresher.start(); + for (int i = 0; i < 10; i++) { + lockList[i] = readLock.newCondition(); + } + this.lowestThreadPir = lowestPrid; + this.server = server; + } + + /** + * add a job to this queue + * + * @param call + * the job instance + * @param pri + * the job's priority + */ + public void add(T call, int pri) { + this.testAdd(new Job(pri, call)); + testHead(); + this.callPriAddTimes[this.getCondition(pri)]++; + + } + + /** + * get the size of the queue,maintain a integer to indicate the size for + * performance. + * + * @return the size of the queue + */ + public int size() { + return this.queue.size(); + } + + /** + * get the size of the queue + * + * @return the size of the queue + */ + public int queueSize() { + return queue.size(); + } + + private int getCondition(int pri) { + if (pri <= 10 && pri >= 1) { + return pri - 1; + } else if (pri > 10) { + return 9; + } else { + return 0; + } + } + + private void testHead() { + readLock.lock(); + Job jobt = queue.peek(); + if (jobt != null) { + this.lockList[getCondition(jobt.orgPri)].signal(); + + } + readLock.unlock(); + + } + + /** + * if handler's priority lower than job's priority, then this handler can't + * get this job. + * + * @param job + * the job which worker want to get + * @param pri + * the worker thread's priority + * @return should the worker get this job + */ + public boolean shouldWork(Job job, int pri) { + if (job == null) + return false; + return (pri >= job.orgPri) || (job.orgPri < 1 && pri == 1) + || (job.orgPri > this.lowestThreadPir && pri == this.lowestThreadPir); + } + + /** + * get a job from the queue ,will test whether the thread can get this job + * + * @param pri + * the worker thread's priority + * @return the job + * @throws InterruptedException + */ + public T get(int pri) throws InterruptedException { + Job job = null; + Job ret = null; + while (true) { + + readLock.lock(); + job = queue.peek(); + if (shouldWork(job, pri)) { + ret = queue.take(); + readLock.unlock(); + break; + } + this.lockList[getCondition(pri)].await(100, TimeUnit.MILLISECONDS); + readLock.unlock(); + if (job == null) { + this.setSize(0); + } + } + if (ret.orgPri > pri && pri != this.lowestThreadPir) { + System.err.println("error"); + } + this.testHead(); + this.callPriGetTimes[this.getCondition(ret.orgPri)]++; + this.decreaseSize(); + return ret.getCall(); + + } + + public void printMetrix() { + LOG.debug("size is :" + this.size); + this.size = this.queue.size(); + if (verbose) + System.out.println("size is :" + this.size); + LOG.debug("capacity is :" + this.capacity); + if (verbose) + System.out.println("capacity is :" + this.capacity); + String out = "addTimes"; + for (int i = 0; i < this.callPriAddTimes.length; i++) { + out += " " + (i + 1) + ":" + this.callPriAddTimes[i]; + } + LOG.debug("priority request static:" + out); + if (verbose) + System.out.println(out); + out = "getTimes"; + for (int i = 0; i < this.callPriGetTimes.length; i++) { + out += " " + (i + 1) + ":" + this.callPriGetTimes[i]; + } + LOG.debug("priority request static:" + out); + if (verbose) + System.out.println(out); + } + + /** + * refresh the priorities of the jobs in queue,simply -1 + */ + public void refresh() { + this.refresher.interrupt(); + } + + static int outputIndicator = 0; + + private void refreshIner() { + try { + if ((outputIndicator << 60) >>> 60 == 0) { + LOG.debug(Calendar.getInstance().getTime() + ":" + this.queue); + if (verbose) + System.out.println(Calendar.getInstance().getTime() + ":" + + this.queue); + this.printMetrix(); + } + outputIndicator++; + for (Job job : queue) { + if (job != null) { + job.add(); + } + } + testHead(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void refreshIner(int n) { + try { + for (Job job : queue) { + if (job != null) { + job.add(n); + } + } + testHead(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * + * The Job hold by queue + * + * @param + */ + public static class Job implements Comparable> { + int orgPri = 0; + int priority = 0; + long initTime = 0; + T call; + + /** + * increase job's priority + */ + public void add() { + this.priority--; + } + + /** + * increase job's priority by n + * + * @param n + */ + public void add(int n) { + + this.priority = this.priority - n; + + } + + /** + * get the instance hold by the job + * + * @return the call instance + */ + public T getCall() { + return call; + } + + /** + * set the instance hold byt the job + * + * @param call + * the call instance + */ + public void setCall(T call) { + this.call = call; + } + + /** + * Initiate a job + * + * @param pri + * the job priority + * @param call + * the instance hold by the job + */ + public Job(int pri, T call) { + this.orgPri = pri; + this.priority = pri; + this.initTime = System.currentTimeMillis(); + this.call = call; + } + + /** + * print the job + */ + public String toString() { + + return "orgPri:" + this.orgPri + ", lastPri:" + this.priority + + ", wait time:" + ((System.currentTimeMillis() - this.initTime)) + + ",ino:";// + call; + } + + @Override + public int compareTo(Job arg0) { + // TODO Auto-generated method stub + return this.priority - arg0.priority; + } + + } + + /** + * test the queue's function + * + * @param args + */ + public static void main(String args[]) { + final Random r = new Random(); + final PriorityJobQueue queue = new PriorityJobQueue(100, + 10, null); + Thread tt[] = new Thread[50]; + int pri = 1; + for (int i = 0; i < tt.length; i++, pri++) { + tt[i] = new Thread(i + "") { + public void run() { + while (true) { + String j = null; + try { + j = queue.get(Math.abs(this.getPriority() - 10) + 1); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + System.out.println("thread pri: " + + (Math.abs(this.getPriority() - 10) + 1) + " job:" + j + + " ,thread real pri is:" + this.getPriority()); + } + } + }; + if (pri >= 11) + pri = 1; + tt[i].setPriority(pri); + tt[i].start(); + } + + Thread tt2[] = new Thread[10]; + for (int i = 0; i < tt2.length; i++) { + tt2[i] = new Thread(i + "") { + @SuppressWarnings("static-access") + public void run() { + for (int i = 0; i < 10000000; i++) { + System.out.println("add ten jobs..............."); + for (int j = 0; j < 100000; j++) { + int jobpri = (r.nextInt(19) - 4); + queue.add("" + jobpri, jobpri); + + } + System.out.println(queue.size()); + try { + Thread.currentThread().sleep(r.nextInt(10000)); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + }; + tt2[i].start(); + } + + } + + @Override + public Object remove() { + + try { + return this.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public Object poll() { + try { + return this.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public Object element() { + return this.queue.element(); + } + + @Override + public Object peek() { + return this.queue.peek(); + } + + @Override + public boolean isEmpty() { + return this.queue.isEmpty(); + } + + @Override + public Iterator iterator() { + return this.queue.iterator(); + } + + @Override + public Object[] toArray() { + return this.queue.toArray(); + } + + @Override + public Object[] toArray(Object[] a) { + return this.queue.toArray(a); + } + + @Override + public boolean containsAll(Collection c) { + return this.queue.containsAll(c); + } + + @Override + public boolean addAll(Collection c) { + return this.queue.addAll(c); + } + + @Override + public boolean removeAll(Collection c) { + return this.queue.removeAll(c); + } + + @Override + public boolean retainAll(Collection c) { + return this.queue.retainAll(c); + } + + @Override + public void clear() { + this.queue.clear(); + } + + @Override + public boolean add(Object e) { + if (this.size > this.capacity) + return false; + else + try { + this.put(e); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + return true; + } + + @Override + public boolean offer(Object e) { + return this.offer(e); + } + + @Override + public void put(Object e) throws InterruptedException { + HBaseServer.Call call = (HBaseServer.Call) (e); + int pri = this.server.getCallPri(call); + this.add((T) call, pri); + } + + @Override + public boolean offer(Object e, long timeout, TimeUnit unit) + throws InterruptedException { + return false; + } + + @Override + public Object take() throws InterruptedException { + if (((flush << move) >>> move) == 0) { + this.refresher.interrupt(); + } + flush++; + return this.get(this.server.priTrans(Thread.currentThread().getPriority())); + } + + @Override + public Object poll(long timeout, TimeUnit unit) throws InterruptedException { + return this.get(this.server.priTrans(Thread.currentThread().getPriority())); + } + + @Override + public int remainingCapacity() { + return this.remainingCapacity(); + } + + @Override + public boolean remove(Object o) { + return this.remove(o); + } + + @Override + public boolean contains(Object o) { + return this.queue.contains(o); + } + + @Override + public int drainTo(Collection c) { + return this.queue.drainTo(c); + } + + @Override + public int drainTo(Collection c, int maxElements) { + return this.queue.drainTo(c, maxElements); + } + +}