From 2ab0d27db473187bdd56e0e2b9926134af618685 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Thu, 27 Aug 2015 00:08:15 -0700 Subject: [PATCH] HBASE-14322 Add a master priority function to let master use it's threads --- .../MasterAnnotationReadingPriorityFunction.java | 71 ++++++++++++++++++++++ .../hadoop/hbase/master/MasterRpcServices.java | 20 +++--- .../AnnotationReadingPriorityFunction.java | 57 +++++++++-------- .../hadoop/hbase/regionserver/RSRpcServices.java | 6 +- 4 files changed, 120 insertions(+), 34 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterAnnotationReadingPriorityFunction.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterAnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterAnnotationReadingPriorityFunction.java new file mode 100644 index 0000000..c4d32aa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterAnnotationReadingPriorityFunction.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import com.google.protobuf.Message; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos; +import org.apache.hadoop.hbase.regionserver.AnnotationReadingPriorityFunction; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.security.User; + +/** + * Priority function specifically for the master. + * + * This doesn't make the super users always priority since that would make everything + * to the master into high priority. + */ +public class MasterAnnotationReadingPriorityFunction extends AnnotationReadingPriorityFunction { + MasterAnnotationReadingPriorityFunction(RSRpcServices rpcServices) { + super(rpcServices); + } + + public int getPriority(RPCProtos.RequestHeader header, Message param, User user) { + // Yes this is copy pasted from the base class but it keeps from having to look in the + // annotatedQos table twice something that could get costly since this is called for + // every single RPC request. + int priorityByAnnotation = getAnnotatedPriority(header); + if (priorityByAnnotation >= 0) { + return priorityByAnnotation; + } + + // If meta is moving then all the rest of report the report state transitions will be + // blocked. We shouldn't be in the same queue. + if (param instanceof RegionServerStatusProtos.ReportRegionStateTransitionRequest) { // Regions are moving + RegionServerStatusProtos.ReportRegionStateTransitionRequest + tRequest = (RegionServerStatusProtos.ReportRegionStateTransitionRequest) param; + for (RegionServerStatusProtos.RegionStateTransition transition : tRequest.getTransitionList()) { + if (transition.getRegionInfoList() != null) { + for (HBaseProtos.RegionInfo info : transition.getRegionInfoList()) { + TableName tn = ProtobufUtil.toTableName(info.getTableName()); + if (tn.isSystemTable()) { + return HConstants.SYSTEMTABLE_QOS; + } + } + } + } + } + + // Handle the rest of the different reasons to change priority. + return getBasePriority(header, param); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index c6b1f00..28d2df9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; +import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; @@ -226,6 +227,11 @@ public class MasterRpcServices extends RSRpcServices master = m; } + @Override + protected PriorityFunction createPriority() { + return new MasterAnnotationReadingPriorityFunction(this); + } + enum BalanceSwitchMode { SYNC, ASYNC @@ -310,7 +316,7 @@ public class MasterRpcServices extends RSRpcServices if (sl != null && master.metricsMaster != null) { // Up our metrics. master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests() - - (oldLoad != null ? oldLoad.getTotalNumberOfRequests() : 0)); + - (oldLoad != null ? oldLoad.getTotalNumberOfRequests() : 0)); } } catch (IOException ioe) { throw new ServiceException(ioe); @@ -360,10 +366,10 @@ public class MasterRpcServices extends RSRpcServices AddColumnRequest req) throws ServiceException { try { master.addColumn( - ProtobufUtil.toTableName(req.getTableName()), - HColumnDescriptor.convert(req.getColumnFamilies()), - req.getNonceGroup(), - req.getNonce()); + ProtobufUtil.toTableName(req.getTableName()), + HColumnDescriptor.convert(req.getColumnFamilies()), + req.getNonceGroup(), + req.getNonce()); } catch (IOException ioe) { throw new ServiceException(ioe); } @@ -497,7 +503,7 @@ public class MasterRpcServices extends RSRpcServices DeleteTableRequest request) throws ServiceException { try { long procId = master.deleteTable(ProtobufUtil.toTableName( - request.getTableName()), request.getNonceGroup(), request.getNonce()); + request.getTableName()), request.getNonceGroup(), request.getNonce()); return DeleteTableResponse.newBuilder().setProcId(procId).build(); } catch (IOException ioe) { throw new ServiceException(ioe); @@ -786,7 +792,7 @@ public class MasterRpcServices extends RSRpcServices try { return GetNamespaceDescriptorResponse.newBuilder() .setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor( - master.getNamespaceDescriptor(request.getNamespaceName()))) + master.getNamespaceDescriptor(request.getNamespaceName()))) .build(); } catch (IOException e) { throw new ServiceException(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index 52f692b..aeb3f2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -25,14 +25,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.QosPriority; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; @@ -74,14 +69,14 @@ import org.apache.hadoop.hbase.security.User; //RegionSpecifier object. Methods can be invoked on the returned object //to figure out whether it is a meta region or not. @InterfaceAudience.Private -class AnnotationReadingPriorityFunction implements PriorityFunction { +public class AnnotationReadingPriorityFunction implements PriorityFunction { private static final Log LOG = LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName()); /** Used to control the scan delay, currently sqrt(numNextCall * weight) */ public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight"; - private final Map annotatedQos; + protected final Map annotatedQos; //We need to mock the regionserver instance for some unit tests (set via //setRegionServer method. private RSRpcServices rpcServices; @@ -113,7 +108,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { * @param rpcServices * The RPC server implementation */ - AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) { + public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) { this(rpcServices, rpcServices.getClass()); } @@ -177,9 +172,9 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { */ @Override public int getPriority(RequestHeader header, Message param, User user) { - String methodName = header.getMethodName(); - Integer priorityByAnnotation = annotatedQos.get(methodName); - if (priorityByAnnotation != null) { + int priorityByAnnotation = getAnnotatedPriority(header); + + if (priorityByAnnotation >= 0) { return priorityByAnnotation; } @@ -195,6 +190,30 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { return HConstants.NORMAL_QOS; } + return getBasePriority(header, param); + } + + /** + * See if the method has an annotation. + * @param header + * @return Return the priority from the annotation. If there isn't + * an annotation, this returns something below zero. + */ + protected int getAnnotatedPriority(RequestHeader header) { + String methodName = header.getMethodName(); + Integer priorityByAnnotation = annotatedQos.get(methodName); + if (priorityByAnnotation != null) { + return priorityByAnnotation; + } + return -1; + } + + /** + * Get the priority for a given request from the header and the param + * This doesn't consider which user is sending the request at all. + * This doesn't consider annotations + */ + protected int getBasePriority(RequestHeader header, Message param) { if (param == null) { return HConstants.NORMAL_QOS; } @@ -203,6 +222,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { // only this one has been converted so far. No priority == NORMAL_QOS. return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS; } + String cls = param.getClass().getName(); Class rpcArgClass = argumentToClassMap.get(cls); RegionSpecifier regionSpecifier = null; @@ -247,21 +267,6 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { } } - // If meta is moving then all the rest of report the report state transitions will be - // blocked. We shouldn't be in the same queue. - if (param instanceof ReportRegionStateTransitionRequest) { // Regions are moving - ReportRegionStateTransitionRequest tRequest = (ReportRegionStateTransitionRequest) param; - for (RegionStateTransition transition : tRequest.getTransitionList()) { - if (transition.getRegionInfoList() != null) { - for (HBaseProtos.RegionInfo info : transition.getRegionInfoList()) { - TableName tn = ProtobufUtil.toTableName(info.getTableName()); - if (tn.isSystemTable()) { - return HConstants.SYSTEMTABLE_QOS; - } - } - } - } - } return HConstants.NORMAL_QOS; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 70ac7a6..ff0ce92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -943,7 +943,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (initialIsa.getAddress() == null) { throw new IllegalArgumentException("Failed resolve of " + initialIsa); } - priority = new AnnotationReadingPriorityFunction(this); + priority = createPriority(); String name = rs.getProcessName() + "/" + initialIsa.toString(); // Set how many times to retry talking to another server over HConnection. ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG); @@ -978,6 +978,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rs.setName(name); } + protected PriorityFunction createPriority() { + return new AnnotationReadingPriorityFunction(this); + } + public static String getHostname(Configuration conf, boolean isMaster) throws UnknownHostException { String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY : -- 2.5.0