diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index b6cb581..21b4a1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -805,8 +805,8 @@ public static Resource createResourceWithSameValue(long value) { @InterfaceAudience.Private @InterfaceStability.Unstable public static Resource createResourceFromString( - String resourceStr, - List resourceTypeInfos) { + String resourceStr, + List resourceTypeInfos) { Map typeToValue = parseResourcesString(resourceStr); validateResourceTypes(typeToValue.keySet(), resourceTypeInfos); Resource resource = Resource.newInstance(0, 0); @@ -816,68 +816,37 @@ public static Resource createResourceFromString( return resource; } - private static Map parseResourcesString(String resourcesStr) { + /** + * Get a map of resourceName and resourceValue from a string like + * '[resourceName1=value1, resourceName2=value2]'. + * @param resourcesStr + * @return A map of resourceName and resourceValue + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + public static Map parseResourcesString(String resourcesStr) { Map resources = new HashMap<>(); - String[] pairs = resourcesStr.trim().split(","); - for (String resource : pairs) { - resource = resource.trim(); - if (!resource.matches(RES_PATTERN)) { - throw new IllegalArgumentException("\"" + resource + "\" is not a " - + "valid resource type/amount pair. " - + "Please provide key=amount pairs separated by commas."); - } - String[] splits = resource.split("="); - String key = splits[0], value = splits[1]; - String units = getUnits(value); - - String valueWithoutUnit = value.substring(0, - value.length()- units.length()).trim(); - long resourceValue = Long.parseLong(valueWithoutUnit); - - // Convert commandline unit to standard YARN unit. - if (units.equals("M") || units.equals("m")) { - units = "Mi"; - } else if (units.equals("G") || units.equals("g")) { - units = "Gi"; - } else if (units.isEmpty()) { - // do nothing; - } else { - throw new IllegalArgumentException("Acceptable units are M/G or empty"); - } - // special handle memory-mb and memory - if (key.equals(ResourceInformation.MEMORY_URI)) { - if (!units.isEmpty()) { - resourceValue = UnitsConversionUtil.convert(units, "Mi", - resourceValue); - } - } - - if (key.equals("memory")) { - key = ResourceInformation.MEMORY_URI; - resourceValue = UnitsConversionUtil.convert(units, "Mi", - resourceValue); - } - - // special handle gpu - if (key.equals("gpu")) { - key = ResourceInformation.GPU_URI; - } - - // special handle fpga - if (key.equals("fpga")) { - key = ResourceInformation.FPGA_URI; - } + // Ignore the grouping "[]" + resourcesStr = resourcesStr.trim(); + if (resourcesStr.startsWith("[")) { + resourcesStr = resourcesStr.substring(1); + } + if (resourcesStr.endsWith("]")) { + resourcesStr = resourcesStr.substring(0, resourcesStr.length()-1); + } - resources.put(key, resourceValue); + for (String resource : resourcesStr.trim().split(",")) { + ResourceInformation standardResource = getResourceInformation(resource); + resources.put(standardResource.getName(), standardResource.getValue()); } return resources; } private static void validateResourceTypes( - Iterable resourceNames, - List resourceTypeInfos) - throws ResourceNotFoundException { + Iterable resourceNames, + List resourceTypeInfos) + throws ResourceNotFoundException { for (String resourceName : resourceNames) { if (!resourceTypeInfos.stream().anyMatch( e -> e.getName().equals(resourceName))) { @@ -886,4 +855,87 @@ private static void validateResourceTypes( } } } + + /** + * Get standard yarn resourceInformation from a string like 'resourceName= + * resourceValue'. + * + * @param resource The String with resource name and resource + * connected with '=' + * @return {@link ResourceInformation} + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + public static ResourceInformation getResourceInformation(String resource) { + if (resource == null || resource.isEmpty()) { + throw new IllegalArgumentException("Resource is null or empty. " + + "Please provide key=amount pairs"); + } + resource = resource.trim(); + if (!resource.matches(RES_PATTERN)) { + throw new IllegalArgumentException("\"" + resource + "\" is not a " + + "valid resource type/amount pair. " + + "Please provide key=amount pairs separated by commas."); + } + String[] splits = resource.split("="); + return getStandardYarnResource(splits); + } + + private static ResourceInformation getStandardYarnResource(String[] + resourcePair) { + if(resourcePair == null || resourcePair.length != 2) { + throw new IllegalArgumentException("Valid resource type/amount pair. " + + "The resourcePair is null or the length of resourcePair is not " + + " equal to 2"); + } + String resourceName = resourcePair[0].toLowerCase(); + String resourceValue = resourcePair[1]; + String units = ResourceUtils.getUnits(resourceValue); + + String valueWithoutUnit = resourceValue.substring(0, resourceValue.length() + - units.length()).trim(); + Long value; + try { + value = Long.valueOf(valueWithoutUnit); + } catch(NumberFormatException e) { + throw new IllegalArgumentException("Can't parse resource value from " + + valueWithoutUnit + ", please check the resource configuration: " + + resourceName + " " + resourceValue); + } + + // special handle memory-mb and memory + if (resourceName.equals(ResourceInformation.MEMORY_URI) || + resourceName.equals("memory")) { + resourceName = ResourceInformation.MEMORY_URI; + // Convert commandline unit to standard YARN unit. + if (!units.isEmpty()) { + if (units.toLowerCase().equals("m") || + units.toLowerCase().equals("mi")) { + units = "Mi"; + } else if (units.toLowerCase().equals("g") || + units.toLowerCase().equals("gi")) { + value = UnitsConversionUtil.convert("Gi", "Mi", value); + units = "Mi"; + } else { + throw new IllegalArgumentException("Acceptable units are M/Mi/G/Gi " + + "or empty"); + } + } + } + + // special handle gpu + if (resourceName.equals("gpu")) { + resourceName = ResourceInformation.GPU_URI; + } + + // special handle fpga + if (resourceName.equals("fpga")) { + resourceName = ResourceInformation.FPGA_URI; + } + + ResourceInformation resourceInformation = ResourceInformation + .newInstance(resourceName, units, value); + return resourceInformation; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 333e00c..794c46c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -674,7 +674,7 @@ public boolean init(String[] args) throws ParseException, IOException { "container_vcores", "-1")); containerResources = new HashMap<>(); if (cliParser.hasOption("container_resources")) { - Map resources = Client.parseResourcesString( + Map resources = ResourceUtils.parseResourcesString( cliParser.getOptionValue("container_resources")); for (Map.Entry entry : resources.entrySet()) { containerResources.put(entry.getKey(), entry.getValue()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 1666325..8e8a83f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -92,7 +92,6 @@ import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.DockerClientConfigHandler; -import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @@ -471,7 +470,8 @@ public boolean init(String[] args) throws ParseException { Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1")); if (cliParser.hasOption("master_resources")) { Map masterResources = - parseResourcesString(cliParser.getOptionValue("master_resources")); + ResourceUtils.parseResourcesString( + cliParser.getOptionValue("master_resources")); for (Map.Entry entry : masterResources.entrySet()) { if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { amMemory = entry.getValue(); @@ -545,7 +545,8 @@ public boolean init(String[] args) throws ParseException { Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1")); if (cliParser.hasOption("container_resources")) { Map resources = - parseResourcesString(cliParser.getOptionValue("container_resources")); + ResourceUtils.parseResourcesString( + cliParser.getOptionValue("container_resources")); for (Map.Entry entry : resources.entrySet()) { if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) { containerMemory = entry.getValue(); @@ -1304,39 +1305,4 @@ private void validateResourceTypes(Iterable resourceNames, } } } - - static Map parseResourcesString(String resourcesStr) { - Map resources = new HashMap<>(); - - // Ignore the grouping "[]" - if (resourcesStr.startsWith("[")) { - resourcesStr = resourcesStr.substring(1); - } - if (resourcesStr.endsWith("]")) { - resourcesStr = resourcesStr.substring(0, resourcesStr.length()); - } - - for (String resource : resourcesStr.trim().split(",")) { - resource = resource.trim(); - if (!resource.matches("^[^=]+=\\d+\\s?\\w*$")) { - throw new IllegalArgumentException("\"" + resource + "\" is not a " + - "valid resource type/amount pair. " + - "Please provide key=amount pairs separated by commas."); - } - String[] splits = resource.split("="); - String key = splits[0], value = splits[1]; - String units = ResourceUtils.getUnits(value); - String valueWithoutUnit = value.substring( - 0, value.length() - units.length()).trim(); - Long resourceValue = Long.valueOf(valueWithoutUnit); - if (!units.isEmpty()) { - resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue); - } - if (key.equals("memory")) { - key = ResourceInformation.MEMORY_URI; - } - resources.put(key, resourceValue); - } - return resources; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 4764147..d1e4dec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -26,6 +26,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; + +import java.util.Map; /** * Resources is a computation class which provides a set of apis to do @@ -226,6 +229,22 @@ public static Resource unbounded() { return UNBOUNDED; } + private static void refreshResourceUNBOUNDEDandNONE() { + if(UNBOUNDED instanceof FixedValueResource) { + ((FixedValueResource) UNBOUNDED).initResourceMap(); + } + if(NONE instanceof FixedValueResource) { + ((FixedValueResource) NONE).initResourceMap(); + } + } + + public static void refreshResourcesFromMap( + Map resourceInformationMap) { + ResourceUtils.initializeResourcesFromResourceInformationMap( + resourceInformationMap); + refreshResourceUNBOUNDEDandNONE(); + } + public static Resource clone(Resource res) { return Resource.newInstance(res); } @@ -498,7 +517,12 @@ public static Resource componentwiseMin(Resource lhs, Resource rhs) { try { ResourceInformation rhsValue = rhs.getResourceInformation(i); ResourceInformation lhsValue = lhs.getResourceInformation(i); - ResourceInformation outInfo = lhsValue.getValue() < rhsValue.getValue() + long lValue = lhsValue.getValue(); + long rValue = rhsValue.getUnits().equals(lhsValue.getUnits()) + ? rhsValue.getValue() + : UnitsConversionUtil.convert(rhsValue.getUnits(), + lhsValue.getUnits(), rhsValue.getValue()); + ResourceInformation outInfo = lValue < rValue ? lhsValue : rhsValue; ret.setResourceInformation(i, outInfo); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java index f7ec4f8..0d6dcc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.junit.After; import org.junit.Assert; @@ -404,7 +405,7 @@ public void testResourceUnitParsing() throws Exception { ResourceUtils.getResourcesTypeInfo()); Assert.assertEquals(Resources.createResource(20 * 1024, 3), res); - res = ResourceUtils.createResourceFromString("memory=20M,vcores=3", + res = ResourceUtils.createResourceFromString("[memory=20M,vcores=3]", ResourceUtils.getResourcesTypeInfo()); Assert.assertEquals(Resources.createResource(20, 3), res); @@ -412,7 +413,7 @@ public void testResourceUnitParsing() throws Exception { ResourceUtils.getResourcesTypeInfo()); Assert.assertEquals(Resources.createResource(20, 3), res); - res = ResourceUtils.createResourceFromString("memory-mb=20,vcores=3", + res = ResourceUtils.createResourceFromString("[memory-mb=20,vcores=3]", ResourceUtils.getResourcesTypeInfo()); Assert.assertEquals(Resources.createResource(20, 3), res); @@ -420,21 +421,20 @@ public void testResourceUnitParsing() throws Exception { ResourceUtils.getResourcesTypeInfo()); Assert.assertEquals(Resources.createResource(20, 3), res); - res = ResourceUtils.createResourceFromString("memory-mb=20G,vcores=3", + res = ResourceUtils.createResourceFromString("[memory-mb=20G,vcores=3]", ResourceUtils.getResourcesTypeInfo()); Assert.assertEquals(Resources.createResource(20 * 1024, 3), res); - // W/o unit for memory means bits, and 20 bits will be rounded to 0 res = ResourceUtils.createResourceFromString("memory=20,vcores=3", ResourceUtils.getResourcesTypeInfo()); - Assert.assertEquals(Resources.createResource(0, 3), res); + Assert.assertEquals(Resources.createResource(20, 3), res); // Test multiple resources List resTypes = new ArrayList<>( ResourceUtils.getResourcesTypeInfo()); resTypes.add(ResourceTypeInfo.newInstance(ResourceInformation.GPU_URI, "")); ResourceUtils.reinitializeResources(resTypes); - res = ResourceUtils.createResourceFromString("memory=2G,vcores=3,gpu=0", + res = ResourceUtils.createResourceFromString("[memory=2G,vcores=3,gpu=0]", resTypes); Assert.assertEquals(2 * 1024, res.getMemorySize()); Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI)); @@ -450,7 +450,7 @@ public void testResourceUnitParsing() throws Exception { Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI)); res = ResourceUtils.createResourceFromString( - "memory=2G,vcores=3,yarn.io/gpu=0", resTypes); + "[memory=2G,vcores=3,yarn.io/gpu=0]", resTypes); Assert.assertEquals(2 * 1024, res.getMemorySize()); Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI)); @@ -459,7 +459,39 @@ public void testResourceUnitParsing() throws Exception { Assert.assertEquals(2 * 1024, res.getMemorySize()); Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI)); - // TODO, add more negative tests. + //negative tests + try { + res = ResourceUtils.createResourceFromString("[memory-mb=, vcores=3, " + + "gpu=5]", resTypes); + } catch(IllegalArgumentException e) { + Assert.assertEquals("\"memory-mb=\" is not a valid resource type/amount" + + " pair. Please provide key=amount pairs separated by commas.", + e.getMessage()); + } + + try { + res = ResourceUtils.createResourceFromString("memory-mb=200G,vcores=3,", + resTypes); + } catch(IllegalArgumentException e) { + Assert.assertEquals("Resource is null or empty. Please provide " + + "key=amount pairs", e.getMessage()); + } + + try { + res = ResourceUtils.createResourceFromString("memory-mb=200G, !vcores=3", + resTypes); + } catch(ResourceNotFoundException e) { + Assert.assertEquals("Unknown resource: !vcores", e.getMessage()); + } + + try { + res = ResourceUtils.createResourceFromString("memory-mb=200Gvcores=3", + resTypes); + } catch(IllegalArgumentException e) { + Assert.assertEquals("\"memory-mb=200Gvcores=3\" is not a valid resource" + + " type/amount pair. Please provide key=amount pairs separated by " + + "commas.", e.getMessage()); + } } public static String setupResourceTypes(Configuration conf, String filename) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index caa88cf..44d8cc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -25,7 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Arrays; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; @@ -57,7 +59,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; @@ -93,7 +94,6 @@ final ResourceCalculator resourceCalculator; Set accessibleLabels; - Set resourceTypes; final RMNodeLabelsManager labelManager; String defaultLabelExpression; private String multiNodeSortingPolicyName = null; @@ -340,10 +340,6 @@ protected void setupQueueConfigs(Resource clusterResource, this.defaultLabelExpression = configuration.getDefaultNodeLabelExpression( getQueuePath()); - this.resourceTypes = new HashSet(); - for (AbsoluteResourceType type : AbsoluteResourceType.values()) { - resourceTypes.add(type.toString().toLowerCase()); - } // inherit from parent if labels not set if (this.accessibleLabels == null && parent != null) { @@ -494,12 +490,14 @@ protected void updateConfigurableResourceRequirement(String queuePath, Resource clusterResource) { CapacitySchedulerConfiguration conf = csContext.getConfiguration(); Set configuredNodelabels = conf.getConfiguredNodeLabels(queuePath); + Set resources = Arrays.stream(clusterResource.getResources()).map(x + -> x.getName()).collect(Collectors.toSet()); for (String label : configuredNodelabels) { Resource minResource = conf.getMinimumResourceRequirement(label, - queuePath, resourceTypes); + queuePath, resources); Resource maxResource = conf.getMaximumResourceRequirement(label, - queuePath, resourceTypes); + queuePath, resources); if (LOG.isDebugEnabled()) { LOG.debug("capacityConfigType is '" + capacityConfigType @@ -914,8 +912,8 @@ private Resource getCurrentLimitResource(String nodePartition, Resource queueMaxResource = getQueueMaxResource(nodePartition); - return Resources.min(resourceCalculator, clusterResource, - queueMaxResource, currentResourceLimits.getLimit()); + return Resources.componentwiseMin(queueMaxResource, + currentResourceLimits.getLimit()); } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { // When we doing non-exclusive resource allocation, maximum capacity of // all queues on this label equals to total resource with the label. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index ead1f57..4cd0d9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; -import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -72,6 +71,9 @@ import java.util.Set; import java.util.StringTokenizer; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI; + public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { private static final Log LOG = @@ -358,13 +360,6 @@ private static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE); - /** - * Different resource types supported. - */ - public enum AbsoluteResourceType { - MEMORY, VCORES; - } - AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); public CapacitySchedulerConfiguration() { @@ -2044,10 +2039,18 @@ private void updateMinMaxResourceToConf(String label, String queue, StringBuilder resourceString = new StringBuilder(); resourceString - .append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "=" - + resource.getMemorySize() + "," - + AbsoluteResourceType.VCORES.toString().toLowerCase() + "=" - + resource.getVirtualCores() + "]"); + .append("[" + MEMORY_URI + "=" + resource.getMemorySize() + "," + + VCORES_URI + "=" + resource.getVirtualCores()); + ResourceInformation[] resourceInformations = resource.getResources(); + if(resourceInformations.length > 2) { + for (int i = 2; i < resourceInformations.length; i++) { + resourceString.append(","); + resourceString.append(resourceInformations[i].getName()); + resourceString.append("="); + resourceString.append(resourceInformations[i].getValue()); + } + } + resourceString.append("]"); String prefix = getQueuePrefix(queue) + type; if (!label.isEmpty()) { @@ -2083,11 +2086,10 @@ private Resource internalGetLabeledResourceRequirementForQueue(String queue, subGroup = subGroup.substring(1, subGroup.length() - 1); for (String kvPair : subGroup.trim().split(",")) { - String[] splits = kvPair.split("="); - - // Ensure that each sub string is key value pair separated by '='. - if (splits != null && splits.length > 1) { - updateResourceValuesFromConfig(resourceTypes, resource, splits); + try { + updateResourceValuesFromConfig(resourceTypes, resource, kvPair); + } catch (IllegalArgumentException e) { + LOG.warn(e.getMessage(), e); } } } @@ -2105,36 +2107,23 @@ private Resource internalGetLabeledResourceRequirementForQueue(String queue, } private void updateResourceValuesFromConfig(Set resourceTypes, - Resource resource, String[] splits) { + Resource resource, String resourceValue) { + + ResourceInformation resourceInformation = ResourceUtils + .getResourceInformation(resourceValue); + String resourceName = resourceInformation.getName(); // If key is not a valid type, skip it. - if (!resourceTypes.contains(splits[0])) { + if (!resourceTypes.contains(resourceName)) { return; } - String units = getUnits(splits[1]); - Long resourceValue = Long - .valueOf(splits[1].substring(0, splits[1].length() - units.length())); - - // Convert all incoming units to MB if units is configured. - if (!units.isEmpty()) { - resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue); - } - - // map it based on key. - AbsoluteResourceType resType = AbsoluteResourceType - .valueOf(StringUtils.toUpperCase(splits[0].trim())); - switch (resType) { - case MEMORY : - resource.setMemorySize(resourceValue); - break; - case VCORES : - resource.setVirtualCores(resourceValue.intValue()); - break; - default : - resource.setResourceInformation(splits[0].trim(), ResourceInformation - .newInstance(splits[0].trim(), units, resourceValue)); - break; + if(resourceName.equals(MEMORY_URI)) { + resource.setMemorySize(resourceInformation.getValue()); + } else if(resourceName.equals(VCORES_URI)) { + resource.setVirtualCores((int)resourceInformation.getValue()); + } else { + resource.setResourceInformation(resourceName, resourceInformation); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index cd7518f..b011998 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -996,8 +996,14 @@ private void calculateEffectiveResourcesAndCapacity(String label, // childMaxResource is empty, consider parent's max resource alone. Resource childMaxResource = childQueue.getQueueResourceQuotas() .getConfiguredMaxResource(label); - Resource effMaxResource = Resources.min(resourceCalculator, - resourceByLabel, childMaxResource.equals(Resources.none()) + + // Use Resources.componentwiseMin, not Resources.min. When + // parentMaxRes have available GPUs but childMaxResource doesn't, + // parentMaxRes may be smaller than childMaxResource using + // Resources.min, if it has less memory. In this case, effMaxResource + // would have available GPUs which it shouldn't have. + Resource effMaxResource = Resources.componentwiseMin( + childMaxResource.equals(Resources.none()) ? parentMaxRes : childMaxResource, parentMaxRes); @@ -1067,8 +1073,12 @@ private Resource getMinResourceNormalized(String name, Map effect dResourceInformation.getUnits(), nResourceInformation.getUnits(), dResourceInformation.getValue()); if (dValue != 0) { + // If there are resources added to cluster, queue configured min + // resource should not be increased automatically for absolute + // configuration + float minRatio = Math.min((float) nValue / dValue, 1); effectiveMinRatioPerResource.put(nResourceInformation.getName(), - (float) nValue / dValue); + minRatio); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java index 7fc2a53..14d23c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterServiceTestBase.java @@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -479,7 +479,11 @@ public void testValidateRequestCapacityAgainstMinMaxAllocation() throws Exception { Map riMap = initializeMandatoryResources(); - ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + //Resources.NONE and Resources.UNBOUNDED should be refreshed with the map, + //Otherwise, Fair scheduler will use the wrong Resources.UNBOUNDED + //to compute the max resources of queues + Resources.refreshResourcesFromMap(riMap); final YarnConfiguration yarnConf = createYarnConfig(); @@ -536,7 +540,10 @@ public void testRequestCapacityMinMaxAllocationForResourceTypes() ResourceInformation.VCORES.getUnits(), 0, 4); riMap.put(CUSTOM_RES, res1); - ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + //Resources.NONE and Resources.UNBOUNDED should be refreshed with the map, + //Otherwise, Fair scheduler will use the wrong Resources.UNBOUNDED + //to compute the max resources of queues + Resources.refreshResourcesFromMap(riMap); final YarnConfiguration yarnConf = createYarnConfig(); // Don't reset resource types since we have already configured resource diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 04a9a75..f44c91c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -184,7 +184,16 @@ public AllocateResponse schedule() throws Exception { public void addContainerToBeReleased(ContainerId containerId) { releases.add(containerId); } - + + public AllocateResponse allocate( + Resource containerResource, int numContainers, + List containerReleases) throws Exception { + List reqs = + createReq(null, containerResource, 1, numContainers, + null, -1); + return allocate(reqs, containerReleases); + } + public AllocateResponse allocate( String host, int memory, int numContainers, List releases) throws Exception { @@ -251,6 +260,37 @@ public AllocateResponse allocate( reqs.add(offRackReq); return reqs; } + + private List createReq(String[] hosts, + Resource containerResource, int priority, int containers, + String labelExpression, long allocationRequestId) throws Exception { + List reqs = new ArrayList(); + if (hosts != null) { + for (String host : hosts) { + // only add host/rack request when asked host isn't ANY + if (!host.equals(ResourceRequest.ANY)) { + ResourceRequest hostReq = + createResourceReq(host, containerResource, priority, containers, + labelExpression, ExecutionTypeRequest.newInstance()); + hostReq.setAllocationRequestId(allocationRequestId); + reqs.add(hostReq); + ResourceRequest rackReq = + createResourceReq("/default-rack", containerResource, priority, + containers, labelExpression, + ExecutionTypeRequest.newInstance()); + rackReq.setAllocationRequestId(allocationRequestId); + reqs.add(rackReq); + } + } + } + + ResourceRequest offRackReq = createResourceReq(ResourceRequest.ANY, + containerResource, priority, containers, labelExpression, + ExecutionTypeRequest.newInstance()); + offRackReq.setAllocationRequestId(allocationRequestId); + reqs.add(offRackReq); + return reqs; + } public ResourceRequest createResourceReq(String resource, int memory, int priority, int containers) throws Exception { @@ -266,21 +306,27 @@ public ResourceRequest createResourceReq(String resource, int memory, public ResourceRequest createResourceReq(String resource, int memory, int priority, int containers, String labelExpression, ExecutionTypeRequest executionTypeRequest) throws Exception { + Resource capability = Records.newRecord(Resource.class); + capability.setMemorySize(memory); + return createResourceReq(resource, capability, priority, containers, + labelExpression, executionTypeRequest); + } + + private ResourceRequest createResourceReq(String resource, + Resource capability, int priority, int containers, String labelExpression, + ExecutionTypeRequest executionTypeRequest) throws Exception { ResourceRequest req = Records.newRecord(ResourceRequest.class); req.setResourceName(resource); req.setNumContainers(containers); Priority pri = Records.newRecord(Priority.class); pri.setPriority(priority); req.setPriority(pri); - Resource capability = Records.newRecord(Resource.class); - capability.setMemorySize(memory); req.setCapability(capability); if (labelExpression != null) { req.setNodeLabelExpression(labelExpression); } req.setExecutionTypeRequest(executionTypeRequest); return req; - } public ResourceRequest createResourceReq(String host, Resource cap, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceFair.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceFair.java index 3ff3bcd..43dadfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceFair.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterServiceFair.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Test; import java.util.Collections; @@ -78,7 +78,10 @@ public void testRequestCapacityMinMaxAllocationWithDifferentUnits() ResourceInformation.newInstance(CUSTOM_RES, "G", 0, 4); riMap.put(CUSTOM_RES, res1); - ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + //Resources.NONE and Resources.UNBOUNDED should be refreshed with the map, + //Otherwise, Fair scheduler will use the wrong Resources.UNBOUNDED + //to compute the max resources of queues + Resources.refreshResourcesFromMap(riMap); final YarnConfiguration yarnConf = createYarnConfig(); // Don't reset resource types since we have already configured resource diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java index eaa966a..40d0682 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java @@ -98,7 +98,12 @@ public RMNodeLabelsManager createNodeLabelManager() { // For async mode, the number of alloc might be bigger than 1 Assert.assertTrue(csMetrics.getNumOfAllocates() > 0); // But there will be only 2 successful commit (1 AM + 1 task) - Assert.assertEquals(2, csMetrics.getNumOfCommitSuccess()); + try { + GenericTestUtils.waitFor(() + -> csMetrics.getNumOfCommitSuccess() == 2, 100, 3000); + } catch(TimeoutException e) { + Assert.fail("CS metrics not updated on resource commit."); + } } @After diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java index 298e1ab..80a6991 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -27,9 +28,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.Assert; import org.junit.Test; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RESOURCE_TYPES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS; + public class TestAbsoluteResourceConfiguration { private static final int GB = 1024; @@ -51,6 +62,24 @@ private static final String QUEUEA2_FULL = QUEUEA_FULL + "." + QUEUEA2; private static final String QUEUEB1_FULL = QUEUEB_FULL + "." + QUEUEB1; + private static HashMap queueCMinResources = new HashMap(); + private static HashMap queueCMaxResources = new HashMap(); + private static HashMap queueCReducedResources = new HashMap(); + + //add GPU and FPGA to resource types + static { + queueCMinResources.put(GPU_URI, 2L); + queueCMinResources.put(FPGA_URI, 2L); + queueCMaxResources.put(GPU_URI, 4L); + queueCMaxResources.put(FPGA_URI, 4L); + queueCReducedResources.put(GPU_URI, 1L); + queueCReducedResources.put(FPGA_URI, 1L); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.set(RESOURCE_TYPES, GPU_URI + "," + FPGA_URI); + ResourceUtils.resetResourceTypes(csConf); + } + private static final Resource QUEUE_A_MINRES = Resource.newInstance(100 * GB, 10); private static final Resource QUEUE_A_MAXRES = Resource.newInstance(200 * GB, @@ -66,21 +95,27 @@ private static final Resource QUEUE_B_MAXRES = Resource.newInstance(150 * GB, 30); private static final Resource QUEUE_C_MINRES = Resource.newInstance(50 * GB, - 10); + 10, queueCMinResources); private static final Resource QUEUE_C_MAXRES = Resource.newInstance(150 * GB, - 20); + 20, queueCMaxResources); + private static final Resource QUEUE_C_MAXRES_WITH_MEMORY_CPUS = + Resource.newInstance(150 * GB, 20); private static final Resource QUEUEA_REDUCED = Resource.newInstance(64000, 6); private static final Resource QUEUEB_REDUCED = Resource.newInstance(32000, 6); - private static final Resource QUEUEC_REDUCED = Resource.newInstance(32000, 6); + private static final Resource QUEUEC_REDUCED = Resource.newInstance(32000, + 6, queueCReducedResources); private static final Resource QUEUEMAX_REDUCED = Resource.newInstance(128000, 20); + private static final Resource QUEUECMAX_REDUCED = Resource.newInstance(128000, + 20, queueCReducedResources); private static Set resourceTypes = new HashSet<>( - Arrays.asList("memory", "vcores")); + Arrays.asList(MEMORY_URI, VCORES_URI, GPU_URI, FPGA_URI)); private CapacitySchedulerConfiguration setupSimpleQueueConfiguration( boolean isCapacityNeeded) { - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + CapacitySchedulerConfiguration csConf = + setupExtendedResourceConfiguration(); csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{QUEUEA, QUEUEB, QUEUEC}); @@ -96,7 +131,8 @@ private CapacitySchedulerConfiguration setupSimpleQueueConfiguration( private CapacitySchedulerConfiguration setupComplexQueueConfiguration( boolean isCapacityNeeded) { - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + CapacitySchedulerConfiguration csConf = + setupExtendedResourceConfiguration(); csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{QUEUEA, QUEUEB, QUEUEC}); csConf.setQueues(QUEUEA_FULL, new String[]{QUEUEA1, QUEUEA2}); @@ -146,6 +182,18 @@ private CapacitySchedulerConfiguration setupComplexMinMaxResourceConfig( return csConf; } + private CapacitySchedulerConfiguration + setupExtendedResourceConfiguration() { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.set(RESOURCE_TYPES, GPU_URI + "," + FPGA_URI); + csConf.set(RESOURCE_CALCULATOR_CLASS, + "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"); + csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + return csConf; + } + @Test public void testSimpleMinMaxResourceConfigurartionPerQueue() { @@ -180,9 +228,6 @@ public void testEffectiveMinMaxResourceConfigurartionPerQueue() false); setupMinMaxResourceConfiguration(csConf); - csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); - @SuppressWarnings("resource") MockRM rm = new MockRM(csConf); rm.start(); @@ -220,11 +265,13 @@ public void testEffectiveMinMaxResourceConfigurartionPerQueue() Assert.assertEquals("Min resource configured for QUEUEC is not correct", QUEUE_C_MINRES, qC.queueResourceQuotas.getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_C_MAXRES, + qC.queueResourceQuotas.getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEC is not correct", QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_C_MAXRES_WITH_MEMORY_CPUS, + qC.queueResourceQuotas.getEffectiveMaxResource()); rm.stop(); } @@ -249,8 +296,6 @@ public void testSimpleValidateAbsoluteResourceConfig() throws Exception { CapacitySchedulerConfiguration csConf = setupSimpleQueueConfiguration( false); setupMinMaxResourceConfiguration(csConf); - csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); @SuppressWarnings("resource") MockRM rm = new MockRM(csConf); @@ -313,11 +358,13 @@ public void testSimpleValidateAbsoluteResourceConfig() throws Exception { Assert.assertEquals("Min resource configured for QUEUEC is not correct", QUEUE_C_MINRES, qC.queueResourceQuotas.getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_C_MAXRES, + qC.queueResourceQuotas.getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEC is not correct", QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_C_MAXRES_WITH_MEMORY_CPUS, + qC.queueResourceQuotas.getEffectiveMaxResource()); // 3. Create a new config and make sure one queue's min resource is more // than its max resource configured. @@ -382,8 +429,6 @@ public void testComplexValidateAbsoluteResourceConfig() throws Exception { CapacitySchedulerConfiguration csConf = setupComplexQueueConfiguration( false); setupComplexMinMaxResourceConfig(csConf); - csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); @SuppressWarnings("resource") MockRM rm = new MockRM(csConf); @@ -441,16 +486,21 @@ public void testEffectiveResourceAfterReducingClusterResource() false); setupMinMaxResourceConfiguration(csConf); - csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); - @SuppressWarnings("resource") MockRM rm = new MockRM(csConf); rm.start(); + HashMap node1Resources = new HashMap(); + node1Resources.put(GPU_URI, 3L); + node1Resources.put(FPGA_URI, 3L); + HashMap node2Resources = new HashMap(); + node2Resources.put(GPU_URI, 1L); + node2Resources.put(FPGA_URI, 1L); + Resource node1Resource = Resource.newInstance(125 * GB, 20, node1Resources); + Resource node2Resource = Resource.newInstance(125 * GB, 20, node2Resources); // Add few nodes - MockNM nm1 = rm.registerNode("127.0.0.1:1234", 125 * GB, 20); - rm.registerNode("127.0.0.2:1234", 125 * GB, 20); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", node1Resource); + rm.registerNode("127.0.0.2:1234", node2Resource); // Get queue object to verify min/max resource configuration. CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); @@ -507,7 +557,7 @@ public void testEffectiveResourceAfterReducingClusterResource() Assert.assertEquals("Effective Min resource for QUEUEC is not correct", QUEUEC_REDUCED, qC.queueResourceQuotas.getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEC is not correct", - QUEUEMAX_REDUCED, qC.queueResourceQuotas.getEffectiveMaxResource()); + QUEUECMAX_REDUCED, qC.queueResourceQuotas.getEffectiveMaxResource()); rm.stop(); } @@ -520,16 +570,21 @@ public void testEffectiveResourceAfterIncreasingClusterResource() false); setupComplexMinMaxResourceConfig(csConf); - csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); - @SuppressWarnings("resource") MockRM rm = new MockRM(csConf); rm.start(); + HashMap node1Resources = new HashMap(); + node1Resources.put(GPU_URI, 3L); + node1Resources.put(FPGA_URI, 3L); + HashMap node2Resources = new HashMap(); + node2Resources.put(GPU_URI, 1L); + node2Resources.put(FPGA_URI, 1L); + Resource node1Resource = Resource.newInstance(125 * GB, 20, node1Resources); + Resource node2Resource = Resource.newInstance(125 * GB, 20, node2Resources); // Add few nodes - rm.registerNode("127.0.0.1:1234", 125 * GB, 20); - rm.registerNode("127.0.0.2:1234", 125 * GB, 20); + rm.registerNode("127.0.0.1:1234", node1Resource); + rm.registerNode("127.0.0.2:1234", node2Resource); // Get queue object to verify min/max resource configuration. CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); @@ -590,7 +645,7 @@ public void testEffectiveResourceAfterIncreasingClusterResource() QUEUE_B_MAXRES, qB1.queueResourceQuotas.getEffectiveMaxResource()); // add new NM. - rm.registerNode("127.0.0.3:1234", 125 * GB, 20); + rm.registerNode("127.0.0.3:1234", Resource.newInstance(node1Resource)); // There will be no change in effective resource when nodes are added. // Since configured capacity was based on initial node capacity, a diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 9f01a17..722d05c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Set; @@ -65,12 +66,17 @@ import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RESOURCE_TYPES; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAX_ASSIGN_PER_HEARTBEAT; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS; public class TestContainerAllocation { @@ -224,6 +230,156 @@ public void testNormalContainerAllocationWhenDNSUnavailable() throws Exception{ Assert.assertEquals(1, containers.size()); } + @Test + public void testContainerAllocationForAbsoluteResourceConfig() + throws Exception{ + /** + * Test case: There are two queues in the cluster, a and b. Queue a has gpu + * but queue b doesn't have. + * + * No matter cluster resource is added or reduced, queue a supports + * applications with gpu request, but queue b can't, + */ + CapacitySchedulerConfiguration csConf = + setupAbsoluteResourceConfiguration(); + MockRM rm = new MockRM(csConf); + rm.start(); + + // Register nodes + HashMap nodeResources = new HashMap(); + nodeResources.put(GPU_URI, 3L); + Resource node1Resource = Resource.newInstance(10 * GB, 5, nodeResources); + Resource node2Resource = Resource.newInstance(10 * GB, 5, nodeResources); + // Add few nodes + MockNM nm1 = rm.registerNode("127.0.0.1:1234", node1Resource); + MockNM nm2 = rm.registerNode("127.0.0.2:1234", node2Resource); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + // Wait for nodemanager to register + int waitCount = 20, size; + while (rm.getRMContext().getRMNodes().size() != 2 + && waitCount-- > 0) { + size = rm.getRMContext().getRMNodes().size(); + LOG.info("Waiting for node managers to register : " + size); + Thread.sleep(100); + } + Assert.assertEquals(2, rm.getRMContext().getRMNodes().size()); + + // Request a gpu resource from queue a + HashMap containerResources = new HashMap(); + containerResources.put(GPU_URI, 2L); + Resource containerResource = Resource.newInstance(1 * GB, 1, + containerResources); + RMApp app1 = rm.submitApp(200, "a"); + allocateWithSufficientResource(rm, nm1, containerResource, app1); + + // Request a gpu resource from queue b + RMApp app2 = rm.submitApp(200, "b"); + allocateWithInsufficientResource(rm, nm1, containerResource, app2); + + // Reduce cluster resources + rm.unRegisterNode(nm2); + RMApp app3 = rm.submitApp(200, "a"); + allocateWithSufficientResource(rm, nm1, containerResource, app3); + + RMApp app4 = rm.submitApp(200, "b"); + allocateWithInsufficientResource(rm, nm1, containerResource, app4); + + // Add cluster resources + MockNM nm3 = rm.registerNode("127.0.0.2:1234", node1Resource); + MockNM nm4 = rm.registerNode("127.0.0.2:1234", node1Resource); + + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + + RMApp app5 = rm.submitApp(200, "a"); + allocateWithSufficientResource(rm, nm1, containerResource, app5); + + RMApp app6 = rm.submitApp(200, "b"); + allocateWithInsufficientResource(rm, nm1, containerResource, app6); + + rm.stop(); + } + + private CapacitySchedulerConfiguration setupAbsoluteResourceConfiguration() { + YarnConfiguration yarnConf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(yarnConf); + csConf.set(RESOURCE_TYPES, GPU_URI); + csConf.set(RESOURCE_CALCULATOR_CLASS, + "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"); + csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + ResourceUtils.resetResourceTypes(csConf); + + HashMap queueAMinResources = new HashMap(); + HashMap queueAMaxResources = new HashMap(); + queueAMinResources.put(GPU_URI, 2L); + queueAMaxResources.put(GPU_URI, 6L); + Resource queueAMin = Resource.newInstance(2 * GB, 2, + queueAMinResources); + Resource queueAMax = Resource.newInstance(4 * GB, 4, + queueAMaxResources); + + Resource queueBMin = Resource.newInstance(2 * GB, 2); + Resource queueBMax = Resource.newInstance(16 * GB, 6); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b"}); + + csConf.setMinimumResourceRequirement("", + CapacitySchedulerConfiguration.ROOT + ".a", queueAMin); + csConf.setMaximumResourceRequirement("", + CapacitySchedulerConfiguration.ROOT + ".a", queueAMax); + csConf.setMinimumResourceRequirement("", + CapacitySchedulerConfiguration.ROOT + ".b", queueBMin); + csConf.setMaximumResourceRequirement("", + CapacitySchedulerConfiguration.ROOT + ".b", queueBMax); + return csConf; + } + + private void allocateWithInsufficientResource(MockRM rm, MockNM nm, + Resource containerResource, RMApp app) throws Exception { + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm); + + // Request a container. + AllocateResponse allocateResponse = am.allocate(containerResource, 1, + new ArrayList()); + nm.nodeHeartbeat(true); + int waitCounter = 20; + LOG.info("heartbeating nm1"); + while (allocateResponse.getAllocatedContainers().size() < 1 + && waitCounter-- > 0) { + LOG.info("Waiting for containers to be created for app1..."); + Thread.sleep(500); + allocateResponse = am.schedule(); + } + LOG.info("received container : " + + allocateResponse.getAllocatedContainers().size()); + + // No container should be allocated. + // Internally it should not been reserved. + Assert.assertTrue(allocateResponse.getAllocatedContainers().size() == 0); + rm.killApp(app.getApplicationId()); + nm.nodeHeartbeat(true); + } + + private void allocateWithSufficientResource(MockRM rm, MockNM nm, + Resource containerResource, RMApp app) throws Exception { + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm); + + // Request a container. + am.allocate(containerResource, 1, new ArrayList()); + ContainerId containerId = + ContainerId.newContainerId(am.getApplicationAttemptId(), 2); + nm.nodeHeartbeat(true); + Assert.assertTrue(rm.waitForState(nm, containerId, + RMContainerState.ALLOCATED)); + rm.killApp(app.getApplicationId()); + nm.nodeHeartbeat(true); + } + // This is to test whether LogAggregationContext is passed into // container tokens correctly @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index a960d89..1c33486 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -1013,8 +1013,10 @@ public void testAbsoluteResourceWithChangeInClusterResource() root.updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); - Resource QUEUE_B_RESOURCE_70PERC = Resource.newInstance(7 * 1024, 27); - Resource QUEUE_A_RESOURCE_30PERC = Resource.newInstance(3 * 1024, 12); + // For absolute configuration, resource quota shouldn't be increased + // automatically, when some kind of resource of cluster is increased. + Resource QUEUE_B_RESOURCE_70PERC = Resource.newInstance(7 * 1024, 22); + Resource QUEUE_A_RESOURCE_30PERC = Resource.newInstance(3 * 1024, 10); assertEquals(a.getQueueResourceQuotas().getConfiguredMinResource(), QUEUE_A_RESOURCE); assertEquals(b.getQueueResourceQuotas().getConfiguredMinResource(), -- 1.8.3.1