diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java index 8b670871e56..e12b55e3da1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -148,11 +148,10 @@ private void updateCapacitiesToZero() throws IOException { try { for( String nodeLabel : parent.getQueueCapacities().getExistingNodeLabels ()) { - //TODO - update to use getMaximumCapacity(nodeLabel) in YARN-7574 setEntitlement(nodeLabel, new QueueEntitlement(0.0f, parent.getLeafQueueTemplate() .getQueueCapacities() - .getMaximumCapacity())); + .getMaximumCapacity(nodeLabel))); } } catch (SchedulerDynamicEditException e) { throw new IOException(e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java index f7a4bbdeee3..388e9d6233b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; + + +import java.io.IOException; import java.util.List; public interface AutoCreatedQueueManagementPolicy { @@ -26,14 +29,15 @@ * Initialize policy * @param schedulerContext Capacity Scheduler context */ - void init(CapacitySchedulerContext schedulerContext, ParentQueue parentQueue); + void init(CapacitySchedulerContext schedulerContext, ParentQueue + parentQueue) throws IOException; /** * Reinitialize policy state ( if required ) * @param schedulerContext Capacity Scheduler context */ void reinitialize(CapacitySchedulerContext schedulerContext, - ParentQueue parentQueue); + ParentQueue parentQueue) throws IOException; /** * Get initial template for the specified leaf queue @@ -48,6 +52,10 @@ AutoCreatedLeafQueueConfig getInitialLeafQueueConfiguration( /** * Compute/Adjust child queue capacities * for auto created leaf queues + * This computes queue entitlements but does not update LeafQueueState or + * queue capacities. Scheduler calls commitQueueManagemetChanges after + * validation after applying queue changes and commits to LeafQueueState + * are done in commitQueueManagementChanges. * * @return returns a list of suggested QueueEntitlementChange(s) which may * or may not be be enforced by the scheduler diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index bdd30b915c9..510b044093f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1829,6 +1829,15 @@ public void setAutoCreatedLeafQueueConfigCapacity(String queuePath, setCapacity(leafQueueConfPrefix, val); } + @VisibleForTesting + @Private + public void setAutoCreatedLeafQueueTemplateCapacityByLabel(String queuePath, + String label, float val) { + String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( + queuePath); + setCapacityByLabel(leafQueueConfPrefix, label, val); + } + @Private @VisibleForTesting public void setAutoCreatedLeafQueueConfigMaxCapacity(String queuePath, @@ -1838,6 +1847,15 @@ public void setAutoCreatedLeafQueueConfigMaxCapacity(String queuePath, setMaximumCapacity(leafQueueConfPrefix, val); } + @Private + @VisibleForTesting + public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath, + String label, float val) { + String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( + queuePath); + setMaximumCapacityByLabel(leafQueueConfPrefix, label, val); + } + @VisibleForTesting @Private public void setAutoCreatedLeafQueueConfigUserLimit(String queuePath, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 8d1428d3e49..3171ac49cc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -778,6 +778,13 @@ public Resource calculateAndGetAMResourceLimitPerPartition( metrics.setAMResouceLimit(nodePartition, amResouceLimit); queueUsage.setAMLimit(nodePartition, amResouceLimit); + if( LOG.isDebugEnabled()) { + LOG.debug("queue partition resource : " + queuePartitionResource + ',' + + " queue current limit : " + queueCurrentLimit + "," + + " queue partition usable resource : " + + queuePartitionUsableResource + "," + + " amResourceLimit : " + amResouceLimit); + } return amResouceLimit; } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java index cbdb21d9498..1ef0dac4211 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -132,7 +132,7 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) } } - private void initializeQueueManagementPolicy() { + private void initializeQueueManagementPolicy() throws IOException { queueManagementPolicy = csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass( getQueuePath()); @@ -140,7 +140,7 @@ private void initializeQueueManagementPolicy() { queueManagementPolicy.init(csContext, this); } - private void reinitializeQueueManagementPolicy() { + private void reinitializeQueueManagementPolicy() throws IOException { AutoCreatedQueueManagementPolicy managementPolicy = csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass( getQueuePath()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java index aee6405dd7e..de077af740e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.MonotonicClock; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -63,8 +64,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager - .NO_LABEL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler .capacity.CSQueueUtils.EPSILON; @@ -85,8 +84,6 @@ private static final Log LOG = LogFactory.getLog( GuaranteedOrZeroCapacityOverTimePolicy.class); - private AutoCreatedLeafQueueConfig ZERO_CAPACITY_ENTITLEMENT; - private ReentrantReadWriteLock.WriteLock writeLock; private ReentrantReadWriteLock.ReadLock readLock; @@ -97,12 +94,71 @@ private QueueCapacities leafQueueTemplateCapacities; - private Map leafQueueStateMap = new HashMap<>(); + private Set leafQueueTemplateNodeLabels; + + private LeafQueueState leafQueueState = new LeafQueueState(); private Clock clock = new MonotonicClock(); private class LeafQueueState { + //map of partition-> queueName->{leaf queue's state} + private Map> leafQueueStateMap = new + HashMap<>(); + + public boolean containsLeafQueue(String leafQueueName, String partition) { + if( leafQueueStateMap.containsKey(partition) ) { + return leafQueueStateMap.get(partition).containsKey(leafQueueName); + } + return false; + } + + private boolean containsPartition(String partition) { + if( leafQueueStateMap.containsKey(partition) ) { + return true; + } + return false; + } + + private boolean addLeafQueueStateIfNotExists(String leafQueueName, String + partition, LeafQueueStatePerPartition leafQueueState) { + if (!containsPartition(partition)) { + leafQueueStateMap.put(partition, new HashMap<>()); + } + if ( !containsLeafQueue(leafQueueName, partition)) { + leafQueueStateMap.get(partition).put(leafQueueName, leafQueueState); + return true; + } + return false; + } + + public boolean createLeafQueueStateIfNotExists(LeafQueue leafQueue, String + partition) { + return addLeafQueueStateIfNotExists(leafQueue.getQueueName(), + partition, new LeafQueueStatePerPartition()); + } + + public LeafQueueStatePerPartition getLeafQueueStatePerPartition(String + leafQueueName, String partition) { + if( leafQueueStateMap.get(partition) != null) { + return leafQueueStateMap.get(partition).get(leafQueueName); + } + return null; + } + + public Map> + getLeafQueueStateMap() { + return leafQueueStateMap; + } + + private void clear() { + leafQueueStateMap.clear(); + } + } + + + private class LeafQueueStatePerPartition { + private AtomicBoolean isActive = new AtomicBoolean(false); private long mostRecentActivationTime; @@ -139,41 +195,16 @@ private boolean deactivate() { } } - private boolean containsLeafQueue(String leafQueueName) { - return leafQueueStateMap.containsKey(leafQueueName); - } - - private boolean addLeafQueueStateIfNotExists(String leafQueueName, - LeafQueueState leafQueueState) { - if (!containsLeafQueue(leafQueueName)) { - leafQueueStateMap.put(leafQueueName, leafQueueState); - return true; - } - return false; - } - - private boolean addLeafQueueStateIfNotExists(LeafQueue leafQueue) { - return addLeafQueueStateIfNotExists(leafQueue.getQueueName(), - new LeafQueueState()); - } - - private void clearLeafQueueState() { - leafQueueStateMap.clear(); - } - private class ParentQueueState { private Map totalAbsoluteActivatedChildQueueCapacityByLabel = new HashMap(); - private float getAbsoluteActivatedChildQueueCapacity() { - return getAbsoluteActivatedChildQueueCapacity(NO_LABEL); - } - private float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) { try { readLock.lock(); - Float totalActivatedCapacity = getByLabel(nodeLabel); + Float totalActivatedCapacity = getAbsActivatedChildQueueCapacityByLabel + (nodeLabel); if (totalActivatedCapacity != null) { return totalActivatedCapacity; } else{ @@ -188,11 +219,14 @@ private void incAbsoluteActivatedChildCapacity(String nodeLabel, float childQueueCapacity) { try { writeLock.lock(); - Float activatedChildCapacity = getByLabel(nodeLabel); + Float activatedChildCapacity = + getAbsActivatedChildQueueCapacityByLabel(nodeLabel); if (activatedChildCapacity != null) { - setByLabel(nodeLabel, activatedChildCapacity + childQueueCapacity); + setAbsActivatedChildQueueCapacityByLabel( + nodeLabel, activatedChildCapacity + childQueueCapacity); } else{ - setByLabel(nodeLabel, childQueueCapacity); + setAbsActivatedChildQueueCapacityByLabel + (nodeLabel, childQueueCapacity); } } finally { writeLock.unlock(); @@ -203,22 +237,25 @@ private void decAbsoluteActivatedChildCapacity(String nodeLabel, float childQueueCapacity) { try { writeLock.lock(); - Float activatedChildCapacity = getByLabel(nodeLabel); + Float activatedChildCapacity = + getAbsActivatedChildQueueCapacityByLabel(nodeLabel); if (activatedChildCapacity != null) { - setByLabel(nodeLabel, activatedChildCapacity - childQueueCapacity); + setAbsActivatedChildQueueCapacityByLabel( + nodeLabel, activatedChildCapacity - childQueueCapacity); } else{ - setByLabel(nodeLabel, childQueueCapacity); + setAbsActivatedChildQueueCapacityByLabel( + nodeLabel, childQueueCapacity); } } finally { writeLock.unlock(); } } - Float getByLabel(String label) { + Float getAbsActivatedChildQueueCapacityByLabel(String label) { return totalAbsoluteActivatedChildQueueCapacityByLabel.get(label); } - Float setByLabel(String label, float val) { + Float setAbsActivatedChildQueueCapacityByLabel(String label, float val) { return totalAbsoluteActivatedChildQueueCapacityByLabel.put(label, val); } @@ -256,13 +293,12 @@ public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) { @Override public void init(final CapacitySchedulerContext schedulerContext, - final ParentQueue parentQueue) { + final ParentQueue parentQueue) throws IOException { this.scheduler = schedulerContext; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); - if (!(parentQueue instanceof ManagedParentQueue)) { throw new IllegalArgumentException( "Expected instance of type " + ManagedParentQueue.class); @@ -278,15 +314,42 @@ public void init(final CapacitySchedulerContext schedulerContext, + leafQueueTemplate.getQueueCapacities() + "]"); } - private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) { + private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) + throws IOException { leafQueueTemplate = parentQueue.getLeafQueueTemplate(); leafQueueTemplateCapacities = leafQueueTemplate.getQueueCapacities(); - ZERO_CAPACITY_ENTITLEMENT = buildTemplate(0.0f, - leafQueueTemplateCapacities.getMaximumCapacity()); + for(String nodeLabel : leafQueueTemplateCapacities + .getExistingNodeLabels()) { + //Should this be used inste + Set parentQueueLabels = parentQueue.getNodeLabelsForQueue(); + if ( !parentQueueLabels.contains(nodeLabel)) { + LOG.error("Invalid node label " + nodeLabel + + " on configured leaf template on parent" + + " queue " + parentQueue.getQueueName()); + throw new IOException("Invalid node label " + nodeLabel + + " on configured leaf template on parent" + + " queue " + parentQueue.getQueueName()); + } + } + + leafQueueTemplateNodeLabels = leafQueueTemplateCapacities + .getExistingNodeLabels(); + } + /** + * Compute/Adjust child queue capacities + * for auto created leaf queues + * This computes queue entitlements but does not update LeafQueueState or + * queue capacities. Scheduler calls commitQueueManagemetChanges after + * validation after applying queue changes and commits to LeafQueueState + * are done in commitQueueManagementChanges. + * + * @return + * @throws SchedulerDynamicEditException + */ @Override public List computeQueueManagementChanges() throws SchedulerDynamicEditException { @@ -298,70 +361,91 @@ private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) { try { readLock.lock(); List queueManagementChanges = new ArrayList<>(); + List pendingApps = getSortedPendingApplications(); + + //Map of LeafQueue->QueueCapacities - keep adding the computed + // entitlements to this map and finally + // build the leaf queue configuration Template for all identified leaf + // queues + Map leafQueueEntitlements = new HashMap<>(); + for (String nodeLabel : leafQueueTemplateNodeLabels) { + // check if any leaf queues need to be deactivated based on pending + // applications + float parentAbsoluteCapacity = + managedParentQueue.getQueueCapacities().getAbsoluteCapacity( + nodeLabel); + float leafQueueTemplateAbsoluteCapacity = + leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel); + Map deactivatedLeafQueues = + deactivateLeafQueuesIfInActive(managedParentQueue, nodeLabel, + leafQueueEntitlements); + + float deactivatedCapacity = getTotalDeactivatedCapacity( + deactivatedLeafQueues, nodeLabel); + + float sumOfChildQueueActivatedCapacity = parentQueueState. + getAbsoluteActivatedChildQueueCapacity(nodeLabel); + + //Check if we need to activate anything at all? + float availableCapacity = getAvailableCapacity(parentAbsoluteCapacity, + deactivatedCapacity, sumOfChildQueueActivatedCapacity); - // check if any leaf queues need to be deactivated based on pending - // applications and - float parentAbsoluteCapacity = - managedParentQueue.getQueueCapacities().getAbsoluteCapacity(); - - float leafQueueTemplateAbsoluteCapacity = - leafQueueTemplateCapacities.getAbsoluteCapacity(); - Map deactivatedLeafQueues = - deactivateLeafQueuesIfInActive(managedParentQueue, queueManagementChanges); - - float deactivatedCapacity = getTotalDeactivatedCapacity( - deactivatedLeafQueues); - - float sumOfChildQueueActivatedCapacity = parentQueueState. - getAbsoluteActivatedChildQueueCapacity(); - - //Check if we need to activate anything at all? - float availableCapacity = getAvailableCapacity(parentAbsoluteCapacity, - deactivatedCapacity, sumOfChildQueueActivatedCapacity); - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Parent queue : " + managedParentQueue.getQueueName() + " absCapacity = " - + parentAbsoluteCapacity + ", leafQueueAbsoluteCapacity = " - + leafQueueTemplateAbsoluteCapacity + ", deactivatedCapacity = " - + deactivatedCapacity + " , absChildActivatedCapacity = " - + sumOfChildQueueActivatedCapacity + ", availableCapacity = " - + availableCapacity); - } - - if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) { - //sort applications across leaf queues by submit time - List pendingApps = getSortedPendingApplications(); + if (LOG.isDebugEnabled()) { + LOG.debug("Parent queue : " + managedParentQueue.getQueueName() + + ", nodeLabel = " + nodeLabel + ", absCapacity = " + + parentAbsoluteCapacity + ", leafQueueAbsoluteCapacity = " + + leafQueueTemplateAbsoluteCapacity + ", deactivatedCapacity = " + + deactivatedCapacity + " , absChildActivatedCapacity = " + + sumOfChildQueueActivatedCapacity + ", availableCapacity = " + + availableCapacity); + } - if (pendingApps.size() > 0) { - int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated( - availableCapacity, leafQueueTemplateAbsoluteCapacity, - pendingApps.size()); + if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) { + //sort applications across leaf queues by submit time + if (pendingApps.size() > 0) { + int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated( + availableCapacity, leafQueueTemplateAbsoluteCapacity, + pendingApps.size()); - if (LOG.isDebugEnabled()) { - LOG.debug("Found " + maxLeafQueuesTobeActivated - + " leaf queues to be activated with " + pendingApps.size() - + " apps "); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + maxLeafQueuesTobeActivated + " leaf queues" + + " to be activated with " + pendingApps.size() + " apps "); + } - LinkedHashSet leafQueuesToBeActivated = getSortedLeafQueues( - pendingApps, maxLeafQueuesTobeActivated, - deactivatedLeafQueues.keySet()); + LinkedHashSet leafQueuesToBeActivated = getSortedLeafQueues( + nodeLabel, pendingApps, maxLeafQueuesTobeActivated, + deactivatedLeafQueues.keySet()); - //Compute entitlement changes for the identified leaf queues - // which is appended to the List of queueManagementChanges - computeQueueManagementChanges(leafQueuesToBeActivated, - queueManagementChanges, availableCapacity, - leafQueueTemplateAbsoluteCapacity); + //Compute entitlement changes for the identified leaf queues + // which is appended to the List of computedEntitlements + updateLeafQueueCapacitiesByLabel(nodeLabel, leafQueuesToBeActivated, + leafQueueEntitlements); - if (LOG.isDebugEnabled()) { - if (leafQueuesToBeActivated.size() > 0) { - LOG.debug( - "Activated leaf queues : [" + leafQueuesToBeActivated + "]"); + if (LOG.isDebugEnabled()) { + if (leafQueuesToBeActivated.size() > 0) { + LOG.debug("Activated leaf queues : [" + leafQueuesToBeActivated + + "]"); + } } } } } + + //Populate new entitlements + + for (final Iterator> iterator = + leafQueueEntitlements.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry queueCapacities = iterator.next(); + String leafQueueName = queueCapacities.getKey(); + AutoCreatedLeafQueue leafQueue = + (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager() + .getQueue(leafQueueName); + AutoCreatedLeafQueueConfig newTemplate = buildTemplate( + queueCapacities.getValue()); + queueManagementChanges.add( + new QueueManagementChange.UpdateQueue(leafQueue, newTemplate)); + + } return queueManagementChanges; } finally { readLock.unlock(); @@ -369,14 +453,14 @@ private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) { } private float getTotalDeactivatedCapacity( - Map deactivatedLeafQueues) { + Map deactivatedLeafQueues, String nodeLabel) { float deactivatedCapacity = 0; for (Iterator> iterator = deactivatedLeafQueues.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry deactivatedQueueCapacity = - iterator.next(); - deactivatedCapacity += - deactivatedQueueCapacity.getValue().getAbsoluteCapacity(); + Map.Entry deactivatedQueueCapacity = iterator + .next(); + deactivatedCapacity += deactivatedQueueCapacity.getValue() + .getAbsoluteCapacity(nodeLabel); } return deactivatedCapacity; } @@ -385,20 +469,44 @@ private float getTotalDeactivatedCapacity( void updateLeafQueueState() { try { writeLock.lock(); + Set newPartitions = new HashSet<>(); Set newQueues = new HashSet<>(); + for (CSQueue newQueue : managedParentQueue.getChildQueues()) { if (newQueue instanceof LeafQueue) { - addLeafQueueStateIfNotExists((LeafQueue) newQueue); + for (String nodeLabel : + leafQueueTemplateNodeLabels) { + leafQueueState.createLeafQueueStateIfNotExists((LeafQueue) + newQueue, nodeLabel); + newPartitions.add(nodeLabel); + } newQueues.add(newQueue.getQueueName()); } } - for (Iterator> itr = - leafQueueStateMap.entrySet().iterator(); itr.hasNext(); ) { - Map.Entry e = itr.next(); - String queueName = e.getKey(); - if (!newQueues.contains(queueName)) { + for (Iterator>> itr = + leafQueueState.getLeafQueueStateMap().entrySet().iterator(); itr + .hasNext(); ) { + Map.Entry> e = itr.next(); + String partition = e.getKey(); + if (!newPartitions.contains(partition)) { itr.remove(); + LOG.info("Removed partition " + partition + " from leaf queue " + + "state"); + } else { + Map queues = e.getValue(); + for (Iterator> queueItr = + queues.entrySet().iterator(); queueItr + .hasNext(); ) { + String queue = queueItr.next().getKey(); + if (!newQueues.contains(queue)) { + queueItr.remove(); + LOG.info("Removed queue " + queue + " from leaf queue " + + "state from partition " + partition); + } + } } } } finally { @@ -406,22 +514,20 @@ void updateLeafQueueState() { } } - private LinkedHashSet getSortedLeafQueues( + private LinkedHashSet getSortedLeafQueues(String nodeLabel, final List pendingApps, int leafQueuesNeeded, Set deactivatedQueues) throws SchedulerDynamicEditException { LinkedHashSet leafQueues = new LinkedHashSet<>(leafQueuesNeeded); int ctr = 0; for (FiCaSchedulerApp app : pendingApps) { - AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) app.getCSLeafQueue(); String leafQueueName = leafQueue.getQueueName(); //Check if leafQueue is not active already and has any pending apps if (ctr < leafQueuesNeeded) { - - if (!isActive(leafQueue)) { + if (!isActive(leafQueue, nodeLabel)) { if (!deactivatedQueues.contains(leafQueueName)) { if (addLeafQueueIfNotExists(leafQueues, leafQueueName)) { ctr++; @@ -445,76 +551,64 @@ private boolean addLeafQueueIfNotExists(Set leafQueues, } @VisibleForTesting - public boolean isActive(final AutoCreatedLeafQueue leafQueue) + public boolean isActive(final AutoCreatedLeafQueue leafQueue, String + nodeLabel) throws SchedulerDynamicEditException { try { readLock.lock(); - LeafQueueState leafQueueStatus = getLeafQueueState(leafQueue); + LeafQueueStatePerPartition leafQueueStatus = getLeafQueueState(leafQueue, + nodeLabel); return leafQueueStatus.isActive(); } finally { readLock.unlock(); } } - private Map deactivateLeafQueuesIfInActive( - ParentQueue parentQueue, - List queueManagementChanges) + private Map deactivateLeafQueuesIfInActive(ParentQueue parentQueue, + String nodeLabel, Map leafQueueEntitlements) throws SchedulerDynamicEditException { Map deactivatedQueues = new HashMap<>(); for (CSQueue childQueue : parentQueue.getChildQueues()) { AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; + if (leafQueue != null) { + if (isActive(leafQueue, nodeLabel) && !hasPendingApps(leafQueue)) { + if ( !leafQueueEntitlements.containsKey(leafQueue.getQueueName())) { + leafQueueEntitlements.put(leafQueue.getQueueName(), new QueueCapacities(false)); + } - if (isActive(leafQueue) && !hasPendingApps(leafQueue)) { - queueManagementChanges.add( - new QueueManagementChange.UpdateQueue(leafQueue, - ZERO_CAPACITY_ENTITLEMENT)); - deactivatedQueues.put(leafQueue.getQueueName(), - leafQueueTemplateCapacities); - } else{ - if (LOG.isDebugEnabled()) { - LOG.debug(" Leaf queue has pending applications : " + leafQueue - .getNumApplications() + ".Skipping deactivation for " - + leafQueue); + QueueCapacities capacities = leafQueueEntitlements.get(leafQueue.getQueueName()); + updateToZeroCapacity(capacities, nodeLabel); + deactivatedQueues.put(leafQueue.getQueueName(), + leafQueueTemplateCapacities); + } else{ + if (LOG.isDebugEnabled()) { + LOG.debug(" Leaf queue has pending applications or is " + "inactive" + + " : " + leafQueue.getNumApplications() + + ".Skipping deactivation for " + leafQueue); + } } + } else{ + LOG.warn( + "Could not find queue in scheduler while trying" + " to " + + "deactivate for " + parentQueue); } } - if (LOG.isDebugEnabled()) { - if (deactivatedQueues.size() > 0) { - LOG.debug("Deactivated leaf queues : " + deactivatedQueues); - } - } return deactivatedQueues; } - private void computeQueueManagementChanges( - Set leafQueuesToBeActivated, - List queueManagementChanges, - final float availableCapacity, - final float leafQueueTemplateAbsoluteCapacity) { - - float curAvailableCapacity = availableCapacity; - + private void updateLeafQueueCapacitiesByLabel(String nodeLabel, + Set leafQueuesToBeActivated, Map + leafQueueEntitlements) { for (String curLeafQueue : leafQueuesToBeActivated) { + if ( !leafQueueEntitlements.containsKey(curLeafQueue)) { + leafQueueEntitlements.put(curLeafQueue, new QueueCapacities(false)); // Activate queues if capacity is available - if (curAvailableCapacity >= leafQueueTemplateAbsoluteCapacity) { - AutoCreatedLeafQueue leafQueue = - (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager() - .getQueue(curLeafQueue); - if (leafQueue != null) { - AutoCreatedLeafQueueConfig newTemplate = buildTemplate( - leafQueueTemplateCapacities.getCapacity(), - leafQueueTemplateCapacities.getMaximumCapacity()); - queueManagementChanges.add( - new QueueManagementChange.UpdateQueue(leafQueue, newTemplate)); - curAvailableCapacity -= leafQueueTemplateAbsoluteCapacity; - } else{ - LOG.warn( - "Could not find queue in scheduler while trying to deactivate " - + curLeafQueue); - } } + + QueueCapacities capacities = leafQueueEntitlements.get(curLeafQueue); + updateCapacityFromTemplate(capacities, nodeLabel); } } @@ -567,25 +661,27 @@ public void commitQueueManagementChanges( AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) queue; - if (updatedQueueTemplate.getQueueCapacities().getCapacity() > 0) { - if (isActive(leafQueue)) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Queue is already active. Skipping activation : " + queue - .getQueuePath()); + for (String nodeLabel : updatedQueueTemplate.getQueueCapacities() + .getExistingNodeLabels()) { + if (updatedQueueTemplate.getQueueCapacities(). + getCapacity(nodeLabel) > 0) { + if (isActive(leafQueue, nodeLabel)) { + if ( LOG.isDebugEnabled()) { + LOG.debug("Queue is already active." + " Skipping activation : " + + queue.getQueuePath()); + } + } else{ + activate(leafQueue, nodeLabel); } } else{ - activate(leafQueue); - } - } else{ - if (!isActive(leafQueue)) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Queue is already de-activated. " + "Skipping de-activation " - + ": " + leafQueue.getQueuePath()); + if (!isActive(leafQueue, nodeLabel)) { + if ( LOG.isDebugEnabled()) { + LOG.debug("Queue is already de-activated. Skipping " + + "de-activation : " + leafQueue.getQueuePath()); + } + } else{ + deactivate(leafQueue, nodeLabel); } - } else{ - deactivate(leafQueue); } } } @@ -594,30 +690,28 @@ public void commitQueueManagementChanges( } } - private void activate(final AutoCreatedLeafQueue leafQueue) + private void activate(final AbstractAutoCreatedLeafQueue leafQueue, String nodeLabel) throws SchedulerDynamicEditException { try { writeLock.lock(); - getLeafQueueState(leafQueue).activate(); - - parentQueueState.incAbsoluteActivatedChildCapacity(NO_LABEL, - leafQueueTemplateCapacities.getAbsoluteCapacity()); + getLeafQueueState(leafQueue, nodeLabel).activate(); + parentQueueState.incAbsoluteActivatedChildCapacity(nodeLabel, + leafQueueTemplateCapacities + .getAbsoluteCapacity(nodeLabel)); } finally { writeLock.unlock(); } } - private void deactivate(final AutoCreatedLeafQueue leafQueue) - throws SchedulerDynamicEditException { + private void deactivate(final AbstractAutoCreatedLeafQueue leafQueue, String + nodeLabel) throws SchedulerDynamicEditException { try { writeLock.lock(); - getLeafQueueState(leafQueue).deactivate(); + getLeafQueueState(leafQueue, nodeLabel).deactivate(); - for (String nodeLabel : managedParentQueue.getQueueCapacities() - .getExistingNodeLabels()) { - parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel, - leafQueueTemplateCapacities.getAbsoluteCapacity()); - } + parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel, + leafQueueTemplateCapacities + .getAbsoluteCapacity(nodeLabel)); } finally { writeLock.unlock(); } @@ -629,7 +723,7 @@ public boolean hasPendingApps(final AutoCreatedLeafQueue leafQueue) { @Override public void reinitialize(CapacitySchedulerContext schedulerContext, - final ParentQueue parentQueue) { + final ParentQueue parentQueue) throws IOException { if (!(parentQueue instanceof ManagedParentQueue)) { throw new IllegalStateException( "Expected instance of type " + ManagedParentQueue.class + " found " @@ -649,7 +743,7 @@ public void reinitialize(CapacitySchedulerContext schedulerContext, //clear state parentQueueState.clear(); - clearLeafQueueState(); + leafQueueState.clear(); LOG.info( "Reinitialized queue management policy for parent queue " @@ -663,51 +757,77 @@ public AutoCreatedLeafQueueConfig getInitialLeafQueueConfiguration( AbstractAutoCreatedLeafQueue leafQueue) throws SchedulerDynamicEditException { + AutoCreatedLeafQueueConfig template; + if ( !(leafQueue instanceof AutoCreatedLeafQueue)) { throw new SchedulerDynamicEditException("Not an instance of " + "AutoCreatedLeafQueue : " + leafQueue.getClass()); } - AutoCreatedLeafQueue autoCreatedLeafQueue = - (AutoCreatedLeafQueue) leafQueue; - AutoCreatedLeafQueueConfig template = ZERO_CAPACITY_ENTITLEMENT; try { writeLock.lock(); - if (!addLeafQueueStateIfNotExists(leafQueue)) { - LOG.error("Leaf queue already exists in state : " + getLeafQueueState( - leafQueue)); - throw new SchedulerDynamicEditException( - "Leaf queue already exists in state : " + getLeafQueueState( - leafQueue)); - } - float availableCapacity = getAvailableCapacity( - managedParentQueue.getQueueCapacities().getAbsoluteCapacity(), 0, - parentQueueState.getAbsoluteActivatedChildQueueCapacity()); + QueueCapacities capacities = new QueueCapacities(false); + for (String nodeLabel : + leafQueueTemplateNodeLabels) { + if (!leafQueueState.createLeafQueueStateIfNotExists(leafQueue, + nodeLabel)) { + String message = "Leaf queue already exists in state : " + + getLeafQueueState(leafQueue, nodeLabel); + LOG.error(message); + } + + float availableCapacity = + getAvailableCapacity( + managedParentQueue.getQueueCapacities().getAbsoluteCapacity(nodeLabel) + , 0, + parentQueueState. + getAbsoluteActivatedChildQueueCapacity(nodeLabel)); + + if (availableCapacity >= + leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel)) { - if (availableCapacity >= leafQueueTemplateCapacities - .getAbsoluteCapacity()) { - activate(autoCreatedLeafQueue); - template = buildTemplate(leafQueueTemplateCapacities.getCapacity(), - leafQueueTemplateCapacities.getMaximumCapacity()); + updateCapacityFromTemplate(capacities, nodeLabel); + activate(leafQueue, nodeLabel); + } else{ + updateToZeroCapacity(capacities, nodeLabel); + } } + + template = buildTemplate(capacities); } finally { writeLock.unlock(); } return template; } + private void updateToZeroCapacity(QueueCapacities capacities, String + nodeLabel) { + capacities.setCapacity(nodeLabel, 0.0f); + capacities.setMaximumCapacity(nodeLabel, leafQueueTemplateCapacities + .getMaximumCapacity(nodeLabel)); + } + + private void updateCapacityFromTemplate(QueueCapacities capacities, String + nodeLabel) { + capacities.setCapacity(nodeLabel, leafQueueTemplateCapacities + .getCapacity(nodeLabel)); + capacities.setMaximumCapacity(nodeLabel, leafQueueTemplateCapacities + .getMaximumCapacity(nodeLabel)); + } + @VisibleForTesting - LeafQueueState getLeafQueueState(LeafQueue queue) + LeafQueueStatePerPartition getLeafQueueState(LeafQueue queue, String partition) throws SchedulerDynamicEditException { try { readLock.lock(); String queueName = queue.getQueueName(); - if (!containsLeafQueue(queueName)) { + if (!leafQueueState.containsLeafQueue(queueName, partition)) { throw new SchedulerDynamicEditException( "Could not find leaf queue in " + "state " + queueName); } else{ - return leafQueueStateMap.get(queueName); + return leafQueueState. + getLeafQueueStatePerPartition(queueName, partition); } } finally { readLock.unlock(); @@ -715,8 +835,8 @@ LeafQueueState getLeafQueueState(LeafQueue queue) } @VisibleForTesting - public float getAbsoluteActivatedChildQueueCapacity() { - return parentQueueState.getAbsoluteActivatedChildQueueCapacity(); + public float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) { + return parentQueueState.getAbsoluteActivatedChildQueueCapacity(nodeLabel); } private List getSortedPendingApplications() { @@ -726,20 +846,11 @@ public float getAbsoluteActivatedChildQueueCapacity() { return apps; } - private AutoCreatedLeafQueueConfig buildTemplate(float capacity, - float maxCapacity) { - AutoCreatedLeafQueueConfig.Builder templateBuilder = - new AutoCreatedLeafQueueConfig.Builder(); - - QueueCapacities capacities = new QueueCapacities(false); + private AutoCreatedLeafQueueConfig buildTemplate( + QueueCapacities capacities) { + AutoCreatedLeafQueueConfig.Builder templateBuilder = new + AutoCreatedLeafQueueConfig.Builder(); templateBuilder.capacities(capacities); - - for (String nodeLabel : managedParentQueue.getQueueCapacities() - .getExistingNodeLabels()) { - capacities.setCapacity(nodeLabel, capacity); - capacities.setMaximumCapacity(nodeLabel, maxCapacity); - } - return new AutoCreatedLeafQueueConfig(templateBuilder); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java index d8d71c71ea3..46011a3bc33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -83,6 +83,8 @@ .capacity.CapacitySchedulerConfiguration.DOT; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler .capacity.CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CapacitySchedulerConfiguration.ROOT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -93,7 +95,7 @@ private static final Log LOG = LogFactory.getLog( TestCapacitySchedulerAutoCreatedQueueBase.class); public static final int GB = 1024; - public final static ContainerUpdates NULL_UPDATE_REQUESTS = + public static final ContainerUpdates NULL_UPDATE_REQUESTS = new ContainerUpdates(); public static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; @@ -106,9 +108,6 @@ public static final String B1 = B + ".b1"; public static final String B2 = B + ".b2"; public static final String B3 = B + ".b3"; - public static final String C1 = C + ".c1"; - public static final String C2 = C + ".c2"; - public static final String C3 = C + ".c3"; public static final float A_CAPACITY = 20f; public static final float B_CAPACITY = 40f; public static final float C_CAPACITY = 20f; @@ -118,8 +117,6 @@ public static final float B1_CAPACITY = 60f; public static final float B2_CAPACITY = 20f; public static final float B3_CAPACITY = 20f; - public static final float C1_CAPACITY = 20f; - public static final float C2_CAPACITY = 20f; public static final int NODE_MEMORY = 16; @@ -139,12 +136,16 @@ public static final String NODEL_LABEL_GPU = "GPU"; public static final String NODEL_LABEL_SSD = "SSD"; + protected final float NODE_LABEL_GPU_TEMPLATE_CAPACITY = 30.0f; + protected final float NODE_LABEL_GPU_TEMPLATE_MAX_CAPACITY = 100.0f; + protected final float NODEL_LABEL_SSD_TEMPLATE_CAPACITY = 40.0f; + protected final float NODE_LABEL_SSD_TEMPLATE_MAX_CAPACITY = 100.0f; + protected MockRM mockRM = null; protected MockNM nm1 = null; protected MockNM nm2 = null; protected MockNM nm3 = null; protected CapacityScheduler cs; - private final TestCapacityScheduler tcs = new TestCapacityScheduler(); protected SpyDispatcher dispatcher; private static EventHandler rmAppEventEventHandler; @@ -258,19 +259,23 @@ public static CapacitySchedulerConfiguration setupQueueMappings( /** * @param conf, to be modified - * @return, CS configuration which has C as an auto creation enabled parent - * queue - *

- * root / \ \ \ a b c d / \ / | \ a1 a2 b1 - * b2 b3 + * @return, CS configuration which has C + * as an auto creation enabled parent queue + *

+ * root + * / \ \ \ + * a b c d + * / \ / | \ + * a1 a2 b1 b2 b3 */ - public static CapacitySchedulerConfiguration setupQueueConfiguration( + + protected CapacitySchedulerConfiguration setupQueueConfiguration( CapacitySchedulerConfiguration conf) { //setup new queues with one of them auto enabled // Define top-level queues // Set childQueue for root - conf.setQueues(CapacitySchedulerConfiguration.ROOT, + conf.setQueues(ROOT, new String[] { "a", "b", "c", "d" }); conf.setCapacity(A, A_CAPACITY); @@ -302,6 +307,14 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration( conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100); conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f); + conf.setAutoCreatedLeafQueueTemplateCapacityByLabel(C, NODEL_LABEL_GPU, + NODE_LABEL_GPU_TEMPLATE_CAPACITY); + conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, NODEL_LABEL_GPU, 100.0f); + conf.setAutoCreatedLeafQueueTemplateCapacityByLabel(C, NODEL_LABEL_SSD, + NODEL_LABEL_SSD_TEMPLATE_CAPACITY); + conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, NODEL_LABEL_SSD, + 100.0f); + LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue"); conf.setUserLimitFactor(D, 1.0f); @@ -326,8 +339,13 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration( accessibleNodeLabelsOnC.add(NO_LABEL); conf.setAccessibleNodeLabels(C, accessibleNodeLabelsOnC); - conf.setCapacityByLabel(C, NODEL_LABEL_GPU, 50); - conf.setCapacityByLabel(C, NODEL_LABEL_SSD, 50); + conf.setAccessibleNodeLabels(ROOT, accessibleNodeLabelsOnC); + conf.setCapacityByLabel(ROOT, NODEL_LABEL_GPU, 100f); + conf.setCapacityByLabel(ROOT, NODEL_LABEL_SSD, 100f); + + conf.setAccessibleNodeLabels(C, accessibleNodeLabelsOnC); + conf.setCapacityByLabel(C, NODEL_LABEL_GPU, 100f); + conf.setCapacityByLabel(C, NODEL_LABEL_SSD, 100f); LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue"); @@ -504,17 +522,19 @@ protected void validateUserAndAppLimits( autoCreatedLeafQueue.getMaxApplicationsPerUser()); } - protected void validateInitialQueueEntitlement(CSQueue parentQueue, - String leafQueueName, float expectedTotalChildQueueAbsCapacity) - throws SchedulerDynamicEditException { + protected void validateInitialQueueEntitlement(CSQueue parentQueue, String + leafQueueName, Map + expectedTotalChildQueueAbsCapacityByLabel) + throws SchedulerDynamicEditException, InterruptedException { validateInitialQueueEntitlement(cs, parentQueue, leafQueueName, - expectedTotalChildQueueAbsCapacity); + expectedTotalChildQueueAbsCapacityByLabel); } protected void validateInitialQueueEntitlement( CapacityScheduler capacityScheduler, CSQueue parentQueue, - String leafQueueName, float expectedTotalChildQueueAbsCapacity) - throws SchedulerDynamicEditException { + String leafQueueName, + Map expectedTotalChildQueueAbsCapacityByLabel) + throws SchedulerDynamicEditException, InterruptedException { ManagedParentQueue autoCreateEnabledParentQueue = (ManagedParentQueue) parentQueue; @@ -522,11 +542,7 @@ protected void validateInitialQueueEntitlement( (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue .getAutoCreatedQueueManagementPolicy(); - assertEquals(expectedTotalChildQueueAbsCapacity, - policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON); - - AutoCreatedLeafQueue leafQueue = - (AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName); + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName); Map expectedEntitlements = new HashMap<>(); QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate() @@ -534,6 +550,10 @@ protected void validateInitialQueueEntitlement( for (String label : accessibleNodeLabelsOnC) { validateCapacitiesByLabel(autoCreateEnabledParentQueue, leafQueue, label); + assertEquals(true, policy.isActive(leafQueue, label)); + + assertEquals(expectedTotalChildQueueAbsCapacityByLabel.get(label), + policy.getAbsoluteActivatedChildQueueCapacity(label), EPSILON); QueueEntitlement expectedEntitlement = new QueueEntitlement( cap.getCapacity(label), cap.getMaximumCapacity(label)); @@ -542,21 +562,19 @@ protected void validateInitialQueueEntitlement( validateEffectiveMinResource(leafQueue, label, expectedEntitlements); } - - assertEquals(true, policy.isActive(leafQueue)); } - protected void validateCapacitiesByLabel( - ManagedParentQueue autoCreateEnabledParentQueue, - AutoCreatedLeafQueue leafQueue, String label) { - assertEquals( - autoCreateEnabledParentQueue.getLeafQueueTemplate().getQueueCapacities() - .getCapacity(), leafQueue.getQueueCapacities().getCapacity(label), - EPSILON); - assertEquals( - autoCreateEnabledParentQueue.getLeafQueueTemplate().getQueueCapacities() - .getMaximumCapacity(), - leafQueue.getQueueCapacities().getMaximumCapacity(label), EPSILON); + protected void validateCapacitiesByLabel(ManagedParentQueue + autoCreateEnabledParentQueue, AutoCreatedLeafQueue leafQueue, String + label) throws InterruptedException { + assertEquals(autoCreateEnabledParentQueue.getLeafQueueTemplate() + .getQueueCapacities().getCapacity(label), + leafQueue.getQueueCapacities() + .getCapacity(label), EPSILON); + assertEquals(autoCreateEnabledParentQueue.getLeafQueueTemplate() + .getQueueCapacities().getMaximumCapacity(label), + leafQueue.getQueueCapacities() + .getMaximumCapacity(label), EPSILON); } protected void validateEffectiveMinResource(CSQueue leafQueue, @@ -582,8 +600,10 @@ protected void validateEffectiveMinResource(CSQueue leafQueue, } protected void validateActivatedQueueEntitlement(CSQueue parentQueue, - String leafQueueName, float expectedTotalChildQueueAbsCapacity, - List queueManagementChanges) + String leafQueueName, Map + expectedTotalChildQueueAbsCapacity, + List queueManagementChanges, Set + expectedNodeLabels) throws SchedulerDynamicEditException { ManagedParentQueue autoCreateEnabledParentQueue = (ManagedParentQueue) parentQueue; @@ -594,67 +614,84 @@ protected void validateActivatedQueueEntitlement(CSQueue parentQueue, QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate() .getQueueCapacities(); - QueueEntitlement expectedEntitlement = new QueueEntitlement( - cap.getCapacity(), cap.getMaximumCapacity()); - //validate capacity - validateQueueEntitlements(leafQueueName, expectedEntitlement, - queueManagementChanges); + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) + cs.getQueue(leafQueueName); - //validate parent queue state - assertEquals(expectedTotalChildQueueAbsCapacity, - policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON); + Map expectedEntitlements = new HashMap<>(); - AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( - leafQueueName); + for (String label : expectedNodeLabels) { + //validate leaf queue state + assertEquals(true, policy.isActive(leafQueue, label)); - //validate leaf queue state - assertEquals(true, policy.isActive(leafQueue)); + QueueEntitlement expectedEntitlement = new QueueEntitlement( + cap.getCapacity(label), cap.getMaximumCapacity(label)); + + //validate parent queue state + assertEquals(expectedTotalChildQueueAbsCapacity.get(label), + policy.getAbsoluteActivatedChildQueueCapacity(label), EPSILON); + + expectedEntitlements.put(label, expectedEntitlement); + } + + //validate capacity + validateQueueEntitlements(leafQueueName, expectedEntitlements, + queueManagementChanges, expectedNodeLabels); } protected void validateDeactivatedQueueEntitlement(CSQueue parentQueue, - String leafQueueName, float expectedTotalChildQueueAbsCapacity, - List queueManagementChanges) + String leafQueueName, Map + expectedTotalChildQueueAbsCapacity, + List + queueManagementChanges) throws SchedulerDynamicEditException { - QueueEntitlement expectedEntitlement = new QueueEntitlement(0.0f, 1.0f); + QueueEntitlement expectedEntitlement = + new QueueEntitlement(0.0f, 1.0f); ManagedParentQueue autoCreateEnabledParentQueue = (ManagedParentQueue) parentQueue; - AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( - leafQueueName); + AutoCreatedLeafQueue leafQueue = + (AutoCreatedLeafQueue) cs.getQueue(leafQueueName); GuaranteedOrZeroCapacityOverTimePolicy policy = (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue .getAutoCreatedQueueManagementPolicy(); - //validate parent queue state - assertEquals(expectedTotalChildQueueAbsCapacity, - policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON); + Map expectedEntitlements = new HashMap<>(); + + for (String label : accessibleNodeLabelsOnC) { + //validate parent queue state + LOG.info("Validating label " + label); + assertEquals(expectedTotalChildQueueAbsCapacity.get(label), policy + .getAbsoluteActivatedChildQueueCapacity(label), EPSILON); - //validate leaf queue state - assertEquals(false, policy.isActive(leafQueue)); + //validate leaf queue state + assertEquals(false, policy.isActive(leafQueue, label)); + expectedEntitlements.put(label, expectedEntitlement); + } //validate capacity - validateQueueEntitlements(leafQueueName, expectedEntitlement, - queueManagementChanges); + validateQueueEntitlements(leafQueueName, expectedEntitlements, + queueManagementChanges, accessibleNodeLabelsOnC); } - private void validateQueueEntitlements(String leafQueueName, - QueueEntitlement expectedEntitlement, - List queueEntitlementChanges) { + void validateQueueEntitlements(String leafQueueName, + Map expectedEntitlements, + List + queueEntitlementChanges, Set expectedNodeLabels) { AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( leafQueueName); - validateQueueEntitlementChangesForLeafQueue(leafQueue, expectedEntitlement, - queueEntitlementChanges); + validateQueueEntitlementChanges(leafQueue, expectedEntitlements, + queueEntitlementChanges, expectedNodeLabels); } - private void validateQueueEntitlementChangesForLeafQueue(CSQueue leafQueue, - QueueEntitlement expectedQueueEntitlement, - final List queueEntitlementChanges) { + private void validateQueueEntitlementChanges(AutoCreatedLeafQueue leafQueue, + Map expectedQueueEntitlements, + final List queueEntitlementChanges, Set + expectedNodeLabels) { boolean found = false; - Map expectedQueueEntitlements = new HashMap<>(); for (QueueManagementChange entitlementChange : queueEntitlementChanges) { if (leafQueue.getQueueName().equals( entitlementChange.getQueue().getQueueName())) { @@ -662,13 +699,12 @@ private void validateQueueEntitlementChangesForLeafQueue(CSQueue leafQueue, AutoCreatedLeafQueueConfig updatedQueueTemplate = entitlementChange.getUpdatedQueueTemplate(); - for (String label : accessibleNodeLabelsOnC) { + for (String label : expectedNodeLabels) { QueueEntitlement newEntitlement = new QueueEntitlement( updatedQueueTemplate.getQueueCapacities().getCapacity(label), - updatedQueueTemplate.getQueueCapacities() - .getMaximumCapacity(label)); - assertEquals(expectedQueueEntitlement, newEntitlement); - expectedQueueEntitlements.put(label, expectedQueueEntitlement); + updatedQueueTemplate.getQueueCapacities().getMaximumCapacity + (label)); + assertEquals(expectedQueueEntitlements.get(label), newEntitlement); validateEffectiveMinResource(leafQueue, label, expectedQueueEntitlements); } @@ -677,9 +713,20 @@ private void validateQueueEntitlementChangesForLeafQueue(CSQueue leafQueue, } } if (!found) { - fail("Could not find the specified leaf queue in entitlement changes : " - + leafQueue.getQueueName()); + fail( + "Could not find the specified leaf queue in entitlement changes : " + + leafQueue.getQueueName()); } } + protected Map populateExpectedAbsCapacityByLabelForParentQueue + (int numLeafQueues) { + Map expectedChildQueueAbsCapacity = new HashMap<>(); + expectedChildQueueAbsCapacity.put(NODEL_LABEL_GPU, + NODE_LABEL_GPU_TEMPLATE_CAPACITY/100 * numLeafQueues); + expectedChildQueueAbsCapacity.put(NODEL_LABEL_SSD, + NODEL_LABEL_SSD_TEMPLATE_CAPACITY/100 * numLeafQueues); + expectedChildQueueAbsCapacity.put(NO_LABEL, 0.1f * numLeafQueues); + return expectedChildQueueAbsCapacity; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index 01d5e6c38f2..750739cc66f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -71,12 +71,17 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager + .NO_LABEL; +import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; + -import static org.apache.hadoop.yarn.server.resourcemanager.placement - .UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .capacity.CSQueueUtils.EPSILON; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -86,7 +91,7 @@ /** * Tests for creation and reinitialization of auto created leaf queues - * under a ManagedParentQueue. + * and capacity management under a ManagedParentQueue. */ public class TestCapacitySchedulerAutoQueueCreation extends TestCapacitySchedulerAutoCreatedQueueBase { @@ -101,7 +106,8 @@ 4); - @Test(timeout = 10000) +// @Test(timeout = 10000) + @Test public void testAutoCreateLeafQueueCreation() throws Exception { try { @@ -118,7 +124,12 @@ public void testAutoCreateLeafQueueCreation() throws Exception { ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue( PARENT_QUEUE); assertEquals(parentQueue, autoCreatedLeafQueue.getParent()); - validateInitialQueueEntitlement(parentQueue, USER0, 0.1f); + + Map expectedChildQueueAbsCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(1); + validateInitialQueueEntitlement(parentQueue, USER0, + expectedChildQueueAbsCapacity); + validateUserAndAppLimits(autoCreatedLeafQueue, 1000, 1000); assertTrue(autoCreatedLeafQueue @@ -156,10 +167,17 @@ public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception { USER0); ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue( PARENT_QUEUE); + assertEquals(parentQueue, user0Queue.getParent()); assertEquals(parentQueue, user1Queue.getParent()); - validateInitialQueueEntitlement(parentQueue, USER0, 0.2f); - validateInitialQueueEntitlement(parentQueue, USER1, 0.2f); + + Map + expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(2); + validateInitialQueueEntitlement(parentQueue, USER0, + expectedAbsChildQueueCapacity); + validateInitialQueueEntitlement(parentQueue, USER1, + expectedAbsChildQueueCapacity); ApplicationAttemptId appAttemptId = appsInC.get(0); @@ -167,7 +185,8 @@ public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception { RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory( null); ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY, - 1 * GB, 1, true, priority, recordFactory); + 1 * GB, 1, true, priority, + recordFactory); cs.allocate(appAttemptId, Collections.singletonList(r1), null, Collections.emptyList(), Collections.singletonList(host), @@ -197,11 +216,6 @@ public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception { validateCapacities(user0QueueReinited, 0.0f, 0.0f, 1.0f, 1.0f); - AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( - USER1); - validateInitialQueueEntitlement(parentQueue, leafQueue.getQueueName(), - 0.1f); - } finally { cleanupQueue(USER0); } @@ -481,52 +495,77 @@ public void testAutoCreatedQueueActivationDeactivation() throws Exception { CSQueue parentQueue = cs.getQueue(PARENT_QUEUE); //submit app1 as USER1 - submitApp(mockRM, parentQueue, USER1, USER1, 1, 1); - validateInitialQueueEntitlement(parentQueue, USER1, 0.1f); + ApplicationId user1AppId = submitApp(mockRM, parentQueue, USER1, USER1, + 1, 1); + Map expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(1); + validateInitialQueueEntitlement(parentQueue, USER1, + expectedAbsChildQueueCapacity); //submit another app2 as USER2 ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2, 1); - validateInitialQueueEntitlement(parentQueue, USER2, 0.2f); + + expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(2); + validateInitialQueueEntitlement(parentQueue, USER2, + expectedAbsChildQueueCapacity); //submit another app3 as USER1 submitApp(mockRM, parentQueue, USER1, USER1, 3, 2); //validate total activated abs capacity remains the same GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy = - (GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) - parentQueue) + (GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) parentQueue) .getAutoCreatedQueueManagementPolicy(); - assertEquals(autoCreatedQueueManagementPolicy - .getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON); - //submit user_3 app. This cant be scheduled since there is no capacity + for (String nodeLabel : accessibleNodeLabelsOnC) { + assertEquals(expectedAbsChildQueueCapacity.get(nodeLabel), + autoCreatedQueueManagementPolicy.getAbsoluteActivatedChildQueueCapacity(nodeLabel), EPSILON); + } + + //submit user_3 app. This cant be allocated since there is no capacity + // in NO_LABEL, SSD but can be in GPU label submitApp(mockRM, parentQueue, USER3, USER3, 4, 1); final CSQueue user3LeafQueue = cs.getQueue(USER3); validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.0f, 0.0f, 1.0f, 1.0f); - assertEquals(autoCreatedQueueManagementPolicy - .getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON); + assertEquals(0.2f, autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON); - //deactivate USER2 queue + assertEquals(0.9f, autoCreatedQueueManagementPolicy.getAbsoluteActivatedChildQueueCapacity(NODEL_LABEL_GPU), + EPSILON); + + //Verify that AMs can be allocated + //Node 3 has NO_LABEL. So launch here + mockRM.launchAM(mockRM.getRMContext().getRMApps().get(user1AppId), + mockRM, nm3); + mockRM.launchAM(mockRM.getRMContext().getRMApps().get(user2AppId), + mockRM, nm3); + +// //deactivate USER2 queue cs.killAllAppsInQueue(USER2); mockRM.waitForState(user2AppId, RMAppState.KILLED); - //Verify if USER_2 can be deactivated since it has no pending appsA + //Verify if USER_2 can be deactivated since it has no pending apps List queueManagementChanges = autoCreatedQueueManagementPolicy.computeQueueManagementChanges(); ManagedParentQueue managedParentQueue = (ManagedParentQueue) parentQueue; - managedParentQueue.validateAndApplyQueueManagementChanges( - queueManagementChanges); + managedParentQueue. + validateAndApplyQueueManagementChanges(queueManagementChanges); + + validateDeactivatedQueueEntitlement(parentQueue, USER2, + expectedAbsChildQueueCapacity, queueManagementChanges); - validateDeactivatedQueueEntitlement(parentQueue, USER2, 0.2f, - queueManagementChanges); + //USER_3 should now get activated for SSD, NO_LABEL + Set expectedNodeLabelsUpdated = new HashSet<>(); + expectedNodeLabelsUpdated.add(NO_LABEL); + expectedNodeLabelsUpdated.add(NODEL_LABEL_SSD); - //USER_3 should now get activated - validateActivatedQueueEntitlement(parentQueue, USER3, 0.2f, - queueManagementChanges); + validateActivatedQueueEntitlement(parentQueue, USER3, + expectedAbsChildQueueCapacity , queueManagementChanges, expectedNodeLabelsUpdated); } finally { cleanupQueue(USER1); @@ -548,13 +587,18 @@ public void testClusterResourceUpdationOnAutoCreatedLeafQueues() throws //submit app1 as USER1 submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1); - validateInitialQueueEntitlement(newCS, parentQueue, USER1, 0.1f); - CSQueue user1LeafQueue = newCS.getQueue(USER1); + Map expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(1); + validateInitialQueueEntitlement(newCS, parentQueue, USER1, + expectedAbsChildQueueCapacity); //submit another app2 as USER2 - submitApp(newMockRM, parentQueue, USER2, USER2, 2, 1); - validateInitialQueueEntitlement(newCS, parentQueue, USER2, 0.2f); - CSQueue user2LeafQueue = newCS.getQueue(USER2); + ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2, USER2, 2, + 1); + expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(2); + validateInitialQueueEntitlement(newCS, parentQueue, USER2, + expectedAbsChildQueueCapacity); //validate total activated abs capacity remains the same GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy = @@ -562,7 +606,7 @@ public void testClusterResourceUpdationOnAutoCreatedLeafQueues() throws parentQueue) .getAutoCreatedQueueManagementPolicy(); assertEquals(autoCreatedQueueManagementPolicy - .getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON); + .getAbsoluteActivatedChildQueueCapacity(NO_LABEL), 0.2f, EPSILON); //submit user_3 app. This cant be scheduled since there is no capacity submitApp(newMockRM, parentQueue, USER3, USER3, 3, 1); @@ -571,7 +615,7 @@ public void testClusterResourceUpdationOnAutoCreatedLeafQueues() throws 1.0f, 1.0f); assertEquals(autoCreatedQueueManagementPolicy - .getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON); + .getAbsoluteActivatedChildQueueCapacity(NO_LABEL), 0.2f, EPSILON); // add new NM. newMockRM.registerNode("127.0.0.3:1234", 125 * GB, 20); @@ -579,31 +623,33 @@ public void testClusterResourceUpdationOnAutoCreatedLeafQueues() throws // There will be change in effective resource when nodes are added // since we deal with percentages - Resource MAX_RES = Resources.addTo(TEMPLATE_MAX_RES, - Resources.createResource(125 * GB, 20)); + Resource MAX_RES = Resources.addTo(TEMPLATE_MAX_RES, Resources.createResource(125 * + GB, 20)); Resource MIN_RES = Resources.createResource(14438, 6); Assert.assertEquals("Effective Min resource for USER3 is not correct", - Resources.none(), - user3LeafQueue.getQueueResourceQuotas().getEffectiveMinResource()); + Resources.none(), user3LeafQueue.getQueueResourceQuotas() + .getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for USER3 is not correct", - MAX_RES, - user3LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource()); + MAX_RES, user3LeafQueue + .getQueueResourceQuotas() + .getEffectiveMaxResource()); + CSQueue user1LeafQueue = newCS.getQueue(USER1); + CSQueue user2LeafQueue = newCS.getQueue(USER2); Assert.assertEquals("Effective Min resource for USER2 is not correct", - MIN_RES, - user1LeafQueue.getQueueResourceQuotas().getEffectiveMinResource()); + MIN_RES, user1LeafQueue.getQueueResourceQuotas() + .getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for USER2 is not correct", - MAX_RES, - user1LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource()); + MAX_RES, user1LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Effective Min resource for USER1 is not correct", - MIN_RES, - user2LeafQueue.getQueueResourceQuotas().getEffectiveMinResource()); + MIN_RES, user2LeafQueue.getQueueResourceQuotas() + .getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for USER1 is not correct", - MAX_RES, - user2LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource()); + MAX_RES, user2LeafQueue.getQueueResourceQuotas() + .getEffectiveMaxResource()); // unregister one NM. newMockRM.unRegisterNode(nm3); @@ -612,11 +658,11 @@ public void testClusterResourceUpdationOnAutoCreatedLeafQueues() throws // After loosing one NM, resources will reduce Assert.assertEquals("Effective Min resource for USER2 is not correct", - MIN_RES_UPDATED, - user1LeafQueue.getQueueResourceQuotas().getEffectiveMinResource()); + MIN_RES_UPDATED, user1LeafQueue.getQueueResourceQuotas().getEffectiveMinResource + ()); Assert.assertEquals("Effective Max resource for USER2 is not correct", - MAX_RES_UPDATED, - user2LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource()); + MAX_RES_UPDATED, user2LeafQueue.getQueueResourceQuotas() + .getEffectiveMaxResource()); } finally { cleanupQueue(USER1); @@ -629,25 +675,6 @@ public void testClusterResourceUpdationOnAutoCreatedLeafQueues() throws } } - @Test - public void testAutoCreatedQueueInheritsNodeLabels() throws Exception { - - try { - String host = "127.0.0.1"; - RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, - host); - cs.handle(new NodeAddedSchedulerEvent(node)); - - CSQueue parentQueue = cs.getQueue(PARENT_QUEUE); - - submitApp(USER1, USER1, NODEL_LABEL_GPU); - //submit app1 as USER1 - validateInitialQueueEntitlement(parentQueue, USER1, 0.1f); - } finally { - cleanupQueue(USER1); - } - } - @Test public void testReinitializeQueuesWithAutoCreatedLeafQueues() throws Exception { @@ -662,12 +689,17 @@ public void testReinitializeQueuesWithAutoCreatedLeafQueues() //submit app1 as USER1 submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1); - validateInitialQueueEntitlement(newCS, parentQueue, USER1, 0.1f); + Map expectedChildQueueAbsCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(1); + validateInitialQueueEntitlement(newCS, parentQueue, USER1, expectedChildQueueAbsCapacity); //submit another app2 as USER2 - ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2, USER2, - 2, 1); - validateInitialQueueEntitlement(newCS, parentQueue, USER2, 0.2f); + ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2, + USER2, 2, + 1); + expectedChildQueueAbsCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(2); + validateInitialQueueEntitlement(newCS, parentQueue, USER2, expectedChildQueueAbsCapacity); //update parent queue capacity conf.setCapacity(C, 30f); @@ -692,19 +724,24 @@ public void testReinitializeQueuesWithAutoCreatedLeafQueues() //submit app1 as USER3 submitApp(newMockRM, parentQueue, USER3, USER3, 3, 1); - validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f); - AutoCreatedLeafQueue user3Queue = (AutoCreatedLeafQueue) newCS.getQueue( - USER1); - validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f, 0.2f); + AutoCreatedLeafQueue user3Queue = + (AutoCreatedLeafQueue) newCS.getQueue(USER1); + validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f,0.2f); validateUserAndAppLimits(user3Queue, 900, 900); //submit app1 as USER1 - is already activated. there should be no diff // in capacities submitApp(newMockRM, parentQueue, USER3, USER3, 4, 2); - validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f); - validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f, 0.2f); + validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f,0.2f); validateUserAndAppLimits(user3Queue, 900, 900); + GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy = + (GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) + parentQueue) + .getAutoCreatedQueueManagementPolicy(); + assertEquals(0.27f, autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity + (NO_LABEL), EPSILON); } finally { cleanupQueue(USER1); cleanupQueue(USER2); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java index 4dc56fb314b..113f0a7b184 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java @@ -24,7 +24,10 @@ import org.junit.Before; import org.junit.Test; +import java.util.Map; +import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager + .NO_LABEL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler .capacity.CSQueueUtils.EPSILON; import static org.junit.Assert.assertEquals; @@ -54,21 +57,27 @@ public void testEditSchedule() throws Exception { parentQueue) .getAutoCreatedQueueManagementPolicy(); assertEquals(0f, autoCreatedQueueManagementPolicy - .getAbsoluteActivatedChildQueueCapacity(), EPSILON); + .getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON); //submit app1 as USER1 ApplicationId user1AppId = submitApp(mockRM, parentQueue, USER1, USER1, 1, 1); - validateInitialQueueEntitlement(parentQueue, USER1, 0.1f); + Map expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(1); + validateInitialQueueEntitlement(parentQueue, USER1, + expectedAbsChildQueueCapacity); //submit another app2 as USER2 ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2, 1); - validateInitialQueueEntitlement(parentQueue, USER2, 0.2f); + expectedAbsChildQueueCapacity = + populateExpectedAbsCapacityByLabelForParentQueue(2); + validateInitialQueueEntitlement(parentQueue, USER2, + expectedAbsChildQueueCapacity); //validate total activated abs capacity assertEquals(0.2f, autoCreatedQueueManagementPolicy - .getAbsoluteActivatedChildQueueCapacity(), EPSILON); + .getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON); //submit user_3 app. This cant be scheduled since there is no capacity submitApp(mockRM, parentQueue, USER3, USER3, 3, 1); @@ -77,7 +86,7 @@ public void testEditSchedule() throws Exception { 1.0f, 1.0f); assertEquals(autoCreatedQueueManagementPolicy - .getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON); + .getAbsoluteActivatedChildQueueCapacity(NO_LABEL), 0.2f, EPSILON); //deactivate USER2 queue cs.killAllAppsInQueue(USER2); @@ -88,8 +97,8 @@ public void testEditSchedule() throws Exception { mockRM.waitForState(user1AppId, RMAppState.KILLED); policy.editSchedule(); - - waitForPolicyState(0.1f, autoCreatedQueueManagementPolicy, 1000); + waitForPolicyState(0.1f, autoCreatedQueueManagementPolicy, NO_LABEL, + 1000); validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.5f, 0.1f, 1.0f, 1.0f); @@ -105,13 +114,12 @@ public void testEditSchedule() throws Exception { } private void waitForPolicyState(float expectedVal, - GuaranteedOrZeroCapacityOverTimePolicy queueManagementPolicy, int - timesec) throws - InterruptedException { + GuaranteedOrZeroCapacityOverTimePolicy queueManagementPolicy, String + nodeLabel, int timesec) throws InterruptedException { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < timesec * 1000) { if (Float.compare(expectedVal, queueManagementPolicy - .getAbsoluteActivatedChildQueueCapacity()) != 0) { + .getAbsoluteActivatedChildQueueCapacity(nodeLabel)) != 0) { Thread.sleep(100); } else { break;