From b5129352d3bc6cee47d94893a36ede7aba98d9e8 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Fri, 27 Mar 2015 12:59:09 -0400 Subject: [PATCH] HBASE-13351 Set ADMIN QoS on RS calls to Master. Just as the RegionServer gives priority to RPCs from the Master, the Master can give priority to RPCs from RegionServers. Should help prevent cases where clients can starve out RegionServer RPCs. Increase some tests' threadpool size where starvation appears to have occurred. --- .../java/org/apache/hadoop/hbase/HConstants.java | 7 +- .../org/apache/hadoop/hbase/ipc/QosPriority.java | 32 ++++++++ .../org/apache/hadoop/hbase/master/HMaster.java | 12 ++- .../hadoop/hbase/master/MasterRpcServices.java | 6 ++ .../AnnotationReadingPriorityFunction.java | 29 ++++++- .../hadoop/hbase/regionserver/RSRpcServices.java | 12 +-- .../org/apache/hadoop/hbase/client/TestAdmin2.java | 1 + .../hadoop/hbase/master/TestMasterPriorityRpc.java | 94 ++++++++++++++++++++++ .../hadoop/hbase/regionserver/TestQosFunction.java | 2 +- .../apache/hadoop/hbase/util/TestHBaseFsck.java | 2 +- 10 files changed, 177 insertions(+), 20 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QosPriority.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterPriorityRpc.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 19e251a..2a446e0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -970,12 +970,13 @@ public final class HConstants { * by different set of handlers. For example, HIGH_QOS tagged methods are * handled by high priority handlers. */ + // normal_QOS < qos_threshold < replicationQOS < replay_QOS < high_QOS public static final int NORMAL_QOS = 0; public static final int QOS_THRESHOLD = 10; public static final int HIGH_QOS = 200; - public static final int REPLICATION_QOS = 5; // normal_QOS < replication_QOS < high_QOS - public static final int REPLAY_QOS = 6; // REPLICATION_QOS < REPLAY_QOS < high_QOS - public static final int ADMIN_QOS = 100; // QOS_THRESHOLD < ADMIN_QOS < high_QOS + public static final int REPLICATION_QOS = 5; + public static final int REPLAY_QOS = 6; + public static final int ADMIN_QOS = 100; public static final int SYSTEMTABLE_QOS = HIGH_QOS; /** Directory under /hbase where archived hfiles are stored */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QosPriority.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QosPriority.java new file mode 100644 index 0000000..2762d05 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QosPriority.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +import org.apache.hadoop.hbase.HConstants; + +/** + * Annotation which decorates RPC methods to denote the relative priority among other RPCs in the + * same server. Provides a basic notion of quality of service (QOS). + */ +@Retention(RetentionPolicy.RUNTIME) +public @interface QosPriority { + int priority() default HConstants.NORMAL_QOS; +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 581e3c9..cbc5e2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -378,9 +378,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server { getChoreService().scheduleChore(clusterStatusPublisherChore); } } - activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this); - int infoPort = putUpJettyServer(); - startActiveMasterManager(infoPort); + + // Some unit tests don't need a cluster, so no zookeeper at all + if (!conf.getBoolean("hbase.testing.nocluster", false)) { + activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this); + int infoPort = putUpJettyServer(); + startActiveMasterManager(infoPort); + } else { + activeMasterManager = null; + } } // return the actual infoPort, -1 means disable info server. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 07b2da2..4c35dc3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; +import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.procedure.MasterProcedureManager; @@ -264,6 +265,7 @@ public class MasterRpcServices extends RSRpcServices } @Override + @QosPriority(priority=HConstants.ADMIN_QOS) public GetLastFlushedSequenceIdResponse getLastFlushedSequenceId(RpcController controller, GetLastFlushedSequenceIdRequest request) throws ServiceException { try { @@ -277,6 +279,7 @@ public class MasterRpcServices extends RSRpcServices } @Override + @QosPriority(priority=HConstants.ADMIN_QOS) public RegionServerReportResponse regionServerReport( RpcController controller, RegionServerReportRequest request) throws ServiceException { try { @@ -297,6 +300,7 @@ public class MasterRpcServices extends RSRpcServices } @Override + @QosPriority(priority=HConstants.ADMIN_QOS) public RegionServerStartupResponse regionServerStartup( RpcController controller, RegionServerStartupRequest request) throws ServiceException { // Register with server manager @@ -322,6 +326,7 @@ public class MasterRpcServices extends RSRpcServices } @Override + @QosPriority(priority=HConstants.ADMIN_QOS) public ReportRSFatalErrorResponse reportRSFatalError( RpcController controller, ReportRSFatalErrorRequest request) throws ServiceException { String errorText = request.getErrorMessage(); @@ -1239,6 +1244,7 @@ public class MasterRpcServices extends RSRpcServices } @Override + @QosPriority(priority=HConstants.ADMIN_QOS) public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController c, ReportRegionStateTransitionRequest req) throws ServiceException { try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index 29228db..5ee20ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -23,10 +23,11 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; @@ -39,7 +40,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.regionserver.RSRpcServices.QosPriority; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Message; @@ -100,9 +100,30 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { private final float scanVirtualTimeWeight; + /** + * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of + * {@code rpcServices#getClass()} + * + * @param rpcServices + * The RPC server implementation + */ AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) { - Map qosMap = new HashMap(); - for (Method m : RSRpcServices.class.getMethods()) { + this(rpcServices, rpcServices.getClass()); + } + + /** + * Constructs the priority function given the RPC server implementation and the annotations on the + * methods in the provided {@code clz}. + * + * @param rpcServices + * The RPC server implementation + * @param clz + * The concrete RPC server implementation's class + */ + AnnotationReadingPriorityFunction(final RSRpcServices rpcServices, + Class clz) { + Map qosMap = new HashMap(); + for (Method m : clz.getMethods()) { QosPriority p = m.getAnnotation(QosPriority.class); if (p != null) { // Since we protobuf'd, and then subsequently, when we went with pb style, method names diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 10e39a1..b3adfab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.InterruptedIOException; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -75,6 +73,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; @@ -171,6 +170,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.net.DNS; import org.apache.zookeeper.KeeperException; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import com.google.protobuf.Message; import com.google.protobuf.RpcController; @@ -890,7 +890,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ProtobufUtil.getRegionEncodedName(regionSpecifier)); } - PriorityFunction getPriority() { + @VisibleForTesting + public PriorityFunction getPriority() { return priority; } @@ -943,11 +944,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return bssi; } - @Retention(RetentionPolicy.RUNTIME) - protected @interface QosPriority { - int priority() default HConstants.NORMAL_QOS; - } - public InetSocketAddress getSocketAddress() { return isa; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index a1da440..c2e4e11 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -85,6 +85,7 @@ public class TestAdmin2 { TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.metahandler.count", 30); TEST_UTIL.getConfiguration().setBoolean( "hbase.master.enabletable.roundrobin", true); TEST_UTIL.startMiniCluster(3); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterPriorityRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterPriorityRpc.java new file mode 100644 index 0000000..a72f96b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterPriorityRpc.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; + +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Sets; + +/** + * Tests to verify correct priority on Master RPC methods. + */ +@Category({MasterTests.class, MediumTests.class}) +public class TestMasterPriorityRpc { + private HMaster master = null; + private PriorityFunction priority = null; + + private final Set ADMIN_METHODS = Sets.newHashSet("GetLastFlushedSequenceId", + "RegionServerReport", "RegionServerStartup", "ReportRSFatalError", + "ReportRegionStateTransition"); + + private final Set NORMAL_METHODS = Sets.newHashSet("CreateTable", "DeleteTable", + "ModifyColumn", "OfflineRegion", "Shutdown"); + + @Before + public void setup() { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK + CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); + master = HMaster.constructMaster(HMaster.class, conf, cp); + priority = master.getMasterRpcServices().getPriority(); + } + + /** + * Asserts that the provided method has the given priority. + * + * @param methodName + * The name of the RPC method. + * @param expectedPriority + * The expected priority. + */ + private void assertPriority(String methodName, int expectedPriority) { + assertEquals(methodName + " had unexpected priority", expectedPriority, + priority.getPriority(RequestHeader.newBuilder().setMethodName(methodName).build(), null)); + } + + @Test + public void testNullMessage() { + assertPriority("doesnotexist", HConstants.NORMAL_QOS); + } + + @Test + public void testAdminPriorityMethods() { + for (String methodName : ADMIN_METHODS) { + assertPriority(methodName, HConstants.ADMIN_QOS); + } + } + + @Test + public void testSomeNormalMethods() { + for (String methodName : NORMAL_METHODS) { + assertPriority(methodName, HConstants.NORMAL_QOS); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java index 2b2ecda..e6f5552 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java @@ -45,7 +45,7 @@ public class TestQosFunction { when(rpcServices.getConfiguration()).thenReturn(conf); AnnotationReadingPriorityFunction qosFunction = - new AnnotationReadingPriorityFunction(rpcServices); + new AnnotationReadingPriorityFunction(rpcServices, RSRpcServices.class); // Set method name in pb style with the method name capitalized. checkMethod("ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java index 48d0bfc..cdb5c60 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java @@ -160,7 +160,7 @@ public class TestHBaseFsck { MasterSyncObserver.class.getName()); conf.setInt("hbase.regionserver.handler.count", 2); - conf.setInt("hbase.regionserver.metahandler.count", 2); + conf.setInt("hbase.regionserver.metahandler.count", 30); conf.setInt("hbase.htable.threads.max", POOL_SIZE); conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE); -- 2.1.2