From 851364281dbc98f9fc7c435b847f2749dc0c384c Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Fri, 27 Mar 2015 12:59:09 -0400 Subject: [PATCH] HBASE-13351 Set ADMIN QoS on RS calls to Master. Just as the RegionServer gives priority to RPCs from the Master, the Master can give priority to RPCs from RegionServers. Should help prevent cases where clients can starve out RegionServer RPCs. --- .../org/apache/hadoop/hbase/ipc/QosPriority.java | 28 +++++++ .../org/apache/hadoop/hbase/master/HMaster.java | 12 ++- .../hadoop/hbase/master/MasterRpcServices.java | 6 ++ .../AnnotationReadingPriorityFunction.java | 6 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 12 +-- .../hadoop/hbase/master/TestMasterPriorityRpc.java | 94 ++++++++++++++++++++++ 6 files changed, 144 insertions(+), 14 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QosPriority.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterPriorityRpc.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QosPriority.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QosPriority.java new file mode 100644 index 0000000..d1989a4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/QosPriority.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +import org.apache.hadoop.hbase.HConstants; + +@Retention(RetentionPolicy.RUNTIME) +public @interface QosPriority { + int priority() default HConstants.NORMAL_QOS; +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 3cd0d47..89561b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -379,9 +379,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server { getChoreService().scheduleChore(clusterStatusPublisherChore); } } - activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this); - int infoPort = putUpJettyServer(); - startActiveMasterManager(infoPort); + + // Some unit tests don't need a cluster, so no zookeeper at all + if (!conf.getBoolean("hbase.testing.nocluster", false)) { + activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this); + int infoPort = putUpJettyServer(); + startActiveMasterManager(infoPort); + } else { + activeMasterManager = null; + } } // return the actual infoPort, -1 means disable info server. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 3e64796..b35d6b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; +import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.procedure.MasterProcedureManager; @@ -264,6 +265,7 @@ public class MasterRpcServices extends RSRpcServices } @Override + @QosPriority(priority=HConstants.ADMIN_QOS) public GetLastFlushedSequenceIdResponse getLastFlushedSequenceId(RpcController controller, GetLastFlushedSequenceIdRequest request) throws ServiceException { try { @@ -277,6 +279,7 @@ public class MasterRpcServices extends RSRpcServices } @Override + @QosPriority(priority=HConstants.ADMIN_QOS) public RegionServerReportResponse regionServerReport( RpcController controller, RegionServerReportRequest request) throws ServiceException { try { @@ -297,6 +300,7 @@ public class MasterRpcServices extends RSRpcServices } @Override + @QosPriority(priority=HConstants.ADMIN_QOS) public RegionServerStartupResponse regionServerStartup( RpcController controller, RegionServerStartupRequest request) throws ServiceException { // Register with server manager @@ -321,6 +325,7 @@ public class MasterRpcServices extends RSRpcServices } @Override + @QosPriority(priority=HConstants.ADMIN_QOS) public ReportRSFatalErrorResponse reportRSFatalError( RpcController controller, ReportRSFatalErrorRequest request) throws ServiceException { String errorText = request.getErrorMessage(); @@ -1238,6 +1243,7 @@ public class MasterRpcServices extends RSRpcServices } @Override + @QosPriority(priority=HConstants.ADMIN_QOS) public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController c, ReportRegionStateTransitionRequest req) throws ServiceException { try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index ddeabfa..3a069a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -23,10 +23,11 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; @@ -39,7 +40,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.regionserver.RSRpcServices.QosPriority; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Message; @@ -102,7 +102,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) { Map qosMap = new HashMap(); - for (Method m : RSRpcServices.class.getMethods()) { + for (Method m : rpcServices.getClass().getMethods()) { QosPriority p = m.getAnnotation(QosPriority.class); if (p != null) { // Since we protobuf'd, and then subsequently, when we went with pb style, method names diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 20c1809..7ce3550 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.InterruptedIOException; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -75,6 +73,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; @@ -169,6 +168,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.net.DNS; import org.apache.zookeeper.KeeperException; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import com.google.protobuf.Message; import com.google.protobuf.RpcController; @@ -879,7 +879,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ProtobufUtil.getRegionEncodedName(regionSpecifier)); } - PriorityFunction getPriority() { + @VisibleForTesting + public PriorityFunction getPriority() { return priority; } @@ -932,11 +933,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return bssi; } - @Retention(RetentionPolicy.RUNTIME) - protected @interface QosPriority { - int priority() default HConstants.NORMAL_QOS; - } - public InetSocketAddress getSocketAddress() { return isa; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterPriorityRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterPriorityRpc.java new file mode 100644 index 0000000..a72f96b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterPriorityRpc.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; + +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Sets; + +/** + * Tests to verify correct priority on Master RPC methods. + */ +@Category({MasterTests.class, MediumTests.class}) +public class TestMasterPriorityRpc { + private HMaster master = null; + private PriorityFunction priority = null; + + private final Set ADMIN_METHODS = Sets.newHashSet("GetLastFlushedSequenceId", + "RegionServerReport", "RegionServerStartup", "ReportRSFatalError", + "ReportRegionStateTransition"); + + private final Set NORMAL_METHODS = Sets.newHashSet("CreateTable", "DeleteTable", + "ModifyColumn", "OfflineRegion", "Shutdown"); + + @Before + public void setup() { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK + CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); + master = HMaster.constructMaster(HMaster.class, conf, cp); + priority = master.getMasterRpcServices().getPriority(); + } + + /** + * Asserts that the provided method has the given priority. + * + * @param methodName + * The name of the RPC method. + * @param expectedPriority + * The expected priority. + */ + private void assertPriority(String methodName, int expectedPriority) { + assertEquals(methodName + " had unexpected priority", expectedPriority, + priority.getPriority(RequestHeader.newBuilder().setMethodName(methodName).build(), null)); + } + + @Test + public void testNullMessage() { + assertPriority("doesnotexist", HConstants.NORMAL_QOS); + } + + @Test + public void testAdminPriorityMethods() { + for (String methodName : ADMIN_METHODS) { + assertPriority(methodName, HConstants.ADMIN_QOS); + } + } + + @Test + public void testSomeNormalMethods() { + for (String methodName : NORMAL_METHODS) { + assertPriority(methodName, HConstants.NORMAL_QOS); + } + } +} -- 2.1.2