commit 36c6b2fda560b4b66c988012f63348431214d73f Author: Wangda Tan Date: Wed Dec 9 20:30:59 2020 -0800 Added queue weight mode, with very very basic sanity UT Change-Id: Ib688682f1e31d764153d34d238366800f42f5b8f (cherry picked from commit 27310d6c23013f1ca7eca2e9e5a17515db325693) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java index 2b222419606..8d7733453f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java @@ -128,9 +128,4 @@ public void setEntitlement(String nodeLabel, QueueEntitlement entitlement) writeLock.unlock(); } } - - protected void setupConfigurableCapacities(QueueCapacities queueCapacities) { - CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), - queueCapacities, parent == null ? null : parent.getQueueCapacities()); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 8d22a36d99d..a757bc6a482 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -80,8 +80,8 @@ public abstract class AbstractCSQueue implements CSQueue { - private static final Logger LOG = - LoggerFactory.getLogger(AbstractCSQueue.class); + private static final Logger LOG = LoggerFactory.getLogger( + AbstractCSQueue.class); volatile CSQueue parent; final String queueName; private final String queuePath; @@ -100,7 +100,7 @@ String defaultLabelExpression; private String multiNodeSortingPolicyName = null; - Map acls = + Map acls = new HashMap(); volatile boolean reservationsContinueLooking; private volatile boolean preemptionDisabled; @@ -112,7 +112,7 @@ volatile ResourceUsage queueUsage; private final boolean fullPathQueueNamingPolicy = false; - + // Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity, // etc. QueueCapacities queueCapacities; @@ -129,12 +129,16 @@ private volatile boolean defaultAppLifetimeWasSpecifiedInConfig = false; protected enum CapacityConfigType { + // FIXME, from what I can see, Percentage mode can almost apply to weighted + // and percentage mode at the same time, there's only small area need to be + // changed, we need to rename "PERCENTAGE" to "PERCENTAGE" and "WEIGHT" NONE, PERCENTAGE, ABSOLUTE_RESOURCE - }; - protected CapacityConfigType capacityConfigType = - CapacityConfigType.NONE; + } + + ; + protected CapacityConfigType capacityConfigType = CapacityConfigType.NONE; - private final RecordFactory recordFactory = + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); protected CapacitySchedulerContext csContext; protected YarnAuthorizationProvider authorizer = null; @@ -148,8 +152,8 @@ private Map userWeights = new HashMap(); private int maxParallelApps; - public AbstractCSQueue(CapacitySchedulerContext cs, - String queueName, CSQueue parent, CSQueue old) throws IOException { + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, + CSQueue parent, CSQueue old) throws IOException { this(cs, cs.getConfiguration(), queueName, parent, old); } @@ -189,17 +193,15 @@ public AbstractCSQueue(CapacitySchedulerContext cs, writeLock = lock.writeLock(); } + @VisibleForTesting protected void setupConfigurableCapacities() { setupConfigurableCapacities(csContext.getConfiguration()); } protected void setupConfigurableCapacities( CapacitySchedulerConfiguration configuration) { - CSQueueUtils.loadUpdateAndCheckCapacities( - getQueuePath(), - configuration, - queueCapacities, - parent == null ? null : parent.getQueueCapacities()); + CSQueueUtils.loadCapacitiesByLabelsFromConf(getQueuePath(), queueCapacities, + configuration); } @Override @@ -250,12 +252,12 @@ public int getNumContainers() { public QueueState getState() { return state; } - + @Override public CSQueueMetrics getMetrics() { return metrics; } - + @Override public String getQueueShortName() { return queueName; @@ -283,7 +285,7 @@ public CSQueue getParent() { public void setParent(CSQueue newParentQueue) { this.parent = newParentQueue; } - + public Set getAccessibleNodeLabels() { return accessibleLabels; } @@ -339,29 +341,25 @@ void setMaxCapacity(String nodeLabel, float maximumCapacity) { } } - @Override public String getDefaultNodeLabelExpression() { return defaultLabelExpression; } - - void setupQueueConfigs(Resource clusterResource) - throws IOException { + + void setupQueueConfigs(Resource clusterResource) throws IOException { setupQueueConfigs(clusterResource, csContext.getConfiguration()); } protected void setupQueueConfigs(Resource clusterResource, - CapacitySchedulerConfiguration configuration) throws - IOException { + CapacitySchedulerConfiguration configuration) throws IOException { writeLock.lock(); try { // get labels - this.accessibleLabels = - configuration.getAccessibleNodeLabels(getQueuePath()); - this.defaultLabelExpression = - configuration.getDefaultNodeLabelExpression( - getQueuePath()); + this.accessibleLabels = configuration.getAccessibleNodeLabels( + getQueuePath()); + this.defaultLabelExpression = configuration.getDefaultNodeLabelExpression( + getQueuePath()); this.resourceTypes = new HashSet(); for (AbsoluteResourceType type : AbsoluteResourceType.values()) { resourceTypes.add(type.toString().toLowerCase()); @@ -392,15 +390,15 @@ protected void setupQueueConfigs(Resource clusterResource, setupMaximumAllocation(configuration); // Max parallel apps - int queueMaxParallelApps = - configuration.getMaxParallelAppsForQueue(getQueuePath()); + int queueMaxParallelApps = configuration.getMaxParallelAppsForQueue( + getQueuePath()); setMaxParallelApps(queueMaxParallelApps); // initialized the queue state based on previous state, configured state // and its parent state. QueueState previous = getState(); - QueueState configuredState = configuration - .getConfiguredState(getQueuePath()); + QueueState configuredState = configuration.getConfiguredState( + getQueuePath()); QueueState parentState = (parent == null) ? null : parent.getState(); initializeQueueState(previous, configuredState, parentState); @@ -408,10 +406,6 @@ protected void setupQueueConfigs(Resource clusterResource, this.acls = configuration.getAcls(getQueuePath()); - // Update metrics - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - this, labelManager, null); - // Check if labels of this queue is a subset of parent queue, only do this // when we not root if (parent != null && parent.getParent() != null) { @@ -443,8 +437,7 @@ protected void setupQueueConfigs(Resource clusterResource, this.intraQueuePreemptionDisabledInHierarchy = isIntraQueueHierarchyPreemptionDisabled(this, configuration); - this.priority = configuration.getQueuePriority( - getQueuePath()); + this.priority = configuration.getQueuePriority(getQueuePath()); // Update multi-node sorting algorithm for scheduling as configured. setMultiNodeSortingPolicyName( @@ -453,17 +446,17 @@ protected void setupQueueConfigs(Resource clusterResource, this.userWeights = getUserWeightsFromHierarchy(configuration); maxApplicationLifetime = getInheritedMaxAppLifetime(this, configuration); - defaultApplicationLifetime = - getInheritedDefaultAppLifetime(this, configuration, - maxApplicationLifetime); - if (maxApplicationLifetime > 0 && - defaultApplicationLifetime > maxApplicationLifetime) { + defaultApplicationLifetime = getInheritedDefaultAppLifetime(this, + configuration, maxApplicationLifetime); + if (maxApplicationLifetime > 0 + && defaultApplicationLifetime > maxApplicationLifetime) { throw new YarnRuntimeException( "Default lifetime " + defaultApplicationLifetime + " can't exceed maximum lifetime " + maxApplicationLifetime); } - defaultApplicationLifetime = defaultApplicationLifetime > 0 - ? defaultApplicationLifetime : maxApplicationLifetime; + defaultApplicationLifetime = defaultApplicationLifetime > 0 ? + defaultApplicationLifetime : + maxApplicationLifetime; } finally { writeLock.unlock(); } @@ -471,17 +464,16 @@ protected void setupQueueConfigs(Resource clusterResource, private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) { String myQueuePath = getQueuePath(); - Resource clusterMax = ResourceUtils - .fetchMaximumAllocationFromConfig(csConf); + Resource clusterMax = ResourceUtils.fetchMaximumAllocationFromConfig( + csConf); Resource queueMax = csConf.getQueueMaximumAllocation(myQueuePath); maximumAllocation = Resources.clone( - parent == null ? clusterMax : parent.getMaximumAllocation()); + parent == null ? clusterMax : parent.getMaximumAllocation()); String errMsg = - "Queue maximum allocation cannot be larger than the cluster setting" - + " for queue " + myQueuePath - + " max allocation per queue: %s" + "Queue maximum allocation cannot be larger than the cluster setting" + + " for queue " + myQueuePath + " max allocation per queue: %s" + " cluster setting: " + clusterMax; if (queueMax == Resources.none()) { @@ -497,12 +489,12 @@ private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) { } if ((queueMemory != UNDEFINED && queueMemory > clusterMax.getMemorySize() - || (queueVcores != UNDEFINED - && queueVcores > clusterMax.getVirtualCores()))) { + || (queueVcores != UNDEFINED && queueVcores > clusterMax + .getVirtualCores()))) { throw new IllegalArgumentException( - String.format(errMsg, maximumAllocation)); + String.format(errMsg, maximumAllocation)); } - } else { + } else{ // Queue level maximum-allocation can't be larger than cluster setting for (ResourceInformation ri : queueMax.getResources()) { if (ri.compareTo(clusterMax.getResourceInformation(ri.getName())) > 0) { @@ -514,9 +506,8 @@ private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) { } } - private Map getUserWeightsFromHierarchy - (CapacitySchedulerConfiguration configuration) throws - IOException { + private Map getUserWeightsFromHierarchy( + CapacitySchedulerConfiguration configuration) throws IOException { Map unionInheritedWeights = new HashMap(); CSQueue parentQ = getParent(); if (parentQ != null) { @@ -530,15 +521,19 @@ private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) { return unionInheritedWeights; } - protected Resource getMinimumAbsoluteResource(String queuePath, String label) { - Resource minResource = csContext.getConfiguration() - .getMinimumResourceRequirement(label, queuePath, resourceTypes); + protected Resource getMinimumAbsoluteResource(String queuePath, + String label) { + Resource minResource = + csContext.getConfiguration().getMinimumResourceRequirement(label, + queuePath, resourceTypes); return minResource; } - protected Resource getMaximumAbsoluteResource(String queuePath, String label) { - Resource maxResource = csContext.getConfiguration() - .getMaximumResourceRequirement(label, queuePath, resourceTypes); + protected Resource getMaximumAbsoluteResource(String queuePath, + String label) { + Resource maxResource = + csContext.getConfiguration().getMaximumResourceRequirement(label, + queuePath, resourceTypes); return maxResource; } @@ -557,18 +552,19 @@ protected void updateConfigurableResourceRequirement(String queuePath, Resource minResource = getMinimumAbsoluteResource(queuePath, label); Resource maxResource = getMaximumAbsoluteResource(queuePath, label); - LOG.debug("capacityConfigType is '{}' for queue {}", - capacityConfigType, getQueuePath()); + LOG.debug("capacityConfigType is '{}' for queue {}", capacityConfigType, + getQueuePath()); CapacityConfigType localType = checkConfigTypeIsAbsoluteResource( - queuePath, label) ? CapacityConfigType.ABSOLUTE_RESOURCE - : CapacityConfigType.PERCENTAGE; + queuePath, label) ? + CapacityConfigType.ABSOLUTE_RESOURCE : + CapacityConfigType.PERCENTAGE; if (this.capacityConfigType.equals(CapacityConfigType.NONE)) { this.capacityConfigType = localType; LOG.debug("capacityConfigType is updated as '{}' for queue {}", capacityConfigType, getQueuePath()); - } else { + } else{ validateAbsoluteVsPercentageCapacityConfig(localType); } @@ -576,36 +572,38 @@ protected void updateConfigurableResourceRequirement(String queuePath, // throw exception to handle such error configs. if (!maxResource.equals(Resources.none()) && Resources.greaterThan( resourceCalculator, clusterResource, minResource, maxResource)) { - throw new IllegalArgumentException("Min resource configuration " - + minResource + " is greater than its max value:" + maxResource - + " in queue:" + getQueuePath()); + throw new IllegalArgumentException( + "Min resource configuration " + minResource + + " is greater than its max value:" + maxResource + " in queue:" + + getQueuePath()); } // If parent's max resource is lesser to a specific child's max // resource, throw exception to handle such error configs. if (parent != null) { - Resource parentMaxRes = parent.getQueueResourceQuotas() - .getConfiguredMaxResource(label); + Resource parentMaxRes = + parent.getQueueResourceQuotas().getConfiguredMaxResource(label); if (Resources.greaterThan(resourceCalculator, clusterResource, parentMaxRes, Resources.none())) { if (Resources.greaterThan(resourceCalculator, clusterResource, maxResource, parentMaxRes)) { - throw new IllegalArgumentException("Max resource configuration " - + maxResource + " is greater than parents max value:" - + parentMaxRes + " in queue:" + getQueuePath()); + throw new IllegalArgumentException( + "Max resource configuration " + maxResource + + " is greater than parents max value:" + parentMaxRes + + " in queue:" + getQueuePath()); } // If child's max resource is not set, but its parent max resource is // set, we must set child max resource to its parent's. - if (maxResource.equals(Resources.none()) - && !minResource.equals(Resources.none())) { + if (maxResource.equals(Resources.none()) && !minResource.equals( + Resources.none())) { maxResource = Resources.clone(parentMaxRes); } } } LOG.debug("Updating absolute resource configuration for queue:{} as" - + " minResource={} and maxResource={}", getQueuePath(), minResource, + + " minResource={} and maxResource={}", getQueuePath(), minResource, maxResource); queueResourceQuotas.setConfiguredMinResource(label, minResource); @@ -615,8 +613,8 @@ protected void updateConfigurableResourceRequirement(String queuePath, private void validateAbsoluteVsPercentageCapacityConfig( CapacityConfigType localType) { - if (!queuePath.equals("root") - && !this.capacityConfigType.equals(localType)) { + if (!queuePath.equals("root") && !this.capacityConfigType.equals( + localType)) { throw new IllegalArgumentException("Queue '" + getQueuePath() + "' should use either percentage based capacity" + " configuration or absolute resource."); @@ -630,8 +628,8 @@ public CapacityConfigType getCapacityConfigType() { @Override public Resource getEffectiveCapacity(String label) { - return Resources - .clone(getQueueResourceQuotas().getEffectiveMinResource(label)); + return Resources.clone( + getQueueResourceQuotas().getEffectiveMinResource(label)); } @Override @@ -643,8 +641,8 @@ public Resource getEffectiveCapacityDown(String label, Resource factor) { @Override public Resource getEffectiveMaxCapacity(String label) { - return Resources - .clone(getQueueResourceQuotas().getEffectiveMaxResource(label)); + return Resources.clone( + getQueueResourceQuotas().getEffectiveMaxResource(label)); } @Override @@ -670,29 +668,30 @@ private void initializeQueueState(QueueState previousState, // from its parent. If this queue does not has parent, such as root queue, // we would use the configured state. if (parentState == null) { - updateQueueState((configuredState == null) ? defaultState - : configuredState); - } else { + updateQueueState( + (configuredState == null) ? defaultState : configuredState); + } else{ if (configuredState == null) { updateQueueState((parentState == QueueState.DRAINING) ? - QueueState.STOPPED : parentState); + QueueState.STOPPED : + parentState); } else if (configuredState == QueueState.RUNNING && parentState != QueueState.RUNNING) { throw new IllegalArgumentException( "The parent queue:" + parent.getQueuePath() - + " cannot be STOPPED as the child queue:" + queuePath - + " is in RUNNING state."); - } else { + + " cannot be STOPPED as the child queue:" + queuePath + + " is in RUNNING state."); + } else{ updateQueueState(configuredState); } } - } else { + } else{ // when we get a refreshQueue request from AdminService, if (previousState == QueueState.RUNNING) { if (configuredState == QueueState.STOPPED) { stopQueue(); } - } else { + } else{ if (configuredState == QueueState.RUNNING) { try { activeQueue(); @@ -752,20 +751,20 @@ public QueueStatistics getQueueStatistics() { stats.setReservedContainers(getMetrics().getReservedContainers()); return stats; } - + public Map getQueueConfigurations() { Map queueConfigurations = new HashMap<>(); Set nodeLabels = getNodeLabelsForQueue(); for (String nodeLabel : nodeLabels) { - QueueConfigurations queueConfiguration = - recordFactory.newRecordInstance(QueueConfigurations.class); + QueueConfigurations queueConfiguration = recordFactory.newRecordInstance( + QueueConfigurations.class); float capacity = queueCapacities.getCapacity(nodeLabel); float absoluteCapacity = queueCapacities.getAbsoluteCapacity(nodeLabel); float maxCapacity = queueCapacities.getMaximumCapacity(nodeLabel); - float absMaxCapacity = - queueCapacities.getAbsoluteMaximumCapacity(nodeLabel); - float maxAMPercentage = - queueCapacities.getMaxAMResourcePercentage(nodeLabel); + float absMaxCapacity = queueCapacities.getAbsoluteMaximumCapacity( + nodeLabel); + float maxAMPercentage = queueCapacities.getMaxAMResourcePercentage( + nodeLabel); queueConfiguration.setCapacity(capacity); queueConfiguration.setAbsoluteCapacity(absoluteCapacity); queueConfiguration.setMaxCapacity(maxCapacity); @@ -788,14 +787,14 @@ public QueueStatistics getQueueStatistics() { public Resource getMaximumAllocation() { return maximumAllocation; } - + @Private public Resource getMinimumAllocation() { return minimumAllocation; } - - void allocateResource(Resource clusterResource, - Resource resource, String nodePartition) { + + void allocateResource(Resource clusterResource, Resource resource, + String nodePartition) { writeLock.lock(); try { queueUsage.incUsed(nodePartition, resource); @@ -808,9 +807,9 @@ void allocateResource(Resource clusterResource, writeLock.unlock(); } } - - protected void releaseResource(Resource clusterResource, - Resource resource, String nodePartition) { + + protected void releaseResource(Resource clusterResource, Resource resource, + String nodePartition) { writeLock.lock(); try { queueUsage.decUsed(nodePartition, resource); @@ -823,12 +822,12 @@ protected void releaseResource(Resource clusterResource, writeLock.unlock(); } } - + @Private public boolean getReservationContinueLooking() { return reservationsContinueLooking; } - + @Private public Map getACLs() { readLock.lock(); @@ -853,12 +852,12 @@ public boolean getIntraQueuePreemptionDisabled() { public boolean getIntraQueuePreemptionDisabledInHierarchy() { return intraQueuePreemptionDisabledInHierarchy; } - + @Private public QueueCapacities getQueueCapacities() { return queueCapacities; } - + @Private public ResourceUsage getQueueResourceUsage() { return queueUsage; @@ -886,15 +885,15 @@ public QueueResourceQuotas getQueueResourceQuotas() { */ private boolean isQueueHierarchyPreemptionDisabled(CSQueue q, CapacitySchedulerConfiguration configuration) { - boolean systemWidePreemption = - csContext.getConfiguration() - .getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS); + boolean systemWidePreemption = csContext.getConfiguration().getBoolean( + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS); CSQueue parentQ = q.getParent(); // If the system-wide preemption switch is turned off, all of the queues in // the qPath hierarchy have preemption disabled, so return true. - if (!systemWidePreemption) return true; + if (!systemWidePreemption) + return true; // If q is the root queue and the system-wide preemption switch is turned // on, then q does not have preemption disabled (default=false, below) @@ -908,7 +907,7 @@ private boolean isQueueHierarchyPreemptionDisabled(CSQueue q, // inherited from the parent's hierarchy unless explicitly overridden at // this level. return configuration.getPreemptionDisabled(q.getQueuePath(), - parentQ.getPreemptionDisabled()); + parentQ.getPreemptionDisabled()); } private long getInheritedMaxAppLifetime(CSQueue q, @@ -935,9 +934,8 @@ private long getInheritedDefaultAppLifetime(CSQueue q, CSQueue parentQ = q.getParent(); long defaultAppLifetime = conf.getDefaultLifetimePerQueue(getQueuePath()); defaultAppLifetimeWasSpecifiedInConfig = - (defaultAppLifetime >= 0 - || (parentQ != null && - parentQ.getDefaultAppLifetimeWasSpecifiedInConfig())); + (defaultAppLifetime >= 0 || (parentQ != null && parentQ + .getDefaultAppLifetimeWasSpecifiedInConfig())); // If q is the root queue, then get default app lifetime from conf. if (parentQ == null) { @@ -961,10 +959,10 @@ private long getInheritedDefaultAppLifetime(CSQueue q, if (defaultAppLifetimeWasSpecifiedInConfig) { if (parentsDefaultAppLifetime <= myMaxAppLifetime) { defaultAppLifetime = parentsDefaultAppLifetime; - } else { + } else{ defaultAppLifetime = myMaxAppLifetime; } - } else { + } else{ // Default app lifetime value was not set anywhere in this queue's // hierarchy. Use current queue's max lifetime as its default. defaultAppLifetime = myMaxAppLifetime; @@ -989,18 +987,18 @@ private boolean isIntraQueueHierarchyPreemptionDisabled(CSQueue q, boolean systemWideIntraQueuePreemption = csContext.getConfiguration().getBoolean( CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, - CapacitySchedulerConfiguration - .DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED); + CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED); // Intra-queue preemption is disabled for this queue if the system-wide // intra-queue preemption flag is false - if (!systemWideIntraQueuePreemption) return true; + if (!systemWideIntraQueuePreemption) + return true; // Check if this is the root queue and the root queue's intra-queue // preemption disable switch is set CSQueue parentQ = q.getParent(); if (parentQ == null) { - return configuration - .getIntraQueuePreemptionDisabled(q.getQueuePath(), false); + return configuration.getIntraQueuePreemptionDisabled(q.getQueuePath(), + false); } // At this point, the master preemption switch is enabled down to this @@ -1020,8 +1018,7 @@ private Resource getCurrentLimitResource(String nodePartition, * non-labeled resource: limit = min(queue-max-resource, * limit-set-by-parent) */ - Resource queueMaxResource = - getQueueMaxResource(nodePartition); + Resource queueMaxResource = getQueueMaxResource(nodePartition); return Resources.min(resourceCalculator, clusterResource, queueMaxResource, currentResourceLimits.getLimit()); @@ -1030,7 +1027,7 @@ private Resource getCurrentLimitResource(String nodePartition, // all queues on this label equals to total resource with the label. return labelManager.getResourceByLabel(nodePartition, clusterResource); } - + return Resources.none(); } @@ -1043,9 +1040,9 @@ public boolean hasChildQueues() { return childQueues != null && !childQueues.isEmpty(); } - boolean canAssignToThisQueue(Resource clusterResource, - String nodePartition, ResourceLimits currentResourceLimits, - Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) { + boolean canAssignToThisQueue(Resource clusterResource, String nodePartition, + ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved, + SchedulingMode schedulingMode) { readLock.lock(); try { // Get current limited resource: @@ -1081,9 +1078,9 @@ boolean canAssignToThisQueue(Resource clusterResource, // if reservation continue looking enabled, check to see if could we // potentially use this node instead of a reserved node if the application // has reserved containers. - if (this.reservationsContinueLooking - && Resources.greaterThan(resourceCalculator, clusterResource, - resourceCouldBeUnreserved, Resources.none())) { + if (this.reservationsContinueLooking && Resources.greaterThan( + resourceCalculator, clusterResource, resourceCouldBeUnreserved, + Resources.none())) { // resource-without-reserved = used - reserved Resource newTotalWithoutReservedResource = Resources.subtract( usedExceptKillable, resourceCouldBeUnreserved); @@ -1093,13 +1090,14 @@ boolean canAssignToThisQueue(Resource clusterResource, if (Resources.lessThan(resourceCalculator, clusterResource, newTotalWithoutReservedResource, currentLimitResource)) { if (LOG.isDebugEnabled()) { - LOG.debug("try to use reserved: " + getQueuePath() - + " usedResources: " + queueUsage.getUsed() - + ", clusterResources: " + clusterResource - + ", reservedResources: " + resourceCouldBeUnreserved - + ", capacity-without-reserved: " - + newTotalWithoutReservedResource - + ", maxLimitCapacity: " + currentLimitResource); + LOG.debug( + "try to use reserved: " + getQueuePath() + " usedResources: " + + queueUsage.getUsed() + ", clusterResources: " + + clusterResource + ", reservedResources: " + + resourceCouldBeUnreserved + + ", capacity-without-reserved: " + + newTotalWithoutReservedResource + ", maxLimitCapacity: " + + currentLimitResource); } return true; } @@ -1108,26 +1106,25 @@ boolean canAssignToThisQueue(Resource clusterResource, // Can not assign to this queue if (LOG.isDebugEnabled()) { LOG.debug("Failed to assign to queue: " + getQueuePath() - + " nodePatrition: " + nodePartition - + ", usedResources: " + queueUsage.getUsed(nodePartition) - + ", clusterResources: " + clusterResource - + ", reservedResources: " + resourceCouldBeUnreserved - + ", maxLimitCapacity: " + currentLimitResource - + ", currTotalUsed:" + usedExceptKillable); + + " nodePatrition: " + nodePartition + ", usedResources: " + + queueUsage.getUsed(nodePartition) + ", clusterResources: " + + clusterResource + ", reservedResources: " + + resourceCouldBeUnreserved + ", maxLimitCapacity: " + + currentLimitResource + ", currTotalUsed:" + usedExceptKillable); } return false; } if (LOG.isDebugEnabled()) { - LOG.debug("Check assign to queue: " + getQueuePath() - + " nodePartition: " + nodePartition - + ", usedResources: " + queueUsage.getUsed(nodePartition) - + ", clusterResources: " + clusterResource - + ", currentUsedCapacity: " + Resources - .divide(resourceCalculator, clusterResource, - queueUsage.getUsed(nodePartition), labelManager - .getResourceByLabel(nodePartition, clusterResource)) - + ", max-capacity: " + queueCapacities - .getAbsoluteMaximumCapacity(nodePartition)); + LOG.debug( + "Check assign to queue: " + getQueuePath() + " nodePartition: " + + nodePartition + ", usedResources: " + queueUsage + .getUsed(nodePartition) + ", clusterResources: " + + clusterResource + ", currentUsedCapacity: " + Resources + .divide(resourceCalculator, clusterResource, + queueUsage.getUsed(nodePartition), labelManager + .getResourceByLabel(nodePartition, clusterResource)) + + ", max-capacity: " + queueCapacities + .getAbsoluteMaximumCapacity(nodePartition)); } return true; } finally { @@ -1143,7 +1140,7 @@ public void incReservedResource(String partition, Resource reservedRes) { } queueUsage.incReserved(partition, reservedRes); - if(null != parent){ + if (null != parent) { parent.incReservedResource(partition, reservedRes); } } @@ -1155,7 +1152,7 @@ public void decReservedResource(String partition, Resource reservedRes) { } queueUsage.decReserved(partition, reservedRes); - if(null != parent){ + if (null != parent) { parent.decReservedResource(partition, reservedRes); } } @@ -1171,7 +1168,7 @@ public void incPendingResource(String nodeLabel, Resource resourceToInc) { parent.incPendingResource(nodeLabel, resourceToInc); } } - + @Override public void decPendingResource(String nodeLabel, Resource resourceToDec) { if (nodeLabel == null) { @@ -1183,7 +1180,7 @@ public void decPendingResource(String nodeLabel, Resource resourceToDec) { parent.decPendingResource(nodeLabel, resourceToDec); } } - + @Override public void incUsedResource(String nodeLabel, Resource resourceToInc, SchedulerApplicationAttempt application) { @@ -1193,8 +1190,8 @@ public void incUsedResource(String nodeLabel, Resource resourceToInc, // ResourceUsage has its own lock, no addition lock needs here. queueUsage.incUsed(nodeLabel, resourceToInc); CSQueueUtils.updateUsedCapacity(resourceCalculator, - labelManager.getResourceByLabel(nodeLabel, Resources.none()), - nodeLabel, this); + labelManager.getResourceByLabel(nodeLabel, Resources.none()), nodeLabel, + this); if (null != parent) { parent.incUsedResource(nodeLabel, resourceToInc, null); } @@ -1209,8 +1206,8 @@ public void decUsedResource(String nodeLabel, Resource resourceToDec, // ResourceUsage has its own lock, no addition lock needs here. queueUsage.decUsed(nodeLabel, resourceToDec); CSQueueUtils.updateUsedCapacity(resourceCalculator, - labelManager.getResourceByLabel(nodeLabel, Resources.none()), - nodeLabel, this); + labelManager.getResourceByLabel(nodeLabel, Resources.none()), nodeLabel, + this); if (null != parent) { parent.decUsedResource(nodeLabel, resourceToDec, null); } @@ -1220,21 +1217,21 @@ public void decUsedResource(String nodeLabel, Resource resourceToDec, * Return if the queue has pending resource on given nodePartition and * schedulingMode. */ - boolean hasPendingResourceRequest(String nodePartition, - Resource cluster, SchedulingMode schedulingMode) { + boolean hasPendingResourceRequest(String nodePartition, Resource cluster, + SchedulingMode schedulingMode) { return SchedulerUtils.hasPendingResourceRequest(resourceCalculator, queueUsage, nodePartition, cluster, schedulingMode); } - + public boolean accessibleToPartition(String nodePartition) { // if queue's label is *, it can access any node - if (accessibleLabels != null - && accessibleLabels.contains(RMNodeLabelsManager.ANY)) { + if (accessibleLabels != null && accessibleLabels.contains( + RMNodeLabelsManager.ANY)) { return true; } // any queue can access to a node without label - if (nodePartition == null - || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (nodePartition == null || nodePartition.equals( + RMNodeLabelsManager.NO_LABEL)) { return true; } // a queue can access to a node only if it contains any label of the node @@ -1259,9 +1256,10 @@ public Priority getDefaultApplicationPriority() { Set nodeLabels = new HashSet(); if (this.getAccessibleNodeLabels() != null && this.getAccessibleNodeLabels() .contains(RMNodeLabelsManager.ANY)) { - nodeLabels.addAll(Sets.union(this.getQueueCapacities().getNodePartitionsSet(), - this.getQueueResourceUsage().getNodePartitionsSet())); - } else { + nodeLabels.addAll( + Sets.union(this.getQueueCapacities().getNodePartitionsSet(), + this.getQueueResourceUsage().getNodePartitionsSet())); + } else{ nodeLabels.addAll(this.getAccessibleNodeLabels()); } @@ -1280,8 +1278,7 @@ public Resource getTotalKillableResource(String partition) { public Iterator getKillableContainers(String partition) { return csContext.getPreemptionManager().getKillableContainers( - getQueuePath(), - partition); + getQueuePath(), partition); } @VisibleForTesting @@ -1299,8 +1296,8 @@ public boolean accept(Resource cluster, // Do we need to check parent queue before making this decision? boolean checkParentQueue = false; - ContainerAllocationProposal allocation = - request.getFirstAllocatedOrReservedContainer(); + ContainerAllocationProposal + allocation = request.getFirstAllocatedOrReservedContainer(); SchedulerContainer schedulerContainer = allocation.getAllocatedOrReservedContainer(); @@ -1339,7 +1336,6 @@ public boolean accept(Resource cluster, checkParentQueue = true; } - if (parent != null && checkParentQueue) { return parent.accept(cluster, request); } @@ -1365,11 +1361,11 @@ public void activeQueue() throws YarnException { if (getState() == QueueState.RUNNING) { LOG.info("The specified queue:" + getQueuePath() + " is already in the RUNNING state."); - } else { + } else{ CSQueue parent = getParent(); if (parent == null || parent.getState() == QueueState.RUNNING) { updateQueueState(QueueState.RUNNING); - } else { + } else{ throw new YarnException("The parent Queue:" + parent.getQueuePath() + " is not running. Please activate the parent queue first"); } @@ -1447,4 +1443,185 @@ public int getMaxParallelApps() { } abstract int getNumRunnableApps(); + + protected void updateAbsoluteCapacities() { + QueueCapacities parentQueueCapacities = null; + if (parent != null) { + parentQueueCapacities = parent.getQueueCapacities(); + } + + for (String label : queueCapacities.getExistingNodeLabels()) { + // Weight will be normalized to queue.weight = + // queue.weight(sum({sibling-queues.weight})) + // When weight is set, capacity will be set to 0; + // When capacity is set, weight will be normalized to 0, + // So get larger from normalized_weight and capacity will make sure we do + // calculation correct + float capacity = Math.max(queueCapacities.getCapacity(label), + queueCapacities.getNormalizedWeight(label)); + if (capacity > 0f) { + queueCapacities.setAbsoluteCapacity(label, capacity * ( + parentQueueCapacities == null ? + 1 : + parentQueueCapacities.getAbsoluteCapacity(label))); + } + + float maxCapacity = queueCapacities.getMaximumCapacity(label); + if (maxCapacity > 0f) { + queueCapacities.setAbsoluteMaximumCapacity(label, maxCapacity * ( + parentQueueCapacities == null ? + 1 : + parentQueueCapacities.getAbsoluteMaximumCapacity(label))); + } + } + } + + private Resource getMinResourceNormalized(String name, + Map effectiveMinRatio, Resource minResource) { + Resource ret = Resource.newInstance(minResource); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); + for (int i = 0; i < maxLength; i++) { + ResourceInformation nResourceInformation = + minResource.getResourceInformation(i); + + Float ratio = effectiveMinRatio.get(nResourceInformation.getName()); + if (ratio != null) { + ret.setResourceValue(i, + (long) (nResourceInformation.getValue() * ratio.floatValue())); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating min resource for Queue: " + name + " as " + ret + .getResourceInformation(i) + ", Actual resource: " + + nResourceInformation.getValue() + ", ratio: " + ratio + .floatValue()); + } + } + } + return ret; + } + + private void deriveCapacityFromAbsoluteConfigurations(String label, + Resource clusterResource, ResourceCalculator rc) { + + /* + * In case when queues are configured with absolute resources, it is better + * to update capacity/max-capacity etc w.r.t absolute resource as well. In + * case of computation, these values wont be used any more. However for + * metrics and UI, its better these values are pre-computed here itself. + */ + + // 1. Update capacity as a float based on parent's minResource + queueCapacities.setCapacity(label, rc.divide(clusterResource, + queueResourceQuotas.getEffectiveMinResource(label), + parent.getQueueResourceQuotas().getEffectiveMinResource(label))); + + // 2. Update max-capacity as a float based on parent's maxResource + queueCapacities.setMaximumCapacity(label, rc.divide(clusterResource, + queueResourceQuotas.getEffectiveMaxResource(label), + parent.getQueueResourceQuotas().getEffectiveMaxResource(label))); + + // 3. Update absolute capacity as a float based on parent's minResource and + // cluster resource. + queueCapacities.setAbsoluteCapacity(label, + queueCapacities.getCapacity(label) * parent.getQueueCapacities() + .getAbsoluteCapacity(label)); + + // 4. Update absolute max-capacity as a float based on parent's maxResource + // and cluster resource. + queueCapacities.setAbsoluteMaximumCapacity(label, + queueCapacities.getMaximumCapacity(label) * parent.getQueueCapacities() + .getAbsoluteMaximumCapacity(label)); + + // Re-visit max applications for a queue based on absolute capacity if + // needed. + if (this instanceof LeafQueue) { + LeafQueue leafQueue = (LeafQueue) this; + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + int maxApplications = conf.getMaximumApplicationsPerQueue(queuePath); + if (maxApplications < 0) { + int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue(); + if (maxGlobalPerQueueApps > 0) { + maxApplications = (int) (maxGlobalPerQueueApps * queueCapacities + .getAbsoluteCapacity(label)); + } else{ + maxApplications = + (int) (conf.getMaximumSystemApplications() * queueCapacities + .getAbsoluteCapacity(label)); + } + } + leafQueue.setMaxApplications(maxApplications); + + int maxApplicationsPerUser = Math.min(maxApplications, + (int) (maxApplications * (leafQueue.getUsersManager().getUserLimit() + / 100.0f) * leafQueue.getUsersManager().getUserLimitFactor())); + leafQueue.setMaxApplicationsPerUser(maxApplicationsPerUser); + LOG.info("LeafQueue:" + leafQueue.getQueuePath() + ", maxApplications=" + + maxApplications + ", maxApplicationsPerUser=" + + maxApplicationsPerUser + ", Abs Cap:" + queueCapacities + .getAbsoluteCapacity(label) + ", Cap: " + queueCapacities + .getCapacity(label) + ", MaxCap : " + queueCapacities + .getMaximumCapacity(label)); + } + } + + void updateEffectiveResources(Resource clusterResource) { + Set configuredNodelabels = + csContext.getConfiguration().getConfiguredNodeLabels(getQueuePath()); + for (String label : configuredNodelabels) { + Resource resourceByLabel = labelManager.getResourceByLabel(label, + clusterResource); + + Resource minResource = queueResourceQuotas.getConfiguredMinResource( + label); + + // Update effective resource (min/max) to each child queue. + if (getCapacityConfigType().equals( + CapacityConfigType.ABSOLUTE_RESOURCE)) { + queueResourceQuotas.setEffectiveMinResource(label, + getMinResourceNormalized(queuePath, + ((ParentQueue) parent).getEffectiveMinRatioPerResource(), + minResource)); + + // Max resource of a queue should be a minimum of {configuredMaxRes, + // parentMaxRes}. parentMaxRes could be configured value. But if not + // present could also be taken from effective max resource of parent. + Resource parentMaxRes = + parent.getQueueResourceQuotas().getConfiguredMaxResource(label); + if (parent != null && parentMaxRes.equals(Resources.none())) { + parentMaxRes = + parent.getQueueResourceQuotas().getEffectiveMaxResource(label); + } + + // Minimum of {childMaxResource, parentMaxRes}. However if + // childMaxResource is empty, consider parent's max resource alone. + Resource childMaxResource = + getQueueResourceQuotas().getConfiguredMaxResource(label); + Resource effMaxResource = Resources.min(resourceCalculator, + resourceByLabel, childMaxResource.equals(Resources.none()) ? + parentMaxRes : + childMaxResource, parentMaxRes); + queueResourceQuotas.setEffectiveMaxResource(label, + Resources.clone(effMaxResource)); + + // In cases where we still need to update some units based on + // percentage, we have to calculate percentage and update. + ResourceCalculator rc = this.csContext.getResourceCalculator(); + deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc); + } else{ + queueResourceQuotas.setEffectiveMinResource(label, Resources + .multiply(resourceByLabel, + queueCapacities.getAbsoluteCapacity(label))); + queueResourceQuotas.setEffectiveMaxResource(label, Resources + .multiply(resourceByLabel, + queueCapacities.getAbsoluteMaximumCapacity(label))); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Updating effective min resource for queue:" + queuePath + + " as effMinResource=" + queueResourceQuotas + .getEffectiveMinResource(label) + + "and Updating effective max resource as effMaxResource=" + + queueResourceQuotas.getEffectiveMaxResource(label)); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java index edc5277e8a7..3fd5ae98b80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -74,10 +74,6 @@ public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig writeLock.lock(); try { - - this.getParent().updateClusterResource(this.csContext.getClusterResource(), - new ResourceLimits(this.csContext.getClusterResource())); - // TODO: // reinitialize only capacities for now since 0 capacity updates // can cause @@ -85,16 +81,12 @@ public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig // through reinitialize QueueCapacities capacities = leafQueueTemplate.getQueueCapacities(); - //update abs capacities - setupConfigurableCapacities(capacities); - //reset capacities for the leaf queue mergeCapacities(capacities); - //update queue used capacity for all the node labels - CSQueueUtils.updateQueueStatistics(resourceCalculator, - csContext.getClusterResource(), - this, labelManager, null); + // FIXME: Why add this call? UpdateClusterResource will be called after reinitialize. + this.getParent().updateClusterResource(this.csContext.getClusterResource(), + new ResourceLimits(this.csContext.getClusterResource())); //activate applications if any are pending activateApplications(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 6deb7da582b..842e4919fb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -32,7 +32,7 @@ public class CSQueueUtils { public final static float EPSILON = 0.0001f; - + /* * Used only by tests */ @@ -58,28 +58,6 @@ public static void checkAbsoluteCapacity(String queuePath, + ")"); } } - - /** - * Check sanity of capacities: - * - capacity <= maxCapacity - * - absCapacity <= absMaximumCapacity - */ - private static void capacitiesSanityCheck(String queueName, - QueueCapacities queueCapacities) { - for (String label : queueCapacities.getExistingNodeLabels()) { - // The only thing we should care about is absolute capacity <= - // absolute max capacity otherwise the absolute max capacity is - // no longer an absolute maximum. - float absCapacity = queueCapacities.getAbsoluteCapacity(label); - float absMaxCapacity = queueCapacities.getAbsoluteMaximumCapacity(label); - if (absCapacity > absMaxCapacity) { - throw new IllegalArgumentException("Illegal queue capacity setting " - + "(abs-capacity=" + absCapacity + ") > (abs-maximum-capacity=" - + absMaxCapacity + ") for queue=[" - + queueName + "],label=[" + label + "]"); - } - } - } public static float computeAbsoluteMaximumCapacity( float maximumCapacity, CSQueue parent) { @@ -88,36 +66,7 @@ public static float computeAbsoluteMaximumCapacity( return (parentAbsMaxCapacity * maximumCapacity); } - /** - * This method intends to be used by ReservationQueue, ReservationQueue will - * not appear in configuration file, so we shouldn't do load capacities - * settings in configuration for reservation queue. - */ - public static void updateAndCheckCapacitiesByLabel(String queuePath, - QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) { - updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities); - - capacitiesSanityCheck(queuePath, queueCapacities); - } - - /** - * Do following steps for capacities - * - Load capacities from configuration - * - Update absolute capacities for new capacities - * - Check if capacities/absolute-capacities legal - */ - public static void loadUpdateAndCheckCapacities(String queuePath, - CapacitySchedulerConfiguration csConf, - QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) { - loadCapacitiesByLabelsFromConf(queuePath, - queueCapacities, csConf); - - updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities); - - capacitiesSanityCheck(queuePath, queueCapacities); - } - - private static void loadCapacitiesByLabelsFromConf(String queuePath, + public static void loadCapacitiesByLabelsFromConf(String queuePath, QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) { queueCapacities.clearConfigurableFields(); Set configuredNodelabels = @@ -132,41 +81,30 @@ private static void loadCapacitiesByLabelsFromConf(String queuePath, queueCapacities.setMaxAMResourcePercentage( label, csConf.getMaximumAMResourcePercentPerPartition(queuePath, label)); - } else { + queueCapacities.setWeight(label, + csConf.getNonLabeledQueueWeight(queuePath)); + } else{ queueCapacities.setCapacity(label, csConf.getLabeledQueueCapacity(queuePath, label) / 100); queueCapacities.setMaximumCapacity(label, csConf.getLabeledQueueMaximumCapacity(queuePath, label) / 100); queueCapacities.setMaxAMResourcePercentage(label, csConf.getMaximumAMResourcePercentPerPartition(queuePath, label)); + queueCapacities.setWeight(label, + csConf.getLabeledQueueWeight(queuePath, label)); } - } - } - // Set absolute capacities for {capacity, maximum-capacity} - private static void updateAbsoluteCapacitiesByNodeLabels( - QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) { - for (String label : queueCapacities.getExistingNodeLabels()) { - float capacity = queueCapacities.getCapacity(label); - if (capacity > 0f) { - queueCapacities.setAbsoluteCapacity( - label, - capacity - * (parentQueueCapacities == null ? 1 : parentQueueCapacities - .getAbsoluteCapacity(label))); - } - - float maxCapacity = queueCapacities.getMaximumCapacity(label); - if (maxCapacity > 0f) { - queueCapacities.setAbsoluteMaximumCapacity( - label, - maxCapacity - * (parentQueueCapacities == null ? 1 : parentQueueCapacities - .getAbsoluteMaximumCapacity(label))); + float absCapacity = queueCapacities.getCapacity(label); + float absMaxCapacity = queueCapacities.getMaximumCapacity(label); + if (absCapacity > absMaxCapacity) { + throw new IllegalArgumentException("Illegal queue capacity setting " + + "(abs-capacity=" + absCapacity + ") > (abs-maximum-capacity=" + + absMaxCapacity + ") for queue=[" + + queuePath + "],label=[" + label + "]"); } } } - + /** * Update partitioned resource usage, if nodePartition == null, will update * used resource for all partitions of this queue. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index d0ee25df300..9188cec0e14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.ipc.WeightedTimeCostProvider; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; @@ -385,6 +386,8 @@ public static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE); + private static final String WEIGHT_SUFFIX = "w"; + public static final String MAX_PARALLEL_APPLICATIONS = "max-parallel-apps"; public static final int DEFAULT_MAX_PARALLEL_APPLICATIONS = Integer.MAX_VALUE; @@ -491,12 +494,45 @@ public void setMaximumApplicationMasterResourcePerQueuePercent(String queue, float percent) { setFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX, percent); } + + private void throwExceptionForUnexpectedWeight(float weight, String queue, + String label) { + if ((weight < -1e-6 && Math.abs(weight + 1) > 1e-6) || weight > 10000) { + throw new IllegalArgumentException( + "Illegal " + "weight=" + weight + " for queue=" + queue + "label=" + + label + + ". Acceptable values: [0, 10000], -1 is same as not set"); + } + } + + public float getNonLabeledQueueWeight(String queue) { + String configuredValue = get(getQueuePrefix(queue) + CAPACITY); + float weight = extractFloatValueFromWeightConfig(configuredValue); + throwExceptionForUnexpectedWeight(weight, queue, ""); + return weight; + } + + public void setNonLabeledQueueWeight(String queue, float weight) { + set(getQueuePrefix(queue) + CAPACITY, weight + WEIGHT_SUFFIX); + } + + public void setLabeledQueueWeight(String queue, String label, float weight) { + set(getNodeLabelPrefix(queue, label) + CAPACITY, weight + WEIGHT_SUFFIX); + } + + public float getLabeledQueueWeight(String queue, String label) { + String configuredValue = get(getNodeLabelPrefix(queue, label) + CAPACITY); + float weight = extractFloatValueFromWeightConfig(configuredValue); + throwExceptionForUnexpectedWeight(weight, queue, label); + return weight; + } public float getNonLabeledQueueCapacity(String queue) { String configuredCapacity = get(getQueuePrefix(queue) + CAPACITY); - boolean matcher = (configuredCapacity != null) + boolean absoluteResourceConfigured = (configuredCapacity != null) && RESOURCE_PATTERN.matcher(configuredCapacity).find(); - if (matcher) { + if (absoluteResourceConfigured || configuredWeightAsCapacity( + configuredCapacity)) { // Return capacity in percentage as 0 for non-root queues and 100 for // root.From AbstractCSQueue, absolute resource will be parsed and // updated. Once nodes are added/removed in cluster, capacity in @@ -729,31 +765,51 @@ public void setAccessibleNodeLabels(String queue, Set labels) { } return Collections.unmodifiableSet(set); } - - private float internalGetLabeledQueueCapacity(String queue, String label, String suffix, - float defaultValue) { + + private boolean configuredWeightAsCapacity(String configureValue) { + if (configureValue == null) { + return false; + } + return configureValue.endsWith(WEIGHT_SUFFIX); + } + + private float extractFloatValueFromWeightConfig(String configureValue) { + if (!configuredWeightAsCapacity(configureValue)) { + return -1f; + } else { + return Float.valueOf( + configureValue.substring(0, configureValue.indexOf(WEIGHT_SUFFIX))); + } + } + + private float internalGetLabeledQueueCapacity(String queue, String label, + String suffix, float defaultValue) { String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix; String configuredCapacity = get(capacityPropertyName); - boolean matcher = (configuredCapacity != null) - && RESOURCE_PATTERN.matcher(configuredCapacity).find(); - if (matcher) { + boolean absoluteResourceConfigured = + (configuredCapacity != null) && RESOURCE_PATTERN.matcher( + configuredCapacity).find(); + if (absoluteResourceConfigured || configuredWeightAsCapacity( + configuredCapacity)) { // Return capacity in percentage as 0 for non-root queues and 100 for - // root.From AbstractCSQueue, absolute resource will be parsed and - // updated. Once nodes are added/removed in cluster, capacity in - // percentage will also be re-calculated. + // root.From AbstractCSQueue, absolute resource, and weight will be parsed + // and updated separately. Once nodes are added/removed in cluster, + // capacity is percentage will also be re-calculated. return defaultValue; } float capacity = getFloat(capacityPropertyName, defaultValue); if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) { - throw new IllegalArgumentException("Illegal capacity of " + capacity - + " for node-label=" + label + " in queue=" + queue - + ", valid capacity should in range of [0, 100]."); + throw new IllegalArgumentException( + "Illegal capacity of " + capacity + " for node-label=" + label + + " in queue=" + queue + + ", valid capacity should in range of [0, 100]."); } if (LOG.isDebugEnabled()) { - LOG.debug("CSConf - getCapacityOfLabel: prefix=" - + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity); + LOG.debug( + "CSConf - getCapacityOfLabel: prefix=" + getNodeLabelPrefix(queue, + label) + ", capacity=" + capacity); } return capacity; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index a44929beed6..a3d65710b9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -164,6 +166,8 @@ public void initializeQueues(CapacitySchedulerConfiguration conf) setQueueAcls(authorizer, appPriorityACLManager, queues); labelManager.reinitializeQueueLabels(getQueueToLabels()); this.queueStateManager.initialize(this); + root.updateClusterResource(csContext.getClusterResource(), + new ResourceLimits(csContext.getClusterResource())); LOG.info("Initialized root queue " + root); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 96d309c547e..4ed1ec5e597 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -200,26 +200,18 @@ protected void setupQueueConfigs(Resource clusterResource, usersManager.setUserLimit(conf.getUserLimit(getQueuePath())); usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath())); + maxAMResourcePerQueuePercent = + conf.getMaximumApplicationMasterResourcePerQueuePercent( + getQueuePath()); + maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); if (maxApplications < 0) { - int maxGlobalPerQueueApps = schedConf - .getGlobalMaximumApplicationsPerQueue(); + int maxGlobalPerQueueApps = + csContext.getConfiguration().getGlobalMaximumApplicationsPerQueue(); if (maxGlobalPerQueueApps > 0) { maxApplications = maxGlobalPerQueueApps; - } else { - int maxSystemApps = schedConf. - getMaximumSystemApplications(); - maxApplications = - (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); } } - maxApplicationsPerUser = Math.min(maxApplications, - (int) (maxApplications * (usersManager.getUserLimit() / 100.0f) - * usersManager.getUserLimitFactor())); - - maxAMResourcePerQueuePercent = - conf.getMaximumApplicationMasterResourcePerQueuePercent( - getQueuePath()); priorityAcls = conf.getPriorityAcls(getQueuePath(), scheduler.getMaxClusterLevelAppPriority()); @@ -1893,14 +1885,36 @@ private void updateCurrentResourceLimits( currentResourceLimits.getLimit())); } + private void updateAbsoluteCapacitiesAndRelatedFields() { + updateAbsoluteCapacities(); + CapacitySchedulerConfiguration schedulerConf = csContext.getConfiguration(); + + // If maxApplications not set, use the system total max app, apply newly + // calculated abs capacity of the queue. + if (maxApplications <= 0) { + int maxSystemApps = schedulerConf. + getMaximumSystemApplications(); + maxApplications = + (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); + } + maxApplicationsPerUser = Math.min(maxApplications, + (int) (maxApplications * (usersManager.getUserLimit() / 100.0f) + * usersManager.getUserLimitFactor())); + } + @Override public void updateClusterResource(Resource clusterResource, ResourceLimits currentResourceLimits) { writeLock.lock(); try { - updateCurrentResourceLimits(currentResourceLimits, clusterResource); lastClusterResource = clusterResource; + updateAbsoluteCapacitiesAndRelatedFields(); + + super.updateEffectiveResources(clusterResource); + + updateCurrentResourceLimits(currentResourceLimits, clusterResource); + // Update headroom info based on new cluster resource value // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // during allocation diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java index 3ecfef462a9..9b17cc725c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -180,9 +180,10 @@ private void reinitializeQueueManagementPolicy() throws IOException { //Load template capacities QueueCapacities queueCapacities = new QueueCapacities(false); - CSQueueUtils.loadUpdateAndCheckCapacities(csContext.getConfiguration() + CSQueueUtils.loadCapacitiesByLabelsFromConf(csContext.getConfiguration() .getAutoCreatedQueueTemplateConfPrefix(getQueuePath()), - csContext.getConfiguration(), queueCapacities, getQueueCapacities()); + queueCapacities, + csContext.getConfiguration()); /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 7d82faeeef4..73c2bd18eab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,6 +98,12 @@ private final boolean allowZeroCapacitySum; + // effective min ratio per resource, it is used during updateClusterResource, + // leaf queue can use this to calculate effective resources. + // This field will not be edited, reference will point to a new immutable map + // after every time recalculation + private volatile Map effectiveMinRatioPerResource; + public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -172,55 +179,45 @@ protected void setupQueueConfigs(Resource clusterResource) private static float PRECISION = 0.0005f; // 0.05% precision - void setChildQueues(Collection childQueues) { - writeLock.lock(); - try { - // Validate - float childCapacities = 0; - Resource minResDefaultLabel = Resources.createResource(0, 0); - for (CSQueue queue : childQueues) { - childCapacities += queue.getCapacity(); - Resources.addTo(minResDefaultLabel, queue.getQueueResourceQuotas() - .getConfiguredMinResource()); - - // If any child queue is using percentage based capacity model vs parent - // queues' absolute configuration or vice versa, throw back an - // exception. - if (!queueName.equals("root") && getCapacity() != 0f - && !queue.getQueueResourceQuotas().getConfiguredMinResource() - .equals(Resources.none())) { - throw new IllegalArgumentException("Parent queue '" + getQueuePath() - + "' and child queue '" + queue.getQueuePath() - + "' should use either percentage based capacity" - + " configuration or absolute resource together."); - } - } + // Check weight configuration, throw exception when configuration is invalid + // return true when all children use weight mode. + private boolean checkWeightConfiguration(Collection childQueues) { + // Do we have ANY queue set capacity in any labels? + boolean capacityIsSet = false; + + // Do we have ANY queue set weight in any labels? + boolean weightIsSet = false; - float delta = Math.abs(1.0f - childCapacities); // crude way to check - - if (allowZeroCapacitySum) { - // If we allow zero capacity for children, only fail if: - // Σ(childCapacities) != 1.0f OR Σ(childCapacities) != 0.0f - // - // Therefore, child queues either add up to 0% or 100%. - // - // Current capacity doesn't matter, because we apply this logic - // regardless of whether the current capacity is zero or not. - if (minResDefaultLabel.equals(Resources.none()) - && (delta > PRECISION && childCapacities > PRECISION)) { - LOG.error("Capacity validation check is relaxed for" - + " queue {}, but the capacity must be either 0% or 100%", - getQueuePath()); - throw new IllegalArgumentException("Illegal" + " capacity of " - + childCapacities + " for children of queue " + queueName); + for (CSQueue queue : childQueues) { + for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { + float capacityByLabel = queue.getQueueCapacities().getCapacity( + nodeLabel); + if (capacityByLabel > 0) { + capacityIsSet = true; + } + float weightByLabel = queue.getQueueCapacities().getWeight(nodeLabel); + // By default weight is set to -1, so >= 0 is enough. + if (weightByLabel >= 0) { + weightIsSet = true; } - } else if ((minResDefaultLabel.equals(Resources.none()) - && (queueCapacities.getCapacity() > 0) && (delta > PRECISION)) - || ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) { - // allow capacities being set to 0, and enforce child 0 if parent is 0 - throw new IllegalArgumentException("Illegal" + " capacity of " - + childCapacities + " for children of queue " + queueName); } + } + + if (capacityIsSet && weightIsSet) { + throw new IllegalArgumentException( + "Parent queue '" + getQueuePath() + "' have children queue used both " + + " weight mode and capacity mode, it is not allowed, please " + + "double check.'"); + } + + return weightIsSet; + } + + void setChildQueues(Collection childQueues) { + writeLock.lock(); + try { + // Check weights + boolean weightMode = checkWeightConfiguration(childQueues); // check label capacities for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { @@ -252,10 +249,10 @@ void setChildQueues(Collection childQueues) { float labelDelta = Math.abs(1.0f - sum); - if (allowZeroCapacitySum) { + if (allowZeroCapacitySum || weightMode) { // Similar to above, we only throw exception if // Σ(childCapacities) != 1.0f OR Σ(childCapacities) != 0.0f - if (minResDefaultLabel.equals(Resources.none()) + if (minRes.equals(Resources.none()) && capacityByLabel > 0 && (labelDelta > PRECISION && sum > PRECISION)) { LOG.error("Capacity validation check is relaxed for" @@ -265,7 +262,7 @@ void setChildQueues(Collection childQueues) { "Illegal" + " capacity of " + sum + " for children of queue " + queueName + " for label=" + nodeLabel); } - } else if ((minResDefaultLabel.equals(Resources.none()) + } else if ((minRes.equals(Resources.none()) && capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION) || (capacityByLabel == 0) && (sum > 0)) { @@ -451,8 +448,7 @@ public void reinitialize(CSQueue newlyParsedQueue, } // Re-sort all queues - childQueues.clear(); - childQueues.addAll(currentChildQueues.values()); + setChildQueues(currentChildQueues.values()); // Make sure we notifies QueueOrderingPolicy queueOrderingPolicy.setQueues(childQueues); @@ -788,14 +784,24 @@ private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { } private ResourceLimits getResourceLimitsOfChild(CSQueue child, - Resource clusterResource, Resource parentLimits, - String nodePartition) { + Resource clusterResource, ResourceLimits parentLimits, + String nodePartition, boolean netLimit) { // Set resource-limit of a given child, child.limit = // min(my.limit - my.used + child.used, child.max) + // First, cap parent limit by parent's max + parentLimits.setLimit(Resources.min(resourceCalculator, clusterResource, + parentLimits.getLimit(), + queueResourceQuotas.getEffectiveMaxResource(nodePartition))); + // Parent available resource = parent-limit - parent-used-resource + Resource limit = parentLimits.getLimit(); + if (netLimit) { + limit = parentLimits.getNetLimit(); + } Resource parentMaxAvailableResource = Resources.subtract( - parentLimits, queueUsage.getUsed(nodePartition)); + limit, queueUsage.getUsed(nodePartition)); + // Deduct killable from used Resources.addTo(parentMaxAvailableResource, getTotalKillableResource(nodePartition)); @@ -804,15 +810,6 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, Resource childLimit = Resources.add(parentMaxAvailableResource, child.getQueueResourceUsage().getUsed(nodePartition)); - // Get child's max resource - Resource childConfiguredMaxResource = child - .getEffectiveMaxCapacityDown(nodePartition, minimumAllocation); - - // Child's limit should be capped by child configured max resource - childLimit = - Resources.min(resourceCalculator, clusterResource, childLimit, - childConfiguredMaxResource); - // Normalize before return childLimit = Resources.roundDown(resourceCalculator, childLimit, minimumAllocation); @@ -841,8 +838,8 @@ private CSAssignment assignContainersToChildQueues(Resource cluster, // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, limits.getNetLimit(), - candidates.getPartition()); + getResourceLimitsOfChild(childQueue, cluster, limits, + candidates.getPartition(), true); CSAssignment childAssignment = childQueue.assignContainers(cluster, candidates, childLimits, schedulingMode); @@ -941,6 +938,39 @@ public void updateClusterResource(Resource clusterResource, ResourceLimits resourceLimits) { writeLock.lock(); try { + // Special handle root queue + if (rootQueue) { + for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { + if (queueCapacities.getWeight(nodeLabel) > 0) { + queueCapacities.setNormalizedWeight(nodeLabel, 1f); + } + } + } + + // Update absolute capacities of this queue, this need to happen before + // below calculation for effective capacities + updateAbsoluteCapacities(); + + // Normalize weight of children + if (checkWeightConfiguration(childQueues)) { + for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { + float sumOfWeight = 0; + for (CSQueue queue : childQueues) { + float weight = Math.max(0, + queue.getQueueCapacities().getWeight(nodeLabel)); + sumOfWeight += weight; + } + // When sum of weight == 0, skip setting normalized_weight (so + // normalized weight will be 0). + if (Math.abs(sumOfWeight) > 1e-6) { + for (CSQueue queue : childQueues) { + queue.getQueueCapacities().setNormalizedWeight(nodeLabel, + queue.getQueueCapacities().getWeight(nodeLabel) / sumOfWeight); + } + } + } + } + // Update effective capacity in all parent queue. Set configuredNodelabels = csContext.getConfiguration() .getConfiguredNodeLabels(getQueuePath()); @@ -952,8 +982,8 @@ public void updateClusterResource(Resource clusterResource, for (CSQueue childQueue : childQueues) { // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, - clusterResource, resourceLimits.getLimit(), - RMNodeLabelsManager.NO_LABEL); + clusterResource, resourceLimits, + RMNodeLabelsManager.NO_LABEL, false); childQueue.updateClusterResource(clusterResource, childLimits); } @@ -979,16 +1009,13 @@ private void calculateEffectiveResourcesAndCapacity(String label, // cluster resource. Resource resourceByLabel = labelManager.getResourceByLabel(label, clusterResource); - if (getQueuePath().equals("root")) { - queueResourceQuotas.setConfiguredMinResource(label, resourceByLabel); - queueResourceQuotas.setConfiguredMaxResource(label, resourceByLabel); - queueResourceQuotas.setEffectiveMinResource(label, resourceByLabel); - queueResourceQuotas.setEffectiveMaxResource(label, resourceByLabel); - queueCapacities.setAbsoluteCapacity(label, 1.0f); - } + + /* + * == Below logic are added to calculate effectiveMinRatioPerResource == + */ // Total configured min resources of direct children of this given parent - // queue. + // queue Resource configuredMinResources = Resource.newInstance(0L, 0); for (CSQueue childQueue : getChildQueues()) { Resources.addTo(configuredMinResources, @@ -1014,90 +1041,16 @@ private void calculateEffectiveResourcesAndCapacity(String label, } } - Map effectiveMinRatioPerResource = getEffectiveMinRatioPerResource( + effectiveMinRatioPerResource = getEffectiveMinRatioPerResource( configuredMinResources, numeratorForMinRatio); - // loop and do this for all child queues - for (CSQueue childQueue : getChildQueues()) { - Resource minResource = childQueue.getQueueResourceQuotas() - .getConfiguredMinResource(label); - - // Update effective resource (min/max) to each child queue. - if (childQueue.getCapacityConfigType() - .equals(CapacityConfigType.ABSOLUTE_RESOURCE)) { - childQueue.getQueueResourceQuotas().setEffectiveMinResource(label, - getMinResourceNormalized( - childQueue.getQueuePath(), - effectiveMinRatioPerResource, - minResource)); - - // Max resource of a queue should be a minimum of {configuredMaxRes, - // parentMaxRes}. parentMaxRes could be configured value. But if not - // present could also be taken from effective max resource of parent. - Resource parentMaxRes = queueResourceQuotas - .getConfiguredMaxResource(label); - if (parent != null && parentMaxRes.equals(Resources.none())) { - parentMaxRes = parent.getQueueResourceQuotas() - .getEffectiveMaxResource(label); - } - - // Minimum of {childMaxResource, parentMaxRes}. However if - // childMaxResource is empty, consider parent's max resource alone. - Resource childMaxResource = childQueue.getQueueResourceQuotas() - .getConfiguredMaxResource(label); - Resource effMaxResource = Resources.min(resourceCalculator, - resourceByLabel, childMaxResource.equals(Resources.none()) - ? parentMaxRes - : childMaxResource, - parentMaxRes); - childQueue.getQueueResourceQuotas().setEffectiveMaxResource(label, - Resources.clone(effMaxResource)); - - // In cases where we still need to update some units based on - // percentage, we have to calculate percentage and update. - deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc, - childQueue); - } else { - childQueue.getQueueResourceQuotas().setEffectiveMinResource(label, - Resources.multiply(resourceByLabel, - childQueue.getQueueCapacities().getAbsoluteCapacity(label))); - childQueue.getQueueResourceQuotas().setEffectiveMaxResource(label, - Resources.multiply(resourceByLabel, childQueue.getQueueCapacities() - .getAbsoluteMaximumCapacity(label))); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Updating effective min resource for queue:" - + childQueue.getQueuePath() + " as effMinResource=" - + childQueue.getQueueResourceQuotas().getEffectiveMinResource(label) - + "and Updating effective max resource as effMaxResource=" - + childQueue.getQueueResourceQuotas() - .getEffectiveMaxResource(label)); - } - } - } - - private Resource getMinResourceNormalized(String name, Map effectiveMinRatio, - Resource minResource) { - Resource ret = Resource.newInstance(minResource); - int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); - for (int i = 0; i < maxLength; i++) { - ResourceInformation nResourceInformation = minResource - .getResourceInformation(i); - - Float ratio = effectiveMinRatio.get(nResourceInformation.getName()); - if (ratio != null) { - ret.setResourceValue(i, - (long) (nResourceInformation.getValue() * ratio.floatValue())); - if (LOG.isDebugEnabled()) { - LOG.debug("Updating min resource for Queue: " + name + " as " - + ret.getResourceInformation(i) + ", Actual resource: " - + nResourceInformation.getValue() + ", ratio: " - + ratio.floatValue()); - } - } + // Update effective resources for my self; + if (rootQueue) { + queueResourceQuotas.setEffectiveMinResource(label, resourceByLabel); + queueResourceQuotas.setEffectiveMaxResource(label, resourceByLabel); + } else{ + super.updateEffectiveResources(clusterResource); } - return ret; } private Map getEffectiveMinRatioPerResource( @@ -1121,74 +1074,7 @@ private Resource getMinResourceNormalized(String name, Map effect } } } - return effectiveMinRatioPerResource; - } - - private void deriveCapacityFromAbsoluteConfigurations(String label, - Resource clusterResource, ResourceCalculator rc, CSQueue childQueue) { - - /* - * In case when queues are configured with absolute resources, it is better - * to update capacity/max-capacity etc w.r.t absolute resource as well. In - * case of computation, these values wont be used any more. However for - * metrics and UI, its better these values are pre-computed here itself. - */ - - // 1. Update capacity as a float based on parent's minResource - childQueue.getQueueCapacities().setCapacity(label, - rc.divide(clusterResource, - childQueue.getQueueResourceQuotas().getEffectiveMinResource(label), - getQueueResourceQuotas().getEffectiveMinResource(label))); - - // 2. Update max-capacity as a float based on parent's maxResource - childQueue.getQueueCapacities().setMaximumCapacity(label, - rc.divide(clusterResource, - childQueue.getQueueResourceQuotas().getEffectiveMaxResource(label), - getQueueResourceQuotas().getEffectiveMaxResource(label))); - - // 3. Update absolute capacity as a float based on parent's minResource and - // cluster resource. - childQueue.getQueueCapacities().setAbsoluteCapacity(label, - childQueue.getQueueCapacities().getCapacity(label) - * getQueueCapacities().getAbsoluteCapacity(label)); - - // 4. Update absolute max-capacity as a float based on parent's maxResource - // and cluster resource. - childQueue.getQueueCapacities().setAbsoluteMaximumCapacity(label, - childQueue.getQueueCapacities().getMaximumCapacity(label) - * getQueueCapacities().getAbsoluteMaximumCapacity(label)); - - // Re-visit max applications for a queue based on absolute capacity if - // needed. - if (childQueue instanceof LeafQueue) { - LeafQueue leafQueue = (LeafQueue) childQueue; - CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - int maxApplications = - conf.getMaximumApplicationsPerQueue(childQueue.getQueuePath()); - if (maxApplications < 0) { - int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue(); - if (maxGlobalPerQueueApps > 0) { - maxApplications = (int) (maxGlobalPerQueueApps * - childQueue.getQueueCapacities().getAbsoluteCapacity(label)); - } else { - maxApplications = (int) (conf.getMaximumSystemApplications() - * childQueue.getQueueCapacities().getAbsoluteCapacity(label)); - } - } - leafQueue.setMaxApplications(maxApplications); - - int maxApplicationsPerUser = Math.min(maxApplications, - (int) (maxApplications - * (leafQueue.getUsersManager().getUserLimit() / 100.0f) - * leafQueue.getUsersManager().getUserLimitFactor())); - leafQueue.setMaxApplicationsPerUser(maxApplicationsPerUser); - LOG.info("LeafQueue:" + leafQueue.getQueuePath() + ", maxApplications=" - + maxApplications + ", maxApplicationsPerUser=" - + maxApplicationsPerUser + ", Abs Cap:" - + childQueue.getQueueCapacities().getAbsoluteCapacity(label) + ", Cap: " - + childQueue.getQueueCapacities().getCapacity(label) + ", MaxCap : " - + childQueue.getQueueCapacities().getMaximumCapacity(label)); - } + return ImmutableMap.copyOf(effectiveMinRatioPerResource); } @Override @@ -1463,4 +1349,9 @@ void decrementRunnableApps() { writeLock.unlock(); } } + + // This is a locking free method + Map getEffectiveMinRatioPerResource() { + return effectiveMinRatioPerResource; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java index c1b715742ce..46bb0caed3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -50,7 +50,7 @@ public QueueCapacities(boolean isRoot) { // Usage enum here to make implement cleaner private enum CapacityType { USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5), - MAX_AM_PERC(6), RESERVED_CAP(7), ABS_RESERVED_CAP(8); + MAX_AM_PERC(6), RESERVED_CAP(7), ABS_RESERVED_CAP(8), WEIGHT(9), NORMALIZED_WEIGHT(10); private int idx; @@ -64,6 +64,9 @@ private CapacityType(int idx) { public Capacities() { capacitiesArr = new float[CapacityType.values().length]; + + // Set weight to -1 by default (means not set) + capacitiesArr[CapacityType.WEIGHT.idx] = -1; } @Override @@ -74,10 +77,12 @@ public String toString() { .append("max_cap=" + capacitiesArr[2] + "%, ") .append("abs_max_cap=" + capacitiesArr[3] + "%, ") .append("cap=" + capacitiesArr[4] + "%, ") - .append("abs_cap=" + capacitiesArr[5] + "%}") - .append("max_am_perc=" + capacitiesArr[6] + "%}") - .append("reserved_cap=" + capacitiesArr[7] + "%}") - .append("abs_reserved_cap=" + capacitiesArr[8] + "%}"); + .append("abs_cap=" + capacitiesArr[5] + "%, ") + .append("max_am_perc=" + capacitiesArr[6] + "%, ") + .append("reserved_cap=" + capacitiesArr[7] + "%, ") + .append("abs_reserved_cap=" + capacitiesArr[8] + "%, ") + .append("weight=" + capacitiesArr[9] + "w, ") + .append("normalized_weight=" + capacitiesArr[9] + "w}"); return sb.toString(); } } @@ -87,6 +92,10 @@ private float _get(String label, CapacityType type) { try { Capacities cap = capacitiesMap.get(label); if (null == cap) { + // Special handle weight mode + if (type == CapacityType.WEIGHT) { + return -1f; + } return LABEL_DOESNT_EXIST_CAP; } return cap.capacitiesArr[type.idx]; @@ -270,6 +279,40 @@ public void setAbsoluteReservedCapacity(String label, float value) { _set(label, CapacityType.ABS_RESERVED_CAP, value); } + /* Weight Getter and Setter */ + public float getWeight() { + return _get(NL, CapacityType.WEIGHT); + } + + public float getWeight(String label) { + return _get(label, CapacityType.WEIGHT); + } + + public void setWeight(float value) { + _set(NL, CapacityType.WEIGHT, value); + } + + public void setWeight(String label, float value) { + _set(label, CapacityType.WEIGHT, value); + } + + /* Weight Getter and Setter */ + public float getNormalizedWeight() { + return _get(NL, CapacityType.NORMALIZED_WEIGHT); + } + + public float getNormalizedWeight(String label) { + return _get(label, CapacityType.NORMALIZED_WEIGHT); + } + + public void setNormalizedWeight(float value) { + _set(NL, CapacityType.NORMALIZED_WEIGHT, value); + } + + public void setNormalizedWeight(String label, float value) { + _set(label, CapacityType.NORMALIZED_WEIGHT, value); + } + /** * Clear configurable fields, like * (absolute)capacity/(absolute)maximum-capacity, this will be used by queue @@ -284,6 +327,7 @@ public void clearConfigurableFields() { _set(label, CapacityType.MAX_CAP, 0); _set(label, CapacityType.ABS_CAP, 0); _set(label, CapacityType.ABS_MAX_CAP, 0); + _set(label, CapacityType.WEIGHT, 0); } } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index d59c02bc655..ebac4c20b67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -22,8 +22,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +84,6 @@ private void updateQuotas(int userLimit, float userLimitFactor, @Override protected void setupConfigurableCapacities(CapacitySchedulerConfiguration configuration) { - super.setupConfigurableCapacities(queueCapacities); + super.updateAbsoluteCapacities(); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java index 3d5637c3522..0ac021acd96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java @@ -498,9 +498,9 @@ public void testComplexValidateAbsoluteResourceConfig() throws Exception { Assert.assertTrue(e instanceof IOException); Assert.assertEquals( "Failed to re-init queues : Parent queue 'root.queueA' " - + "and child queue 'root.queueA.queueA1'" - + " should use either percentage based" - + " capacity configuration or absolute resource together.", + + "and child queue 'root.queueA.queueA1' should use either " + + "percentage based capacityconfiguration or absolute resource " + + "together for label:", e.getMessage()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceWithAutoQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceWithAutoQueue.java index 683e9fcf381..4603c4b2d81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceWithAutoQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceWithAutoQueue.java @@ -148,6 +148,8 @@ private CapacitySchedulerConfiguration setupSimpleQueueConfiguration( return csConf; } + // TODO: Wangda: I think this test case is not correct, Sunil could help look + // into details. @Test(timeout = 20000) public void testAutoCreateLeafQueueCreation() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java index 4757cd79a07..26d5e130581 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -749,6 +749,9 @@ protected void validateEffectiveMinResource(ResourceManager rm, * parentQueue.getQueueCapacities().getAbsoluteCapacity(label)); assertEquals(effMinCapacity, Resources.multiply(resourceByLabel, leafQueue.getQueueCapacities().getAbsoluteCapacity(label))); + // TODO: Wangda, I think this is a wrong test, it doesn't consider rounding + // loss of multiplication, the right value should be <10240, 2>, but the + // test expects <10240, 1> assertEquals(effMinCapacity, leafQueue.getEffectiveCapacity(label)); if (leafQueue.getQueueCapacities().getAbsoluteCapacity(label) > 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index e7abf7d53df..3a6fe2a8521 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -3677,11 +3677,13 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh() root.updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); - // Manipulate queue 'a' + // Manipulate queue 'b' LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B)); assertEquals(0.1f, b.getMaxAMResourcePerQueuePercent(), 1e-3f); - assertEquals(b.calculateAndGetAMResourceLimit(), - Resources.createResource(159 * GB, 1)); + // Queue b has 100 * 16 = 1600 GB effective usable resource, so the + // AM limit is 1600 GB * 0.1 * 0.99 = 162816 MB + assertEquals(Resources.createResource(162816, 1), + b.calculateAndGetAMResourceLimit()); csConf.setFloat( CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, @@ -4748,6 +4750,9 @@ public void testSetupQueueConfigsWithSpecifiedConfiguration() leafQueueName, cs.getRootQueue(), null); + leafQueue.updateClusterResource(Resource.newInstance(0, 0), + new ResourceLimits(Resource.newInstance(0, 0))); + assertEquals(30, leafQueue.getNodeLocalityDelay()); assertEquals(20, leafQueue.getMaxApplications()); assertEquals(2, leafQueue.getMaxApplicationsPerUser()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java index 86feb5bc33f..248831f03d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java @@ -47,7 +47,9 @@ { "AbsoluteMaximumCapacity" }, { "MaxAMResourcePercentage" }, { "ReservedCapacity" }, - { "AbsoluteReservedCapacity" }}); + { "AbsoluteReservedCapacity" }, + { "Weight" }, + { "NormalizedWeight" }}); } public TestQueueCapacities(String suffix) { @@ -105,9 +107,6 @@ private static float executeByName(QueueCapacities obj, String methodName, private void internalTestModifyAndRead(String label) throws Exception { QueueCapacities qc = new QueueCapacities(false); - // First get returns 0 always - Assert.assertEquals(0f, get(qc, suffix, label), 1e-8); - // Set to 1, and check set(qc, suffix, label, 1f); Assert.assertEquals(1f, get(qc, suffix, label), 1e-8); @@ -117,15 +116,19 @@ private void internalTestModifyAndRead(String label) throws Exception { Assert.assertEquals(2f, get(qc, suffix, label), 1e-8); } - void check(int mem, int cpu, Resource res) { - Assert.assertEquals(mem, res.getMemorySize()); - Assert.assertEquals(cpu, res.getVirtualCores()); - } - @Test public void testModifyAndRead() throws Exception { LOG.info("Test - " + suffix); internalTestModifyAndRead(null); internalTestModifyAndRead("label"); } + + @Test + public void testDefaultValues() { + QueueCapacities qc = new QueueCapacities(false); + Assert.assertEquals(-1, qc.getWeight(""), 1e-6); + Assert.assertEquals(-1, qc.getWeight("x"), 1e-6); + Assert.assertEquals(0, qc.getCapacity(""), 1e-6); + Assert.assertEquals(0, qc.getCapacity("x"), 1e-6); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index c1f48be96a3..ea07756709f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -1143,6 +1143,59 @@ public void testQueueOrderingPolicyUpdatedAfterReinitialize() ServiceOperations.stopQuietly(capacityScheduler); } + @Test(timeout = 60000) + public void testQueueCapacityWithWeight() throws Exception { + YarnConfiguration config = new YarnConfiguration(); + nodeLabelManager = new NullRMNodeLabelsManager(); + nodeLabelManager.init(config); + config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a" }); + conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "y", 100); + conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "z", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setNonLabeledQueueWeight(A, 100); + conf.setAccessibleNodeLabels(A, ImmutableSet.of("x", "y", "z")); + conf.setLabeledQueueWeight(A, "x", 100); + conf.setLabeledQueueWeight(A, "y", 100); + conf.setLabeledQueueWeight(A, "z", 70); + MockRM rm = null; + try { + rm = new MockRM(conf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return nodeLabelManager; + } + }; + } finally { + IOUtils.closeStream(rm); + } + + verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "", 1f); + verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "x", 1f); + verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "y", 1f); + verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "z", 1f); + + verifyQueueAbsCapacity(rm, A, "", 1f); + verifyQueueAbsCapacity(rm, A, "x", 1f); + verifyQueueAbsCapacity(rm, A, "y", 1f); + verifyQueueAbsCapacity(rm, A, "z", 1f); + } + + private void verifyQueueAbsCapacity(MockRM rm, String queuePath, String label, + float expectedAbsCapacity) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = cs.getQueue(queuePath); + Assert.assertEquals(expectedAbsCapacity, + queue.getQueueCapacities().getAbsoluteCapacity(label), 1e-6); + } + private void checkEqualsToQueueSet(List queues, String[] queueNames) { Set existedQueues = new HashSet<>(); for (CSQueue q : queues) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java index f6b4f2a31d3..84de7ccb82f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java @@ -28,7 +28,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -49,9 +51,10 @@ private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); private ReservationQueue autoCreatedLeafQueue; + private PlanQueue planQueue; @Before - public void setup() throws IOException { + public void setup() throws IOException, SchedulerDynamicEditException { // setup a context / conf csConf = new CapacitySchedulerConfiguration(); @@ -66,12 +69,14 @@ public void setup() throws IOException { when(csContext.getClusterResource()).thenReturn( Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); RMContext mockRMContext = TestUtils.getMockRMContext(); when(csContext.getRMContext()).thenReturn(mockRMContext); // create a queue - PlanQueue pq = new PlanQueue(csContext, "root", null, null); - autoCreatedLeafQueue = new ReservationQueue(csContext, "a", pq); + planQueue = new PlanQueue(csContext, "root", null, null); + autoCreatedLeafQueue = new ReservationQueue(csContext, "a", planQueue); + planQueue.addChildQueue(autoCreatedLeafQueue); } private void validateAutoCreatedLeafQueue(double capacity) { @@ -83,9 +88,14 @@ private void validateAutoCreatedLeafQueue(double capacity) { @Test public void testAddSubtractCapacity() throws Exception { - // verify that setting, adding, subtracting capacity works autoCreatedLeafQueue.setCapacity(1.0F); + autoCreatedLeafQueue.setMaxCapacity(1.0F); + + planQueue.updateClusterResource( + Resources.createResource(100 * 16 * GB, 100 * 32), + new ResourceLimits(Resources.createResource(100 * 16 * GB, 100 * 32))); + validateAutoCreatedLeafQueue(1); autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0.9f, 1f)); validateAutoCreatedLeafQueue(0.9);