diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java index 8d7733453f8..d0346329c78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java @@ -85,8 +85,8 @@ protected Resource getMaximumAbsoluteResource(String queuePath, } @Override - protected boolean checkConfigTypeIsAbsoluteResource(String queuePath, - String label) { + public boolean checkConfigTypeIsAbsoluteResource(String queuePath, + String label) { return super.checkConfigTypeIsAbsoluteResource(csContext.getConfiguration() .getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()), label); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 250fcc716d6..6431b618f62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemode.AbstractQueueConfigMode; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.AccessControlException; @@ -68,6 +69,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -120,6 +122,10 @@ QueueResourceQuotas queueResourceQuotas; + AbstractQueueConfigMode queueConfigMode; + + protected boolean allowZeroCapacitySum = false; + // -1 indicates lifetime is disabled private volatile long maxApplicationLifetime = -1; @@ -130,11 +136,9 @@ private volatile boolean defaultAppLifetimeWasSpecifiedInConfig = false; public enum CapacityConfigType { - // FIXME, from what I can see, Percentage mode can almost apply to weighted - // and percentage mode at the same time, there's only small area need to be - // changed, we need to rename "PERCENTAGE" to "PERCENTAGE" and "WEIGHT" - NONE, PERCENTAGE, ABSOLUTE_RESOURCE + NONE, PERCENTAGE, ABSOLUTE_RESOURCE, WEIGHT }; + protected CapacityConfigType capacityConfigType = CapacityConfigType.NONE; @@ -393,7 +397,8 @@ protected void setupQueueConfigs(Resource clusterResource, // Also fetch minimum/maximum resource constraint for this queue if // configured. - capacityConfigType = CapacityConfigType.NONE; + queueConfigMode = AbstractQueueConfigMode.defineMode(this, csContext); + capacityConfigType = queueConfigMode.getConfigType(); updateConfigurableResourceRequirement(getQueuePath(), clusterResource); // Setup queue's maximumAllocation respecting the global setting @@ -523,6 +528,80 @@ private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) { } } + // Check weight configuration, throw exception when configuration is invalid + // return true when all children use weight mode. + public static CapacityConfigType getCapacityConfigurationTypeForQueues( + Collection queues) throws IOException { + // Do we have ANY queue set capacity in any labels? + boolean percentageIsSet = false; + + // Do we have ANY queue set weight in any labels? + boolean weightIsSet = false; + + // Do we have ANY queue set absolute in any labels? + boolean absoluteMinResSet = false; + + StringBuilder diagMsg = new StringBuilder(); + + for (CSQueue queue : queues) { + for (String nodeLabel : queue.getQueueCapacities().getExistingNodeLabels()) { + float capacityByLabel = queue.getQueueCapacities().getCapacity(nodeLabel); + if (capacityByLabel > 0) { + percentageIsSet = true; + } + float weightByLabel = queue.getQueueCapacities().getWeight(nodeLabel); + // By default weight is set to -1, so >= 0 is enough. + if (weightByLabel >= 0) { + weightIsSet = true; + diagMsg.append( + "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel + + " uses weight mode}. "); + } + if (!queue.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel) + .equals(Resources.none())) { + absoluteMinResSet = true; + // There's a special handling: when absolute resource is configured, + // capacity will be calculated (and set) for UI/metrics purposes, so + // when asboluteMinResource is set, unset percentage + percentageIsSet = false; + diagMsg.append( + "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel + + " uses absolute mode}. "); + } + if (percentageIsSet) { + diagMsg.append( + "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel + + " uses percentage mode}. "); + } + } + + // If we have mixed capacity, weight or absolute resource (any of the two) + // We will throw exception + // Root queue is an exception here, because by default root queue returns + // 100 as capacity no matter what. We should look into this case in the + // future. To avoid impact too many code paths, we don;t check root queue's + // config. + if (!queue.getQueuePath().equals( + CapacitySchedulerConfiguration.ROOT) && + (percentageIsSet ? 1 : 0) + (weightIsSet ? 1 : 0) + (absoluteMinResSet ? + 1 : + 0) > 1) { + throw new IOException("Parent queue '" + queue.getParent().getQueuePath() + + "' have children queue used mixed of " + + " weight mode, percentage and absolute mode, it is not allowed, please " + + "double check, details:" + diagMsg.toString()); + } + } + + if (weightIsSet || queues.isEmpty()) { + return CapacityConfigType.WEIGHT; + } else if (absoluteMinResSet) { + return CapacityConfigType.ABSOLUTE_RESOURCE; + } else { + return CapacityConfigType.PERCENTAGE; + } + } + private Map getUserWeightsFromHierarchy (CapacitySchedulerConfiguration configuration) throws IOException { @@ -551,8 +630,8 @@ protected Resource getMaximumAbsoluteResource(String queuePath, String label) { return maxResource; } - protected boolean checkConfigTypeIsAbsoluteResource(String queuePath, - String label) { + public boolean checkConfigTypeIsAbsoluteResource(String queuePath, + String label) { return csContext.getConfiguration().checkConfigTypeIsAbsoluteResource(label, queuePath, resourceTypes); } @@ -569,18 +648,6 @@ protected void updateConfigurableResourceRequirement(String queuePath, LOG.debug("capacityConfigType is '{}' for queue {}", capacityConfigType, getQueuePath()); - CapacityConfigType localType = checkConfigTypeIsAbsoluteResource( - queuePath, label) ? CapacityConfigType.ABSOLUTE_RESOURCE - : CapacityConfigType.PERCENTAGE; - - if (this.capacityConfigType.equals(CapacityConfigType.NONE)) { - this.capacityConfigType = localType; - LOG.debug("capacityConfigType is updated as '{}' for queue {}", - capacityConfigType, getQueuePath()); - } else { - validateAbsoluteVsPercentageCapacityConfig(localType); - } - // If min resource for a resource type is greater than its max resource, // throw exception to handle such error configs. if (!maxResource.equals(Resources.none()) && Resources.greaterThan( @@ -622,16 +689,6 @@ protected void updateConfigurableResourceRequirement(String queuePath, } } - private void validateAbsoluteVsPercentageCapacityConfig( - CapacityConfigType localType) { - if (!queuePath.equals("root") - && !this.capacityConfigType.equals(localType)) { - throw new IllegalArgumentException("Queue '" + getQueuePath() - + "' should use either percentage based capacity" - + " configuration or absolute resource."); - } - } - @Override public CapacityConfigType getCapacityConfigType() { return capacityConfigType; @@ -1457,6 +1514,10 @@ public int getMaxParallelApps() { return maxParallelApps; } + public RMNodeLabelsManager getLabelManager() { + return labelManager; + } + abstract int getNumRunnableApps(); protected void updateAbsoluteCapacities() { @@ -1469,29 +1530,6 @@ protected void updateAbsoluteCapacities() { parentQueueCapacities, queueCapacities.getExistingNodeLabels()); } - private Resource getMinResourceNormalized(String name, - Map effectiveMinRatio, Resource minResource) { - Resource ret = Resource.newInstance(minResource); - int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); - for (int i = 0; i < maxLength; i++) { - ResourceInformation nResourceInformation = - minResource.getResourceInformation(i); - - Float ratio = effectiveMinRatio.get(nResourceInformation.getName()); - if (ratio != null) { - ret.setResourceValue(i, - (long) (nResourceInformation.getValue() * ratio.floatValue())); - if (LOG.isDebugEnabled()) { - LOG.debug("Updating min resource for Queue: " + name + " as " + ret - .getResourceInformation(i) + ", Actual resource: " - + nResourceInformation.getValue() + ", ratio: " + ratio - .floatValue()); - } - } - } - return ret; - } - void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf, LeafQueue leafQueue, String label) { int maxApplications = conf.getMaximumApplicationsPerQueue(queuePath); @@ -1500,11 +1538,8 @@ void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf, if (maxGlobalPerQueueApps > 0) { // In absolute mode, should // shrink when change to corresponding label capacity. - maxApplications = this.capacityConfigType - != CapacityConfigType.ABSOLUTE_RESOURCE ? - maxGlobalPerQueueApps : - (int) (maxGlobalPerQueueApps * queueCapacities - .getAbsoluteCapacity(label)); + maxApplications = queueConfigMode.calculateMaxApplications( + maxGlobalPerQueueApps, label); } else{ maxApplications = (int) (conf.getMaximumSystemApplications() * queueCapacities.getAbsoluteCapacity(label)); @@ -1530,110 +1565,6 @@ void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf, .getMaximumCapacity(label)); } - private void deriveCapacityFromAbsoluteConfigurations(String label, - Resource clusterResource, ResourceCalculator rc) { - - /* - * In case when queues are configured with absolute resources, it is better - * to update capacity/max-capacity etc w.r.t absolute resource as well. In - * case of computation, these values wont be used any more. However for - * metrics and UI, its better these values are pre-computed here itself. - */ - - // 1. Update capacity as a float based on parent's minResource - float f = rc.divide(clusterResource, - queueResourceQuotas.getEffectiveMinResource(label), - parent.getQueueResourceQuotas().getEffectiveMinResource(label)); - queueCapacities.setCapacity(label, Float.isInfinite(f) ? 0 : f); - - // 2. Update max-capacity as a float based on parent's maxResource - f = rc.divide(clusterResource, - queueResourceQuotas.getEffectiveMaxResource(label), - parent.getQueueResourceQuotas().getEffectiveMaxResource(label)); - queueCapacities.setMaximumCapacity(label, Float.isInfinite(f) ? 0 : f); - - // 3. Update absolute capacity as a float based on parent's minResource and - // cluster resource. - queueCapacities.setAbsoluteCapacity(label, - queueCapacities.getCapacity(label) * parent.getQueueCapacities() - .getAbsoluteCapacity(label)); - - // 4. Update absolute max-capacity as a float based on parent's maxResource - // and cluster resource. - queueCapacities.setAbsoluteMaximumCapacity(label, - queueCapacities.getMaximumCapacity(label) * parent.getQueueCapacities() - .getAbsoluteMaximumCapacity(label)); - } - - void updateEffectiveResources(Resource clusterResource) { - Set configuredNodelabels = - csContext.getConfiguration().getConfiguredNodeLabels(getQueuePath()); - for (String label : configuredNodelabels) { - Resource resourceByLabel = labelManager.getResourceByLabel(label, - clusterResource); - - Resource minResource = queueResourceQuotas.getConfiguredMinResource( - label); - - // Update effective resource (min/max) to each child queue. - if (getCapacityConfigType().equals( - CapacityConfigType.ABSOLUTE_RESOURCE)) { - queueResourceQuotas.setEffectiveMinResource(label, - getMinResourceNormalized(queuePath, - ((ParentQueue) parent).getEffectiveMinRatioPerResource(), - minResource)); - - // Max resource of a queue should be a minimum of {configuredMaxRes, - // parentMaxRes}. parentMaxRes could be configured value. But if not - // present could also be taken from effective max resource of parent. - Resource parentMaxRes = - parent.getQueueResourceQuotas().getConfiguredMaxResource(label); - if (parent != null && parentMaxRes.equals(Resources.none())) { - parentMaxRes = - parent.getQueueResourceQuotas().getEffectiveMaxResource(label); - } - - // Minimum of {childMaxResource, parentMaxRes}. However if - // childMaxResource is empty, consider parent's max resource alone. - Resource childMaxResource = - getQueueResourceQuotas().getConfiguredMaxResource(label); - Resource effMaxResource = Resources.min(resourceCalculator, - resourceByLabel, childMaxResource.equals(Resources.none()) ? - parentMaxRes : - childMaxResource, parentMaxRes); - queueResourceQuotas.setEffectiveMaxResource(label, - Resources.clone(effMaxResource)); - - // In cases where we still need to update some units based on - // percentage, we have to calculate percentage and update. - ResourceCalculator rc = this.csContext.getResourceCalculator(); - deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc); - // Re-visit max applications for a queue based on absolute capacity if - // needed. - if (this instanceof LeafQueue) { - LeafQueue leafQueue = (LeafQueue) this; - CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - updateMaxAppRelatedField(conf, leafQueue, label); - } - } else{ - queueResourceQuotas.setEffectiveMinResource(label, Resources - .multiply(resourceByLabel, - queueCapacities.getAbsoluteCapacity(label))); - queueResourceQuotas.setEffectiveMaxResource(label, Resources - .multiply(resourceByLabel, - queueCapacities.getAbsoluteMaximumCapacity(label))); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Updating effective min resource for queue:" + queuePath - + " as effMinResource=" + queueResourceQuotas - .getEffectiveMinResource(label) - + "and Updating effective max resource as effMaxResource=" - + queueResourceQuotas.getEffectiveMaxResource(label)); - } - } - } - public boolean isDynamicQueue() { readLock.lock(); @@ -1704,4 +1635,12 @@ public void setLastSubmittedTimestamp(long lastSubmittedTimestamp) { } } + public boolean isAllowZeroCapacitySum() { + return allowZeroCapacitySum; + } + + public boolean isRoot() { + return false; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 89e7f838003..6be3c08f614 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1931,11 +1931,11 @@ public void updateClusterResource(Resource clusterResource, // calculated abs capacity of the queue. // When add new queue, the parent queue's other children should also // update the max app. + + queueConfigMode.calculateEffectiveResource(clusterResource); super.updateMaxAppRelatedField(csContext.getConfiguration(), this, CommonNodeLabelsManager.NO_LABEL); - super.updateEffectiveResources(clusterResource); - updateCurrentResourceLimits(currentResourceLimits, clusterResource); // Update headroom info based on new cluster resource value diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java index 88fae00f1b4..8723f1eb130 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -23,7 +23,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler .SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica .FiCaSchedulerApp; @@ -173,7 +172,7 @@ private void reinitializeQueueManagementPolicy() throws IOException { .getAutoCreatedQueueTemplateConfPrefix(getQueuePath()), resourceTypes); - if (this.capacityConfigType.equals(CapacityConfigType.PERCENTAGE) + if (this.capacityConfigType.equals(AbstractCSQueue.CapacityConfigType.PERCENTAGE) && !templateMinResource.equals(Resources.none())) { throw new IOException("Managed Parent Queue " + this.getQueuePath() + " config type is different from leaf queue template config type"); @@ -194,7 +193,7 @@ private void reinitializeQueueManagementPolicy() throws IOException { * been defined in ABSOLUTE_RESOURCE format. * */ - if (this.capacityConfigType.equals(CapacityConfigType.ABSOLUTE_RESOURCE)) { + if (this.capacityConfigType.equals(AbstractCSQueue.CapacityConfigType.ABSOLUTE_RESOURCE)) { updateQueueCapacities(queueCapacities); } builder.capacities(queueCapacities); @@ -306,7 +305,7 @@ public void addChildQueue(CSQueue childQueue) /* Below is to avoid Setting Queue Capacity to NaN when ClusterResource is zero during RM Startup with DominantResourceCalculator */ if (this.capacityConfigType.equals( - CapacityConfigType.ABSOLUTE_RESOURCE)) { + AbstractCSQueue.CapacityConfigType.ABSOLUTE_RESOURCE)) { QueueCapacities queueCapacities = getLeafQueueTemplate().getQueueCapacities(); updateQueueCapacities(queueCapacities); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 798c7103784..d342635a663 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -193,86 +193,6 @@ protected void setupQueueConfigs(Resource clusterResource, } } - private static float PRECISION = 0.0005f; // 0.05% precision - - // Check weight configuration, throw exception when configuration is invalid - // return true when all children use weight mode. - private QueueCapacityType getCapacityConfigurationTypeForQueues( - Collection queues) throws IOException { - // Do we have ANY queue set capacity in any labels? - boolean percentageIsSet = false; - - // Do we have ANY queue set weight in any labels? - boolean weightIsSet = false; - - // Do we have ANY queue set absolute in any labels? - boolean absoluteMinResSet = false; - - StringBuilder diagMsg = new StringBuilder(); - - for (CSQueue queue : queues) { - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - float capacityByLabel = queue.getQueueCapacities().getCapacity(nodeLabel); - if (capacityByLabel > 0) { - percentageIsSet = true; - } - float weightByLabel = queue.getQueueCapacities().getWeight(nodeLabel); - // By default weight is set to -1, so >= 0 is enough. - if (weightByLabel >= 0) { - weightIsSet = true; - diagMsg.append( - "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel - + " uses weight mode}. "); - } - if (!queue.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel) - .equals(Resources.none())) { - absoluteMinResSet = true; - // There's a special handling: when absolute resource is configured, - // capacity will be calculated (and set) for UI/metrics purposes, so - // when asboluteMinResource is set, unset percentage - percentageIsSet = false; - diagMsg.append( - "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel - + " uses absolute mode}. "); - } - if (percentageIsSet) { - diagMsg.append( - "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel - + " uses percentage mode}. "); - } - } - } - - // If we have mixed capacity, weight or absolute resource (any of the two) - // We will throw exception - // Root queue is an exception here, because by default root queue returns - // 100 as capacity no matter what. We should look into this case in the - // future. To avoid impact too many code paths, we don;t check root queue's - // config. - if (queues.iterator().hasNext() && - !queues.iterator().next().getQueuePath().equals( - CapacitySchedulerConfiguration.ROOT) && - (percentageIsSet ? 1 : 0) + (weightIsSet ? 1 : 0) + (absoluteMinResSet ? - 1 : - 0) > 1) { - throw new IOException("Parent queue '" + getQueuePath() - + "' have children queue used mixed of " - + " weight mode, percentage and absolute mode, it is not allowed, please " - + "double check, details:" + diagMsg.toString()); - } - - if (weightIsSet || queues.isEmpty()) { - return QueueCapacityType.WEIGHT; - } else if (absoluteMinResSet) { - return QueueCapacityType.ABSOLUTE_RESOURCE; - } else { - return QueueCapacityType.PERCENT; - } - } - - private enum QueueCapacityType { - WEIGHT, ABSOLUTE_RESOURCE, PERCENT; - } /** * Set child queue and verify capacities @@ -295,98 +215,7 @@ private QueueCapacityType getCapacityConfigurationTypeForQueues( void setChildQueues(Collection childQueues) throws IOException { writeLock.lock(); try { - QueueCapacityType childrenCapacityType = - getCapacityConfigurationTypeForQueues(childQueues); - QueueCapacityType parentCapacityType = - getCapacityConfigurationTypeForQueues(ImmutableList.of(this)); - - if (childrenCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE - || parentCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE) { - // We don't allow any mixed absolute + {weight, percentage} between - // children and parent - if (childrenCapacityType != parentCapacityType && !this.getQueuePath() - .equals(CapacitySchedulerConfiguration.ROOT)) { - throw new IOException("Parent=" + this.getQueuePath() - + ": When absolute minResource is used, we must make sure both " - + "parent and child all use absolute minResource"); - } - - // Ensure that for each parent queue: parent.min-resource >= - // Σ(child.min-resource). - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - Resource minRes = Resources.createResource(0, 0); - for (CSQueue queue : childQueues) { - // Accumulate all min/max resource configured for all child queues. - Resources.addTo(minRes, queue.getQueueResourceQuotas() - .getConfiguredMinResource(nodeLabel)); - } - Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel, - scheduler.getClusterResource()); - Resource parentMinResource = - queueResourceQuotas.getConfiguredMinResource(nodeLabel); - if (!parentMinResource.equals(Resources.none()) && Resources.lessThan( - resourceCalculator, resourceByLabel, parentMinResource, minRes)) { - throw new IOException( - "Parent Queues" + " capacity: " + parentMinResource - + " is less than" + " to its children:" + minRes - + " for queue:" + queueName); - } - } - } - - // When child uses percent - if (childrenCapacityType == QueueCapacityType.PERCENT) { - float childrenPctSum = 0; - // check label capacities - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - // check children's labels - childrenPctSum = 0; - for (CSQueue queue : childQueues) { - childrenPctSum += queue.getQueueCapacities().getCapacity(nodeLabel); - } - - if (Math.abs(1 - childrenPctSum) > PRECISION) { - // When children's percent sum != 100% - if (Math.abs(childrenPctSum) > PRECISION) { - // It is wrong when percent sum != {0, 1} - throw new IOException( - "Illegal" + " capacity sum of " + childrenPctSum - + " for children of queue " + queueName + " for label=" - + nodeLabel + ". It should be either 0 or 1.0"); - } else{ - // We also allow children's percent sum = 0 under the following - // conditions - // - Parent uses weight mode - // - Parent uses percent mode, and parent has - // (capacity=0 OR allowZero) - if (parentCapacityType == QueueCapacityType.PERCENT) { - if ((Math.abs(queueCapacities.getCapacity(nodeLabel)) - > PRECISION) && (!allowZeroCapacitySum)) { - throw new IOException( - "Illegal" + " capacity sum of " + childrenPctSum - + " for children of queue " + queueName - + " for label=" + nodeLabel - + ". It is set to 0, but parent percent != 0, and " - + "doesn't allow children capacity to set to 0"); - } - } - } - } else { - // Even if child pct sum == 1.0, we will make sure parent has - // positive percent. - if (parentCapacityType == QueueCapacityType.PERCENT && Math.abs( - queueCapacities.getCapacity(nodeLabel)) <= 0f - && !allowZeroCapacitySum) { - throw new IOException( - "Illegal" + " capacity sum of " + childrenPctSum - + " for children of queue " + queueName + " for label=" - + nodeLabel + ". queue=" + queueName - + " has zero capacity, but child" - + "queues have positive capacities"); - } - } - } - } + queueConfigMode.validate(childQueues); this.childQueues.clear(); this.childQueues.addAll(childQueues); @@ -564,7 +393,7 @@ private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf) boolean weightsAreUsed = false; try { weightsAreUsed = getCapacityConfigurationTypeForQueues(childQueues) - == QueueCapacityType.WEIGHT; + == CapacityConfigType.WEIGHT; } catch (IOException e) { LOG.warn("Caught Exception during auto queue creation", e); } @@ -1225,52 +1054,11 @@ public void updateClusterResource(Resource clusterResource, // Update absolute capacities of this queue, this need to happen before // below calculation for effective capacities + queueConfigMode.calculateEffectiveResourcesPrerequisite(); updateAbsoluteCapacities(); - // Normalize all dynamic queue queue's weight to 1 for all accessible node - // labels, this is important because existing node labels could keep - // changing when new node added, or node label mapping changed. We need - // this to ensure auto created queue can access all labels. - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - for (CSQueue queue : childQueues) { - // For dynamic queue, we will set weight to 1 every time, because it - // is possible new labels added to the parent. - if (((AbstractCSQueue) queue).isDynamicQueue()) { - if (queue.getQueueCapacities().getWeight(nodeLabel) == -1f) { - queue.getQueueCapacities().setWeight(nodeLabel, 1f); - } - } - } - } - - // Normalize weight of children - if (getCapacityConfigurationTypeForQueues(childQueues) - == QueueCapacityType.WEIGHT) { - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - float sumOfWeight = 0; - - for (CSQueue queue : childQueues) { - float weight = Math.max(0, - queue.getQueueCapacities().getWeight(nodeLabel)); - sumOfWeight += weight; - } - // When sum of weight == 0, skip setting normalized_weight (so - // normalized weight will be 0). - if (Math.abs(sumOfWeight) > 1e-6) { - for (CSQueue queue : childQueues) { - queue.getQueueCapacities().setNormalizedWeight(nodeLabel, - queue.getQueueCapacities().getWeight(nodeLabel) / sumOfWeight); - } - } - } - } - // Update effective capacity in all parent queue. - Set configuredNodelabels = csContext.getConfiguration() - .getConfiguredNodeLabels(getQueuePath()); - for (String label : configuredNodelabels) { - calculateEffectiveResourcesAndCapacity(label, clusterResource); - } + queueConfigMode.calculateEffectiveResource(clusterResource); // Update all children for (CSQueue childQueue : childQueues) { @@ -1300,80 +1088,6 @@ public boolean hasChildQueues() { return true; } - private void calculateEffectiveResourcesAndCapacity(String label, - Resource clusterResource) { - // For root queue, ensure that max/min resource is updated to latest - // cluster resource. - Resource resourceByLabel = labelManager.getResourceByLabel(label, - clusterResource); - - /* - * == Below logic are added to calculate effectiveMinRatioPerResource == - */ - - // Total configured min resources of direct children of this given parent - // queue - Resource configuredMinResources = Resource.newInstance(0L, 0); - for (CSQueue childQueue : getChildQueues()) { - Resources.addTo(configuredMinResources, - childQueue.getQueueResourceQuotas().getConfiguredMinResource(label)); - } - - // Factor to scale down effective resource: When cluster has sufficient - // resources, effective_min_resources will be same as configured - // min_resources. - Resource numeratorForMinRatio = null; - ResourceCalculator rc = this.csContext.getResourceCalculator(); - if (getQueuePath().equals("root")) { - if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(rc, - clusterResource, resourceByLabel, configuredMinResources)) { - numeratorForMinRatio = resourceByLabel; - } - } else { - if (Resources.lessThan(rc, clusterResource, - queueResourceQuotas.getEffectiveMinResource(label), - configuredMinResources)) { - numeratorForMinRatio = queueResourceQuotas - .getEffectiveMinResource(label); - } - } - - effectiveMinRatioPerResource = getEffectiveMinRatioPerResource( - configuredMinResources, numeratorForMinRatio); - - // Update effective resources for my self; - if (rootQueue) { - queueResourceQuotas.setEffectiveMinResource(label, resourceByLabel); - queueResourceQuotas.setEffectiveMaxResource(label, resourceByLabel); - } else{ - super.updateEffectiveResources(clusterResource); - } - } - - private Map getEffectiveMinRatioPerResource( - Resource configuredMinResources, Resource numeratorForMinRatio) { - Map effectiveMinRatioPerResource = new HashMap<>(); - if (numeratorForMinRatio != null) { - int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); - for (int i = 0; i < maxLength; i++) { - ResourceInformation nResourceInformation = numeratorForMinRatio - .getResourceInformation(i); - ResourceInformation dResourceInformation = configuredMinResources - .getResourceInformation(i); - - long nValue = nResourceInformation.getValue(); - long dValue = UnitsConversionUtil.convert( - dResourceInformation.getUnits(), nResourceInformation.getUnits(), - dResourceInformation.getValue()); - if (dValue != 0) { - effectiveMinRatioPerResource.put(nResourceInformation.getName(), - (float) nValue / dValue); - } - } - } - return ImmutableMap.copyOf(effectiveMinRatioPerResource); - } - @Override public List getChildQueues() { readLock.lock(); @@ -1659,6 +1373,11 @@ public boolean isEligibleForAutoDeletion() { isAutoExpiredDeletionEnabled(this.getQueuePath()); } + @Override + public boolean isRoot() { + return rootQueue; + } + public AutoCreatedQueueTemplate getAutoCreatedQueueTemplate() { return autoCreatedQueueTemplate; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/AbstractQueueConfigMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/AbstractQueueConfigMode.java new file mode 100644 index 00000000000..749ca4012fb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/AbstractQueueConfigMode.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemode; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +/** + * A strategy to encapsulate capacity calculations of a Capacity Scheduler queue + * based on its capacity configuration settings. + */ +public abstract class AbstractQueueConfigMode { + + protected static final Logger LOG = + LoggerFactory.getLogger(AbstractQueueConfigMode.class); + protected static final float PRECISION = 0.0005f; // 0.05% precision + + protected final AbstractCSQueue queue; + protected final CapacitySchedulerContext csContext; + + public AbstractQueueConfigMode(AbstractCSQueue queue, + CapacitySchedulerContext csContext) { + this.queue = queue; + this.csContext = csContext; + } + + /** + * Creates a {@code AbstractQueueConfigMode} for the given queue based on + * its capacity configuration settings. + * @param queue given queue + * @param csContext Capacity Scheduler context + * @throws IllegalArgumentException if capacity settings for all node labels + * are not uniform + * @return queue config mode + */ + public static AbstractQueueConfigMode defineMode( + AbstractCSQueue queue, CapacitySchedulerContext csContext) { + Set configuredNodelabels = csContext.getConfiguration() + .getConfiguredNodeLabels(queue.getQueuePath()); + CapacityConfigType localType = + CapacityConfigType.NONE; + + for (String label : configuredNodelabels) { + CapacityConfigType configTypePerLabel = + queue.checkConfigTypeIsAbsoluteResource(queue.getQueuePath(), label) + ? CapacityConfigType.ABSOLUTE_RESOURCE + : CapacityConfigType.PERCENTAGE; + if (localType == CapacityConfigType.NONE) { + localType = configTypePerLabel; + } + if (!queue.getQueuePath().equals("root") + && !configTypePerLabel.equals(localType)) { + throw new IllegalArgumentException("Queue '" + queue.getQueuePath() + + "' should use either percentage based capacity" + + " configuration or absolute resource."); + } + } + + if (localType == CapacityConfigType.ABSOLUTE_RESOURCE) { + return new ResourceConfigMode(queue, csContext); + } + + return new PercentageConfigMode(queue, csContext); + } + + /** + * Handles any calculation prior to calculating effective resources. + * @throws IOException if configuration mode of the child queues are not + * determinable + */ + public void calculateEffectiveResourcesPrerequisite() throws IOException { + for (String nodeLabel : queue.getQueueCapacities().getExistingNodeLabels()) { + float sumOfWeight = 0; + + for (CSQueue childQueue : queue.getChildQueues()) { + // Normalize all dynamic queue queue's weight to 1 for all accessible node + // labels, this is important because existing node labels could keep + // changing when new node added, or node label mapping changed. We need + // this to ensure auto created queue can access all labels. + if (((AbstractCSQueue) childQueue).isDynamicQueue()) { + if (childQueue.getQueueCapacities().getWeight(nodeLabel) == -1f) { + childQueue.getQueueCapacities().setWeight(nodeLabel, 1f); + } + } + float weight = Math.max(0, + childQueue.getQueueCapacities().getWeight(nodeLabel)); + sumOfWeight += weight; + } + + if (AbstractCSQueue.getCapacityConfigurationTypeForQueues( + queue.getChildQueues()) == CapacityConfigType.WEIGHT) { + for (CSQueue childQueue : queue.getChildQueues()) { + // When sum of weight == 0, skip setting normalized_weight (so + // normalized weight will be 0). + if (Math.abs(sumOfWeight) > 1e-6) { + childQueue.getQueueCapacities().setNormalizedWeight(nodeLabel, + childQueue.getQueueCapacities(). + getWeight(nodeLabel) / sumOfWeight); + } + } + } + } + } + + /** + * Returns the {@code CapacityConfigType} of the queue. + * @return capacity config type + */ + public abstract CapacityConfigType getConfigType(); + + /** + * Determines whether a collection of newly parsed child queues could be placed + * under the queue. + * @param childQueues newly parsed child queues + * @throws IOException if child queues can not be placed under the queue + */ + public abstract void validate( + Collection childQueues) throws IOException; + + /** + * Calculates minimum and maximum effective capacity values. + * @param clusterResource overall cluster resource + */ + public abstract void calculateEffectiveResource(Resource clusterResource); + + /** + * Calculates the maximum application number limit of the queue. + * @param maxGlobalApplications global maximum application limit + * @param label node label + * @return calculated maximum application limit for a specific node label + */ + public abstract int calculateMaxApplications( + int maxGlobalApplications, String label); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/PercentageConfigMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/PercentageConfigMode.java new file mode 100644 index 00000000000..68dcbe803f8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/PercentageConfigMode.java @@ -0,0 +1,127 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemode; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +public class PercentageConfigMode extends AbstractQueueConfigMode { + + public PercentageConfigMode(AbstractCSQueue queue, CapacitySchedulerContext csContext) { + super(queue, csContext); + } + + @Override + public CapacityConfigType getConfigType() { + return CapacityConfigType.PERCENTAGE; + } + + @Override + public void validate(Collection childQueues) throws IOException { + CapacityConfigType childrenConfigType = + AbstractCSQueue.getCapacityConfigurationTypeForQueues(childQueues); + CapacityConfigType parentConfigType = AbstractCSQueue. + getCapacityConfigurationTypeForQueues(ImmutableList.of(queue)); + + if (childrenConfigType == CapacityConfigType.ABSOLUTE_RESOURCE + && !queue.getQueuePath().equals( + CapacitySchedulerConfiguration.ROOT)) { + throw new IOException("Parent=" + queue.getQueuePath() + + ": When absolute minResource is used, we must make sure both " + + "parent and child all use absolute minResource"); + } + + // When child uses percent + if (childrenConfigType == CapacityConfigType.PERCENTAGE) { + float childrenPctSum = 0; + // check label capacities + for (String nodeLabel : queue.getQueueCapacities().getExistingNodeLabels()) { + // check children's labels + childrenPctSum = 0; + for (CSQueue queue : childQueues) { + childrenPctSum += queue.getQueueCapacities().getCapacity(nodeLabel); + } + + if (Math.abs(1 - childrenPctSum) > PRECISION) { + // When children's percent sum != 100% + if (Math.abs(childrenPctSum) > PRECISION) { + // It is wrong when percent sum != {0, 1} + throw new IOException( + "Illegal" + " capacity sum of " + childrenPctSum + + " for children of queue " + queue.getQueuePath() + + " for label=" + nodeLabel + ". " + + "It should be either 0 or 1.0"); + } else { + // We also allow children's percent sum = 0 under the following + // conditions + // - Parent uses weight mode + // - Parent uses percent mode, and parent has + // (capacity=0 OR allowZero) + if (parentConfigType == CapacityConfigType.PERCENTAGE) { + if ((Math.abs(queue.getQueueCapacities().getCapacity(nodeLabel)) + > PRECISION) && (!queue.isAllowZeroCapacitySum())) { + throw new IOException( + "Illegal" + " capacity sum of " + childrenPctSum + + " for children of queue " + queue.getQueuePath() + + " for label=" + nodeLabel + + ". It is set to 0, but parent percent != 0, and " + + "doesn't allow children capacity to set to 0"); + } + } + } + } else { + // Even if child pct sum == 1.0, we will make sure parent has + // positive percent. + if (parentConfigType == CapacityConfigType.PERCENTAGE && Math.abs( + queue.getQueueCapacities().getCapacity(nodeLabel)) <= 0f + && !queue.isAllowZeroCapacitySum()) { + throw new IOException( + "Illegal" + " capacity sum of " + childrenPctSum + + " for children of queue " + queue.getQueuePath() + " for label=" + + nodeLabel + ". queue=" + queue.getQueuePath() + + " has zero capacity, but child" + + "queues have positive capacities"); + } + } + } + } + } + + @Override + public void calculateEffectiveResource(Resource clusterResource) { + Set configuredNodelabels = + csContext.getConfiguration().getConfiguredNodeLabels( + queue.getQueuePath()); + for (String label : configuredNodelabels) { + Resource resourceByLabel = queue.getLabelManager().getResourceByLabel(label, + clusterResource); + queue.getQueueResourceQuotas().setEffectiveMinResource(label, Resources + .multiply(resourceByLabel, + queue.getQueueCapacities().getAbsoluteCapacity(label))); + queue.getQueueResourceQuotas().setEffectiveMaxResource(label, Resources + .multiply(resourceByLabel, + queue.getQueueCapacities().getAbsoluteMaximumCapacity(label))); + + if (LOG.isDebugEnabled()) { + LOG.debug("Updating effective min resource for queue:" + queue.getQueuePath() + + " as effMinResource=" + queue.getQueueResourceQuotas() + .getEffectiveMinResource(label) + + "and Updating effective max resource as effMaxResource=" + + queue.getQueueResourceQuotas().getEffectiveMaxResource(label)); + } + } + } + + @Override + public int calculateMaxApplications(int maxGlobalApplications, String label) { + return maxGlobalApplications; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/RelativeResourceConfigMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/RelativeResourceConfigMode.java new file mode 100644 index 00000000000..a8ef88e0d5c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/RelativeResourceConfigMode.java @@ -0,0 +1,38 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemode; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.io.IOException; + +public class RelativeResourceConfigMode extends ResourceConfigMode { + public RelativeResourceConfigMode(AbstractCSQueue queue, CapacitySchedulerContext csContext) { + super(queue, csContext); + } + +// @Override +// public void calculateEffectiveResourcesPrerequisite() throws IOException { +// if (queue.getParent() != null) { +// for (String nodeLabel : csContext.getConfiguration(). +// getConfiguredNodeLabels(queue.getQueuePath())) { +// Resource parentMinResource = queue.getParent().getQueueResourceQuotas().getConfiguredMinResource(nodeLabel); +// if (parentMinResource.equals(Resources.none()) && queue.getParent().getQueuePath().equals("root")) { +// parentMinResource = queue.getParent().getQueueResourceQuotas().getEffectiveMinResource(nodeLabel); +// } +// Resource absoluteMinResource = Resources.componentwiseMultiply(parentMinResource, queue.getQueueResourceQuotas().getConfiguredRelativeMinResource(nodeLabel)); +// Resource normalizedAbsMinResource = Resources.multiply(absoluteMinResource, 0.01); +// queue.getQueueResourceQuotas().setConfiguredMinResource(nodeLabel, normalizedAbsMinResource); +// +// Resource parentMaxResource = queue.getParent().getQueueResourceQuotas().getConfiguredMinResource(nodeLabel); +// if (parentMaxResource.equals(Resources.none())) { +// parentMaxResource = queue.getParent().getEffectiveMaxCapacity(nodeLabel); +// } +// Resource absoluteMaxResource = Resources.componentwiseMultiply(parentMaxResource, queue.getQueueResourceQuotas().getConfiguredRelativeMaxResource(nodeLabel)); +// Resource normalizedAbsMaxResource = Resources.multiply(absoluteMaxResource, 0.01); +// queue.getQueueResourceQuotas().setConfiguredMaxResource(nodeLabel, normalizedAbsMaxResource); +// } +// } +// } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/ResourceConfigMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/ResourceConfigMode.java new file mode 100644 index 00000000000..419aaeb9e89 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/ResourceConfigMode.java @@ -0,0 +1,241 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemode; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class ResourceConfigMode extends AbstractQueueConfigMode { + + public ResourceConfigMode(AbstractCSQueue queue, CapacitySchedulerContext csContext) { + super(queue, csContext); + } + + @Override + public void validate(Collection childQueues) throws IOException { + CapacityConfigType childrenConfigType = + AbstractCSQueue.getCapacityConfigurationTypeForQueues(childQueues); + if (childrenConfigType != getConfigType() && !queue.getQueuePath() + .equals(CapacitySchedulerConfiguration.ROOT)) { + throw new IOException("Parent=" + queue.getQueuePath() + + ": When absolute minResource is used, we must make sure both " + + "parent and child all use absolute minResource"); + } + + // Ensure that for each parent queue: parent.min-resource >= + // Σ(child.min-resource). + for (String nodeLabel : queue.getQueueCapacities().getExistingNodeLabels()) { + Resource minRes = Resources.createResource(0, 0); + for (CSQueue queue : childQueues) { + // Accumulate all min/max resource configured for all child queues. + Resources.addTo(minRes, queue.getQueueResourceQuotas() + .getConfiguredMinResource(nodeLabel)); + } + Resource resourceByLabel = csContext.getRMContext().getNodeLabelManager() + .getResourceByLabel(nodeLabel, + csContext.getClusterResource()); + Resource parentMinResource = + queue.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel); + if (!parentMinResource.equals(Resources.none()) && Resources.lessThan( + csContext.getResourceCalculator(), resourceByLabel, parentMinResource, minRes)) { + throw new IOException( + "Parent Queues" + " capacity: " + parentMinResource + + " is less than" + " to its children:" + minRes + + " for queue:" + queue.getQueuePath()); + } + } + } + + @Override + public CapacityConfigType getConfigType() { + return CapacityConfigType.ABSOLUTE_RESOURCE; + } + + @Override + public void calculateEffectiveResource(Resource clusterResource) { + Set configuredNodeLabels = + csContext.getConfiguration().getConfiguredNodeLabels(queue.getQueuePath()); + CSQueue parent = queue.getParent(); + ResourceCalculator rc = csContext.getResourceCalculator(); + + for (String label : configuredNodeLabels) { + Resource resourceByLabel = queue.getLabelManager().getResourceByLabel(label, + clusterResource); + + if (queue.isRoot()) { + queue.getQueueResourceQuotas().setEffectiveMinResource(label, resourceByLabel); + queue.getQueueResourceQuotas().setEffectiveMaxResource(label, resourceByLabel); + } else { + queue.getQueueResourceQuotas().setEffectiveMinResource(label, + getMinResourceNormalized(clusterResource, label)); + + // Max resource of a queue should be a minimum of {configuredMaxRes, + // parentMaxRes}. parentMaxRes could be configured value. But if not + // present could also be taken from effective max resource of parent. + Resource parentMaxRes = + parent.getQueueResourceQuotas().getConfiguredMaxResource(label); + if (parent != null && parentMaxRes.equals(Resources.none())) { + parentMaxRes = + parent.getQueueResourceQuotas().getEffectiveMaxResource(label); + } + + // Minimum of {childMaxResource, parentMaxRes}. However if + // childMaxResource is empty, consider parent's max resource alone. + Resource childMaxResource = + queue.getQueueResourceQuotas().getConfiguredMaxResource(label); + Resource effMaxResource = Resources.min(rc, resourceByLabel, + childMaxResource.equals(Resources.none()) ? parentMaxRes : + childMaxResource, parentMaxRes); + queue.getQueueResourceQuotas().setEffectiveMaxResource(label, + Resources.clone(effMaxResource)); + + // In cases where we still need to update some units based on + // percentage, we have to calculate percentage and update. + deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Updating effective min resource for queue:" + queue.getQueuePath() + + " as effMinResource=" + queue.getQueueResourceQuotas() + .getEffectiveMinResource(label) + + "and Updating effective max resource as effMaxResource=" + + queue.getQueueResourceQuotas().getEffectiveMaxResource(label)); + } + } + } + + @Override + public int calculateMaxApplications(int maxGlobalApplications, + String label) { + return (int) (maxGlobalApplications * + queue.getQueueCapacities().getAbsoluteCapacity(label)); + } + + private void deriveCapacityFromAbsoluteConfigurations( + String label, Resource clusterResource, ResourceCalculator rc) { + + /* + * In case when queues are configured with absolute resources, it is better + * to update capacity/max-capacity etc w.r.t absolute resource as well. In + * case of computation, these values wont be used any more. However for + * metrics and UI, its better these values are pre-computed here itself. + */ + + // 1. Update capacity as a float based on parent's minResource + float f = rc.divide(clusterResource, + queue.getQueueResourceQuotas().getEffectiveMinResource(label), + queue.getParent().getQueueResourceQuotas().getEffectiveMinResource(label)); + queue.getQueueCapacities().setCapacity(label, Float.isInfinite(f) ? 0 : f); + + // 2. Update max-capacity as a float based on parent's maxResource + f = rc.divide(clusterResource, + queue.getQueueResourceQuotas().getEffectiveMaxResource(label), + queue.getParent().getQueueResourceQuotas().getEffectiveMaxResource(label)); + queue.getQueueCapacities().setMaximumCapacity(label, Float.isInfinite(f) ? 0 : f); + + // 3. Update absolute capacity as a float based on parent's minResource and + // cluster resource. + queue.getQueueCapacities().setAbsoluteCapacity(label, + queue.getQueueCapacities().getCapacity(label) * queue.getParent().getQueueCapacities() + .getAbsoluteCapacity(label)); + + // 4. Update absolute max-capacity as a float based on parent's maxResource + // and cluster resource. + queue.getQueueCapacities().setAbsoluteMaximumCapacity(label, + queue.getQueueCapacities().getMaximumCapacity(label) * queue.getParent().getQueueCapacities() + .getAbsoluteMaximumCapacity(label)); + } + + + private Resource getMinResourceNormalized(Resource clusterResource, String label) { + Resource minResource = queue.getQueueResourceQuotas().getConfiguredMinResource(label); + Resource ret = Resource.newInstance(minResource); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); + if (!(queue.getParent() instanceof AbstractCSQueue)) { + return Resources.none(); + } + Map effectiveRatio = getEffectiveMinRatioPerResource(clusterResource, label); + for (int i = 0; i < maxLength; i++) { + ResourceInformation nResourceInformation = + minResource.getResourceInformation(i); + + Float ratio = effectiveRatio.get(nResourceInformation.getName()); + if (ratio != null) { + ret.setResourceValue(i, + (long) (nResourceInformation.getValue() * ratio)); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating min resource for Queue: " + queue.getQueuePath() + " as " + ret + .getResourceInformation(i) + ", Actual resource: " + + nResourceInformation.getValue() + ", ratio: " + ratio); + } + } + } + return ret; + } + + private Map getEffectiveMinRatioPerResource( + Resource clusterResource, String label) { + AbstractCSQueue parent = (AbstractCSQueue) queue.getParent(); + Resource resourceByLabel = csContext.getRMContext().getNodeLabelManager().getResourceByLabel(label, + clusterResource); + // Total configured min resources of direct children of this given parent + // queue + Resource configuredMinResources = Resource.newInstance(0L, 0); + for (CSQueue childQueue : parent.getChildQueues()) { + Resources.addTo(configuredMinResources, + childQueue.getQueueResourceQuotas().getConfiguredMinResource(label)); + } + + // Factor to scale down effective resource: When cluster has sufficient + // resources, effective_min_resources will be same as configured + // min_resources. + Resource numeratorForMinRatio = null; + ResourceCalculator rc = csContext.getResourceCalculator(); + if (parent.getQueuePath().equals("root")) { + if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(rc, + clusterResource, resourceByLabel, configuredMinResources)) { + numeratorForMinRatio = resourceByLabel; + } + } else { + if (Resources.lessThan(rc, clusterResource, + parent.getQueueResourceQuotas().getEffectiveMinResource(label), + configuredMinResources)) { + numeratorForMinRatio = parent.getQueueResourceQuotas() + .getEffectiveMinResource(label); + } + } + + Map effectiveMinRatioPerResource = new HashMap<>(); + if (numeratorForMinRatio != null) { + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); + for (int i = 0; i < maxLength; i++) { + ResourceInformation nResourceInformation = numeratorForMinRatio + .getResourceInformation(i); + ResourceInformation dResourceInformation = configuredMinResources + .getResourceInformation(i); + + long nValue = nResourceInformation.getValue(); + long dValue = UnitsConversionUtil.convert( + dResourceInformation.getUnits(), nResourceInformation.getUnits(), + dResourceInformation.getValue()); + if (dValue != 0) { + effectiveMinRatioPerResource.put(nResourceInformation.getName(), + (float) nValue / dValue); + } + } + } + return effectiveMinRatioPerResource; + } +}