diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java index 8d7733453f8..4434337cdfa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java @@ -85,7 +85,7 @@ protected Resource getMaximumAbsoluteResource(String queuePath, } @Override - protected boolean checkConfigTypeIsAbsoluteResource(String queuePath, + public boolean checkConfigTypeIsAbsoluteResource(String queuePath, String label) { return super.checkConfigTypeIsAbsoluteResource(csContext.getConfiguration() .getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index b05fb504624..5656d51b5ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemode.AbstractQueueConfigMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; @@ -68,6 +69,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -122,6 +124,9 @@ QueueResourceQuotas queueResourceQuotas; + private AbstractQueueConfigMode queueConfigMode; + + private boolean allowZeroCapacitySum = false; // -1 indicates lifetime is disabled private volatile long maxApplicationLifetime = -1; @@ -132,11 +137,25 @@ private volatile boolean defaultAppLifetimeWasSpecifiedInConfig = false; public enum CapacityConfigType { - // FIXME, from what I can see, Percentage mode can almost apply to weighted - // and percentage mode at the same time, there's only small area need to be - // changed, we need to rename "PERCENTAGE" to "PERCENTAGE" and "WEIGHT" - NONE, PERCENTAGE, ABSOLUTE_RESOURCE + NONE, PERCENTAGE, ABSOLUTE_RESOURCE, WEIGHT; + + public boolean isPercentage() { + return equals(CapacityConfigType.PERCENTAGE); + } + + public boolean isAbsolute() { + return equals(CapacityConfigType.ABSOLUTE_RESOURCE); + } + + public boolean isWeight() { + return equals(CapacityConfigType.WEIGHT); + } + + public boolean isNone() { + return equals(CapacityConfigType.NONE); + } }; + protected CapacityConfigType capacityConfigType = CapacityConfigType.NONE; @@ -405,7 +424,8 @@ protected void setupQueueConfigs(Resource clusterResource, // Also fetch minimum/maximum resource constraint for this queue if // configured. - capacityConfigType = CapacityConfigType.NONE; + queueConfigMode = AbstractQueueConfigMode.defineMode(this, csContext); + capacityConfigType = queueConfigMode.getConfigType(); updateConfigurableResourceRequirement(getQueuePath(), clusterResource); // Setup queue's maximumAllocation respecting the global setting @@ -561,6 +581,81 @@ private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) { } } + // Check weight configuration, throw exception when configuration is invalid + // return true when all children use weight mode. + public static CapacityConfigType getCapacityConfigurationTypeForQueues( + Collection queues) throws IOException { + // Do we have ANY queue set capacity in any labels? + boolean percentageIsSet = false; + + // Do we have ANY queue set weight in any labels? + boolean weightIsSet = false; + + // Do we have ANY queue set absolute in any labels? + boolean absoluteMinResSet = false; + + StringBuilder diagMsg = new StringBuilder(); + + for (CSQueue queue : queues) { + for (String nodeLabel : queue.getQueueCapacities() + .getExistingNodeLabels()) { + float capacityByLabel = queue.getQueueCapacities().getCapacity( + nodeLabel); + if (capacityByLabel > 0) { + percentageIsSet = true; + } + float weightByLabel = queue.getQueueCapacities().getWeight(nodeLabel); + // By default weight is set to -1, so >= 0 is enough. + if (weightByLabel >= 0) { + weightIsSet = true; + diagMsg.append( + "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel + + " uses weight mode}. "); + } + if (!queue.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel) + .equals(Resources.none())) { + absoluteMinResSet = true; + // There's a special handling: when absolute resource is configured, + // capacity will be calculated (and set) for UI/metrics purposes, so + // when asboluteMinResource is set, unset percentage + percentageIsSet = false; + diagMsg.append( + "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel + + " uses absolute mode}. "); + } + if (percentageIsSet) { + diagMsg.append( + "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel + + " uses percentage mode}. "); + } + } + + // If we have mixed capacity, weight or absolute resource (any of the two) + // We will throw exception + // Root queue is an exception here, because by default root queue returns + // 100 as capacity no matter what. We should look into this case in the + // future. To avoid impact too many code paths, we don;t check root + // queue's config. + if (!queue.getQueuePath().equals( + CapacitySchedulerConfiguration.ROOT) && + (percentageIsSet ? 1 : 0) + (weightIsSet ? 1 : 0) + + (absoluteMinResSet ? 1 : 0) > 1) { + throw new IOException("Parent queue '" + queue.getParent() + .getQueuePath() + "' have children queue used mixed of " + + " weight mode, percentage and absolute mode, it is not allowed, " + + "please double check, details:" + diagMsg.toString()); + } + } + + if (weightIsSet || queues.isEmpty()) { + return CapacityConfigType.WEIGHT; + } else if (absoluteMinResSet) { + return CapacityConfigType.ABSOLUTE_RESOURCE; + } else { + return CapacityConfigType.PERCENTAGE; + } + } + private Map getUserWeightsFromHierarchy (CapacitySchedulerConfiguration configuration) throws IOException { @@ -589,8 +684,8 @@ protected Resource getMaximumAbsoluteResource(String queuePath, String label) { return maxResource; } - protected boolean checkConfigTypeIsAbsoluteResource(String queuePath, - String label) { + public boolean checkConfigTypeIsAbsoluteResource(String queuePath, + String label) { return csContext.getConfiguration().checkConfigTypeIsAbsoluteResource(label, queuePath, resourceTypes); } @@ -604,18 +699,6 @@ protected void updateConfigurableResourceRequirement(String queuePath, LOG.debug("capacityConfigType is '{}' for queue {}", capacityConfigType, getQueuePath()); - CapacityConfigType localType = checkConfigTypeIsAbsoluteResource( - queuePath, label) ? CapacityConfigType.ABSOLUTE_RESOURCE - : CapacityConfigType.PERCENTAGE; - - if (this.capacityConfigType.equals(CapacityConfigType.NONE)) { - this.capacityConfigType = localType; - LOG.debug("capacityConfigType is updated as '{}' for queue {}", - capacityConfigType, getQueuePath()); - } else { - validateAbsoluteVsPercentageCapacityConfig(localType); - } - // If min resource for a resource type is greater than its max resource, // throw exception to handle such error configs. if (!maxResource.equals(Resources.none()) && Resources.greaterThan( @@ -657,16 +740,6 @@ protected void updateConfigurableResourceRequirement(String queuePath, } } - private void validateAbsoluteVsPercentageCapacityConfig( - CapacityConfigType localType) { - if (!queuePath.equals("root") - && !this.capacityConfigType.equals(localType)) { - throw new IllegalArgumentException("Queue '" + getQueuePath() - + "' should use either percentage based capacity" - + " configuration or absolute resource."); - } - } - @Override public CapacityConfigType getCapacityConfigType() { return capacityConfigType; @@ -1492,6 +1565,10 @@ public int getMaxParallelApps() { return maxParallelApps; } + public RMNodeLabelsManager getLabelManager() { + return labelManager; + } + abstract int getNumRunnableApps(); protected void updateAbsoluteCapacities() { @@ -1504,29 +1581,6 @@ protected void updateAbsoluteCapacities() { parentQueueCapacities, queueCapacities.getExistingNodeLabels()); } - private Resource getMinResourceNormalized(String name, - Map effectiveMinRatio, Resource minResource) { - Resource ret = Resource.newInstance(minResource); - int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); - for (int i = 0; i < maxLength; i++) { - ResourceInformation nResourceInformation = - minResource.getResourceInformation(i); - - Float ratio = effectiveMinRatio.get(nResourceInformation.getName()); - if (ratio != null) { - ret.setResourceValue(i, - (long) (nResourceInformation.getValue() * ratio.floatValue())); - if (LOG.isDebugEnabled()) { - LOG.debug("Updating min resource for Queue: " + name + " as " + ret - .getResourceInformation(i) + ", Actual resource: " - + nResourceInformation.getValue() + ", ratio: " + ratio - .floatValue()); - } - } - } - return ret; - } - void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf, LeafQueue leafQueue) { int maxApplications = conf.getMaximumApplicationsPerQueue(queuePath); @@ -1539,11 +1593,8 @@ void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf, if (maxGlobalPerQueueApps > 0) { // In absolute mode, should // shrink when change to corresponding label capacity. - maxApplicationsByLabel = this.capacityConfigType - != CapacityConfigType.ABSOLUTE_RESOURCE ? - maxGlobalPerQueueApps : - (int) (maxGlobalPerQueueApps * queueCapacities - .getAbsoluteCapacity(label)); + maxApplicationsByLabel = queueConfigMode.calculateMaxApplications( + maxGlobalPerQueueApps, label); } else { maxApplicationsByLabel = (int) (conf.getMaximumSystemApplications() * queueCapacities.getAbsoluteCapacity(label)); @@ -1574,103 +1625,6 @@ void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf, .getMaximumCapacity(maxLabel)); } - private void deriveCapacityFromAbsoluteConfigurations(String label, - Resource clusterResource, ResourceCalculator rc) { - - /* - * In case when queues are configured with absolute resources, it is better - * to update capacity/max-capacity etc w.r.t absolute resource as well. In - * case of computation, these values wont be used any more. However for - * metrics and UI, its better these values are pre-computed here itself. - */ - - // 1. Update capacity as a float based on parent's minResource - float f = rc.divide(clusterResource, - queueResourceQuotas.getEffectiveMinResource(label), - parent.getQueueResourceQuotas().getEffectiveMinResource(label)); - queueCapacities.setCapacity(label, Float.isInfinite(f) ? 0 : f); - - // 2. Update max-capacity as a float based on parent's maxResource - f = rc.divide(clusterResource, - queueResourceQuotas.getEffectiveMaxResource(label), - parent.getQueueResourceQuotas().getEffectiveMaxResource(label)); - queueCapacities.setMaximumCapacity(label, Float.isInfinite(f) ? 0 : f); - - // 3. Update absolute capacity as a float based on parent's minResource and - // cluster resource. - queueCapacities.setAbsoluteCapacity(label, - queueCapacities.getCapacity(label) * parent.getQueueCapacities() - .getAbsoluteCapacity(label)); - - // 4. Update absolute max-capacity as a float based on parent's maxResource - // and cluster resource. - queueCapacities.setAbsoluteMaximumCapacity(label, - queueCapacities.getMaximumCapacity(label) * parent.getQueueCapacities() - .getAbsoluteMaximumCapacity(label)); - } - - void updateEffectiveResources(Resource clusterResource) { - for (String label : configuredNodeLabels) { - Resource resourceByLabel = labelManager.getResourceByLabel(label, - clusterResource); - - Resource minResource = queueResourceQuotas.getConfiguredMinResource( - label); - - // Update effective resource (min/max) to each child queue. - if (getCapacityConfigType().equals( - CapacityConfigType.ABSOLUTE_RESOURCE)) { - queueResourceQuotas.setEffectiveMinResource(label, - getMinResourceNormalized(queuePath, - ((ParentQueue) parent).getEffectiveMinRatioPerResource(), - minResource)); - - // Max resource of a queue should be a minimum of {configuredMaxRes, - // parentMaxRes}. parentMaxRes could be configured value. But if not - // present could also be taken from effective max resource of parent. - Resource parentMaxRes = - parent.getQueueResourceQuotas().getConfiguredMaxResource(label); - if (parent != null && parentMaxRes.equals(Resources.none())) { - parentMaxRes = - parent.getQueueResourceQuotas().getEffectiveMaxResource(label); - } - - // Minimum of {childMaxResource, parentMaxRes}. However if - // childMaxResource is empty, consider parent's max resource alone. - Resource childMaxResource = - getQueueResourceQuotas().getConfiguredMaxResource(label); - Resource effMaxResource = Resources.min(resourceCalculator, - resourceByLabel, childMaxResource.equals(Resources.none()) ? - parentMaxRes : - childMaxResource, parentMaxRes); - queueResourceQuotas.setEffectiveMaxResource(label, - Resources.clone(effMaxResource)); - - // In cases where we still need to update some units based on - // percentage, we have to calculate percentage and update. - ResourceCalculator rc = this.csContext.getResourceCalculator(); - deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc); - // Re-visit max applications for a queue based on absolute capacity if - // needed. - } else{ - queueResourceQuotas.setEffectiveMinResource(label, Resources - .multiply(resourceByLabel, - queueCapacities.getAbsoluteCapacity(label))); - queueResourceQuotas.setEffectiveMaxResource(label, Resources - .multiply(resourceByLabel, - queueCapacities.getAbsoluteMaximumCapacity(label))); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Updating effective min resource for queue:" + queuePath - + " as effMinResource=" + queueResourceQuotas - .getEffectiveMinResource(label) - + "and Updating effective max resource as effMaxResource=" - + queueResourceQuotas.getEffectiveMaxResource(label)); - } - } - } - public boolean isDynamicQueue() { readLock.lock(); @@ -1740,4 +1694,24 @@ public void setLastSubmittedTimestamp(long lastSubmittedTimestamp) { writeLock.unlock(); } } + + public boolean isAllowZeroCapacitySum() { + return allowZeroCapacitySum; + } + + public void setAllowZeroCapacitySum(boolean allowZeroCapacitySum) { + this.allowZeroCapacitySum = allowZeroCapacitySum; + } + + public boolean isRoot() { + return false; + } + + public AbstractQueueConfigMode getQueueConfigMode() { + return queueConfigMode; + } + + public Set getConfiguredNodeLabels() { + return configuredNodeLabels; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 44727fb1db1..a2f6cdcf2d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1945,7 +1945,7 @@ public void updateClusterResource(Resource clusterResource, updateAbsoluteCapacities(); - super.updateEffectiveResources(clusterResource); + getQueueConfigMode().calculateEffectiveResource(clusterResource); super.updateMaxAppRelatedField(csContext.getConfiguration(), this); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java index e415ac12795..97c82f0e611 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -23,7 +23,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler .SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica .FiCaSchedulerApp; @@ -177,7 +176,7 @@ private void reinitializeQueueManagementPolicy() throws IOException { .getAutoCreatedQueueTemplateConfPrefix(getQueuePath()), resourceTypes); - if (this.capacityConfigType.equals(CapacityConfigType.PERCENTAGE) + if (this.capacityConfigType.isPercentage() && !templateMinResource.equals(Resources.none())) { throw new IOException("Managed Parent Queue " + this.getQueuePath() + " config type is different from leaf queue template config type"); @@ -198,7 +197,7 @@ private void reinitializeQueueManagementPolicy() throws IOException { * been defined in ABSOLUTE_RESOURCE format. * */ - if (this.capacityConfigType.equals(CapacityConfigType.ABSOLUTE_RESOURCE)) { + if (this.capacityConfigType.isAbsolute()) { updateQueueCapacities(queueCapacities); } builder.capacities(queueCapacities); @@ -309,8 +308,7 @@ public void addChildQueue(CSQueue childQueue) /* Below is to avoid Setting Queue Capacity to NaN when ClusterResource is zero during RM Startup with DominantResourceCalculator */ - if (this.capacityConfigType.equals( - CapacityConfigType.ABSOLUTE_RESOURCE)) { + if (this.capacityConfigType.isAbsolute()) { QueueCapacities queueCapacities = getLeafQueueTemplate().getQueueCapacities(); updateQueueCapacities(queueCapacities); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index e1307d7bd3f..665d1f8fb26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Map; -import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; -import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.util.Sets; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -48,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AccessType; @@ -73,9 +70,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; -import org.apache.hadoop.yarn.util.UnitsConversionUtil; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; @Private @@ -99,16 +93,8 @@ private int runnableApps; - private final boolean allowZeroCapacitySum; - private AutoCreatedQueueTemplate autoCreatedQueueTemplate; - // effective min ratio per resource, it is used during updateClusterResource, - // leaf queue can use this to calculate effective resources. - // This field will not be edited, reference will point to a new immutable map - // after every time recalculation - private volatile Map effectiveMinRatioPerResource; - public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this(cs, cs.getConfiguration(), queueName, parent, old, false); @@ -139,8 +125,8 @@ private ParentQueue(CapacitySchedulerContext cs, } this.childQueues = new ArrayList<>(); - this.allowZeroCapacitySum = - cs.getConfiguration().getAllowZeroCapacitySum(getQueuePath()); + this.setAllowZeroCapacitySum( + cs.getConfiguration().getAllowZeroCapacitySum(getQueuePath())); setupQueueConfigs(cs.getClusterResource(), csConf); @@ -193,92 +179,12 @@ protected void setupQueueConfigs(Resource clusterResource, + ", reservationsContinueLooking=" + reservationsContinueLooking + ", orderingPolicy=" + getQueueOrderingPolicyConfigName() + ", priority=" + priority - + ", allowZeroCapacitySum=" + allowZeroCapacitySum); + + ", allowZeroCapacitySum=" + isAllowZeroCapacitySum()); } finally { writeLock.unlock(); } } - private static float PRECISION = 0.0005f; // 0.05% precision - - // Check weight configuration, throw exception when configuration is invalid - // return true when all children use weight mode. - private QueueCapacityType getCapacityConfigurationTypeForQueues( - Collection queues) throws IOException { - // Do we have ANY queue set capacity in any labels? - boolean percentageIsSet = false; - - // Do we have ANY queue set weight in any labels? - boolean weightIsSet = false; - - // Do we have ANY queue set absolute in any labels? - boolean absoluteMinResSet = false; - - StringBuilder diagMsg = new StringBuilder(); - - for (CSQueue queue : queues) { - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - float capacityByLabel = queue.getQueueCapacities().getCapacity(nodeLabel); - if (capacityByLabel > 0) { - percentageIsSet = true; - } - float weightByLabel = queue.getQueueCapacities().getWeight(nodeLabel); - // By default weight is set to -1, so >= 0 is enough. - if (weightByLabel >= 0) { - weightIsSet = true; - diagMsg.append( - "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel - + " uses weight mode}. "); - } - if (!queue.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel) - .equals(Resources.none())) { - absoluteMinResSet = true; - // There's a special handling: when absolute resource is configured, - // capacity will be calculated (and set) for UI/metrics purposes, so - // when asboluteMinResource is set, unset percentage - percentageIsSet = false; - diagMsg.append( - "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel - + " uses absolute mode}. "); - } - if (percentageIsSet) { - diagMsg.append( - "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel - + " uses percentage mode}. "); - } - } - } - - // If we have mixed capacity, weight or absolute resource (any of the two) - // We will throw exception - // Root queue is an exception here, because by default root queue returns - // 100 as capacity no matter what. We should look into this case in the - // future. To avoid impact too many code paths, we don;t check root queue's - // config. - if (queues.iterator().hasNext() && - !queues.iterator().next().getQueuePath().equals( - CapacitySchedulerConfiguration.ROOT) && - (percentageIsSet ? 1 : 0) + (weightIsSet ? 1 : 0) + (absoluteMinResSet ? - 1 : - 0) > 1) { - throw new IOException("Parent queue '" + getQueuePath() - + "' have children queue used mixed of " - + " weight mode, percentage and absolute mode, it is not allowed, please " - + "double check, details:" + diagMsg.toString()); - } - - if (weightIsSet || queues.isEmpty()) { - return QueueCapacityType.WEIGHT; - } else if (absoluteMinResSet) { - return QueueCapacityType.ABSOLUTE_RESOURCE; - } else { - return QueueCapacityType.PERCENT; - } - } - - private enum QueueCapacityType { - WEIGHT, ABSOLUTE_RESOURCE, PERCENT; - } /** * Set child queue and verify capacities @@ -301,98 +207,7 @@ private QueueCapacityType getCapacityConfigurationTypeForQueues( void setChildQueues(Collection childQueues) throws IOException { writeLock.lock(); try { - QueueCapacityType childrenCapacityType = - getCapacityConfigurationTypeForQueues(childQueues); - QueueCapacityType parentCapacityType = - getCapacityConfigurationTypeForQueues(ImmutableList.of(this)); - - if (childrenCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE - || parentCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE) { - // We don't allow any mixed absolute + {weight, percentage} between - // children and parent - if (childrenCapacityType != parentCapacityType && !this.getQueuePath() - .equals(CapacitySchedulerConfiguration.ROOT)) { - throw new IOException("Parent=" + this.getQueuePath() - + ": When absolute minResource is used, we must make sure both " - + "parent and child all use absolute minResource"); - } - - // Ensure that for each parent queue: parent.min-resource >= - // Σ(child.min-resource). - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - Resource minRes = Resources.createResource(0, 0); - for (CSQueue queue : childQueues) { - // Accumulate all min/max resource configured for all child queues. - Resources.addTo(minRes, queue.getQueueResourceQuotas() - .getConfiguredMinResource(nodeLabel)); - } - Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel, - scheduler.getClusterResource()); - Resource parentMinResource = - queueResourceQuotas.getConfiguredMinResource(nodeLabel); - if (!parentMinResource.equals(Resources.none()) && Resources.lessThan( - resourceCalculator, resourceByLabel, parentMinResource, minRes)) { - throw new IOException( - "Parent Queues" + " capacity: " + parentMinResource - + " is less than" + " to its children:" + minRes - + " for queue:" + queueName); - } - } - } - - // When child uses percent - if (childrenCapacityType == QueueCapacityType.PERCENT) { - float childrenPctSum = 0; - // check label capacities - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - // check children's labels - childrenPctSum = 0; - for (CSQueue queue : childQueues) { - childrenPctSum += queue.getQueueCapacities().getCapacity(nodeLabel); - } - - if (Math.abs(1 - childrenPctSum) > PRECISION) { - // When children's percent sum != 100% - if (Math.abs(childrenPctSum) > PRECISION) { - // It is wrong when percent sum != {0, 1} - throw new IOException( - "Illegal" + " capacity sum of " + childrenPctSum - + " for children of queue " + queueName + " for label=" - + nodeLabel + ". It should be either 0 or 1.0"); - } else{ - // We also allow children's percent sum = 0 under the following - // conditions - // - Parent uses weight mode - // - Parent uses percent mode, and parent has - // (capacity=0 OR allowZero) - if (parentCapacityType == QueueCapacityType.PERCENT) { - if ((Math.abs(queueCapacities.getCapacity(nodeLabel)) - > PRECISION) && (!allowZeroCapacitySum)) { - throw new IOException( - "Illegal" + " capacity sum of " + childrenPctSum - + " for children of queue " + queueName - + " for label=" + nodeLabel - + ". It is set to 0, but parent percent != 0, and " - + "doesn't allow children capacity to set to 0"); - } - } - } - } else { - // Even if child pct sum == 1.0, we will make sure parent has - // positive percent. - if (parentCapacityType == QueueCapacityType.PERCENT && Math.abs( - queueCapacities.getCapacity(nodeLabel)) <= 0f - && !allowZeroCapacitySum) { - throw new IOException( - "Illegal" + " capacity sum of " + childrenPctSum - + " for children of queue " + queueName + " for label=" - + nodeLabel + ". queue=" + queueName - + " has zero capacity, but child" - + "queues have positive capacities"); - } - } - } - } + getQueueConfigMode().validate(childQueues); this.childQueues.clear(); this.childQueues.addAll(childQueues); @@ -548,7 +363,7 @@ private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf) boolean weightsAreUsed = false; try { weightsAreUsed = getCapacityConfigurationTypeForQueues(childQueues) - == QueueCapacityType.WEIGHT; + .isWeight(); } catch (IOException e) { LOG.warn("Caught Exception during auto queue creation", e); } @@ -1224,57 +1039,12 @@ public void updateClusterResource(Resource clusterResource, // Update absolute capacities of this queue, this need to happen before // below calculation for effective capacities + getQueueConfigMode().calculateEffectiveResourcesPrerequisite( + clusterResource); updateAbsoluteCapacities(); - // Normalize all dynamic queue queue's weight to 1 for all accessible node - // labels, this is important because existing node labels could keep - // changing when new node added, or node label mapping changed. We need - // this to ensure auto created queue can access all labels. - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - for (CSQueue queue : childQueues) { - // For dynamic queue, we will set weight to 1 every time, because it - // is possible new labels added to the parent. - if (((AbstractCSQueue) queue).isDynamicQueue()) { - if (queue.getQueueCapacities().getWeight(nodeLabel) == -1f) { - queue.getQueueCapacities().setWeight(nodeLabel, 1f); - } - } - } - } - - // Normalize weight of children - if (getCapacityConfigurationTypeForQueues(childQueues) - == QueueCapacityType.WEIGHT) { - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - float sumOfWeight = 0; - - for (CSQueue queue : childQueues) { - if (queue.getQueueCapacities().getExistingNodeLabels() - .contains(nodeLabel)) { - float weight = Math.max(0, - queue.getQueueCapacities().getWeight(nodeLabel)); - sumOfWeight += weight; - } - } - // When sum of weight == 0, skip setting normalized_weight (so - // normalized weight will be 0). - if (Math.abs(sumOfWeight) > 1e-6) { - for (CSQueue queue : childQueues) { - if (queue.getQueueCapacities().getExistingNodeLabels() - .contains(nodeLabel)) { - queue.getQueueCapacities().setNormalizedWeight(nodeLabel, - queue.getQueueCapacities().getWeight(nodeLabel) / - sumOfWeight); - } - } - } - } - } - // Update effective capacity in all parent queue. - for (String label : configuredNodeLabels) { - calculateEffectiveResourcesAndCapacity(label, clusterResource); - } + getQueueConfigMode().calculateEffectiveResource(clusterResource); // Update all children for (CSQueue childQueue : childQueues) { @@ -1304,80 +1074,6 @@ public boolean hasChildQueues() { return true; } - private void calculateEffectiveResourcesAndCapacity(String label, - Resource clusterResource) { - // For root queue, ensure that max/min resource is updated to latest - // cluster resource. - Resource resourceByLabel = labelManager.getResourceByLabel(label, - clusterResource); - - /* - * == Below logic are added to calculate effectiveMinRatioPerResource == - */ - - // Total configured min resources of direct children of this given parent - // queue - Resource configuredMinResources = Resource.newInstance(0L, 0); - for (CSQueue childQueue : getChildQueues()) { - Resources.addTo(configuredMinResources, - childQueue.getQueueResourceQuotas().getConfiguredMinResource(label)); - } - - // Factor to scale down effective resource: When cluster has sufficient - // resources, effective_min_resources will be same as configured - // min_resources. - Resource numeratorForMinRatio = null; - ResourceCalculator rc = this.csContext.getResourceCalculator(); - if (getQueuePath().equals("root")) { - if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(rc, - clusterResource, resourceByLabel, configuredMinResources)) { - numeratorForMinRatio = resourceByLabel; - } - } else { - if (Resources.lessThan(rc, clusterResource, - queueResourceQuotas.getEffectiveMinResource(label), - configuredMinResources)) { - numeratorForMinRatio = queueResourceQuotas - .getEffectiveMinResource(label); - } - } - - effectiveMinRatioPerResource = getEffectiveMinRatioPerResource( - configuredMinResources, numeratorForMinRatio); - - // Update effective resources for my self; - if (rootQueue) { - queueResourceQuotas.setEffectiveMinResource(label, resourceByLabel); - queueResourceQuotas.setEffectiveMaxResource(label, resourceByLabel); - } else{ - super.updateEffectiveResources(clusterResource); - } - } - - private Map getEffectiveMinRatioPerResource( - Resource configuredMinResources, Resource numeratorForMinRatio) { - Map effectiveMinRatioPerResource = new HashMap<>(); - if (numeratorForMinRatio != null) { - int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); - for (int i = 0; i < maxLength; i++) { - ResourceInformation nResourceInformation = numeratorForMinRatio - .getResourceInformation(i); - ResourceInformation dResourceInformation = configuredMinResources - .getResourceInformation(i); - - long nValue = nResourceInformation.getValue(); - long dValue = UnitsConversionUtil.convert( - dResourceInformation.getUnits(), nResourceInformation.getUnits(), - dResourceInformation.getValue()); - if (dValue != 0) { - effectiveMinRatioPerResource.put(nResourceInformation.getName(), - (float) nValue / dValue); - } - } - } - return ImmutableMap.copyOf(effectiveMinRatioPerResource); - } - @Override public List getChildQueues() { readLock.lock(); @@ -1651,11 +1347,6 @@ void decrementRunnableApps() { } } - // This is a locking free method - Map getEffectiveMinRatioPerResource() { - return effectiveMinRatioPerResource; - } - @Override public boolean isEligibleForAutoDeletion() { return isDynamicQueue() && getChildQueues().size() == 0 && @@ -1663,6 +1354,11 @@ public boolean isEligibleForAutoDeletion() { isAutoExpiredDeletionEnabled(this.getQueuePath()); } + @Override + public boolean isRoot() { + return rootQueue; + } + public AutoCreatedQueueTemplate getAutoCreatedQueueTemplate() { return autoCreatedQueueTemplate; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/AbstractQueueConfigMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/AbstractQueueConfigMode.java new file mode 100644 index 00000000000..9d4e2346f8e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/AbstractQueueConfigMode.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemode; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * A strategy to encapsulate capacity calculations of a Capacity Scheduler queue + * based on its capacity configuration settings. + */ +public abstract class AbstractQueueConfigMode { + protected static final Logger LOG = + LoggerFactory.getLogger(AbstractQueueConfigMode.class); + + private final AbstractCSQueue queue; + private final CapacitySchedulerContext csContext; + private final Map> effectiveMinRatioByResource = + new HashMap<>(); + + public AbstractQueueConfigMode(AbstractCSQueue queue, + CapacitySchedulerContext csContext) { + this.queue = queue; + this.csContext = csContext; + } + + /** + * Creates a {@code AbstractQueueConfigMode} for the given queue based on + * its capacity configuration settings. + * @param queue given queue + * @param csContext Capacity Scheduler context + * @throws IllegalArgumentException if capacity settings for all node labels + * are not uniform + * @return queue config mode + */ + public static AbstractQueueConfigMode defineMode( + AbstractCSQueue queue, CapacitySchedulerContext csContext) { + Set configuredNodeLabels = queue.getConfiguredNodeLabels(); + CapacityConfigType localType = CapacityConfigType.NONE; + + for (String label : configuredNodeLabels) { + CapacityConfigType configTypePerLabel = + queue.checkConfigTypeIsAbsoluteResource(queue.getQueuePath(), label) + ? CapacityConfigType.ABSOLUTE_RESOURCE + : CapacityConfigType.PERCENTAGE; + if (localType.isNone()) { + localType = configTypePerLabel; + } + if (!queue.getQueuePath().equals("root") + && !configTypePerLabel.equals(localType)) { + throw new IllegalArgumentException("Queue '" + queue.getQueuePath() + + "' should use either percentage based capacity" + + " configuration or absolute resource."); + } + } + + if (localType.isAbsolute()) { + return new ResourceConfigMode(queue, csContext); + } + + return new PercentageConfigMode(queue, csContext); + } + + /** + * Handles any calculation prior to calculating effective resources. + * @throws IOException if configuration mode of the child queues are not + * determinable + */ + public void calculateEffectiveResourcesPrerequisite(Resource clusterResource) + throws IOException { + for (String nodeLabel : getQueue().getQueueCapacities() + .getExistingNodeLabels()) { + calculateEffectiveRatio(clusterResource, nodeLabel); + } + } + + /** + * Returns the effective minimum resource ratio per resource grouped by node + * labels. + * @return effective minimum ratios per resource + */ + public Map> getEffectiveMinRatioByResource() { + return effectiveMinRatioByResource; + } + + /** + * Returns the {@code CapacityConfigType} of the queue. + * @return capacity config type + */ + public abstract CapacityConfigType getConfigType(); + + /** + * Determines whether a collection of newly parsed child queues could be + * placed under the queue. + * @param childQueues newly parsed child queues + * @throws IOException if child queues can not be placed under the queue + */ + public abstract void validate( + Collection childQueues) throws IOException; + + /** + * Calculates minimum and maximum effective capacity values. + * @param clusterResource overall cluster resource + */ + public abstract void calculateEffectiveResource(Resource clusterResource); + + /** + * Calculates the maximum application number limit of the queue. + * @param maxGlobalApplications global maximum application limit + * @param label node label + * @return calculated maximum application limit for a specific node label + */ + public abstract int calculateMaxApplications( + int maxGlobalApplications, String label); + + public AbstractCSQueue getQueue() { + return queue; + } + + public CapacitySchedulerContext getCsContext() { + return csContext; + } + + /** + * Cache effective minimum ratio per resource type based on overall cluster + * resource divided by effective minimum resource of the queue. + * @param clusterResource overall cluster resource + * @param label node label + */ + private void calculateEffectiveRatio(Resource clusterResource, String label) { + if (getQueue().getChildQueues() == null) { + return; + } + + Resource resourceByLabel = getCsContext().getRMContext() + .getNodeLabelManager().getResourceByLabel(label, clusterResource); + // Total configured min resources of direct children of this given parent + // queue + Resource configuredMinResources = Resource.newInstance(0L, 0); + for (CSQueue childQueue : getQueue().getChildQueues()) { + Resources.addTo(configuredMinResources, + childQueue.getQueueResourceQuotas().getConfiguredMinResource(label)); + } + + // Factor to scale down effective resource: When cluster has sufficient + // resources, effective_min_resources will be same as configured + // min_resources. + Resource numeratorForMinRatio = null; + ResourceCalculator rc = getCsContext().getResourceCalculator(); + if (getQueue().getQueuePath().equals("root")) { + if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(rc, + clusterResource, resourceByLabel, configuredMinResources)) { + numeratorForMinRatio = resourceByLabel; + } + } else { + if (Resources.lessThan(rc, clusterResource, + getQueue().getQueueResourceQuotas().getEffectiveMinResource(label), + configuredMinResources)) { + numeratorForMinRatio = getQueue().getQueueResourceQuotas() + .getEffectiveMinResource(label); + } + } + + Map effectiveMinRatioPerResource = new HashMap<>(); + if (numeratorForMinRatio != null) { + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); + for (int i = 0; i < maxLength; i++) { + ResourceInformation nResourceInformation = numeratorForMinRatio + .getResourceInformation(i); + ResourceInformation dResourceInformation = configuredMinResources + .getResourceInformation(i); + + long nValue = nResourceInformation.getValue(); + long dValue = UnitsConversionUtil.convert( + dResourceInformation.getUnits(), nResourceInformation.getUnits(), + dResourceInformation.getValue()); + if (dValue != 0) { + effectiveMinRatioPerResource.put(nResourceInformation.getName(), + (float) nValue / dValue); + } + } + } + effectiveMinRatioByResource.put(label, effectiveMinRatioPerResource); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/PercentageConfigMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/PercentageConfigMode.java new file mode 100644 index 00000000000..83b5a6c10ef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/PercentageConfigMode.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemode; + +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +/** + * A strategy to represent queue mode in case of percentage-based capacity + * config type eg. root.capacity=100 + */ +public class PercentageConfigMode extends AbstractQueueConfigMode { + private static final float PRECISION = 0.0005f; // 0.05% precision + + public PercentageConfigMode( + AbstractCSQueue queue, CapacitySchedulerContext csContext) { + super(queue, csContext); + } + + @Override + public CapacityConfigType getConfigType() { + return CapacityConfigType.PERCENTAGE; + } + + /** + * Validates whether the newly parsed child queues adhere to the following + * conditions: + * 1. The overall capacity sum of all child queues is either 0 (if it is + * allowed in configuration) or 1 + * 2. The overall capacity sum of all child queues is 1 and the parent + * capacity is not 0 (unless it is allowed in configuration) + * @param childQueues newly parsed child queues + * @throws IOException if config type of children can not be determined + */ + @Override + public void validate(Collection childQueues) throws IOException { + CapacityConfigType childrenConfigType = + AbstractCSQueue.getCapacityConfigurationTypeForQueues(childQueues); + CapacityConfigType parentConfigType = AbstractCSQueue. + getCapacityConfigurationTypeForQueues(ImmutableList.of(getQueue())); + + if (childrenConfigType.isAbsolute() && !getQueue().getQueuePath().equals( + CapacitySchedulerConfiguration.ROOT)) { + throw new IOException("Parent=" + getQueue().getQueuePath() + + ": When absolute minResource is used, we must make sure both " + + "parent and child all use absolute minResource"); + } + + // When child uses percent + if (childrenConfigType.isPercentage()) { + float childrenPctSum; + // check label capacities + for (String nodeLabel : getQueue().getQueueCapacities() + .getExistingNodeLabels()) { + // check children's labels + childrenPctSum = 0; + for (CSQueue queue : childQueues) { + childrenPctSum += queue.getQueueCapacities().getCapacity(nodeLabel); + } + + if (Math.abs(1 - childrenPctSum) > PRECISION) { + // When children's percent sum != 100% + if (Math.abs(childrenPctSum) > PRECISION) { + // It is wrong when percent sum != {0, 1} + throw new IOException( + "Illegal" + " capacity sum of " + childrenPctSum + + " for children of queue " + getQueue().getQueuePath() + + " for label=" + nodeLabel + ". " + + "It should be either 0 or 1.0"); + } else { + // We also allow children's percent sum = 0 under the following + // conditions + // - Parent uses weight mode + // - Parent uses percent mode, and parent has + // (capacity=0 OR allowZero) + if (parentConfigType.isPercentage()) { + if ((Math.abs(getQueue().getQueueCapacities().getCapacity( + nodeLabel)) > PRECISION) && (!getQueue() + .isAllowZeroCapacitySum())) { + throw new IOException( + "Illegal" + " capacity sum of " + childrenPctSum + + " for children of queue " + getQueue().getQueuePath() + + " for label=" + nodeLabel + + ". It is set to 0, but parent percent != 0, and " + + "doesn't allow children capacity to set to 0"); + } + } + } + } else { + // Even if child pct sum == 1.0, we will make sure parent has + // positive percent. + if (parentConfigType == CapacityConfigType.PERCENTAGE && Math.abs( + getQueue().getQueueCapacities().getCapacity(nodeLabel)) <= 0f + && !getQueue().isAllowZeroCapacitySum()) { + throw new IOException( + "Illegal" + " capacity sum of " + childrenPctSum + + " for children of queue " + getQueue().getQueuePath() + + " for label=" + nodeLabel + ". queue=" + + getQueue().getQueuePath() + " has zero capacity, but child" + + "queues have positive capacities"); + } + } + } + } + } + + @Override + public void calculateEffectiveResourcesPrerequisite(Resource clusterResource) + throws IOException { + super.calculateEffectiveResourcesPrerequisite(clusterResource); + for (String nodeLabel : getQueue().getQueueCapacities() + .getExistingNodeLabels()) { + float sumOfWeight = 0; + + for (CSQueue childQueue : getQueue().getChildQueues()) { + // Normalize all dynamic queue queue's weight to 1 for all accessible + // node labels, this is important because existing node labels could + // keep changing when new node added, or node label mapping changed. + // We need this to ensure auto created queue can access all labels. + if (((AbstractCSQueue) childQueue).isDynamicQueue()) { + if (childQueue.getQueueCapacities().getWeight(nodeLabel) == -1f) { + childQueue.getQueueCapacities().setWeight(nodeLabel, 1f); + } + } + float weight = Math.max(0, + childQueue.getQueueCapacities().getWeight(nodeLabel)); + sumOfWeight += weight; + } + + if (AbstractCSQueue.getCapacityConfigurationTypeForQueues( + getQueue().getChildQueues()).isWeight()) { + for (CSQueue childQueue : getQueue().getChildQueues()) { + // When sum of weight == 0, skip setting normalized_weight (so + // normalized weight will be 0). + if (Math.abs(sumOfWeight) > 1e-6) { + childQueue.getQueueCapacities().setNormalizedWeight(nodeLabel, + childQueue.getQueueCapacities(). + getWeight(nodeLabel) / sumOfWeight); + } + } + } + } + } + + /** + * Set effective resources based on current absolute capacities. + * @param clusterResource overall cluster resource + */ + @Override + public void calculateEffectiveResource(Resource clusterResource) { + for (String label : getQueue().getConfiguredNodeLabels()) { + Resource resourceByLabel = getQueue().getLabelManager() + .getResourceByLabel(label, clusterResource); + getQueue().getQueueResourceQuotas().setEffectiveMinResource(label, + Resources.multiply(resourceByLabel, + getQueue().getQueueCapacities().getAbsoluteCapacity(label))); + getQueue().getQueueResourceQuotas().setEffectiveMaxResource( + label, Resources.multiply(resourceByLabel, + getQueue().getQueueCapacities().getAbsoluteMaximumCapacity( + label))); + + if (LOG.isDebugEnabled()) { + LOG.debug("Updating effective min resource for queue:" + + getQueue().getQueuePath() + " as effMinResource=" + + getQueue().getQueueResourceQuotas().getEffectiveMinResource(label) + + "and Updating effective max resource as effMaxResource=" + + getQueue().getQueueResourceQuotas().getEffectiveMaxResource( + label)); + } + } + } + + @Override + public int calculateMaxApplications(int maxGlobalApplications, String label) { + return maxGlobalApplications; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/ResourceConfigMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/ResourceConfigMode.java new file mode 100644 index 00000000000..c33ab508124 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemode/ResourceConfigMode.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemode; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +/** + * A strategy to represent queue mode in case of config type defined by an + * absolute resource value eg. root.capacity=[memory=1024,vcore=2] + */ +public class ResourceConfigMode extends AbstractQueueConfigMode { + public ResourceConfigMode(AbstractCSQueue queue, + CapacitySchedulerContext csContext) { + super(queue, csContext); + } + + @Override + public void validate(Collection childQueues) throws IOException { + CapacityConfigType childrenConfigType = + AbstractCSQueue.getCapacityConfigurationTypeForQueues(childQueues); + if (childrenConfigType != getConfigType() && !getQueue().getQueuePath() + .equals(CapacitySchedulerConfiguration.ROOT)) { + throw new IOException("Parent=" + getQueue().getQueuePath() + + ": When absolute minResource is used, we must make sure both " + + "parent and child all use absolute minResource"); + } + + // Ensure that for each parent queue: parent.min-resource >= + // Σ(child.min-resource). + for (String nodeLabel : getQueue().getQueueCapacities() + .getExistingNodeLabels()) { + Resource minRes = Resources.createResource(0, 0); + for (CSQueue queue : childQueues) { + // Accumulate all min/max resource configured for all child queues. + Resources.addTo(minRes, queue.getQueueResourceQuotas() + .getConfiguredMinResource(nodeLabel)); + } + Resource resourceByLabel = getCsContext().getRMContext() + .getNodeLabelManager().getResourceByLabel(nodeLabel, + getCsContext().getClusterResource()); + Resource parentMinResource = + getQueue().getQueueResourceQuotas().getConfiguredMinResource( + nodeLabel); + if (!parentMinResource.equals(Resources.none()) && Resources.lessThan( + getCsContext().getResourceCalculator(), resourceByLabel, + parentMinResource, minRes)) { + throw new IOException( + "Parent Queues" + " capacity: " + parentMinResource + + " is less than" + " to its children:" + minRes + + " for queue:" + getQueue().getQueueName()); + } + } + } + + @Override + public CapacityConfigType getConfigType() { + return CapacityConfigType.ABSOLUTE_RESOURCE; + } + + /** + * Set effective resources based on normalized minimum resource ratios. + * Capacity values are recalculated after setting the effective resources. + * @param clusterResource overall cluster resource + */ + @Override + public void calculateEffectiveResource(Resource clusterResource) { + Set configuredNodeLabels = getQueue().getConfiguredNodeLabels(); + CSQueue parent = getQueue().getParent(); + ResourceCalculator rc = getCsContext().getResourceCalculator(); + + for (String label : configuredNodeLabels) { + Resource resourceByLabel = getQueue().getLabelManager() + .getResourceByLabel(label, clusterResource); + + if (getQueue().isRoot()) { + getQueue().getQueueResourceQuotas().setEffectiveMinResource( + label, resourceByLabel); + getQueue().getQueueResourceQuotas().setEffectiveMaxResource( + label, resourceByLabel); + } else { + getQueue().getQueueResourceQuotas().setEffectiveMinResource(label, + getMinResourceNormalized(label)); + + // Max resource of a queue should be a minimum of {configuredMaxRes, + // parentMaxRes}. parentMaxRes could be configured value. But if not + // present could also be taken from effective max resource of parent. + Resource parentMaxRes = + parent.getQueueResourceQuotas().getConfiguredMaxResource(label); + if (parentMaxRes.equals(Resources.none())) { + parentMaxRes = + parent.getQueueResourceQuotas().getEffectiveMaxResource(label); + } + + // Minimum of {childMaxResource, parentMaxRes}. However if + // childMaxResource is empty, consider parent's max resource alone. + Resource childMaxResource = + getQueue().getQueueResourceQuotas().getConfiguredMaxResource(label); + Resource effMaxResource = Resources.min(rc, resourceByLabel, + childMaxResource.equals(Resources.none()) ? parentMaxRes : + childMaxResource, parentMaxRes); + getQueue().getQueueResourceQuotas().setEffectiveMaxResource(label, + Resources.clone(effMaxResource)); + + // In cases where we still need to update some units based on + // percentage, we have to calculate percentage and update. + deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Updating effective min resource for queue:" + + getQueue().getQueuePath() + " as effMinResource=" + + getQueue().getQueueResourceQuotas().getEffectiveMinResource(label) + + "and Updating effective max resource as effMaxResource=" + + getQueue().getQueueResourceQuotas().getEffectiveMaxResource( + label)); + } + } + } + + @Override + public int calculateMaxApplications(int maxGlobalApplications, + String label) { + return (int) (maxGlobalApplications * + getQueue().getQueueCapacities().getAbsoluteCapacity(label)); + } + + /** + * Set capacity and absolute capacity values based on effective resources. + * @param label node label + * @param clusterResource overall cluster resource + * @param rc resource calculator + */ + private void deriveCapacityFromAbsoluteConfigurations( + String label, Resource clusterResource, ResourceCalculator rc) { + + /* + * In case when queues are configured with absolute resources, it is better + * to update capacity/max-capacity etc w.r.t absolute resource as well. In + * case of computation, these values wont be used any more. However for + * metrics and UI, its better these values are pre-computed here itself. + */ + + // 1. Update capacity as a float based on parent's minResource + float f = rc.divide(clusterResource, + getQueue().getQueueResourceQuotas().getEffectiveMinResource(label), + getQueue().getParent().getQueueResourceQuotas().getEffectiveMinResource( + label)); + getQueue().getQueueCapacities().setCapacity(label, + Float.isInfinite(f) ? 0 : f); + + // 2. Update max-capacity as a float based on parent's maxResource + f = rc.divide(clusterResource, + getQueue().getQueueResourceQuotas().getEffectiveMaxResource(label), + getQueue().getParent().getQueueResourceQuotas().getEffectiveMaxResource( + label)); + getQueue().getQueueCapacities().setMaximumCapacity( + label, Float.isInfinite(f) ? 0 : f); + + // 3. Update absolute capacity as a float based on parent's minResource and + // cluster resource. + getQueue().getQueueCapacities().setAbsoluteCapacity(label, + getQueue().getQueueCapacities().getCapacity(label) * + getQueue().getParent().getQueueCapacities().getAbsoluteCapacity( + label)); + + // 4. Update absolute max-capacity as a float based on parent's maxResource + // and cluster resource. + getQueue().getQueueCapacities().setAbsoluteMaximumCapacity(label, + getQueue().getQueueCapacities().getMaximumCapacity(label) * + getQueue().getParent().getQueueCapacities() + .getAbsoluteMaximumCapacity(label)); + } + + + /** + * Recalculate a resource value from the minimum configured resource and + * the effective minimum ratio. + * @param label node label + * @return normalized configured minimum resource + */ + private Resource getMinResourceNormalized(String label) { + Resource minResource = getQueue().getQueueResourceQuotas() + .getConfiguredMinResource(label); + Resource ret = Resource.newInstance(minResource); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); + if (!(getQueue().getParent() instanceof AbstractCSQueue)) { + return Resources.none(); + } + Map effectiveRatio = ((AbstractCSQueue) getQueue() + .getParent()).getQueueConfigMode().getEffectiveMinRatioByResource() + .get(label); + for (int i = 0; i < maxLength; i++) { + ResourceInformation nResourceInformation = + minResource.getResourceInformation(i); + + Float ratio = effectiveRatio.get(nResourceInformation.getName()); + if (ratio != null) { + ret.setResourceValue(i, + (long) (nResourceInformation.getValue() * ratio)); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating min resource for Queue: " + getQueue() + .getQueuePath() + " as " + ret.getResourceInformation(i) + + ", Actual resource: " + nResourceInformation.getValue() + + ", ratio: " + ratio); + } + } + } + return ret; + } +}