Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1379233) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -701,6 +701,16 @@ /** Configuration key for the directory to backup HFiles for a table */ public static final String HFILE_ARCHIVE_DIRECTORY = "hbase.table.archive.directory"; + /** + * QOS attributes: these attributes are used to demarcate RPC call processing + * by different set of handlers. For example, HIGH_QOS tagged methods are + * handled by high priority handlers. + */ + public static final int NORMAL_QOS = 0; + public static final int QOS_THRESHOLD = 10; + public static final int HIGH_QOS = 100; + public static final int REPLICATION_QOS = 5; // normal_QOS < replication_QOS < high_QOS + private HConstants() { // Can't be instantiated with this ctor. } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java (revision 1379233) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java (working copy) @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest; @@ -85,7 +86,7 @@ Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(true); qosFunction.setRegionServer(mockRS); - assertTrue (qosFunction.apply(rpcRequest) == HRegionServer.HIGH_QOS); + assertTrue (qosFunction.apply(rpcRequest) == HConstants.HIGH_QOS); } @Test @@ -99,7 +100,7 @@ rpcRequestBuilder.setRequestClassName(GetOnlineRegionRequest.class.getCanonicalName()); RpcRequestBody rpcRequest = rpcRequestBuilder.build(); QosFunction qosFunc = regionServer.getQosFunction(); - assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.NORMAL_QOS); + assertTrue (qosFunc.apply(rpcRequest) == HConstants.NORMAL_QOS); } @Test @@ -112,7 +113,7 @@ ByteString requestBody = scanBuilder.build().toByteString(); rpcRequestBuilder.setRequest(requestBody); RpcRequestBody rpcRequest = rpcRequestBuilder.build(); - assertTrue (qosFunction.apply(rpcRequest) == HRegionServer.NORMAL_QOS); + assertTrue (qosFunction.apply(rpcRequest) == HConstants.NORMAL_QOS); //build a scan request with scannerID scanBuilder = ScanRequest.newBuilder(); @@ -134,11 +135,11 @@ qosFunction.setRegionServer(mockRS); - assertTrue (qosFunction.apply(rpcRequest) == HRegionServer.HIGH_QOS); + assertTrue (qosFunction.apply(rpcRequest) == HConstants.HIGH_QOS); //the same as above but with non-meta region Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false); - assertTrue (qosFunction.apply(rpcRequest) == HRegionServer.NORMAL_QOS); + assertTrue (qosFunction.apply(rpcRequest) == HConstants.NORMAL_QOS); } @org.junit.Rule Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1379233) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -27,13 +27,10 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.BindException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -43,13 +40,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.SortedSet; import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -96,7 +91,6 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.MultiAction; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -117,7 +111,6 @@ import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics; -import org.apache.hadoop.hbase.ipc.Invocation; import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; @@ -209,7 +202,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.metrics.util.MBeanUtil; import org.apache.hadoop.net.DNS; @@ -231,7 +223,6 @@ import com.google.common.base.Function; import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.RpcController; @@ -305,10 +296,6 @@ protected volatile boolean fsOk; protected HFileSystem fs; - protected static final int NORMAL_QOS = 0; - protected static final int QOS_THRESHOLD = 10; // the line between low and high qos - protected static final int HIGH_QOS = 100; - // Set when a report to the master comes back with a message asking us to // shutdown. Also set by call to stop when debugging or running unit tests // of HRegionServer in isolation. @@ -522,7 +509,7 @@ conf.getInt("hbase.regionserver.handler.count", 10), conf.getInt("hbase.regionserver.metahandler.count", 10), conf.getBoolean("hbase.rpc.verbose", false), - conf, QOS_THRESHOLD); + conf, HConstants.QOS_THRESHOLD); // Set our address. this.isa = this.rpcServer.getListenerAddress(); @@ -674,7 +661,7 @@ } if (rpcArgClass == null || from.getRequest().isEmpty()) { - return NORMAL_QOS; + return HConstants.NORMAL_QOS; } Object deserializedRequestObj = null; //check whether the request has reference to Meta region @@ -690,7 +677,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("High priority: " + from.toString()); } - return HIGH_QOS; + return HConstants.HIGH_QOS; } } catch (Exception ex) { throw new RuntimeException(ex); @@ -699,20 +686,20 @@ if (methodName.equals("scan")) { // scanner methods... ScanRequest request = (ScanRequest)deserializedRequestObj; if (!request.hasScannerId()) { - return NORMAL_QOS; + return HConstants.NORMAL_QOS; } RegionScanner scanner = hRegionServer.getScanner(request.getScannerId()); if (scanner != null && scanner.getRegionInfo().isMetaRegion()) { if (LOG.isDebugEnabled()) { LOG.debug("High priority scanner request: " + request.getScannerId()); } - return HIGH_QOS; + return HConstants.HIGH_QOS; } } if (LOG.isDebugEnabled()) { LOG.debug("Low priority: " + from.toString()); } - return NORMAL_QOS; + return HConstants.NORMAL_QOS; } } @@ -2182,7 +2169,7 @@ } @Override - @QosPriority(priority=HIGH_QOS) + @QosPriority(priority=HConstants.HIGH_QOS) public ProtocolSignature getProtocolSignature( String protocol, long version, int clientMethodsHashCode) throws IOException { @@ -2195,7 +2182,7 @@ } @Override - @QosPriority(priority=HIGH_QOS) + @QosPriority(priority=HConstants.HIGH_QOS) public long getProtocolVersion(final String protocol, final long clientVersion) throws IOException { if (protocol.equals(ClientProtocol.class.getName())) { @@ -3187,7 +3174,7 @@ * @throws ServiceException */ @Override - @QosPriority(priority=HIGH_QOS) + @QosPriority(priority=HConstants.HIGH_QOS) public UnlockRowResponse unlockRow(final RpcController controller, final UnlockRowRequest request) throws ServiceException { try { @@ -3393,7 +3380,7 @@ // Start Admin methods @Override - @QosPriority(priority=HIGH_QOS) + @QosPriority(priority=HConstants.HIGH_QOS) public GetRegionInfoResponse getRegionInfo(final RpcController controller, final GetRegionInfoRequest request) throws ServiceException { try { @@ -3440,7 +3427,7 @@ } @Override - @QosPriority(priority=HIGH_QOS) + @QosPriority(priority=HConstants.HIGH_QOS) public GetOnlineRegionResponse getOnlineRegion(final RpcController controller, final GetOnlineRegionRequest request) throws ServiceException { try { @@ -3468,7 +3455,7 @@ * @throws ServiceException */ @Override - @QosPriority(priority=HIGH_QOS) + @QosPriority(priority=HConstants.HIGH_QOS) public OpenRegionResponse openRegion(final RpcController controller, final OpenRegionRequest request) throws ServiceException { int versionOfOfflineNode = -1; @@ -3555,7 +3542,7 @@ * @throws ServiceException */ @Override - @QosPriority(priority=HIGH_QOS) + @QosPriority(priority=HConstants.HIGH_QOS) public CloseRegionResponse closeRegion(final RpcController controller, final CloseRegionRequest request) throws ServiceException { int versionOfClosingNode = -1; @@ -3594,7 +3581,7 @@ * @throws ServiceException */ @Override - @QosPriority(priority=HIGH_QOS) + @QosPriority(priority=HConstants.HIGH_QOS) public FlushRegionResponse flushRegion(final RpcController controller, final FlushRegionRequest request) throws ServiceException { try { @@ -3625,7 +3612,7 @@ * @throws ServiceException */ @Override - @QosPriority(priority=HIGH_QOS) + @QosPriority(priority=HConstants.HIGH_QOS) public SplitRegionResponse splitRegion(final RpcController controller, final SplitRegionRequest request) throws ServiceException { try { @@ -3654,7 +3641,7 @@ * @throws ServiceException */ @Override - @QosPriority(priority=HIGH_QOS) + @QosPriority(priority=HConstants.HIGH_QOS) public CompactRegionResponse compactRegion(final RpcController controller, final CompactRegionRequest request) throws ServiceException { try { @@ -3688,7 +3675,7 @@ * @throws ServiceException */ @Override - @QosPriority(priority=HIGH_QOS) + @QosPriority(priority=HConstants.REPLICATION_QOS) public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { try { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1379233) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -275,6 +275,11 @@ protected int numConnections = 0; private Handler[] handlers = null; private Handler[] priorityHandlers = null; + /** replication related queue; */ + private BlockingQueue replicationQueue; + private int numOfReplicationHandlers = 0; + private Handler[] replicationHandlers = null; + protected HBaseRPCErrorHandler errorHandler = null; /** @@ -1650,6 +1655,10 @@ if (priorityCallQueue != null && getQosLevel(rpcRequestBody) > highPriorityLevel) { priorityCallQueue.put(call); updateCallQueueLenMetrics(priorityCallQueue); + } else if (replicationQueue != null + && getQosLevel(rpcRequestBody) == HConstants.REPLICATION_QOS) { + replicationQueue.put(call); + updateCallQueueLenMetrics(replicationQueue); } else { callQueue.put(call); // queue the call; maybe blocked here updateCallQueueLenMetrics(callQueue); @@ -1732,6 +1741,8 @@ rpcMetrics.callQueueLen.set(callQueue.size()); } else if (queue == priorityCallQueue) { rpcMetrics.priorityCallQueueLen.set(priorityCallQueue.size()); + } else if (queue == replicationQueue) { + rpcMetrics.replicationCallQueueLen.set(replicationQueue.size()); } else { LOG.warn("Unknown call queue"); } @@ -1751,6 +1762,8 @@ if (cq == priorityCallQueue) { // this is just an amazing hack, but it works. threadName = "PRI " + threadName; + } else if (cq == replicationQueue) { + threadName = "REPL " + threadName; } this.setName(threadName); this.status = TaskMonitor.get().createRPCStatus(threadName); @@ -1917,7 +1930,10 @@ this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); this.purgeTimeout = conf.getLong("ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - + this.numOfReplicationHandlers = conf.getInt("hbase.regionserver.replication.handler.count", 3); + if (numOfReplicationHandlers > 0) { + this.replicationQueue = new LinkedBlockingQueue(maxQueueSize); + } // Start the listener here and let it bind to the port listener = new Listener(); this.port = listener.getAddress().getPort(); @@ -2011,22 +2027,23 @@ public synchronized void startThreads() { responder.start(); listener.start(); - handlers = new Handler[handlerCount]; + handlers = startHandlers(callQueue, handlerCount); + priorityHandlers = startHandlers(priorityCallQueue, priorityHandlerCount); + replicationHandlers = startHandlers(replicationQueue, numOfReplicationHandlers); + } - for (int i = 0; i < handlerCount; i++) { - handlers[i] = new Handler(callQueue, i); + private Handler[] startHandlers(BlockingQueue queue, int numOfHandlers) { + if (numOfHandlers <= 0) { + return null; + } + Handler[] handlers = new Handler[numOfHandlers]; + for (int i = 0; i < numOfHandlers; i++) { + handlers[i] = new Handler(queue, i); handlers[i].start(); } - - if (priorityHandlerCount > 0) { - priorityHandlers = new Handler[priorityHandlerCount]; - for (int i = 0 ; i < priorityHandlerCount; i++) { - priorityHandlers[i] = new Handler(priorityCallQueue, i); - priorityHandlers[i].start(); - } - } + return handlers; } - + public SecretManager getSecretManager() { return this.secretManager; } @@ -2040,20 +2057,9 @@ public synchronized void stop() { LOG.info("Stopping server on " + port); running = false; - if (handlers != null) { - for (Handler handler : handlers) { - if (handler != null) { - handler.interrupt(); - } - } - } - if (priorityHandlers != null) { - for (Handler handler : priorityHandlers) { - if (handler != null) { - handler.interrupt(); - } - } - } + stopHandlers(handlers); + stopHandlers(priorityHandlers); + stopHandlers(replicationHandlers); listener.interrupt(); listener.doStop(); responder.interrupt(); @@ -2063,6 +2069,16 @@ } } + private void stopHandlers(Handler[] handlers) { + if (handlers != null) { + for (Handler handler : handlers) { + if (handler != null) { + handler.interrupt(); + } + } + } + } + /** Wait for the server to be stopped. * Does not wait for all subthreads to finish. * See {@link #stop()}. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (revision 1379233) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (working copy) @@ -109,6 +109,8 @@ new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry); public MetricsTimeVaryingRate rpcSlowResponseTime = new MetricsTimeVaryingRate("RpcSlowResponse", registry); + public final MetricsIntValue replicationCallQueueLen = + new MetricsIntValue("replicationCallQueueLen", registry); private void initMethods(Class protocol) { for (Method m : protocol.getDeclaredMethods()) {