diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index e4c2665..b95925b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -39,7 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; @@ -47,13 +47,6 @@ CSQueue parent; final String queueName; - float capacity; - float maximumCapacity; - float absoluteCapacity; - float absoluteMaxCapacity; - float absoluteUsedCapacity = 0.0f; - - float usedCapacity = 0.0f; volatile int numContainers; final Resource minimumAllocation; @@ -65,10 +58,6 @@ Set accessibleLabels; RMNodeLabelsManager labelManager; String defaultLabelExpression; - Map absoluteCapacityByNodeLabels; - Map capacitiyByNodeLabels; - Map absoluteMaxCapacityByNodeLabels; - Map maxCapacityByNodeLabels; Map acls = new HashMap(); @@ -78,6 +67,10 @@ // Track resource usage-by-label like used-resource/pending-resource, etc. ResourceUsage queueUsage; + // Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity, + // etc. + QueueCapacities queueCapacities; + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private CapacitySchedulerContext csContext; @@ -114,48 +107,58 @@ public AbstractCSQueue(CapacitySchedulerContext cs, && this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) { this.defaultLabelExpression = parent.getDefaultNodeLabelExpression(); } - - // set capacity by labels - capacitiyByNodeLabels = - cs.getConfiguration().getNodeLabelCapacities(getQueuePath(), accessibleLabels, - labelManager); - - // set maximum capacity by labels - maxCapacityByNodeLabels = - cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(), - accessibleLabels, labelManager); + this.csContext = cs; + + // initialize ResourceUsage queueUsage = new ResourceUsage(); + + // initialize QueueCapacities from Conf + queueCapacities = new QueueCapacities(); + + initializeCapacitiesFromConf(); + + // compute abs-capacity / abs-maximum capacity + CSQueueUtils.setAbsoluteCapacitiesByNodeLabels(queueCapacities, parent); + + // Sanity check + CSQueueUtils.capacitiesSanityCheck(getQueuePath(), queueCapacities); + } + + protected void initializeCapacitiesFromConf() { + csContext.getConfiguration().setCapacitiesByLabels(getQueuePath(), + accessibleLabels, csContext.getRMContext().getNodeLabelManager(), + queueCapacities); } @Override public synchronized float getCapacity() { - return capacity; + return queueCapacities.getCapacity(); } @Override public synchronized float getAbsoluteCapacity() { - return absoluteCapacity; + return queueCapacities.getAbsoluteCapacity(); } @Override public float getAbsoluteMaximumCapacity() { - return absoluteMaxCapacity; + return queueCapacities.getAbsoluteMaximumCapacity(); } @Override public synchronized float getAbsoluteUsedCapacity() { - return absoluteUsedCapacity; + return queueCapacities.getAbsoluteUsedCapacity(); } @Override public float getMaximumCapacity() { - return maximumCapacity; + return queueCapacities.getMaximumCapacity(); } @Override public synchronized float getUsedCapacity() { - return usedCapacity; + return queueCapacities.getUsedCapacity(); } @Override @@ -213,12 +216,12 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { @Override public synchronized void setUsedCapacity(float usedCapacity) { - this.usedCapacity = usedCapacity; + queueCapacities.setUsedCapacity(usedCapacity); } @Override public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) { - this.absoluteUsedCapacity = absUsedCapacity; + queueCapacities.setAbsoluteUsedCapacity(absUsedCapacity); } /** @@ -227,21 +230,16 @@ public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) { */ synchronized void setMaxCapacity(float maximumCapacity) { // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); + CSQueueUtils.checkMaxCapacity(getQueueName(), + queueCapacities.getCapacity(), maximumCapacity); float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); - CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity, + CSQueueUtils.checkAbsoluteCapacity(getQueueName(), + queueCapacities.getAbsoluteCapacity(), absMaxCapacity); - this.maximumCapacity = maximumCapacity; - this.absoluteMaxCapacity = absMaxCapacity; - } - - @Override - public float getAbsActualCapacity() { - // for now, simply return actual capacity = guaranteed capacity for parent - // queue - return absoluteCapacity; + queueCapacities.setMaximumCapacity(maximumCapacity); + queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity); } @Override @@ -249,25 +247,11 @@ public String getDefaultNodeLabelExpression() { return defaultLabelExpression; } - synchronized void setupQueueConfigs(Resource clusterResource, float capacity, - float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, + synchronized void setupQueueConfigs(Resource clusterResource, QueueState state, Map acls, Set labels, String defaultLabelExpression, - Map nodeLabelCapacities, - Map maximumNodeLabelCapacities, - boolean reservationContinueLooking, Resource maxAllocation) - throws IOException { - // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity, - absoluteMaxCapacity); - - this.capacity = capacity; - this.absoluteCapacity = absoluteCapacity; - - this.maximumCapacity = maximumCapacity; - this.absoluteMaxCapacity = absoluteMaxCapacity; - + boolean reservationContinueLooking, QueueCapacities newCapacities, + Resource maxAllocation) throws IOException { this.state = state; this.acls = acls; @@ -278,10 +262,11 @@ synchronized void setupQueueConfigs(Resource clusterResource, float capacity, // set label expression this.defaultLabelExpression = defaultLabelExpression; - // copy node label capacity - this.capacitiyByNodeLabels = new HashMap(nodeLabelCapacities); - this.maxCapacityByNodeLabels = - new HashMap(maximumNodeLabelCapacities); + // copy node label capacity, when newCapacities will be non-null only in + // reinitialize case. + if (null != newCapacities) { + queueCapacities.reinitialize(newCapacities); + } // Update metrics CSQueueUtils.updateQueueStatistics( @@ -308,21 +293,7 @@ synchronized void setupQueueConfigs(Resource clusterResource, float capacity, } } } - - // calculate absolute capacity by each node label - this.absoluteCapacityByNodeLabels = - CSQueueUtils.computeAbsoluteCapacityByNodeLabels( - this.capacitiyByNodeLabels, parent); - - // calculate maximum capacity by each node label - this.absoluteMaxCapacityByNodeLabels = - CSQueueUtils.computeAbsoluteMaxCapacityByNodeLabels( - maximumNodeLabelCapacities, parent); - - // check absoluteMaximumNodeLabelCapacities is valid - CSQueueUtils.checkAbsoluteCapacitiesByLabel(getQueueName(), - absoluteCapacityByNodeLabels, absoluteCapacityByNodeLabels); - + this.reservationsContinueLooking = reservationContinueLooking; this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); @@ -334,8 +305,8 @@ protected QueueInfo getQueueInfo() { QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); queueInfo.setQueueName(queueName); queueInfo.setAccessibleNodeLabels(accessibleLabels); - queueInfo.setCapacity(capacity); - queueInfo.setMaximumCapacity(maximumCapacity); + queueInfo.setCapacity(queueCapacities.getCapacity()); + queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity()); queueInfo.setQueueState(state); queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression); queueInfo.setCurrentCapacity(getUsedCapacity()); @@ -394,11 +365,7 @@ public float getCapacityByNodeLabel(String label) { return getCapacity(); } - if (!capacitiyByNodeLabels.containsKey(label)) { - return 0f; - } else { - return capacitiyByNodeLabels.get(label); - } + return queueCapacities.getCapacity(label); } @Private @@ -410,24 +377,12 @@ public float getAbsoluteCapacityByNodeLabel(String label) { return getAbsoluteCapacity(); } - if (!absoluteCapacityByNodeLabels.containsKey(label)) { - return 0f; - } else { - return absoluteCapacityByNodeLabels.get(label); - } + return queueCapacities.getAbsoluteCapacity(label); } @Private public float getAbsoluteMaximumCapacityByNodeLabel(String label) { - if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) { - return getAbsoluteMaximumCapacity(); - } - - if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) { - return 0f; - } else { - return absoluteMaxCapacityByNodeLabels.get(label); - } + return queueCapacities.getAbsoluteMaximumCapacity(label); } @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 46ee93c..951e6e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -75,15 +75,6 @@ * @return configured queue capacity */ public float getCapacity(); - - /** - * Get actual capacity of the queue, this may be different from - * configured capacity when mis-config take place, like add labels to the - * cluster - * - * @return actual queue capacity - */ - public float getAbsActualCapacity(); /** * Get capacity of the parent of the queue as a function of the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index f458057..4bfa904 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -34,6 +33,9 @@ final static float EPSILON = 0.0001f; + /* + * Used only by tests + */ public static void checkMaxCapacity(String queueName, float capacity, float maximumCapacity) { if (maximumCapacity < 0.0f || maximumCapacity > 1.0f) { @@ -43,6 +45,9 @@ public static void checkMaxCapacity(String queueName, } } + /* + * Used only by tests + */ public static void checkAbsoluteCapacity(String queueName, float absCapacity, float absMaxCapacity) { if (absMaxCapacity < (absCapacity - EPSILON)) { @@ -53,19 +58,35 @@ public static void checkAbsoluteCapacity(String queueName, } } - public static void checkAbsoluteCapacitiesByLabel(String queueName, - Map absCapacities, - Map absMaximumCapacities) { - for (Entry entry : absCapacities.entrySet()) { - String label = entry.getKey(); - float absCapacity = entry.getValue(); - float absMaxCapacity = absMaximumCapacities.get(label); - if (absMaxCapacity < (absCapacity - EPSILON)) { - throw new IllegalArgumentException("Illegal call to setMaxCapacity. " - + "Queue '" + queueName + "' has " + "an absolute capacity (" - + absCapacity + ") greater than " - + "its absolute maximumCapacity (" + absMaxCapacity + ") of label=" - + label); + /** + * Check sanity of capacities: + * - capacity <= maxCapacity + * - absCapacity <= absMaximumCapacity + * @param queueName + * @param queueCapacities + */ + public static void capacitiesSanityCheck(String queueName, + QueueCapacities queueCapacities) { + for (String label : queueCapacities.getExistingNodeLabels()) { + float capacity = queueCapacities.getCapacity(label); + float maximumCapacity = queueCapacities.getMaximumCapacity(label); + if (capacity > maximumCapacity) { + throw new IllegalArgumentException("Illegal queue capacity setting, " + + "(capacity=" + capacity + ") > (maximum-capacity=" + + maximumCapacity + "). When label=[" + label + "]"); + } + + // Actually, this may not needed since we have verified capacity <= + // maximumCapacity. And the way we compute absolute capacity (abs(x) = + // cap(x) * cap(x.parent) * ...) is a monotone increasing function. But + // just keep it here to make sure our compute abs capacity method works + // correctly. + float absCapacity = queueCapacities.getAbsoluteCapacity(label); + float absMaxCapacity = queueCapacities.getAbsoluteMaximumCapacity(label); + if (absCapacity > absMaxCapacity) { + throw new IllegalArgumentException("Illegal queue capacity setting, " + + "(abs-capacity=" + absCapacity + ") > (abs-maximum-capacity=" + + absMaxCapacity + "). When label=[" + label + "]"); } } } @@ -77,37 +98,34 @@ public static float computeAbsoluteMaximumCapacity( return (parentAbsMaxCapacity * maximumCapacity); } - public static Map computeAbsoluteCapacityByNodeLabels( - Map nodeLabelToCapacities, CSQueue parent) { - if (parent == null) { - return nodeLabelToCapacities; - } + // Set absolute capacities for {capacity, maximum-capacity} + public static void setAbsoluteCapacitiesByNodeLabels( + QueueCapacities queueCapacities, CSQueue parent) { - Map absoluteCapacityByNodeLabels = - new HashMap(); - for (Entry entry : nodeLabelToCapacities.entrySet()) { - String label = entry.getKey(); - float capacity = entry.getValue(); - absoluteCapacityByNodeLabels.put(label, - capacity * parent.getAbsoluteCapacityByNodeLabel(label)); - } - return absoluteCapacityByNodeLabels; - } - - public static Map computeAbsoluteMaxCapacityByNodeLabels( - Map maximumNodeLabelToCapacities, CSQueue parent) { - if (parent == null) { - return maximumNodeLabelToCapacities; + QueueCapacities parentQueueCapacities = null; + if (parent != null) { + parentQueueCapacities = ((AbstractCSQueue) parent).queueCapacities; } - Map absoluteMaxCapacityByNodeLabels = - new HashMap(); - for (Entry entry : maximumNodeLabelToCapacities.entrySet()) { - String label = entry.getKey(); - float maxCapacity = entry.getValue(); - absoluteMaxCapacityByNodeLabels.put(label, - maxCapacity * parent.getAbsoluteMaximumCapacityByNodeLabel(label)); + + for (String label : queueCapacities.getExistingNodeLabels()) { + float capacity = queueCapacities.getCapacity(label); + if (capacity > 0f) { + queueCapacities.setAbsoluteCapacity( + label, + capacity + * (parentQueueCapacities == null ? 1 : parentQueueCapacities + .getAbsoluteCapacity(label))); + } + + float maxCapacity = queueCapacities.getMaximumCapacity(label); + if (maxCapacity > 0f) { + queueCapacities.setAbsoluteMaximumCapacity( + label, + maxCapacity + * (parentQueueCapacities == null ? 1 : parentQueueCapacities + .getAbsoluteMaximumCapacity(label))); + } } - return absoluteMaxCapacityByNodeLabels; } @Lock(CSQueue.class) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 268cc6c..dafcb2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -276,6 +276,9 @@ static String getQueuePrefix(String queue) { } private String getNodeLabelPrefix(String queue, String label) { + if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { + return getQueuePrefix(queue); + } return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT; } @@ -440,57 +443,53 @@ public void setAccessibleNodeLabels(String queue, Set labels) { return Collections.unmodifiableSet(set); } - public Map getNodeLabelCapacities(String queue, - Set labels, RMNodeLabelsManager mgr) { - Map nodeLabelCapacities = new HashMap(); - - if (labels == null) { - return nodeLabelCapacities; + // Considered NO_LABEL, ANY and null cases + private Set normalizeAccessibleNodeLabels(Set labels, + RMNodeLabelsManager mgr) { + Set accessibleLabels = new HashSet(); + if (labels != null) { + accessibleLabels.addAll(labels); } - - for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr - .getClusterNodeLabels() : labels) { - String capacityPropertyName = getNodeLabelPrefix(queue, label) + CAPACITY; - float capacity = getFloat(capacityPropertyName, 0f); - if (capacity < MINIMUM_CAPACITY_VALUE - || capacity > MAXIMUM_CAPACITY_VALUE) { - throw new IllegalArgumentException("Illegal capacity of " + capacity - + " for node-label=" + label + " in queue=" + queue - + ", valid capacity should in range of [0, 100]."); - } - if (LOG.isDebugEnabled()) { - LOG.debug("CSConf - getCapacityOfLabel: prefix=" - + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity); - } - - nodeLabelCapacities.put(label, capacity / 100f); + if (accessibleLabels.contains(CommonNodeLabelsManager.ANY)) { + accessibleLabels.addAll(mgr.getClusterNodeLabels()); } - return nodeLabelCapacities; - } - - public Map getMaximumNodeLabelCapacities(String queue, - Set labels, RMNodeLabelsManager mgr) { - Map maximumNodeLabelCapacities = new HashMap(); - if (labels == null) { - return maximumNodeLabelCapacities; + accessibleLabels.add(CommonNodeLabelsManager.NO_LABEL); + + return accessibleLabels; + } + + private float getQueueCapacity(String queue, String label, String suffix, + float defaultValue) { + String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix; + float capacity = getFloat(capacityPropertyName, defaultValue); + if (capacity < MINIMUM_CAPACITY_VALUE + || capacity > MAXIMUM_CAPACITY_VALUE) { + throw new IllegalArgumentException("Illegal capacity of " + capacity + + " for node-label=" + label + " in queue=" + queue + + ", valid capacity should in range of [0, 100]."); } - - for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr - .getClusterNodeLabels() : labels) { - float maxCapacity = - getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, - 100f); - if (maxCapacity < MINIMUM_CAPACITY_VALUE - || maxCapacity > MAXIMUM_CAPACITY_VALUE) { - throw new IllegalArgumentException("Illegal " + "capacity of " - + maxCapacity + " for label=" + label + " in queue=" + queue); - } + if (LOG.isDebugEnabled()) { LOG.debug("CSConf - getCapacityOfLabel: prefix=" - + getNodeLabelPrefix(queue, label) + ", capacity=" + maxCapacity); - - maximumNodeLabelCapacities.put(label, maxCapacity / 100f); + + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity); + } + return capacity / 100; + } + + public void setCapacitiesByLabels(String queue, Set labels, + RMNodeLabelsManager mgr, QueueCapacities queueCapacities) { + labels = normalizeAccessibleNodeLabels(labels, mgr); + + for (String label : labels) { + if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { + queueCapacities.setCapacity(label, getCapacity(queue) / 100); + queueCapacities.setMaximumCapacity(label, getMaximumCapacity(queue) / 100); + } else { + queueCapacities.setCapacity(label, + getQueueCapacity(queue, label, CAPACITY, 0f)); + queueCapacities.setMaximumCapacity(label, + getQueueCapacity(queue, label, MAXIMUM_CAPACITY, 100f)); + } } - return maximumNodeLabelCapacities; } public String getDefaultNodeLabelExpression(String queue) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index c143210..4d1ddd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -127,14 +127,6 @@ public LeafQueue(CapacitySchedulerContext cs, Resources.subtract(maximumAllocation, minimumAllocation), maximumAllocation); - float capacity = getCapacityFromConf(); - float absoluteCapacity = parent.getAbsoluteCapacity() * capacity; - - float maximumCapacity = - (float)cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100; - float absoluteMaxCapacity = - CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); - int userLimit = cs.getConfiguration().getUserLimit(getQueuePath()); float userLimitFactor = cs.getConfiguration().getUserLimitFactor(getQueuePath()); @@ -143,7 +135,8 @@ public LeafQueue(CapacitySchedulerContext cs, cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath()); if (maxApplications < 0) { int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications(); - maxApplications = (int)(maxSystemApps * absoluteCapacity); + maxApplications = + (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); } maxApplicationsPerUser = (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor); @@ -156,14 +149,12 @@ public LeafQueue(CapacitySchedulerContext cs, Map acls = cs.getConfiguration().getAcls(getQueuePath()); - setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, + setupQueueConfigs(cs.getClusterResource(), userLimit, userLimitFactor, maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser, - state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels, - defaultLabelExpression, this.capacitiyByNodeLabels, - this.maxCapacityByNodeLabels, - cs.getConfiguration().getReservationContinueLook(), - cs.getConfiguration().getMaximumAllocationPerQueue(getQueuePath())); + state, acls, cs.getConfiguration().getNodeLocalityDelay(), + accessibleLabels, defaultLabelExpression, cs.getConfiguration() + .getReservationContinueLook(), null, cs.getConfiguration() + .getMaximumAllocationPerQueue(getQueuePath())); if(LOG.isDebugEnabled()) { LOG.debug("LeafQueue:" + " name=" + queueName @@ -176,34 +167,17 @@ public LeafQueue(CapacitySchedulerContext cs, new TreeSet(applicationComparator); this.activeApplications = new TreeSet(applicationComparator); } - - // externalizing in method, to allow overriding - protected float getCapacityFromConf() { - return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100; - } - - protected synchronized void setupQueueConfigs( - Resource clusterResource, - float capacity, float absoluteCapacity, - float maximumCapacity, float absoluteMaxCapacity, - int userLimit, float userLimitFactor, - int maxApplications, float maxAMResourcePerQueuePercent, - int maxApplicationsPerUser, QueueState state, - Map acls, int nodeLocalityDelay, - Set labels, String defaultLabelExpression, - Map capacitieByLabel, - Map maximumCapacitiesByLabel, - boolean revervationContinueLooking, + + protected synchronized void setupQueueConfigs(Resource clusterResource, + int userLimit, float userLimitFactor, int maxApplications, + float maxAMResourcePerQueuePercent, int maxApplicationsPerUser, + QueueState state, Map acls, + int nodeLocalityDelay, Set labels, String defaultLabelExpression, + boolean revervationContinueLooking, QueueCapacities newCapacities, Resource maxAllocation) throws IOException { - super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, state, acls, labels, - defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel, - revervationContinueLooking, maxAllocation); - // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - float absCapacity = getParent().getAbsoluteCapacity() * capacity; - CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity, - absoluteMaxCapacity); + super.setupQueueConfigs(clusterResource, state, acls, labels, + defaultLabelExpression, + revervationContinueLooking, newCapacities, maxAllocation); this.lastClusterResource = clusterResource; updateAbsoluteCapacityResource(clusterResource); @@ -213,9 +187,8 @@ protected synchronized void setupQueueConfigs( // and all queues may not be realized yet, we'll use (optimistic) // absoluteMaxCapacity (it will be replaced with the more accurate // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) - updateHeadroomInfo(clusterResource, absoluteMaxCapacity); - - this.absoluteCapacity = absCapacity; + updateHeadroomInfo(clusterResource, + queueCapacities.getAbsoluteMaximumCapacity()); this.userLimit = userLimit; this.userLimitFactor = userLimitFactor; @@ -260,13 +233,13 @@ protected synchronized void setupQueueConfigs( } LOG.info("Initializing " + queueName + "\n" + - "capacity = " + capacity + + "capacity = " + queueCapacities.getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n" + - "asboluteCapacity = " + absoluteCapacity + + "asboluteCapacity = " + queueCapacities.getAbsoluteCapacity() + " [= parentAbsoluteCapacity * capacity ]" + "\n" + - "maxCapacity = " + maximumCapacity + + "maxCapacity = " + queueCapacities.getMaximumCapacity() + " [= configuredMaxCapacity ]" + "\n" + - "absoluteMaxCapacity = " + absoluteMaxCapacity + + "absoluteMaxCapacity = " + queueCapacities.getAbsoluteMaximumCapacity() + " [= 1.0 maximumCapacity undefined, " + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + "\n" + @@ -281,7 +254,7 @@ protected synchronized void setupQueueConfigs( "maxApplicationsPerUser = " + maxApplicationsPerUser + " [= (int)(maxApplications * (userLimit / 100.0f) * " + "userLimitFactor) ]" + "\n" + - "usedCapacity = " + usedCapacity + + "usedCapacity = " + queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / " + "(clusterResourceMemory * absoluteCapacity)]" + "\n" + "absoluteUsedCapacity = " + absoluteUsedCapacity + @@ -440,8 +413,8 @@ public int getNodeLocalityDelay() { public String toString() { return queueName + ": " + - "capacity=" + capacity + ", " + - "absoluteCapacity=" + absoluteCapacity + ", " + + "capacity=" + queueCapacities.getCapacity() + ", " + + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " + "usedResources=" + queueUsage.getUsed() + ", " + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " + @@ -505,10 +478,7 @@ public synchronized void reinitialize( } setupQueueConfigs( - clusterResource, - newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity, - newlyParsedLeafQueue.maximumCapacity, - newlyParsedLeafQueue.absoluteMaxCapacity, + clusterResource, newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor, newlyParsedLeafQueue.maxApplications, newlyParsedLeafQueue.maxAMResourcePerQueuePercent, @@ -517,9 +487,8 @@ public synchronized void reinitialize( newlyParsedLeafQueue.getNodeLocalityDelay(), newlyParsedLeafQueue.accessibleLabels, newlyParsedLeafQueue.defaultLabelExpression, - newlyParsedLeafQueue.capacitiyByNodeLabels, - newlyParsedLeafQueue.maxCapacityByNodeLabels, newlyParsedLeafQueue.reservationsContinueLooking, + newlyParsedLeafQueue.queueCapacities, newlyParsedLeafQueue.getMaximumAllocation()); // queue metrics are updated, more resource may be available @@ -1033,7 +1002,8 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, application.getCurrentReservation()), labelManager.getResourceByLabel(label, clusterResource)); - if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) { + if (potentialNewWithoutReservedCapacity <= queueCapacities + .getAbsoluteMaximumCapacity()) { if (LOG.isDebugEnabled()) { LOG.debug("try to use reserved: " + getQueueName() @@ -1047,8 +1017,9 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, + Resources.divide(resourceCalculator, clusterResource, queueUsage.getUsed(), clusterResource) + " required " + required + " potentialNewWithoutReservedCapacity: " - + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: " - + absoluteMaxCapacity + ")"); + + potentialNewWithoutReservedCapacity + " ( " + + " max-capacity: " + + queueCapacities.getAbsoluteMaximumCapacity() + ")"); } // we could potentially use this node instead of reserved node return true; @@ -1072,7 +1043,8 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, queueUsage.getUsed(label), labelManager.getResourceByLabel(label, clusterResource)) + " potentialNewCapacity: " + potentialNewCapacity + " ( " - + " max-capacity: " + absoluteMaxCapacity + ")"); + + " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity() + + ")"); } } @@ -1169,7 +1141,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application, queueCapacity = Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager .getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource), - absoluteCapacity, minimumAllocation); + queueCapacities.getAbsoluteCapacity(), minimumAllocation); } // Allow progress for queues with miniscule capacity @@ -1814,12 +1786,9 @@ synchronized void releaseResource(Resource clusterResource, } private void updateAbsoluteCapacityResource(Resource clusterResource) { - - absoluteCapacityResource = Resources.multiplyAndNormalizeUp( - resourceCalculator, - clusterResource, - absoluteCapacity, minimumAllocation); - + absoluteCapacityResource = + Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource, + queueCapacities.getAbsoluteCapacity(), minimumAllocation); } @Override @@ -1830,7 +1799,8 @@ public synchronized void updateClusterResource(Resource clusterResource) { // Update headroom info based on new cluster resource value // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // during allocation - updateHeadroomInfo(clusterResource, absoluteMaxCapacity); + updateHeadroomInfo(clusterResource, + queueCapacities.getAbsoluteMaximumCapacity()); // Update metrics CSQueueUtils.updateQueueStatistics( @@ -2003,35 +1973,13 @@ public void detachContainer(Resource clusterResource, getParent().detachContainer(clusterResource, application, rmContainer); } } - - @Override - public float getAbsActualCapacity() { - //? Is this actually used by anything at present? - // There is a findbugs warning -re lastClusterResource (now excluded), - // when this is used, verify that the access is mt correct and remove - // the findbugs exclusion if possible - if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, - lastClusterResource, Resources.none())) { - return absoluteCapacity; - } - - Resource resourceRespectLabels = - labelManager == null ? lastClusterResource : labelManager - .getQueueResource(queueName, accessibleLabels, lastClusterResource); - float absActualCapacity = - Resources.divide(resourceCalculator, lastClusterResource, - resourceRespectLabels, lastClusterResource); - - return absActualCapacity > absoluteCapacity ? absoluteCapacity - : absActualCapacity; - } public void setCapacity(float capacity) { - this.capacity = capacity; + queueCapacities.setCapacity(capacity); } public void setAbsoluteCapacity(float absoluteCapacity) { - this.absoluteCapacity = absoluteCapacity; + queueCapacities.setAbsoluteCapacity(absoluteCapacity); } public void setMaxApplications(int maxApplications) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index d66c06a..6d96a34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -94,26 +94,15 @@ public ParentQueue(CapacitySchedulerContext cs, "capacity of " + rawCapacity + " for queue " + queueName + ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); } - - float capacity = (float) rawCapacity / 100; - float parentAbsoluteCapacity = - (rootQueue) ? 1.0f : parent.getAbsoluteCapacity(); - float absoluteCapacity = parentAbsoluteCapacity * capacity; - - float maximumCapacity = - (float) cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100; - float absoluteMaxCapacity = - CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); QueueState state = cs.getConfiguration().getState(getQueuePath()); Map acls = cs.getConfiguration().getAcls(getQueuePath()); - setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels, - defaultLabelExpression, capacitiyByNodeLabels, maxCapacityByNodeLabels, - cs.getConfiguration().getReservationContinueLook()); + setupQueueConfigs(cs.getClusterResource(), state, acls, accessibleLabels, + defaultLabelExpression, cs.getConfiguration() + .getReservationContinueLook(), null); this.childQueues = new TreeSet(queueComparator); @@ -122,18 +111,15 @@ public ParentQueue(CapacitySchedulerContext cs, ", fullname=" + getQueuePath()); } - synchronized void setupQueueConfigs(Resource clusterResource, float capacity, - float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, + synchronized void setupQueueConfigs(Resource clusterResource, QueueState state, Map acls, Set accessibleLabels, String defaultLabelExpression, - Map nodeLabelCapacities, - Map maximumCapacitiesByLabel, - boolean reservationContinueLooking) throws IOException { - super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels, - defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel, - reservationContinueLooking, maximumAllocation); - StringBuilder aclsString = new StringBuilder(); + boolean reservationContinueLooking, QueueCapacities newCapacities) + throws IOException { + super.setupQueueConfigs(clusterResource, state, acls, accessibleLabels, + defaultLabelExpression, reservationContinueLooking, newCapacities, + maximumAllocation); + StringBuilder aclsString = new StringBuilder(); for (Map.Entry e : acls.entrySet()) { aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); } @@ -147,10 +133,10 @@ synchronized void setupQueueConfigs(Resource clusterResource, float capacity, } LOG.info(queueName + - ", capacity=" + capacity + - ", asboluteCapacity=" + absoluteCapacity + - ", maxCapacity=" + maximumCapacity + - ", asboluteMaxCapacity=" + absoluteMaxCapacity + + ", capacity=" + queueCapacities.getCapacity() + + ", asboluteCapacity=" + queueCapacities.getAbsoluteCapacity() + + ", maxCapacity=" + queueCapacities.getMaximumCapacity() + + ", asboluteMaxCapacity=" + queueCapacities.getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" + @@ -166,8 +152,8 @@ void setChildQueues(Collection childQueues) { } float delta = Math.abs(1.0f - childCapacities); // crude way to check // allow capacities being set to 0, and enforce child 0 if parent is 0 - if (((capacity > 0) && (delta > PRECISION)) || - ((capacity == 0) && (childCapacities > 0))) { + if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || + ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) { throw new IllegalArgumentException("Illegal" + " capacity of " + childCapacities + " for children of queue " + queueName); @@ -254,8 +240,8 @@ private synchronized QueueUserACLInfo getUserAclInfo( public String toString() { return queueName + ": " + "numChildQueue= " + childQueues.size() + ", " + - "capacity=" + capacity + ", " + - "absoluteCapacity=" + absoluteCapacity + ", " + + "capacity=" + queueCapacities.getCapacity() + ", " + + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " + "usedResources=" + queueUsage.getUsed() + "usedCapacity=" + getUsedCapacity() + ", " + "numApps=" + getNumApplications() + ", " + @@ -277,17 +263,12 @@ public synchronized void reinitialize( // Set new configs setupQueueConfigs(clusterResource, - newlyParsedParentQueue.capacity, - newlyParsedParentQueue.absoluteCapacity, - newlyParsedParentQueue.maximumCapacity, - newlyParsedParentQueue.absoluteMaxCapacity, newlyParsedParentQueue.state, newlyParsedParentQueue.acls, newlyParsedParentQueue.accessibleLabels, newlyParsedParentQueue.defaultLabelExpression, - newlyParsedParentQueue.capacitiyByNodeLabels, - newlyParsedParentQueue.maxCapacityByNodeLabels, - newlyParsedParentQueue.reservationsContinueLooking); + newlyParsedParentQueue.reservationsContinueLooking, + newlyParsedParentQueue.queueCapacities); // Re-configure existing child queues and add new ones // The CS has already checked to ensure all existing child queues are present! @@ -540,7 +521,8 @@ private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) Resources.subtract(queueUsage.getUsed(), reservedResources), clusterResource); - if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) { + if (capacityWithoutReservedCapacity <= queueCapacities + .getAbsoluteMaximumCapacity()) { if (LOG.isDebugEnabled()) { LOG.debug("parent: try to use reserved: " + getQueueName() + " usedResources: " + queueUsage.getUsed().getMemory() @@ -550,7 +532,7 @@ private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) / clusterResource.getMemory() + " potentialNewWithoutReservedCapacity: " + capacityWithoutReservedCapacity + " ( " + " max-capacity: " - + absoluteMaxCapacity + ")"); + + queueCapacities.getAbsoluteMaximumCapacity() + ")"); } // we could potentially use this node instead of reserved node return true; @@ -760,13 +742,6 @@ public void detachContainer(Resource clusterResource, } } } - - @Override - public float getAbsActualCapacity() { - // for now, simply return actual capacity = guaranteed capacity for parent - // queue - return absoluteCapacity; - } public synchronized int getNumApplications() { return numApplications; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index f8b11eb..c535b69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -19,8 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import org.apache.hadoop.yarn.api.records.Resource; @@ -99,16 +97,12 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue, } // Set new configs - setupQueueConfigs(clusterResource, newlyParsedParentQueue.getCapacity(), - newlyParsedParentQueue.getAbsoluteCapacity(), - newlyParsedParentQueue.getMaximumCapacity(), - newlyParsedParentQueue.getAbsoluteMaximumCapacity(), - newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs(), + setupQueueConfigs(clusterResource, newlyParsedParentQueue.getState(), + newlyParsedParentQueue.getACLs(), newlyParsedParentQueue.accessibleLabels, newlyParsedParentQueue.defaultLabelExpression, - newlyParsedParentQueue.capacitiyByNodeLabels, - newlyParsedParentQueue.maxCapacityByNodeLabels, - newlyParsedParentQueue.getReservationContinueLooking()); + newlyParsedParentQueue.getReservationContinueLooking(), + newlyParsedParentQueue.queueCapacities); updateQuotas(newlyParsedParentQueue.userLimit, newlyParsedParentQueue.userLimitFactor, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java index a0e6d8c..2bcd39e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -19,13 +19,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import com.google.common.collect.Sets; + public class QueueCapacities { private static final String NL = CommonNodeLabelsManager.NO_LABEL; private static final float LABEL_DOESNT_EXIST_CAP = 0f; @@ -58,6 +62,18 @@ private CapacityType(int idx) { public Capacities() { capacitiesArr = new float[CapacityType.values().length]; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{used=" + capacitiesArr[0] + "%, "); + sb.append("abs_used=" + capacitiesArr[1] + "%, "); + sb.append("max_cap=" + capacitiesArr[2] + "%, "); + sb.append("abs_max_cap=" + capacitiesArr[3] + "%, "); + sb.append("cap=" + capacitiesArr[4] + "%, "); + sb.append("abs_cap=" + capacitiesArr[5] + "%}"); + return sb.toString(); + } } private float _get(String label, CapacityType type) { @@ -188,4 +204,40 @@ public void setAbsoluteMaximumCapacity(float value) { public void setAbsoluteMaximumCapacity(String label, float value) { _set(label, CapacityType.ABS_MAX_CAP, value); } + + // Reinitialize, but will not replace used resource + public void reinitialize(QueueCapacities newCapacities) { + try { + writeLock.lock(); + for (String label : Sets.union(this.getExistingNodeLabels(), + newCapacities.getExistingNodeLabels())) { + setCapacity(label, newCapacities.getCapacity(label)); + setMaximumCapacity(label, newCapacities.getMaximumCapacity(label)); + setAbsoluteCapacity(label, newCapacities.getAbsoluteCapacity(label)); + setAbsoluteMaximumCapacity(label, + newCapacities.getAbsoluteMaximumCapacity(label)); + } + } finally { + writeLock.unlock(); + } + } + + public Set getExistingNodeLabels() { + try { + readLock.lock(); + return new HashSet(capacitiesMap.keySet()); + } finally { + readLock.unlock(); + } + } + + @Override + public String toString() { + try { + readLock.lock(); + return this.capacitiesMap.toString(); + } finally { + readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index c4424b5..cc73fa1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -56,6 +56,12 @@ public ReservationQueue(CapacitySchedulerContext cs, String queueName, @Override public synchronized void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { + // compute abs-capacity / abs-maximum capacity + CSQueueUtils.setAbsoluteCapacitiesByNodeLabels(queueCapacities, parent); + + // Sanity check + CSQueueUtils.capacitiesSanityCheck(getQueuePath(), queueCapacities); + // Sanity check if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { @@ -108,9 +114,8 @@ private void updateQuotas(int userLimit, float userLimitFactor, maxApplicationsPerUser = maxAppsPerUserForReservation; } - // used by the super constructor, we initialize to zero - protected float getCapacityFromConf() { - return 0f; + @Override + protected void initializeCapacitiesFromConf() { + // Do nothing } - }