From e1ff01be5575b70915a3b6c7375805ce44eff270 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Mon, 17 Apr 2017 20:55:21 +0530 Subject: [PATCH] YARN-6471 --- .../java/org/apache/hadoop/util/StringUtils.java | 31 +++ .../hadoop/yarn/util/UnitsConversionUtil.java | 217 +++++++++++++++++++++ .../scheduler/capacity/AbstractCSQueue.java | 31 +++ .../scheduler/capacity/CSQueue.java | 18 +- .../scheduler/capacity/CSQueueUtils.java | 55 +++++- .../capacity/CapacitySchedulerConfiguration.java | 144 ++++++++++++++ .../scheduler/capacity/ParentQueue.java | 29 ++- .../scheduler/capacity/QueueCapacities.java | 2 +- 8 files changed, 520 insertions(+), 7 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java index e773806..132fad9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java @@ -1142,4 +1142,35 @@ public static boolean equalsIgnoreCase(String s1, String s2) { return s1.equalsIgnoreCase(s2); } + /** + *

Checks if the String contains only unicode letters.

+ * + *

null will return false. + * An empty String (length()=0) will return true.

+ * + *
+   * StringUtils.isAlpha(null)   = false
+   * StringUtils.isAlpha("")     = true
+   * StringUtils.isAlpha("  ")   = false
+   * StringUtils.isAlpha("abc")  = true
+   * StringUtils.isAlpha("ab2c") = false
+   * StringUtils.isAlpha("ab-c") = false
+   * 
+ * + * @param str the String to check, may be null + * @return true if only contains letters, and is non-null + */ + public static boolean isAlpha(String str) { + if (str == null) { + return false; + } + int sz = str.length(); + for (int i = 0; i < sz; i++) { + if (Character.isLetter(str.charAt(i)) == false) { + return false; + } + } + return true; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java new file mode 100644 index 0000000..79ee0f7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.math.BigInteger; +import java.util.*; + +/** + * A util to convert values in one unit to another. Units refers to whether + * the value is expressed in pico, nano, etc. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class UnitsConversionUtil { + + /** + * Helper class for encapsulating conversion values. + */ + public static class Converter { + private long numerator; + private long denominator; + + Converter(long n, long d) { + this.numerator = n; + this.denominator = d; + } + } + + private static final String[] UNITS = + { "p", "n", "u", "m", "", "k", "M", "G", "T", "P", "Ki", "Mi", "Gi", "Ti", + "Pi" }; + private static final List SORTED_UNITS = Arrays.asList(UNITS); + public static final Set KNOWN_UNITS = createKnownUnitsSet(); + private static final Converter PICO = + new Converter(1L, 1000L * 1000L * 1000L * 1000L); + private static final Converter NANO = + new Converter(1L, 1000L * 1000L * 1000L); + private static final Converter MICRO = new Converter(1L, 1000L * 1000L); + private static final Converter MILLI = new Converter(1L, 1000L); + private static final Converter BASE = new Converter(1L, 1L); + private static final Converter KILO = new Converter(1000L, 1L); + private static final Converter MEGA = new Converter(1000L * 1000L, 1L); + private static final Converter GIGA = + new Converter(1000L * 1000L * 1000L, 1L); + private static final Converter TERA = + new Converter(1000L * 1000L * 1000L * 1000L, 1L); + private static final Converter PETA = + new Converter(1000L * 1000L * 1000L * 1000L * 1000L, 1L); + + private static final Converter KILO_BINARY = new Converter(1024L, 1L); + private static final Converter MEGA_BINARY = new Converter(1024L * 1024L, 1L); + private static final Converter GIGA_BINARY = + new Converter(1024L * 1024L * 1024L, 1L); + private static final Converter TERA_BINARY = + new Converter(1024L * 1024L * 1024L * 1024L, 1L); + private static final Converter PETA_BINARY = + new Converter(1024L * 1024L * 1024L * 1024L * 1024L, 1L); + + private static Set createKnownUnitsSet() { + Set ret = new HashSet<>(); + ret.addAll(Arrays.asList(UNITS)); + return ret; + } + + private static Converter getConverter(String unit) { + switch (unit) { + case "p": + return PICO; + case "n": + return NANO; + case "u": + return MICRO; + case "m": + return MILLI; + case "": + return BASE; + case "k": + return KILO; + case "M": + return MEGA; + case "G": + return GIGA; + case "T": + return TERA; + case "P": + return PETA; + case "Ki": + return KILO_BINARY; + case "Mi": + return MEGA_BINARY; + case "Gi": + return GIGA_BINARY; + case "Ti": + return TERA_BINARY; + case "Pi": + return PETA_BINARY; + default: + throw new IllegalArgumentException( + "Unknown unit '" + unit + "'. Known units are " + KNOWN_UNITS); + } + } + + /** + * Converts a value from one unit to another. Supported units can be obtained + * by inspecting the KNOWN_UNITS set. + * + * @param fromUnit the unit of the from value + * @param toUnit the target unit + * @param fromValue the value you wish to convert + * @return the value in toUnit + */ + public static Long convert(String fromUnit, String toUnit, Long fromValue) { + if (toUnit == null || fromUnit == null || fromValue == null) { + throw new IllegalArgumentException("One or more arguments are null"); + } + String overflowMsg = + "Converting " + fromValue + " from '" + fromUnit + "' to '" + toUnit + + "' will result in an overflow of Long"; + if (fromUnit.equals(toUnit)) { + return fromValue; + } + Converter fc = getConverter(fromUnit); + Converter tc = getConverter(toUnit); + Long numerator = fc.numerator * tc.denominator; + Long denominator = fc.denominator * tc.numerator; + Long numeratorMultiplierLimit = Long.MAX_VALUE / numerator; + if (numerator < denominator) { + if (numeratorMultiplierLimit < fromValue) { + throw new IllegalArgumentException(overflowMsg); + } + return (fromValue * numerator) / denominator; + } + if (numeratorMultiplierLimit > fromValue) { + return (numerator * fromValue) / denominator; + } + Long tmp = numerator / denominator; + if ((Long.MAX_VALUE / tmp) < fromValue) { + throw new IllegalArgumentException(overflowMsg); + } + return fromValue * tmp; + } + + /** + * Compare a value in a given unit with a value in another unit. The return + * value is equivalent to the value returned by compareTo. + * + * @param unitA first unit + * @param valueA first value + * @param unitB second unit + * @param valueB second value + * @return +1, 0 or -1 depending on whether the relationship is greater than, + * equal to or lesser than + */ + public static int compare(String unitA, Long valueA, String unitB, + Long valueB) { + if (unitA == null || unitB == null || !KNOWN_UNITS.contains(unitA) + || !KNOWN_UNITS.contains(unitB)) { + throw new IllegalArgumentException("Units cannot be null"); + } + if (!KNOWN_UNITS.contains(unitA)) { + throw new IllegalArgumentException("Unknown unit '" + unitA + "'"); + } + if (!KNOWN_UNITS.contains(unitB)) { + throw new IllegalArgumentException("Unknown unit '" + unitB + "'"); + } + Converter unitAC = getConverter(unitA); + Converter unitBC = getConverter(unitB); + if (unitA.equals(unitB)) { + return valueA.compareTo(valueB); + } + int unitAPos = SORTED_UNITS.indexOf(unitA); + int unitBPos = SORTED_UNITS.indexOf(unitB); + try { + Long tmpA = valueA; + Long tmpB = valueB; + if (unitAPos < unitBPos) { + tmpB = convert(unitB, unitA, valueB); + } else { + tmpA = convert(unitA, unitB, valueA); + } + return tmpA.compareTo(tmpB); + } catch (IllegalArgumentException ie) { + BigInteger tmpA = BigInteger.valueOf(valueA); + BigInteger tmpB = BigInteger.valueOf(valueB); + if (unitAPos < unitBPos) { + tmpB = tmpB.multiply(BigInteger.valueOf(unitBC.numerator)); + tmpB = tmpB.multiply(BigInteger.valueOf(unitAC.denominator)); + tmpB = tmpB.divide(BigInteger.valueOf(unitBC.denominator)); + tmpB = tmpB.divide(BigInteger.valueOf(unitAC.numerator)); + } else { + tmpA = tmpA.multiply(BigInteger.valueOf(unitAC.numerator)); + tmpA = tmpA.multiply(BigInteger.valueOf(unitBC.denominator)); + tmpA = tmpA.divide(BigInteger.valueOf(unitAC.denominator)); + tmpA = tmpA.divide(BigInteger.valueOf(unitBC.numerator)); + } + return tmpA.compareTo(tmpB); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 1643390..e26ff37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -84,6 +85,7 @@ final ResourceCalculator resourceCalculator; Set accessibleLabels; + Set resourceTypes; final RMNodeLabelsManager labelManager; String defaultLabelExpression; @@ -99,6 +101,9 @@ // etc. QueueCapacities queueCapacities; + Map minResourceRequirement = new HashMap<>(); + Map maxResourceRequirement = new HashMap<>(); + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); protected CapacitySchedulerContext csContext; @@ -265,6 +270,10 @@ void setupQueueConfigs(Resource clusterResource) this.defaultLabelExpression = csContext.getConfiguration().getDefaultNodeLabelExpression( getQueuePath()); + this.resourceTypes = new HashSet(); + for (AbsoluteResourceType type : AbsoluteResourceType.values()) { + resourceTypes.add(type.toString().toLowerCase()); + } // inherit from parent if labels not set if (this.accessibleLabels == null && parent != null) { @@ -281,6 +290,10 @@ void setupQueueConfigs(Resource clusterResource) // After we setup labels, we can setup capacities setupConfigurableCapacities(); + // Also fetch minimum/maximum resource constraint for this queue if + // configured. + updateConfigurableResourceRequirement(clusterResource); + this.maximumAllocation = csContext.getConfiguration().getMaximumAllocationPerQueue( getQueuePath()); @@ -336,6 +349,14 @@ void setupQueueConfigs(Resource clusterResource) } } + protected void updateConfigurableResourceRequirement( + Resource clusterResource) { + CSQueueUtils.loadResourceConstraintByLabelsFromConf(getQueuePath(), + resourceTypes, csContext.getConfiguration(), resourceCalculator, + clusterResource, labelManager, minResourceRequirement, + maxResourceRequirement, parent.getMaximumResourceRequirement()); + } + private void initializeQueueState(QueueState previousState, QueueState configuredState, QueueState parentState) { // verify that we can not any value for State other than RUNNING/STOPPED @@ -931,4 +952,14 @@ protected void appFinished() { public Priority getPriority() { return this.priority; } + + @Override + public Map getMinimumResourceRequirement() { + return minResourceRequirement; + } + + @Override + public Map getMaximumResourceRequirement() { + return maxResourceRequirement; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index c6726ec..a2c490e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -36,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -45,12 +45,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; /** * CSQueue represents a node in the tree of @@ -350,4 +348,18 @@ public void validateSubmitApplication(ApplicationId applicationId, * @return queue priority */ Priority getPriority(); + + /** + * Get minimum resource requirement for a queue per label. + * + * @return queue's minimum resource + */ + Map getMinimumResourceRequirement(); + + /** + * Get maximum resource requirement for a queue per label. + * + * @return queue's maximum resource + */ + Map getMaximumResourceRequirement(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index ba22541..4379c6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import java.util.Map; import java.util.Set; import org.apache.hadoop.yarn.api.records.Resource; @@ -150,7 +151,59 @@ private static void loadCapacitiesByLabelsFromConf(String queuePath, } } } - + + public static void loadResourceConstraintByLabelsFromConf(String queuePath, + Set resourceTypes, CapacitySchedulerConfiguration csConf, + ResourceCalculator resourceCalculator, Resource clusterResource, + RMNodeLabelsManager labelManager, Map minResourceMap, + Map maxResourceMap, + Map parentMaxResourceMap) { + Set configuredNodelabels = csConf + .getConfiguredNodeLabels(queuePath); + + for (String label : configuredNodelabels) { + Resource minResource = csConf.getMinimumResourceRequirement(label, + queuePath, resourceTypes); + Resource maxResource = csConf.getMaximumResourceRequirement(label, + queuePath, resourceTypes); + + // For 'root' queue, set minResource as cluster resource. + if(queuePath.equals("root")) { + minResource = labelManager.getResourceByLabel(label, clusterResource); + } + + // If min resource for a resource type is greater than its max resource, + // throw exception to handle such error configs. + if (Resources.greaterThan(resourceCalculator, clusterResource, + minResource, maxResource)) { + throw new IllegalArgumentException("Min resource configuration " + + minResource + " is greater than its max value:" + maxResource); + } + + // If parent's max resource is lesser to a specific child's max + // resource, throw exception to handle such error configs. + Resource parentMaxRes = parentMaxResourceMap.get(label); + if (Resources.greaterThan(resourceCalculator, clusterResource, + parentMaxRes, Resources.none())) { + if (Resources.greaterThan(resourceCalculator, clusterResource, + maxResource, parentMaxRes)) { + throw new IllegalArgumentException( + "Max resource configuration " + maxResource + + " is greater than parents max value:" + parentMaxRes); + } + + // If child's max resource is not set, but its parent max resource is + // set, we must set child max resource to its parent's. + if (maxResource == null || maxResource.equals(Resources.none())) { + maxResource = parentMaxRes; + } + } + + minResourceMap.put(label, minResource); + maxResourceMap.put(label, maxResource); + } + } + // Set absolute capacities for {capacity, maximum-capacity} private static void updateAbsoluteCapacitiesByNodeLabels( QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 9fb92ec..957f46a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -60,6 +61,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.Set; import java.util.StringTokenizer; @@ -292,6 +295,21 @@ @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; + /** Configuring absolute min/max resources in a queue **/ + @Private + public static final String MINIMUM_RESOURCE = "min-resource"; + + @Private + public static final String MAXIMUM_RESOURCE = "max-resource"; + + public static final String DEFAULT_RESOURCE_TYPES = "memory,vcores"; + + public static final String PATTERN_FOR_ABSOLUTE_RESOURCE = "\\[([^\\]]+)"; + + public enum AbsoluteResourceType { + MEMORY, VCORES; + } + AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); public CapacitySchedulerConfiguration() { @@ -1404,4 +1422,130 @@ public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation( QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY, UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation); } + + public static String getUnits(String resourceValue) { + String units; + for (int i = 0; i < resourceValue.length(); i++) { + if (Character.isAlphabetic(resourceValue.charAt(i))) { + units = resourceValue.substring(i); + if (StringUtils.isAlpha(units)) { + return units; + } + } + } + return ""; + } + + /** + * Get absolute minimum resource requirement for a queue. + * + * @param label + * NodeLabel + * @param queue + * queue path + * @param resourceTypes + * Resource types + * @return ResourceInformation + */ + public Resource getMinimumResourceRequirement(String label, String queue, + Set resourceTypes) { + return internalGetLabeledResourceRequirementForQueue(queue, + label, resourceTypes, MINIMUM_RESOURCE); + } + + /** + * Get absolute maximum resource requirement for a queue. + * + * @param label + * NodeLabel + * @param queue + * queue path + * @param resourceTypes + * Resource types + * @return Resource + */ + public Resource getMaximumResourceRequirement(String label, String queue, + Set resourceTypes) { + return internalGetLabeledResourceRequirementForQueue(queue, + label, resourceTypes, MAXIMUM_RESOURCE); + } + + private Resource internalGetLabeledResourceRequirementForQueue(String queue, + String label, Set resourceTypes, String suffix) { + String propertyName = getNodeLabelPrefix(queue, label) + suffix; + String resourceString = get(propertyName); + if (resourceString == null || resourceString.isEmpty()) { + return Resources.none(); + } + + // Define resource here. + Resource resource = Resource.newInstance(0l, 0); + Matcher matcher = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE) + .matcher(resourceString); + + /* + * Absolute resource configuration for a queue will be grouped by "[]". + * Syntax of absolute resource config could be like below + * "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores". + */ + if (matcher.find()) { + // Get the sub-group. + String subGroup = matcher.group(1); + if (subGroup.trim().isEmpty()) { + return Resources.none(); + } + + for (String kvPair : subGroup.trim().split(",")) { + String[] splits = kvPair.split("="); + + // Ensure that each sub string is key value pair separated by '='. + if (splits != null && splits.length > 1) { + updateResourceValuesFromConfig(resourceTypes, resource, splits); + } + } + } + + // Memory has to be configured always. + if (resource.getMemorySize() == 0l) { + return Resources.none(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("CSConf - getAbsolueResourcePerQueue: prefix=" + + getNodeLabelPrefix(queue, label) + ", capacity=" + resource); + } + return resource; + } + + private void updateResourceValuesFromConfig(Set resourceTypes, + Resource resource, String[] splits) { + + // If key is not a valid type, skip it. + if (!resourceTypes.contains(splits[0])) { + return; + } + + String units = getUnits(splits[1]); + long resourceValue = Long + .valueOf(splits[1].substring(0, splits[1].length() - units.length())); + + // Convert all incoming units to MB if units is configured. + if (!units.isEmpty()) { + resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue); + } + + // map it based on key. + AbsoluteResourceType resType = AbsoluteResourceType + .valueOf(StringUtils.toUpperCase(splits[0].trim())); + switch (resType) { + case MEMORY : + resource.setMemorySize(resourceValue); + break; + case VCORES : + resource.setVirtualCores((int) resourceValue); + break; + default : + break; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 91fedbc..4c9fa4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -34,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AccessType; @@ -45,7 +44,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; @@ -166,6 +164,7 @@ void setChildQueues(Collection childQueues) { for (CSQueue queue : childQueues) { childCapacities += queue.getCapacity(); } + float delta = Math.abs(1.0f - childCapacities); // crude way to check // allow capacities being set to 0, and enforce child 0 if parent is 0 if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || ( @@ -179,8 +178,20 @@ void setChildQueues(Collection childQueues) { float capacityByLabel = queueCapacities.getCapacity(nodeLabel); // check children's labels float sum = 0; + Resource minRes = Resources.createResource(0, 0); + Resource maxRes = Resources.createResource(0, 0); for (CSQueue queue : childQueues) { sum += queue.getQueueCapacities().getCapacity(nodeLabel); + + // Accumulate all min/max resource configured for all child queues. + if (queue.getMinimumResourceRequirement().containsKey(nodeLabel)) { + Resources.addTo(minRes, + queue.getMinimumResourceRequirement().get(nodeLabel)); + } + if (queue.getMaximumResourceRequirement().containsKey(nodeLabel)) { + Resources.addTo(maxRes, + queue.getMaximumResourceRequirement().get(nodeLabel)); + } } if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION) || (capacityByLabel == 0) && (sum > 0)) { @@ -188,6 +199,20 @@ void setChildQueues(Collection childQueues) { "Illegal" + " capacity of " + sum + " for children of queue " + queueName + " for label=" + nodeLabel); } + + // Ensure that for each parent queue: parent.min-resource >= + // Σ(child.min-resource). + Resource parentMinResource = this.getMinimumResourceRequirement() + .get(nodeLabel); + if (Resources + .lessThan(resourceCalculator, + labelManager.getResourceByLabel(nodeLabel, + scheduler.getClusterResource()), + parentMinResource, minRes)) { + throw new IllegalArgumentException("Parent Queues" + " capacity: " + + parentMinResource + " is less than" + " to its children:" + + minRes + " for queue:" + queueName); + } } this.childQueues.clear(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java index cc4af3d..d6a28cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -46,7 +46,7 @@ public QueueCapacities(boolean isRoot) { capacitiesMap = new HashMap(); this.isRoot = isRoot; } - + // Usage enum here to make implement cleaner private enum CapacityType { USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5), -- 2.10.1 (Apple Git-78)