diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index 20b64bd..284f1d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -750,4 +750,70 @@ public static void areMandatoryResourcesAvailable(Resource res) { return info; } + + /** + * Get standard yarn resource from a string array of resource name and + * resource value. + * + * @param resourcePair an array of two elements, resource name and resource + * value + * @return {@link ResourceInformation} + */ + public static ResourceInformation getStandardYarnResource(String[] + resourcePair) { + if(resourcePair.length != 2) { + throw new IllegalArgumentException("valid resource type/amount pair. " + + "Please provide key=amount pairs separated by commas."); + } + String resourceName = resourcePair[0].toLowerCase(); + String resourceValue = resourcePair[1]; + String units = ResourceUtils.getUnits(resourceValue); + + String valueWithoutUnit = resourceValue.substring(0, resourceValue.length() + - units.length()).trim(); + Long value; + try { + value = Long.valueOf(valueWithoutUnit); + } catch(NumberFormatException e) { + throw new IllegalArgumentException("Can't parse resource value from " + + valueWithoutUnit + ", please check the resource configuration: " + + resourceName + " " + resourceValue); + } + + // special handle memory-mb and memory + if (resourceName.equals(ResourceInformation.MEMORY_URI) || + resourceName.equals("memory")) { + resourceName = ResourceInformation.MEMORY_URI; + // Convert commandline unit to standard YARN unit. + if (!units.isEmpty()) { + if (units.toLowerCase().equals("m") || + units.toLowerCase().equals("mi")) { + units = "Mi"; + } else if (units.toLowerCase().equals("g") || + units.toLowerCase().equals("gi")) { + value = UnitsConversionUtil.convert("Gi", "Mi", + value); + units = "Mi"; + } else { + throw new IllegalArgumentException("Acceptable units are M/Mi/G/Gi " + + "or empty"); + } + } + } + + // special handle gpu + if (resourceName.equals("gpu")) { + resourceName = ResourceInformation.GPU_URI; + } + + // special handle fpga + if (resourceName.equals("fpga")) { + resourceName = ResourceInformation.FPGA_URI; + } + + ResourceInformation resourceInformation = ResourceInformation + .newInstance(resourceName, units, value); + return resourceInformation; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java index f3eee7c..eecabcb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/client/cli/CliUtils.java @@ -85,49 +85,10 @@ public static String replacePatternsInLaunchCommand(String specifiedCli, + "Please provide key=amount pairs separated by commas."); } String[] splits = resource.split("="); - String key = splits[0], value = splits[1]; - String units = ResourceUtils.getUnits(value); - - String valueWithoutUnit = value.substring(0, - value.length()- units.length()).trim(); - long resourceValue = Long.parseLong(valueWithoutUnit); - - // Convert commandline unit to standard YARN unit. - if (units.equals("M") || units.equals("m")) { - units = "Mi"; - } else if (units.equals("G") || units.equals("g")) { - units = "Gi"; - } else if (units.isEmpty()) { - // do nothing; - } else { - throw new IllegalArgumentException("Acceptable units are M/G or empty"); - } - - // special handle memory-mb and memory - if (key.equals(ResourceInformation.MEMORY_URI)) { - if (!units.isEmpty()) { - resourceValue = UnitsConversionUtil.convert(units, "Mi", - resourceValue); - } - } - - if (key.equals("memory")) { - key = ResourceInformation.MEMORY_URI; - resourceValue = UnitsConversionUtil.convert(units, "Mi", - resourceValue); - } - - // special handle gpu - if (key.equals("gpu")) { - key = ResourceInformation.GPU_URI; - } - - // special handle fpga - if (key.equals("fpga")) { - key = ResourceInformation.FPGA_URI; - } + ResourceInformation standardResource = ResourceUtils + .getStandardYarnResource(splits); - resources.put(key, resourceValue); + resources.put(standardResource.getName(), standardResource.getValue()); } return resources; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 2c9f9a3..5ab9450 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -25,7 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Arrays; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; @@ -56,7 +58,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; @@ -89,7 +90,6 @@ final ResourceCalculator resourceCalculator; Set accessibleLabels; - Set resourceTypes; final RMNodeLabelsManager labelManager; String defaultLabelExpression; private String multiNodeSortingPolicyName = null; @@ -336,10 +336,6 @@ protected void setupQueueConfigs(Resource clusterResource, this.defaultLabelExpression = configuration.getDefaultNodeLabelExpression( getQueuePath()); - this.resourceTypes = new HashSet(); - for (AbsoluteResourceType type : AbsoluteResourceType.values()) { - resourceTypes.add(type.toString().toLowerCase()); - } // inherit from parent if labels not set if (this.accessibleLabels == null && parent != null) { @@ -445,12 +441,14 @@ protected void updateConfigurableResourceRequirement(String queuePath, Resource clusterResource) { CapacitySchedulerConfiguration conf = csContext.getConfiguration(); Set configuredNodelabels = conf.getConfiguredNodeLabels(queuePath); + Set resources = Arrays.stream(clusterResource.getResources()).map(x + -> x.getName()).collect(Collectors.toSet()); for (String label : configuredNodelabels) { Resource minResource = conf.getMinimumResourceRequirement(label, - queuePath, resourceTypes); + queuePath, resources); Resource maxResource = conf.getMaximumResourceRequirement(label, - queuePath, resourceTypes); + queuePath, resources); if (LOG.isDebugEnabled()) { LOG.debug("capacityConfigType is '" + capacityConfigType diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 08380f5..cada842 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -355,13 +355,6 @@ private static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE); - /** - * Different resource types supported. - */ - public enum AbsoluteResourceType { - MEMORY, VCORES; - } - AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); public CapacitySchedulerConfiguration() { @@ -2041,10 +2034,18 @@ private void updateMinMaxResourceToConf(String label, String queue, StringBuilder resourceString = new StringBuilder(); resourceString - .append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "=" - + resource.getMemorySize() + "," - + AbsoluteResourceType.VCORES.toString().toLowerCase() + "=" - + resource.getVirtualCores() + "]"); + .append("[memory=" + resource.getMemorySize() + "," + + "vcores=" + resource.getVirtualCores()); + ResourceInformation[] resourceInformations = resource.getResources(); + if(resourceInformations.length > 2) { + for (int i = 2 ; i < resourceInformations.length ; i++) { + resourceString.append(","); + resourceString.append(resourceInformations[i].getName()); + resourceString.append("="); + resourceString.append(resourceInformations[i].getValue()); + } + } + resourceString.append("]"); String prefix = getQueuePrefix(queue) + type; if (!label.isEmpty()) { @@ -2104,34 +2105,16 @@ private Resource internalGetLabeledResourceRequirementForQueue(String queue, private void updateResourceValuesFromConfig(Set resourceTypes, Resource resource, String[] splits) { - // If key is not a valid type, skip it. - if (!resourceTypes.contains(splits[0])) { - return; - } - - String units = getUnits(splits[1]); - Long resourceValue = Long - .valueOf(splits[1].substring(0, splits[1].length() - units.length())); + ResourceInformation resourceInformation = ResourceUtils + .getStandardYarnResource(splits); + String resourceName = resourceInformation.getName(); - // Convert all incoming units to MB if units is configured. - if (!units.isEmpty()) { - resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue); - } - - // map it based on key. - AbsoluteResourceType resType = AbsoluteResourceType - .valueOf(StringUtils.toUpperCase(splits[0].trim())); - switch (resType) { - case MEMORY : - resource.setMemorySize(resourceValue); - break; - case VCORES : - resource.setVirtualCores(resourceValue.intValue()); - break; - default : - resource.setResourceInformation(splits[0].trim(), ResourceInformation - .newInstance(splits[0].trim(), units, resourceValue)); - break; + if(resourceName.equals(ResourceInformation.MEMORY_URI)) { + resource.setMemorySize(resourceInformation.getValue()); + } else if(resourceName.equals(ResourceInformation.VCORES_URI)) { + resource.setVirtualCores((int)resourceInformation.getValue()); + } else { + resource.setResourceInformation(resourceName, resourceInformation); } }