commit e9edd24dff0fe941cba1b43304c34f0161805e3e Author: Ryan Rawson Date: Tue Sep 14 13:29:52 2010 -0700 HBASE-2782 QoS for META table access diff --git a/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index dc0e203..c6ea838 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ b/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -30,6 +30,7 @@ import java.io.DataInput; import java.util.List; import java.util.Map; import java.util.ArrayList; +import java.util.Set; import java.util.TreeMap; /** @@ -47,7 +48,7 @@ public final class MultiAction implements Writable { /** * Get the total number of Actions - * + * * @return total number of Actions for all groups in this container. */ public int size() { @@ -62,7 +63,7 @@ public final class MultiAction implements Writable { * Add an Action to this container based on it's regionName. If the regionName * is wrong, the initial execution will fail, but will be automatically * retried after looking up the correct region. - * + * * @param regionName * @param a */ @@ -75,6 +76,10 @@ public final class MultiAction implements Writable { rsActions.add(a); } + public Set getRegions() { + return actions.keySet(); + } + /** * @return All actions from all regions in this container */ diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java index d55fe71..e23a629 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.ipc; +import com.google.common.base.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.net.NetUtils; @@ -82,8 +84,9 @@ public class HBaseRPC { super(); } // no public ctor + /** A method invocation, including the method name and its parameters.*/ - private static class Invocation implements Writable, Configurable { + public static class Invocation implements Writable, Configurable { private String methodName; @SuppressWarnings("unchecked") private Class[] parameterClasses; @@ -497,9 +500,9 @@ public class HBaseRPC { final Class[] ifaces, final String bindAddress, final int port, final int numHandlers, - final boolean verbose, Configuration conf) + int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel) throws IOException { - return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, verbose); + return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, metaHandlerCount, verbose, highPriorityLevel); } /** An RPC Server. */ @@ -527,9 +530,9 @@ public class HBaseRPC { * @throws IOException e */ public Server(Object instance, final Class[] ifaces, - Configuration conf, String bindAddress, int port, - int numHandlers, boolean verbose) throws IOException { - super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName())); + Configuration conf, String bindAddress, int port, + int numHandlers, int metaHandlerCount, boolean verbose, int highPriorityLevel) throws IOException { + super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, conf, classNameBase(instance.getClass().getName()), highPriorityLevel); this.instance = instance; this.implementation = instance.getClass(); diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index c4e6da2..1c74acb 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.ipc; +import com.google.common.base.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -85,7 +87,7 @@ public abstract class HBaseServer { /** * How many calls/handler are allowed in the queue. */ - private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100; + private static final int MAX_QUEUE_SIZE_PER_HANDLER = 1000; public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer"); @@ -131,6 +133,7 @@ public abstract class HBaseServer { protected String bindAddress; protected int port; // port we listen on private int handlerCount; // number of handler threads + private int priorityHandlerCount; private int readThreads; // number of read threads protected Class paramClass; // class of call parameters protected int maxIdleTime; // the maximum idle time after @@ -156,6 +159,9 @@ public abstract class HBaseServer { volatile protected boolean running = true; // true while server runs protected BlockingQueue callQueue; // queued calls + protected BlockingQueue priorityCallQueue; + + private int highPriorityLevel; // what level a high priority call is at protected final List connectionList = Collections.synchronizedList(new LinkedList()); @@ -165,6 +171,7 @@ public abstract class HBaseServer { protected Responder responder = null; protected int numConnections = 0; private Handler[] handlers = null; + private Handler[] priorityHandlers = null; protected HBaseRPCErrorHandler errorHandler = null; /** @@ -959,7 +966,12 @@ public abstract class HBaseServer { param.readFields(dis); Call call = new Call(id, param, this); - callQueue.put(call); // queue the call; maybe blocked here + + if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) { + priorityCallQueue.put(call); + } else { + callQueue.put(call); // queue the call; maybe blocked here + } } protected synchronized void close() { @@ -977,9 +989,17 @@ public abstract class HBaseServer { /** Handles queued calls . */ private class Handler extends Thread { - public Handler(int instanceNumber) { + private final BlockingQueue myCallQueue; + public Handler(final BlockingQueue cq, int instanceNumber) { + this.myCallQueue = cq; this.setDaemon(true); - this.setName("IPC Server handler "+ instanceNumber + " on " + port); + + String threadName = "IPC Server handler " + instanceNumber + " on " + port; + if (cq == priorityCallQueue) { + // this is just an amazing hack, but it works. + threadName = "PRI " + threadName; + } + this.setName(threadName); } @Override @@ -990,7 +1010,7 @@ public abstract class HBaseServer { ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize); while (running) { try { - Call call = callQueue.take(); // pop the queue; maybe blocked here + Call call = myCallQueue.take(); // pop the queue; maybe blocked here if (LOG.isDebugEnabled()) LOG.debug(getName() + ": has #" + call.id + " from " + @@ -1058,33 +1078,58 @@ public abstract class HBaseServer { } - protected HBaseServer(String bindAddress, int port, - Class paramClass, int handlerCount, - Configuration conf) - throws IOException - { - this(bindAddress, port, paramClass, handlerCount, conf, Integer.toString(port)); + /** + * Gets the QOS level for this call. If it is higher than the highPriorityLevel and there + * are priorityHandlers available it will be processed in it's own thread set. + * + * @param param + * @return priority, higher is better + */ + private Function qosFunction = null; + public void setQosFunction(Function newFunc) { + qosFunction = newFunc; } + + protected int getQosLevel(Writable param) { + if (qosFunction == null) { + return 0; + } + + Integer res = qosFunction.apply(param); + if (res == null) { + return 0; + } + return res; + } + /* Constructs a server listening on the named port and address. Parameters passed must * be of the named class. The handlerCount determines * the number of handler threads that will be used to process calls. * */ protected HBaseServer(String bindAddress, int port, - Class paramClass, int handlerCount, - Configuration conf, String serverName) + Class paramClass, int handlerCount, + int priorityHandlerCount, Configuration conf, String serverName, + int highPriorityLevel) throws IOException { this.bindAddress = bindAddress; this.conf = conf; this.port = port; this.paramClass = paramClass; this.handlerCount = handlerCount; + this.priorityHandlerCount = priorityHandlerCount; this.socketSendBufferSize = 0; this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER; this.readThreads = conf.getInt( "ipc.server.read.threadpool.size", 1); this.callQueue = new LinkedBlockingQueue(maxQueueSize); + if (priorityHandlerCount > 0) { + this.priorityCallQueue = new LinkedBlockingQueue(maxQueueSize); // TODO hack on size + } else { + this.priorityCallQueue = null; + } + this.highPriorityLevel = highPriorityLevel; this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); @@ -1121,9 +1166,17 @@ public abstract class HBaseServer { handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { - handlers[i] = new Handler(i); + handlers[i] = new Handler(callQueue, i); handlers[i].start(); } + + if (priorityHandlerCount > 0) { + priorityHandlers = new Handler[priorityHandlerCount]; + for (int i = 0 ; i < priorityHandlerCount; i++) { + priorityHandlers[i] = new Handler(priorityCallQueue, i); + priorityHandlers[i].start(); + } + } } /** Stops the service. No new calls will be handled after this is called. */ @@ -1131,9 +1184,16 @@ public abstract class HBaseServer { LOG.info("Stopping server on " + port); running = false; if (handlers != null) { - for (int i = 0; i < handlerCount; i++) { - if (handlers[i] != null) { - handlers[i].interrupt(); + for (Handler handler : handlers) { + if (handler != null) { + handler.interrupt(); + } + } + } + if (priorityHandlers != null) { + for (Handler handler : priorityHandlers) { + if (handler != null) { + handler.interrupt(); } } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4c8bd4c..fdef130 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -158,7 +158,7 @@ public class HRegion implements HeapSize { // , Writable{ * This directory contains the directory for this region. */ final Path tableDir; - + final HLog log; final FileSystem fs; final Configuration conf; @@ -631,7 +631,7 @@ public class HRegion implements HeapSize { // , Writable{ private void cleanupTmpDir() throws IOException { FSUtils.deleteDirectory(this.fs, getTmpDir()); } - + /** * Get the temporary diretory for this region. This directory * will have its contents removed when the region is reopened. @@ -798,7 +798,7 @@ public class HRegion implements HeapSize { // , Writable{ /** * Flush the memstore. - * + * * Flushing the memstore is a little tricky. We have a lot of updates in the * memstore, all of which have also been written to the log. We need to * write those updates in the memstore out to disk, while being able to @@ -1279,12 +1279,12 @@ public class HRegion implements HeapSize { // , Writable{ retCodes = new OperationStatusCode[operations.length]; Arrays.fill(retCodes, OperationStatusCode.NOT_RUN); } - + public boolean isDone() { return nextIndexToProcess == operations.length; } } - + /** * Perform a batch put with no pre-specified locks * @see HRegion#put(Pair[]) @@ -1298,7 +1298,7 @@ public class HRegion implements HeapSize { // , Writable{ } return put(putsAndLocks); } - + /** * Perform a batch of puts. * @param putsAndLocks the list of puts paired with their requested lock IDs. @@ -1307,7 +1307,7 @@ public class HRegion implements HeapSize { // , Writable{ public OperationStatusCode[] put(Pair[] putsAndLocks) throws IOException { BatchOperationInProgress> batchOp = new BatchOperationInProgress>(putsAndLocks); - + while (!batchOp.isDone()) { checkReadOnly(); checkResources(); @@ -1384,7 +1384,7 @@ public class HRegion implements HeapSize { // , Writable{ batchOp.operations[i].getFirst().getFamilyMap().values(), byteNow); } - + // ------------------------------------ // STEP 3. Write to WAL // ---------------------------------- @@ -1392,12 +1392,12 @@ public class HRegion implements HeapSize { // , Writable{ for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue; - + Put p = batchOp.operations[i].getFirst(); if (!p.getWriteToWAL()) continue; addFamilyMapToWALEdit(p.getFamilyMap(), walEdit); } - + // Append the edit to WAL this.log.append(regionInfo, regionInfo.getTableDesc().getName(), walEdit, now); @@ -1635,7 +1635,7 @@ public class HRegion implements HeapSize { // , Writable{ for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); List edits = e.getValue(); - + Store store = getStore(family); for (KeyValue kv: edits) { kv.setMemstoreTS(w.getWriteNumber()); @@ -1706,7 +1706,7 @@ public class HRegion implements HeapSize { // , Writable{ *

We can ignore any log message that has a sequence ID that's equal to or * lower than minSeqId. (Because we know such log messages are already * reflected in the HFiles.) - * + * *

While this is running we are putting pressure on memory yet we are * outside of our usual accounting because we are not yet an onlined region * (this stuff is being run as part of Region initialization). This means @@ -1715,7 +1715,7 @@ public class HRegion implements HeapSize { // , Writable{ * we're not yet online so our relative sequenceids are not yet aligned with * HLog sequenceids -- not till we come up online, post processing of split * edits. - * + * *

But to help relieve memory pressure, at least manage our own heap size * flushing if are in excess of per-region limits. Flushing, though, we have * to be careful and avoid using the regionserver/hlog sequenceid. Its running @@ -1725,7 +1725,7 @@ public class HRegion implements HeapSize { // , Writable{ * in this region and with its split editlogs, then we could miss edits the * next time we go to recover. So, we have to flush inline, using seqids that * make sense in a this single region context only -- until we online. - * + * * @param regiondir * @param minSeqId Any edit found in split editlogs needs to be in excess of * this minSeqId to be applied, else its skipped. @@ -1970,7 +1970,7 @@ public class HRegion implements HeapSize { // , Writable{ closeRegionOperation(); } } - + /** * Obtains or tries to obtain the given row lock. * @param waitForLock if true, will block until the lock is available. @@ -2018,7 +2018,7 @@ public class HRegion implements HeapSize { // , Writable{ closeRegionOperation(); } } - + /** * Used by unit tests. * @param lockid @@ -2135,6 +2135,9 @@ public class HRegion implements HeapSize { // , Writable{ private boolean filterClosed = false; private long readPt; + public HRegionInfo getRegionName() { + return regionInfo; + } RegionScanner(Scan scan, List additionalScanners) throws IOException { //DebugPrint.println("HRegionScanner."); this.filter = scan.getFilter(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b8dad89..7077d2c 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.base.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -306,6 +307,85 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, initialize(); } + private static final int NORMAL_QOS = 0; + private static final int QOS_THRESHOLD = 10; // the line between low and high qos + private static final int HIGH_QOS = 100; + + class QosFunction implements Function { + public boolean isMetaRegion(byte[] regionName) { + HRegion region; + try { + region = getRegion(regionName); + } catch (NotServingRegionException ignored) { + return false; + } + return region.getRegionInfo().isMetaRegion(); + } + + @Override + public Integer apply(Writable from) { + if (from instanceof HBaseRPC.Invocation) { + HBaseRPC.Invocation inv = (HBaseRPC.Invocation) from; + + String methodName = inv.getMethodName(); + + // scanner methods... + if (methodName.equals("next") || methodName.equals("close")) { + // translate! + Long scannerId; + try { + scannerId = (Long) inv.getParameters()[0]; + } catch (ClassCastException ignored) { + //LOG.debug("Low priority: " + from); + return NORMAL_QOS; // doh. + } + String scannerIdString = Long.toString(scannerId); + InternalScanner scanner = scanners.get(scannerIdString); + if (scanner instanceof HRegion.RegionScanner) { + HRegion.RegionScanner rs = (HRegion.RegionScanner) scanner; + HRegionInfo regionName = rs.getRegionName(); + if (regionName.isMetaRegion()) { + //LOG.debug("High priority scanner request: " + scannerId); + return HIGH_QOS; + } + } + } + else if (methodName.equals("getHServerInfo") || + methodName.equals("getRegionsAssignment") || + methodName.equals("unlockRow") || + methodName.equals("getProtocolVersion") || + methodName.equals("getClosestRowBefore")) { + //LOG.debug("High priority method: " + methodName); + return HIGH_QOS; + } + else if (inv.getParameterClasses()[0] == byte[].class) { + // first arg is byte array, so assume this is a regionname: + if (isMetaRegion((byte[]) inv.getParameters()[0])) { + //LOG.debug("High priority with method: " + methodName + " and region: " + // + Bytes.toString((byte[]) inv.getParameters()[0])); + return HIGH_QOS; + } + } + else if (inv.getParameterClasses()[0] == MultiAction.class) { + MultiAction ma = (MultiAction) inv.getParameters()[0]; + Set regions = ma.getRegions(); + // ok this sucks, but if any single of the actions touches a meta, the whole + // thing gets pingged high priority. This is a dangerous hack because people + // can get their multi action tagged high QOS by tossing a Get(.META.) AND this + // regionserver hosts META/-ROOT- + for (byte[] region: regions) { + if (isMetaRegion(region)) { + //LOG.debug("High priority multi with region: " + Bytes.toString(region)); + return HIGH_QOS; // short circuit for the win. + } + } + } + } + //LOG.debug("Low priority: " + from.toString()); + return NORMAL_QOS; + } + } + /** * Creates all of the state that needs to be reconstructed in case we are * doing a restart. This is shared between the constructor and restart(). Both @@ -328,8 +408,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, OnlineRegions.class}, address.getBindAddress(), address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), - false, conf); + conf.getInt("hbase.regionserver.metahandler.count", 10), + false, conf, QOS_THRESHOLD); this.server.setErrorHandler(this); + this.server.setQosFunction(new QosFunction()); + // Address is giving a default IP for the moment. Will be changed after // calling the master. this.serverInfo = new HServerInfo(new HServerAddress(new InetSocketAddress(