commit 6f0582a7b4dd11d865a03067d15082c33f2f7528 Author: hzzhouquan15 Date: Tue Jan 8 11:52:36 2019 +0800 YARN-9161, Absolute resources of capacity scheduler doesn't support GPU and FPGA diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index 20b64bd..e66f6f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -750,4 +750,109 @@ public static void areMandatoryResourcesAvailable(Resource res) { return info; } + + /** + * Get a map of resourceName and resourceValue from a string like + * '[resourceName1=value1, resourceName2=value2]'. + * @param resourcesStr + * @return + */ + public static Map parseResourcesString(String resourcesStr) { + Map resources = new HashMap<>(); + + // Ignore the grouping "[]" + if (resourcesStr.startsWith("[")) { + resourcesStr = resourcesStr.substring(1); + } + if (resourcesStr.endsWith("]")) { + resourcesStr = resourcesStr.substring(0, resourcesStr.length()); + } + + for (String resource : resourcesStr.trim().split(",")) { + ResourceInformation standardResource = getResourceInformation(resource); + resources.put(standardResource.getName(), standardResource.getValue()); + } + return resources; + } + + /** + * Get standard yarn resourceInformation from a string like 'resourceName= + * resourceValue'. + * + * @param resource an array of two elements, resource name and resource + * value + * @return {@link ResourceInformation} + */ + public static ResourceInformation getResourceInformation(String resource) { + if (resource == null || resource.isEmpty()) { + throw new IllegalArgumentException("Resource is null or empty. " + + "Please provide key=amount pairs"); + } + resource = resource.trim(); + if (!resource.matches("^[^=]+=\\d+\\s?\\w*$")) { + throw new IllegalArgumentException("\"" + resource + "\" is not a " + + "valid resource type/amount pair. " + + "Please provide key=amount pairs separated by commas."); + } + String[] splits = resource.split("="); + return getStandardYarnResource(splits); + } + + private static ResourceInformation getStandardYarnResource(String[] + resourcePair) { + if(resourcePair == null || resourcePair.length != 2) { + throw new IllegalArgumentException("Valid resource type/amount pair. " + + "The resourcePair is null or the length of resourcePair is not " + + " equal to 2"); + } + String resourceName = resourcePair[0].toLowerCase(); + String resourceValue = resourcePair[1]; + String units = ResourceUtils.getUnits(resourceValue); + + String valueWithoutUnit = resourceValue.substring(0, resourceValue.length() + - units.length()).trim(); + Long value; + try { + value = Long.valueOf(valueWithoutUnit); + } catch(NumberFormatException e) { + throw new IllegalArgumentException("Can't parse resource value from " + + valueWithoutUnit + ", please check the resource configuration: " + + resourceName + " " + resourceValue); + } + + // special handle memory-mb and memory + if (resourceName.equals(ResourceInformation.MEMORY_URI) || + resourceName.equals("memory")) { + resourceName = ResourceInformation.MEMORY_URI; + // Convert commandline unit to standard YARN unit. + if (!units.isEmpty()) { + if (units.toLowerCase().equals("m") || + units.toLowerCase().equals("mi")) { + units = "Mi"; + } else if (units.toLowerCase().equals("g") || + units.toLowerCase().equals("gi")) { + value = UnitsConversionUtil.convert("Gi", "Mi", value); + units = "Mi"; + } else { + throw new IllegalArgumentException("Acceptable units are M/Mi/G/Gi " + + "or empty"); + } + } + } + + // special handle gpu + if (resourceName.equals("gpu")) { + resourceName = ResourceInformation.GPU_URI; + } + + // special handle fpga + if (resourceName.equals("fpga")) { + resourceName = ResourceInformation.FPGA_URI; + } + + ResourceInformation resourceInformation = ResourceInformation + .newInstance(resourceName, units, value); + return resourceInformation; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 369d94b..a29d509 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -467,7 +467,8 @@ public boolean init(String[] args) throws ParseException { Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1")); if (cliParser.hasOption("master_resources")) { Map masterResources = - parseResourcesString(cliParser.getOptionValue("master_resources")); + ResourceUtils.parseResourcesString( + cliParser.getOptionValue("master_resources")); for (Map.Entry entry : masterResources.entrySet()) { if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { amMemory = entry.getValue(); @@ -538,7 +539,8 @@ public boolean init(String[] args) throws ParseException { Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1")); if (cliParser.hasOption("container_resources")) { Map resources = - parseResourcesString(cliParser.getOptionValue("container_resources")); + ResourceUtils.parseResourcesString( + cliParser.getOptionValue("container_resources")); for (Map.Entry entry : resources.entrySet()) { if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { containerMemory = entry.getValue(); @@ -1294,39 +1296,4 @@ private void validateResourceTypes(Iterable resourceNames, } } } - - static Map parseResourcesString(String resourcesStr) { - Map resources = new HashMap<>(); - - // Ignore the grouping "[]" - if (resourcesStr.startsWith("[")) { - resourcesStr = resourcesStr.substring(1); - } - if (resourcesStr.endsWith("]")) { - resourcesStr = resourcesStr.substring(0, resourcesStr.length()); - } - - for (String resource : resourcesStr.trim().split(",")) { - resource = resource.trim(); - if (!resource.matches("^[^=]+=\\d+\\s?\\w*$")) { - throw new IllegalArgumentException("\"" + resource + "\" is not a " + - "valid resource type/amount pair. " + - "Please provide key=amount pairs separated by commas."); - } - String[] splits = resource.split("="); - String key = splits[0], value = splits[1]; - String units = ResourceUtils.getUnits(value); - String valueWithoutUnit = value.substring( - 0, value.length() - units.length()).trim(); - Long resourceValue = Long.valueOf(valueWithoutUnit); - if (!units.isEmpty()) { - resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue); - } - if (key.equals("memory")) { - key = ResourceInformation.MEMORY_URI; - } - resources.put(key, resourceValue); - } - return resources; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java index f3eee7c..6a512e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java @@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters; import org.apache.hadoop.yarn.submarine.common.exception.SubmarineRuntimeException; import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager; -import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,64 +73,6 @@ public static String replacePatternsInLaunchCommand(String specifiedCli, return newCli; } - private static Map parseResourcesString(String resourcesStr) { - Map resources = new HashMap<>(); - String[] pairs = resourcesStr.trim().split(","); - for (String resource : pairs) { - resource = resource.trim(); - if (!resource.matches(RES_PATTERN)) { - throw new IllegalArgumentException("\"" + resource + "\" is not a " - + "valid resource type/amount pair. " - + "Please provide key=amount pairs separated by commas."); - } - String[] splits = resource.split("="); - String key = splits[0], value = splits[1]; - String units = ResourceUtils.getUnits(value); - - String valueWithoutUnit = value.substring(0, - value.length()- units.length()).trim(); - long resourceValue = Long.parseLong(valueWithoutUnit); - - // Convert commandline unit to standard YARN unit. - if (units.equals("M") || units.equals("m")) { - units = "Mi"; - } else if (units.equals("G") || units.equals("g")) { - units = "Gi"; - } else if (units.isEmpty()) { - // do nothing; - } else { - throw new IllegalArgumentException("Acceptable units are M/G or empty"); - } - - // special handle memory-mb and memory - if (key.equals(ResourceInformation.MEMORY_URI)) { - if (!units.isEmpty()) { - resourceValue = UnitsConversionUtil.convert(units, "Mi", - resourceValue); - } - } - - if (key.equals("memory")) { - key = ResourceInformation.MEMORY_URI; - resourceValue = UnitsConversionUtil.convert(units, "Mi", - resourceValue); - } - - // special handle gpu - if (key.equals("gpu")) { - key = ResourceInformation.GPU_URI; - } - - // special handle fpga - if (key.equals("fpga")) { - key = ResourceInformation.FPGA_URI; - } - - resources.put(key, resourceValue); - } - return resources; - } - private static void validateResourceTypes(Iterable resourceNames, List resourceTypes) throws IOException, YarnException { for (String resourceName : resourceNames) { @@ -145,7 +86,8 @@ private static void validateResourceTypes(Iterable resourceNames, public static Resource createResourceFromString(String resourceStr, List resourceTypes) throws IOException, YarnException { - Map typeToValue = parseResourcesString(resourceStr); + Map typeToValue = + ResourceUtils.parseResourcesString(resourceStr); validateResourceTypes(typeToValue.keySet(), resourceTypes); Resource resource = Resource.newInstance(0, 0); for (Map.Entry entry : typeToValue.entrySet()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java index 184d53d..a89caeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/test/java/org/apache/hadoop/yarn/submarine/client/cli/TestRunJobCliParsing.java @@ -235,10 +235,9 @@ public void testResourceUnitParsing() throws Exception { ResourceUtils.getResourcesTypeInfo()); Assert.assertEquals(Resources.createResource(20 * 1024, 3), res); - // W/o unit for memory means bits, and 20 bits will be rounded to 0 res = CliUtils.createResourceFromString("memory=20,vcores=3", ResourceUtils.getResourcesTypeInfo()); - Assert.assertEquals(Resources.createResource(0, 3), res); + Assert.assertEquals(Resources.createResource(20, 3), res); // Test multiple resources List resTypes = new ArrayList<>( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 48c2c36..d3b842d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.impl.LightWeightResource; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; /** * Resources is a computation class which provides a set of apis to do @@ -518,7 +519,12 @@ public static Resource componentwiseMin(Resource lhs, Resource rhs) { try { ResourceInformation rhsValue = rhs.getResourceInformation(i); ResourceInformation lhsValue = lhs.getResourceInformation(i); - ResourceInformation outInfo = lhsValue.getValue() < rhsValue.getValue() + long lValue = lhsValue.getValue(); + long rValue = rhsValue.getUnits().equals(lhsValue.getUnits()) + ? rhsValue.getValue() + : UnitsConversionUtil.convert(rhsValue.getUnits(), + lhsValue.getUnits(), rhsValue.getValue()); + ResourceInformation outInfo = lValue < rValue ? lhsValue : rhsValue; ret.setResourceInformation(i, outInfo); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java index a9c98bd..0971e8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java @@ -34,6 +34,12 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_MB; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES; + /** * Test class to verify all resource utility methods. */ @@ -391,6 +397,75 @@ public void testGetResourceInformationWithDiffUnits() throws Exception { } } + @Test + public void testParseResourcesString() throws Exception { + Map expected = new HashMap(); + expected.put(MEMORY_URI, 20 * 1024L); + expected.put(VCORES_URI, 3L); + expected.put(GPU_URI, 5L); + + Map res = ResourceUtils.parseResourcesString("memory=20G, " + + "vcores=3, gpu=5"); + Assert.assertEquals(expected, res); + + res = ResourceUtils.parseResourcesString("memory-mb=20G, vcores=3, " + + "yarn.io/gpu=5"); + Assert.assertEquals(expected, res); + + expected.clear(); + expected.put(MEMORY_URI, 20L); + expected.put(VCORES_URI, 3L); + expected.put(GPU_URI, 5L); + + res = ResourceUtils.parseResourcesString("memory=20m, vcores=3, " + + "yarn.io/gpu=5"); + Assert.assertEquals(expected, res); + + res = ResourceUtils.parseResourcesString("memory=20, vcores=3, " + + "yarn.io/gpu=5"); + Assert.assertEquals(expected, res); + + res = ResourceUtils.parseResourcesString("memory-mb=20m, vcores=3, gpu=5"); + Assert.assertEquals(expected, res); + + res = ResourceUtils.parseResourcesString("memory-mb=20, vcores=3, " + + "yarn.io/gpu=5"); + Assert.assertEquals(expected, res); + + //negative tests + try { + res = ResourceUtils.parseResourcesString("memory-mb=, vcores=3, " + + "gpu=5"); + } catch(IllegalArgumentException e) { + Assert.assertEquals("\"memory-mb=\" is not a valid resource type/amount" + + " pair. Please provide key=amount pairs separated by commas.", + e.getMessage()); + } + + try { + res = ResourceUtils.parseResourcesString("memory-mb=200G,vcores=3,"); + } catch(IllegalArgumentException e) { + Assert.assertEquals("Resource is null or empty. Please provide " + + "key=amount pairs", e.getMessage()); + } + + try { + res = ResourceUtils.parseResourcesString("memory-mb=200G, !vcores=3"); + } catch(IllegalArgumentException e) { + Assert.assertEquals("\"!vcores=3\" is not a valid resource type/amount " + + "pair. Please provide key=amount pairs separated by commas.", + e.getMessage()); + } + + try { + res = ResourceUtils.parseResourcesString("memory-mb=200Gvcores=3"); + } catch(IllegalArgumentException e) { + Assert.assertEquals("\"memory-mb=200Gvcores=3\" is not a valid resource" + + " type/amount pair. Please provide key=amount pairs separated by " + + "commas.", e.getMessage()); + } + } + public static String setupResourceTypes(Configuration conf, String filename) throws Exception { File source = new File( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 2c9f9a3..5ab9450 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -25,7 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Arrays; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; @@ -56,7 +58,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; @@ -89,7 +90,6 @@ final ResourceCalculator resourceCalculator; Set accessibleLabels; - Set resourceTypes; final RMNodeLabelsManager labelManager; String defaultLabelExpression; private String multiNodeSortingPolicyName = null; @@ -336,10 +336,6 @@ protected void setupQueueConfigs(Resource clusterResource, this.defaultLabelExpression = configuration.getDefaultNodeLabelExpression( getQueuePath()); - this.resourceTypes = new HashSet(); - for (AbsoluteResourceType type : AbsoluteResourceType.values()) { - resourceTypes.add(type.toString().toLowerCase()); - } // inherit from parent if labels not set if (this.accessibleLabels == null && parent != null) { @@ -445,12 +441,14 @@ protected void updateConfigurableResourceRequirement(String queuePath, Resource clusterResource) { CapacitySchedulerConfiguration conf = csContext.getConfiguration(); Set configuredNodelabels = conf.getConfiguredNodeLabels(queuePath); + Set resources = Arrays.stream(clusterResource.getResources()).map(x + -> x.getName()).collect(Collectors.toSet()); for (String label : configuredNodelabels) { Resource minResource = conf.getMinimumResourceRequirement(label, - queuePath, resourceTypes); + queuePath, resources); Resource maxResource = conf.getMaximumResourceRequirement(label, - queuePath, resourceTypes); + queuePath, resources); if (LOG.isDebugEnabled()) { LOG.debug("capacityConfigType is '" + capacityConfigType diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 08380f5..46dc8b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -71,6 +71,9 @@ import java.util.Set; import java.util.StringTokenizer; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI; + public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { private static final Log LOG = @@ -355,13 +358,6 @@ private static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE); - /** - * Different resource types supported. - */ - public enum AbsoluteResourceType { - MEMORY, VCORES; - } - AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); public CapacitySchedulerConfiguration() { @@ -2041,10 +2037,18 @@ private void updateMinMaxResourceToConf(String label, String queue, StringBuilder resourceString = new StringBuilder(); resourceString - .append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "=" - + resource.getMemorySize() + "," - + AbsoluteResourceType.VCORES.toString().toLowerCase() + "=" - + resource.getVirtualCores() + "]"); + .append("[" + MEMORY_URI + "=" + resource.getMemorySize() + "," + + VCORES_URI + "=" + resource.getVirtualCores()); + ResourceInformation[] resourceInformations = resource.getResources(); + if(resourceInformations.length > 2) { + for (int i = 2 ; i < resourceInformations.length ; i++) { + resourceString.append(","); + resourceString.append(resourceInformations[i].getName()); + resourceString.append("="); + resourceString.append(resourceInformations[i].getValue()); + } + } + resourceString.append("]"); String prefix = getQueuePrefix(queue) + type; if (!label.isEmpty()) { @@ -2080,11 +2084,10 @@ private Resource internalGetLabeledResourceRequirementForQueue(String queue, subGroup = subGroup.substring(1, subGroup.length() - 1); for (String kvPair : subGroup.trim().split(",")) { - String[] splits = kvPair.split("="); - - // Ensure that each sub string is key value pair separated by '='. - if (splits != null && splits.length > 1) { - updateResourceValuesFromConfig(resourceTypes, resource, splits); + try { + updateResourceValuesFromConfig(resourceTypes, resource, kvPair); + } catch (IllegalArgumentException e) { + LOG.warn(e.getMessage(), e); } } } @@ -2102,36 +2105,23 @@ private Resource internalGetLabeledResourceRequirementForQueue(String queue, } private void updateResourceValuesFromConfig(Set resourceTypes, - Resource resource, String[] splits) { + Resource resource, String resourceValue) { + + ResourceInformation resourceInformation = ResourceUtils + .getResourceInformation(resourceValue); + String resourceName = resourceInformation.getName(); // If key is not a valid type, skip it. - if (!resourceTypes.contains(splits[0])) { + if (!resourceTypes.contains(resourceName)) { return; } - String units = getUnits(splits[1]); - Long resourceValue = Long - .valueOf(splits[1].substring(0, splits[1].length() - units.length())); - - // Convert all incoming units to MB if units is configured. - if (!units.isEmpty()) { - resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue); - } - - // map it based on key. - AbsoluteResourceType resType = AbsoluteResourceType - .valueOf(StringUtils.toUpperCase(splits[0].trim())); - switch (resType) { - case MEMORY : - resource.setMemorySize(resourceValue); - break; - case VCORES : - resource.setVirtualCores(resourceValue.intValue()); - break; - default : - resource.setResourceInformation(splits[0].trim(), ResourceInformation - .newInstance(splits[0].trim(), units, resourceValue)); - break; + if(resourceName.equals(MEMORY_URI)) { + resource.setMemorySize(resourceInformation.getValue()); + } else if(resourceName.equals(VCORES_URI)) { + resource.setVirtualCores((int)resourceInformation.getValue()); + } else { + resource.setResourceInformation(resourceName, resourceInformation); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 522c10e..2650daa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -996,8 +996,14 @@ private void calculateEffectiveResourcesAndCapacity(String label, // childMaxResource is empty, consider parent's max resource alone. Resource childMaxResource = childQueue.getQueueResourceQuotas() .getConfiguredMaxResource(label); - Resource effMaxResource = Resources.min(resourceCalculator, - resourceByLabel, childMaxResource.equals(Resources.none()) + + // Use Resources.componentwiseMin, not Resources.min. When + // parentMaxRes have available GPUs but childMaxResource doesn't, + // parentMaxRes may be smaller than childMaxResource using + // Resources.min, if it has less memory. In this case, effMaxResource + // would have available GPUs which it shouldn't have. + Resource effMaxResource = Resources.componentwiseMin( + childMaxResource.equals(Resources.none()) ? parentMaxRes : childMaxResource, parentMaxRes); @@ -1067,8 +1073,12 @@ private Resource getMinResourceNormalized(String name, Map effect dResourceInformation.getUnits(), nResourceInformation.getUnits(), dResourceInformation.getValue()); if (dValue != 0) { + // If there are resources added to cluster, queue configured min + // resource should not be increased automatically for absolute + // configuration + float minRatio = Math.min((float) nValue / dValue, 1); effectiveMinRatioPerResource.put(nResourceInformation.getName(), - (float) nValue / dValue); + minRatio); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java index 298e1ab..80a6991 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -27,9 +28,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.Assert; import org.junit.Test; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RESOURCE_TYPES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS; + public class TestAbsoluteResourceConfiguration { private static final int GB = 1024; @@ -51,6 +62,24 @@ private static final String QUEUEA2_FULL = QUEUEA_FULL + "." + QUEUEA2; private static final String QUEUEB1_FULL = QUEUEB_FULL + "." + QUEUEB1; + private static HashMap queueCMinResources = new HashMap(); + private static HashMap queueCMaxResources = new HashMap(); + private static HashMap queueCReducedResources = new HashMap(); + + //add GPU and FPGA to resource types + static { + queueCMinResources.put(GPU_URI, 2L); + queueCMinResources.put(FPGA_URI, 2L); + queueCMaxResources.put(GPU_URI, 4L); + queueCMaxResources.put(FPGA_URI, 4L); + queueCReducedResources.put(GPU_URI, 1L); + queueCReducedResources.put(FPGA_URI, 1L); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.set(RESOURCE_TYPES, GPU_URI + "," + FPGA_URI); + ResourceUtils.resetResourceTypes(csConf); + } + private static final Resource QUEUE_A_MINRES = Resource.newInstance(100 * GB, 10); private static final Resource QUEUE_A_MAXRES = Resource.newInstance(200 * GB, @@ -66,21 +95,27 @@ private static final Resource QUEUE_B_MAXRES = Resource.newInstance(150 * GB, 30); private static final Resource QUEUE_C_MINRES = Resource.newInstance(50 * GB, - 10); + 10, queueCMinResources); private static final Resource QUEUE_C_MAXRES = Resource.newInstance(150 * GB, - 20); + 20, queueCMaxResources); + private static final Resource QUEUE_C_MAXRES_WITH_MEMORY_CPUS = + Resource.newInstance(150 * GB, 20); private static final Resource QUEUEA_REDUCED = Resource.newInstance(64000, 6); private static final Resource QUEUEB_REDUCED = Resource.newInstance(32000, 6); - private static final Resource QUEUEC_REDUCED = Resource.newInstance(32000, 6); + private static final Resource QUEUEC_REDUCED = Resource.newInstance(32000, + 6, queueCReducedResources); private static final Resource QUEUEMAX_REDUCED = Resource.newInstance(128000, 20); + private static final Resource QUEUECMAX_REDUCED = Resource.newInstance(128000, + 20, queueCReducedResources); private static Set resourceTypes = new HashSet<>( - Arrays.asList("memory", "vcores")); + Arrays.asList(MEMORY_URI, VCORES_URI, GPU_URI, FPGA_URI)); private CapacitySchedulerConfiguration setupSimpleQueueConfiguration( boolean isCapacityNeeded) { - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + CapacitySchedulerConfiguration csConf = + setupExtendedResourceConfiguration(); csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{QUEUEA, QUEUEB, QUEUEC}); @@ -96,7 +131,8 @@ private CapacitySchedulerConfiguration setupSimpleQueueConfiguration( private CapacitySchedulerConfiguration setupComplexQueueConfiguration( boolean isCapacityNeeded) { - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + CapacitySchedulerConfiguration csConf = + setupExtendedResourceConfiguration(); csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{QUEUEA, QUEUEB, QUEUEC}); csConf.setQueues(QUEUEA_FULL, new String[]{QUEUEA1, QUEUEA2}); @@ -146,6 +182,18 @@ private CapacitySchedulerConfiguration setupComplexMinMaxResourceConfig( return csConf; } + private CapacitySchedulerConfiguration + setupExtendedResourceConfiguration() { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.set(RESOURCE_TYPES, GPU_URI + "," + FPGA_URI); + csConf.set(RESOURCE_CALCULATOR_CLASS, + "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"); + csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + return csConf; + } + @Test public void testSimpleMinMaxResourceConfigurartionPerQueue() { @@ -180,9 +228,6 @@ public void testEffectiveMinMaxResourceConfigurartionPerQueue() false); setupMinMaxResourceConfiguration(csConf); - csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); - @SuppressWarnings("resource") MockRM rm = new MockRM(csConf); rm.start(); @@ -220,11 +265,13 @@ public void testEffectiveMinMaxResourceConfigurartionPerQueue() Assert.assertEquals("Min resource configured for QUEUEC is not correct", QUEUE_C_MINRES, qC.queueResourceQuotas.getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_C_MAXRES, + qC.queueResourceQuotas.getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEC is not correct", QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_C_MAXRES_WITH_MEMORY_CPUS, + qC.queueResourceQuotas.getEffectiveMaxResource()); rm.stop(); } @@ -249,8 +296,6 @@ public void testSimpleValidateAbsoluteResourceConfig() throws Exception { CapacitySchedulerConfiguration csConf = setupSimpleQueueConfiguration( false); setupMinMaxResourceConfiguration(csConf); - csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); @SuppressWarnings("resource") MockRM rm = new MockRM(csConf); @@ -313,11 +358,13 @@ public void testSimpleValidateAbsoluteResourceConfig() throws Exception { Assert.assertEquals("Min resource configured for QUEUEC is not correct", QUEUE_C_MINRES, qC.queueResourceQuotas.getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_C_MAXRES, + qC.queueResourceQuotas.getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEC is not correct", QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_C_MAXRES_WITH_MEMORY_CPUS, + qC.queueResourceQuotas.getEffectiveMaxResource()); // 3. Create a new config and make sure one queue's min resource is more // than its max resource configured. @@ -382,8 +429,6 @@ public void testComplexValidateAbsoluteResourceConfig() throws Exception { CapacitySchedulerConfiguration csConf = setupComplexQueueConfiguration( false); setupComplexMinMaxResourceConfig(csConf); - csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); @SuppressWarnings("resource") MockRM rm = new MockRM(csConf); @@ -441,16 +486,21 @@ public void testEffectiveResourceAfterReducingClusterResource() false); setupMinMaxResourceConfiguration(csConf); - csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); - @SuppressWarnings("resource") MockRM rm = new MockRM(csConf); rm.start(); + HashMap node1Resources = new HashMap(); + node1Resources.put(GPU_URI, 3L); + node1Resources.put(FPGA_URI, 3L); + HashMap node2Resources = new HashMap(); + node2Resources.put(GPU_URI, 1L); + node2Resources.put(FPGA_URI, 1L); + Resource node1Resource = Resource.newInstance(125 * GB, 20, node1Resources); + Resource node2Resource = Resource.newInstance(125 * GB, 20, node2Resources); // Add few nodes - MockNM nm1 = rm.registerNode("127.0.0.1:1234", 125 * GB, 20); - rm.registerNode("127.0.0.2:1234", 125 * GB, 20); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", node1Resource); + rm.registerNode("127.0.0.2:1234", node2Resource); // Get queue object to verify min/max resource configuration. CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); @@ -507,7 +557,7 @@ public void testEffectiveResourceAfterReducingClusterResource() Assert.assertEquals("Effective Min resource for QUEUEC is not correct", QUEUEC_REDUCED, qC.queueResourceQuotas.getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEC is not correct", - QUEUEMAX_REDUCED, qC.queueResourceQuotas.getEffectiveMaxResource()); + QUEUECMAX_REDUCED, qC.queueResourceQuotas.getEffectiveMaxResource()); rm.stop(); } @@ -520,16 +570,21 @@ public void testEffectiveResourceAfterIncreasingClusterResource() false); setupComplexMinMaxResourceConfig(csConf); - csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); - @SuppressWarnings("resource") MockRM rm = new MockRM(csConf); rm.start(); + HashMap node1Resources = new HashMap(); + node1Resources.put(GPU_URI, 3L); + node1Resources.put(FPGA_URI, 3L); + HashMap node2Resources = new HashMap(); + node2Resources.put(GPU_URI, 1L); + node2Resources.put(FPGA_URI, 1L); + Resource node1Resource = Resource.newInstance(125 * GB, 20, node1Resources); + Resource node2Resource = Resource.newInstance(125 * GB, 20, node2Resources); // Add few nodes - rm.registerNode("127.0.0.1:1234", 125 * GB, 20); - rm.registerNode("127.0.0.2:1234", 125 * GB, 20); + rm.registerNode("127.0.0.1:1234", node1Resource); + rm.registerNode("127.0.0.2:1234", node2Resource); // Get queue object to verify min/max resource configuration. CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); @@ -590,7 +645,7 @@ public void testEffectiveResourceAfterIncreasingClusterResource() QUEUE_B_MAXRES, qB1.queueResourceQuotas.getEffectiveMaxResource()); // add new NM. - rm.registerNode("127.0.0.3:1234", 125 * GB, 20); + rm.registerNode("127.0.0.3:1234", Resource.newInstance(node1Resource)); // There will be no change in effective resource when nodes are added. // Since configured capacity was based on initial node capacity, a