diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 0e6207b..9f91bca 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -188,6 +188,23 @@ + + + + + + + + + + + + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java new file mode 100644 index 0000000..0072916 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -0,0 +1,453 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.NodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.collect.Sets; + +public abstract class AbstractCSQueue implements CSQueue { + + CSQueue parent; + final String queueName; + + float capacity; + float maximumCapacity; + float absoluteCapacity; + float absoluteMaxCapacity; + float absoluteUsedCapacity = 0.0f; + + float usedCapacity = 0.0f; + volatile int numApplications; + volatile int numContainers; + + final Resource minimumAllocation; + final Resource maximumAllocation; + QueueState state; + final QueueMetrics metrics; + + final ResourceCalculator resourceCalculator; + Set labels; + DynamicNodeLabelsManager labelManager; + String defaultLabelExpression; + Resource usedResources = Resources.createResource(0, 0); + QueueInfo queueInfo; + final Comparator queueComparator; + Map absoluteNodeLabelCapacities; + Map nodeLabelCapacities; + Map usedResourcesByLabels = new HashMap(); + Map absoluteMaximumNodeLabelCapacities; + Map maximumNodeLabelCapacities; + + Map acls = + new HashMap(); + boolean reservationsContinueLooking; + + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + public AbstractCSQueue(CapacitySchedulerContext cs, + String queueName, CSQueue parent, CSQueue old) throws IOException { + this.minimumAllocation = cs.getMinimumResourceCapability(); + this.maximumAllocation = cs.getMaximumResourceCapability(); + this.labelManager = cs.getRMContext().getNodeLabelManager(); + this.parent = parent; + this.queueName = queueName; + this.resourceCalculator = cs.getResourceCalculator(); + this.queueComparator = cs.getQueueComparator(); + this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class); + + // must be called after parent and queueName is set + this.metrics = old != null ? old.getMetrics() : + QueueMetrics.forQueue(getQueuePath(), parent, + cs.getConfiguration().getEnableUserMetrics(), + cs.getConf()); + + // get labels + this.labels = cs.getConfiguration().getAccessibleNodeLabels(getQueuePath()); + this.defaultLabelExpression = cs.getConfiguration() + .getDefaultNodeLabelExpression(getQueuePath()); + + this.queueInfo.setQueueName(queueName); + + // inherit from parent if labels not set + if (this.labels == null && parent != null) { + this.labels = parent.getAccessibleNodeLabels(); + SchedulerUtils.checkAndThrowIfLabelNotIncluded(labelManager, this.labels); + } + + // inherit from parent if labels not set + if (this.defaultLabelExpression == null && parent != null + && this.labels.containsAll(parent.getAccessibleNodeLabels())) { + this.defaultLabelExpression = parent.getDefaultNodeLabelExpression(); + } + + // set capacity by labels + nodeLabelCapacities = + cs.getConfiguration().getNodeLabelCapacities(getQueuePath(), labels); + + // set maximum capacity by labels + maximumNodeLabelCapacities = + cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(), + labels); + } + + @Override + public synchronized float getCapacity() { + return capacity; + } + + @Override + public synchronized float getAbsoluteCapacity() { + return absoluteCapacity; + } + + @Override + public float getAbsoluteMaximumCapacity() { + return absoluteMaxCapacity; + } + + @Override + public synchronized float getAbsoluteUsedCapacity() { + return absoluteUsedCapacity; + } + + @Override + public float getMaximumCapacity() { + return maximumCapacity; + } + + @Override + public synchronized float getUsedCapacity() { + return usedCapacity; + } + + @Override + public synchronized Resource getUsedResources() { + return usedResources; + } + + public synchronized int getNumContainers() { + return numContainers; + } + + public synchronized int getNumApplications() { + return numApplications; + } + + @Override + public synchronized QueueState getState() { + return state; + } + + @Override + public QueueMetrics getMetrics() { + return metrics; + } + + @Override + public String getQueueName() { + return queueName; + } + + @Override + public synchronized CSQueue getParent() { + return parent; + } + + @Override + public synchronized void setParent(CSQueue newParentQueue) { + this.parent = (ParentQueue)newParentQueue; + } + + public Set getAccessibleNodeLabels() { + return labels; + } + + @Override + public boolean hasAccess(QueueACL acl, UserGroupInformation user) { + synchronized (this) { + if (acls.get(acl).isUserAllowed(user)) { + return true; + } + } + + if (parent != null) { + return parent.hasAccess(acl, user); + } + + return false; + } + + @Override + public synchronized void setUsedCapacity(float usedCapacity) { + this.usedCapacity = usedCapacity; + } + + @Override + public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) { + this.absoluteUsedCapacity = absUsedCapacity; + } + + /** + * Set maximum capacity - used only for testing. + * @param maximumCapacity new max capacity + */ + synchronized void setMaxCapacity(float maximumCapacity) { + // Sanity check + CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); + float absMaxCapacity = + CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); + CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity, + absMaxCapacity); + + this.maximumCapacity = maximumCapacity; + this.absoluteMaxCapacity = absMaxCapacity; + } + + @Override + public float getAbsActualCapacity() { + // for now, simply return actual capacity = guaranteed capacity for parent + // queue + return absoluteCapacity; + } + + @Override + public String getDefaultNodeLabelExpression() { + return defaultLabelExpression; + } + + synchronized void setupQueueConfigs(Resource clusterResource, float capacity, + float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, + QueueState state, Map acls, + Set labels, String defaultLabelExpression, + Map nodeLabelCapacities, + Map maximumNodeLabelCapacities, boolean continueLooking) + throws IOException { + // Sanity check + CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); + CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity, + absoluteMaxCapacity); + + this.capacity = capacity; + this.absoluteCapacity = absoluteCapacity; + + this.maximumCapacity = maximumCapacity; + this.absoluteMaxCapacity = absoluteMaxCapacity; + + this.state = state; + + this.acls = acls; + + // set labels + this.labels = labels; + + // set label expression + this.defaultLabelExpression = defaultLabelExpression; + + // copy node label capacity + this.nodeLabelCapacities = new HashMap(nodeLabelCapacities); + this.maximumNodeLabelCapacities = + new HashMap(maximumNodeLabelCapacities); + + this.queueInfo.setNodeLabels(this.labels); + this.queueInfo.setCapacity(this.capacity); + this.queueInfo.setMaximumCapacity(this.maximumCapacity); + this.queueInfo.setQueueState(this.state); + this.queueInfo.setDefaultNodeLabelExpression(this.defaultLabelExpression); + + // Update metrics + CSQueueUtils.updateQueueStatistics( + resourceCalculator, this, parent, clusterResource, minimumAllocation); + + // Check if labels of this queue is a subset of parent queue, only do this + // when we not root + if (parent != null && parent.getParent() != null) { + if (parent.getAccessibleNodeLabels() != null + && !parent.getAccessibleNodeLabels().contains(DynamicNodeLabelsManager.ANY)) { + // if parent isn't "*", child shouldn't be "*" too + if (this.getAccessibleNodeLabels().contains(DynamicNodeLabelsManager.ANY)) { + throw new IOException("Parent's accessible queue is not ANY(*), " + + "but child's accessible queue is *"); + } else { + Set diff = + Sets.difference(this.getAccessibleNodeLabels(), + parent.getAccessibleNodeLabels()); + if (!diff.isEmpty()) { + throw new IOException("Some labels of child queue is not a subset " + + "of parent queue, these labels=[" + + StringUtils.join(diff, ",") + "]"); + } + } + } + } + + // calculate absolute capacity by each node label + this.absoluteNodeLabelCapacities = + CSQueueUtils.computeAbsoluteNodeLabelCapacities( + this.nodeLabelCapacities, parent); + + // calculate maximum capacity by each node label + this.absoluteMaximumNodeLabelCapacities = + CSQueueUtils.computeAbsoluteMaximumNodeLabelCapacities( + maximumNodeLabelCapacities, parent); + + // check absoluteMaximumNodeLabelCapacities is valid + CSQueueUtils.checkAbsoluteCapacitiesByLabel(getQueueName(), + absoluteNodeLabelCapacities, absoluteNodeLabelCapacities); + + this.reservationsContinueLooking = continueLooking; + } + + @Private + public Resource getMaximumAllocation() { + return maximumAllocation; + } + + @Private + public Resource getMinimumAllocation() { + return minimumAllocation; + } + + synchronized void allocateResource(Resource clusterResource, + Resource resource, Set nodeLabels) { + Resources.addTo(usedResources, resource); + + // Update usedResources by labels + if (nodeLabels == null || nodeLabels.isEmpty()) { + if (!usedResourcesByLabels.containsKey(DynamicNodeLabelsManager.NO_LABEL)) { + usedResourcesByLabels.put(DynamicNodeLabelsManager.NO_LABEL, + Resources.createResource(0)); + } + Resources.addTo(usedResourcesByLabels.get(DynamicNodeLabelsManager.NO_LABEL), + resource); + } else { + for (String label : Sets.intersection(labels, nodeLabels)) { + if (!usedResourcesByLabels.containsKey(label)) { + usedResourcesByLabels.put(label, Resources.createResource(0)); + } + Resources.addTo(usedResourcesByLabels.get(label), resource); + } + } + + ++numContainers; + CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(), + clusterResource, minimumAllocation); + } + + synchronized void releaseResource(Resource clusterResource, + Resource resource, Set nodeLabels) { + // Update queue metrics + Resources.subtractFrom(usedResources, resource); + + // Update usedResources by labels + if (nodeLabels.isEmpty()) { + if (!usedResourcesByLabels.containsKey(DynamicNodeLabelsManager.NO_LABEL)) { + usedResourcesByLabels.put(DynamicNodeLabelsManager.NO_LABEL, + Resources.createResource(0)); + } + Resources.subtractFrom( + usedResourcesByLabels.get(DynamicNodeLabelsManager.NO_LABEL), resource); + } else { + for (String label : Sets.intersection(labels, nodeLabels)) { + if (!usedResourcesByLabels.containsKey(label)) { + usedResourcesByLabels.put(label, Resources.createResource(0)); + } + Resources.subtractFrom(usedResourcesByLabels.get(label), resource); + } + } + + CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(), + clusterResource, minimumAllocation); + --numContainers; + } + + @Private + public float getCapacityByNodeLabel(String label) { + if (null == parent) { + if (this.labels.contains(NodeLabelsManager.ANY) + || this.labels.contains(label)) { + return 1f; + } + return 0; + } + + if (StringUtils.equals(label, DynamicNodeLabelsManager.NO_LABEL)) { + return getCapacity(); + } + + if (!nodeLabelCapacities.containsKey(label)) { + return 0; + } else { + return nodeLabelCapacities.get(label); + } + } + + @Private + public float getAbsoluteCapacityByNodeLabel(String label) { + if (null == parent) { + return 1; + } + + if (StringUtils.equals(label, DynamicNodeLabelsManager.NO_LABEL)) { + return getAbsoluteCapacity(); + } + + if (!absoluteMaximumNodeLabelCapacities.containsKey(label)) { + return 0; + } else { + return absoluteMaximumNodeLabelCapacities.get(label); + } + } + + @Private + public float getAbsoluteMaximumCapacityByNodeLabel(String label) { + if (StringUtils.equals(label, DynamicNodeLabelsManager.NO_LABEL)) { + return getAbsoluteMaximumCapacity(); + } + + return getAbsoluteCapacityByNodeLabel(label); + } + + @Private + public boolean getReservationContinueLooking() { + return reservationsContinueLooking; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index db893dc..031bbad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -72,9 +72,18 @@ /** * Get the configured capacity of the queue. - * @return queue capacity + * @return configured queue capacity */ public float getCapacity(); + + /** + * Get actual capacity of the queue, this may be different from + * configured capacity when mis-config take place, like add labels to the + * cluster + * + * @return actual queue capacity + */ + public float getAbsActualCapacity(); /** * Get capacity of the parent of the queue as a function of the @@ -259,4 +268,25 @@ public void detachContainer(Resource clusterResource, */ public void attachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer container); + + /** + * Get absolute capacity by label of this queue can use + * @param nodeLabel + * @return absolute capacity by label of this queue can use + */ + public float getAbsoluteCapacityByNodeLabel(String nodeLabel); + + /** + * Get absolute max capacity by label of this queue can use + * @param nodeLabel + * @return absolute capacity by label of this queue can use + */ + public float getAbsoluteMaximumCapacityByNodeLabel(String nodeLabel); + + /** + * Get capacity by node label + * @param nodeLabel + * @return capacity by node label + */ + public float getCapacityByNodeLabel(String nodeLabel); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 737062b..6f94119 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -17,9 +17,12 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -40,7 +43,7 @@ public static void checkMaxCapacity(String queueName, } } - public static void checkAbsoluteCapacities(String queueName, + public static void checkAbsoluteCapacity(String queueName, float absCapacity, float absMaxCapacity) { if (absMaxCapacity < (absCapacity - EPSILON)) { throw new IllegalArgumentException("Illegal call to setMaxCapacity. " @@ -49,6 +52,23 @@ public static void checkAbsoluteCapacities(String queueName, + ")"); } } + + public static void checkAbsoluteCapacitiesByLabel(String queueName, + Map absCapacities, + Map absMaximumCapacities) { + for (Entry entry : absCapacities.entrySet()) { + String label = entry.getKey(); + float absCapacity = entry.getValue(); + float absMaxCapacity = absMaximumCapacities.get(label); + if (absMaxCapacity < (absCapacity - EPSILON)) { + throw new IllegalArgumentException("Illegal call to setMaxCapacity. " + + "Queue '" + queueName + "' has " + "an absolute capacity (" + + absCapacity + ") greater than " + + "its absolute maximumCapacity (" + absMaxCapacity + ") of label=" + + label); + } + } + } public static float computeAbsoluteMaximumCapacity( float maximumCapacity, CSQueue parent) { @@ -56,6 +76,39 @@ public static float computeAbsoluteMaximumCapacity( (parent == null) ? 1.0f : parent.getAbsoluteMaximumCapacity(); return (parentAbsMaxCapacity * maximumCapacity); } + + public static Map computeAbsoluteNodeLabelCapacities( + Map nodeLabelToCapacities, CSQueue parent) { + if (parent == null) { + return nodeLabelToCapacities; + } + + Map absoluteNodeLabelToCapacities = + new HashMap(); + for (Entry entry : nodeLabelToCapacities.entrySet()) { + String label = entry.getKey(); + float capacity = entry.getValue(); + absoluteNodeLabelToCapacities.put(label, + capacity * parent.getAbsoluteCapacityByNodeLabel(label)); + } + return absoluteNodeLabelToCapacities; + } + + public static Map computeAbsoluteMaximumNodeLabelCapacities( + Map maximumNodeLabelToCapacities, CSQueue parent) { + if (parent == null) { + return maximumNodeLabelToCapacities; + } + Map absoluteMaximumNodeLabelToCapacities = + new HashMap(); + for (Entry entry : maximumNodeLabelToCapacities.entrySet()) { + String label = entry.getKey(); + float maxCapacity = entry.getValue(); + absoluteMaximumNodeLabelToCapacities.put(label, + maxCapacity * parent.getAbsoluteMaximumCapacityByNodeLabel(label)); + } + return absoluteMaximumNodeLabelToCapacities; + } public static int computeMaxActiveApplications( ResourceCalculator calculator, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6a3c7dc..1b7d09e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -20,7 +20,15 @@ import java.io.IOException; import java.io.InputStream; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -53,8 +61,13 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -191,6 +204,7 @@ public Configuration getConf() { private boolean scheduleAsynchronously; private AsyncScheduleThread asyncSchedulerThread; + private DynamicNodeLabelsManager labelManager; /** * EXPERT @@ -275,6 +289,8 @@ private synchronized void initScheduler(Configuration configuration) throws this.applications = new ConcurrentHashMap>(); + this.labelManager = rmContext.getNodeLabelManager(); + initializeQueues(this.conf); scheduleAsynchronously = this.conf.getScheduleAynschronously(); @@ -446,7 +462,7 @@ private void initializeQueues(CapacitySchedulerConfiguration conf) root = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, noop); - + labelManager.reinitializeQueueLabels(getQueueToLabels()); LOG.info("Initialized root queue " + root); initializeQueueMappings(); } @@ -469,6 +485,16 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) // Re-configure queues root.reinitialize(newRoot, clusterResource); initializeQueueMappings(); + + labelManager.reinitializeQueueLabels(getQueueToLabels()); + } + + private Map> getQueueToLabels() { + Map> queueToLabels = new HashMap>(); + for (CSQueue queue : queues.values()) { + queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels()); + } + return queueToLabels; } /** @@ -511,7 +537,7 @@ private void addNewQueues( @Lock(CapacityScheduler.class) static CSQueue parseQueue( - CapacitySchedulerContext csContext, + CapacitySchedulerContext csContext, CapacitySchedulerConfiguration conf, CSQueue parent, String queueName, Map queues, Map oldQueues, @@ -1077,11 +1103,18 @@ public void handle(SchedulerEvent event) { } private synchronized void addNode(RMNode nodeManager) { + // update this node to node label manager + if (labelManager != null) { + labelManager.activateNode(nodeManager.getNodeID(), + nodeManager.getTotalCapability()); + } + this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, usePortForNodeName)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); int numNodes = numNodeManagers.incrementAndGet(); + LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); @@ -1091,6 +1124,11 @@ private synchronized void addNode(RMNode nodeManager) { } private synchronized void removeNode(RMNode nodeInfo) { + // update this node to node label manager + if (labelManager != null) { + labelManager.deactivateNode(nodeInfo.getNodeID()); + } + FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID()); if (node == null) { return; @@ -1124,6 +1162,7 @@ private synchronized void removeNode(RMNode nodeInfo) { } this.nodes.remove(nodeInfo.getNodeID()); + LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index b1f239c..fdac05b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -18,7 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -83,6 +92,12 @@ public static final String STATE = "state"; @Private + public static final String ACCESSIBLE_NODE_LABELS = "accessible-node-labels"; + + @Private + public static final String DEFAULT_NODE_LABEL_EXPRESSION = + "default-node-label-expression"; + public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX + "reservations-continue-look-all-nodes"; @@ -268,6 +283,10 @@ private String getQueuePrefix(String queue) { return queueName; } + private String getNodeLabelPrefix(String queue, String label) { + return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT; + } + public int getMaximumSystemApplications() { int maxApplications = getInt(MAXIMUM_SYSTEM_APPLICATIONS, DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS); @@ -343,6 +362,15 @@ public void setMaximumCapacity(String queue, float maxCapacity) { ", maxCapacity=" + maxCapacity); } + public void setCapacityByLabel(String queue, String label, float capacity) { + setFloat(getNodeLabelPrefix(queue, label) + CAPACITY, capacity); + } + + public void setMaximumCapacityByLabel(String queue, String label, + float capacity) { + setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity); + } + public int getUserLimit(String queue) { int userLimit = getInt(getQueuePrefix(queue) + USER_LIMIT, DEFAULT_USER_LIMIT); @@ -372,6 +400,97 @@ public QueueState getState(String queue) { QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING; } + public void setAccessibleNodeLabels(String queue, Set labels) { + if (labels == null) { + return; + } + String str = StringUtils.join(",", labels); + set(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS, str); + } + + public Set getAccessibleNodeLabels(String queue) { + String labelStr = get(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS); + if (labelStr == null) { + return queue.equals(ROOT) ? DynamicNodeLabelsManager.EMPTY_STRING_SET : null; + } else { + Set set = new HashSet(); + for (String str : labelStr.split(",")) { + if (!str.trim().isEmpty()) { + set.add(str.trim()); + } + } + // if labels contains "*", only leave ANY behind + if (set.contains(DynamicNodeLabelsManager.ANY)) { + set.clear(); + set.add(DynamicNodeLabelsManager.ANY); + } + return Collections.unmodifiableSet(set); + } + } + + public Map getNodeLabelCapacities(String queue, + Set labels) { + Map nodeLabelCapacities = new HashMap(); + + if (labels == null) { + return nodeLabelCapacities; + } + + for (String label : labels) { + // capacity of all labels in each queue should be 1 + if (org.apache.commons.lang.StringUtils.equals(ROOT, queue)) { + nodeLabelCapacities.put(label, 1.0f); + continue; + } + float capacity = + getFloat(getNodeLabelPrefix(queue, label) + CAPACITY, UNDEFINED); + if (capacity < MINIMUM_CAPACITY_VALUE + || capacity > MAXIMUM_CAPACITY_VALUE) { + throw new IllegalArgumentException("Illegal " + "capacity of " + + capacity + " for label=" + label + " in queue=" + queue); + } + LOG.debug("CSConf - getCapacityOfLabel: prefix=" + + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity); + + nodeLabelCapacities.put(label, capacity / 100f); + } + return nodeLabelCapacities; + } + + public Map getMaximumNodeLabelCapacities(String queue, + Set labels) { + Map maximumNodeLabelCapacities = new HashMap(); + if (labels == null) { + return maximumNodeLabelCapacities; + } + + for (String label : labels) { + float maxCapacity = + getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, + UNDEFINED); + maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ? + MAXIMUM_CAPACITY_VALUE : maxCapacity; + if (maxCapacity < MINIMUM_CAPACITY_VALUE + || maxCapacity > MAXIMUM_CAPACITY_VALUE) { + throw new IllegalArgumentException("Illegal " + "capacity of " + + maxCapacity + " for label=" + label + " in queue=" + queue); + } + LOG.debug("CSConf - getCapacityOfLabel: prefix=" + + getNodeLabelPrefix(queue, label) + ", capacity=" + maxCapacity); + + maximumNodeLabelCapacities.put(label, maxCapacity / 100f); + } + return maximumNodeLabelCapacities; + } + + public String getDefaultNodeLabelExpression(String queue) { + return get(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION); + } + + public void setDefaultNodeLabelExpression(String queue, String exp) { + set(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION, exp); + } + /* * Returns whether we should continue to look at all heart beating nodes even * after the reservation limit was hit. The node heart beating in could diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f0cff71..163f446 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -24,12 +24,14 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -52,36 +54,30 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; @Private @Unstable -public class LeafQueue implements CSQueue { +public class LeafQueue extends AbstractCSQueue { private static final Log LOG = LogFactory.getLog(LeafQueue.class); - private final String queueName; - private CSQueue parent; - private float capacity; - private float absoluteCapacity; - private float maximumCapacity; - private float absoluteMaxCapacity; private float absoluteUsedCapacity = 0.0f; private int userLimit; private float userLimitFactor; @@ -95,10 +91,6 @@ private int maxActiveApplicationsPerUser; private int nodeLocalityDelay; - - private Resource usedResources = Resources.createResource(0, 0); - private float usedCapacity = 0.0f; - private volatile int numContainers; Set activeApplications; Map applicationAttemptMap = @@ -106,20 +98,9 @@ Set pendingApplications; - private final Resource minimumAllocation; - private final Resource maximumAllocation; private final float minimumAllocationFactor; private Map users = new HashMap(); - - private final QueueMetrics metrics; - - private QueueInfo queueInfo; - - private QueueState state; - - private Map acls = - new HashMap(); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -127,27 +108,18 @@ private CapacitySchedulerContext scheduler; private final ActiveUsersManager activeUsersManager; - - private final ResourceCalculator resourceCalculator; + + // cache last cluster resource to compute actual capacity + private Resource lastClusterResource = Resources.none(); private boolean reservationsContinueLooking; public LeafQueue(CapacitySchedulerContext cs, - String queueName, CSQueue parent, CSQueue old) { + String queueName, CSQueue parent, CSQueue old) throws IOException { + super(cs, queueName, parent, old); this.scheduler = cs; - this.queueName = queueName; - this.parent = parent; - - this.resourceCalculator = cs.getResourceCalculator(); - // must be after parent and queueName are initialized - this.metrics = old != null ? old.getMetrics() : - QueueMetrics.forQueue(getQueuePath(), parent, - cs.getConfiguration().getEnableUserMetrics(), - cs.getConf()); this.activeUsersManager = new ActiveUsersManager(metrics); - this.minimumAllocation = cs.getMinimumResourceCapability(); - this.maximumAllocation = cs.getMaximumResourceCapability(); this.minimumAllocationFactor = Resources.ratio(resourceCalculator, Resources.subtract(maximumAllocation, minimumAllocation), @@ -165,7 +137,8 @@ public LeafQueue(CapacitySchedulerContext cs, float userLimitFactor = cs.getConfiguration().getUserLimitFactor(getQueuePath()); - int maxApplications = cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath()); + int maxApplications = + cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath()); if (maxApplications < 0) { int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications(); maxApplications = (int)(maxSystemApps * absoluteCapacity); @@ -185,12 +158,10 @@ public LeafQueue(CapacitySchedulerContext cs, resourceCalculator, cs.getClusterResource(), this.minimumAllocation, maxAMResourcePerQueuePercent, absoluteCapacity); - int maxActiveApplicationsPerUser = - CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit, - userLimitFactor); + int maxActiveApplicationsPerUser = + CSQueueUtils.computeMaxActiveApplicationsPerUser( + maxActiveAppsUsingAbsCap, userLimit, userLimitFactor); - this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class); - this.queueInfo.setQueueName(queueName); this.queueInfo.setChildQueues(new ArrayList()); QueueState state = cs.getConfiguration().getState(getQueuePath()); @@ -198,14 +169,13 @@ public LeafQueue(CapacitySchedulerContext cs, Map acls = cs.getConfiguration().getAcls(getQueuePath()); - setupQueueConfigs( - cs.getClusterResource(), - capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, - userLimit, userLimitFactor, + setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, + maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser, - maxActiveApplications, maxActiveApplicationsPerUser, state, acls, - cs.getConfiguration().getNodeLocalityDelay(), + maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs + .getConfiguration().getNodeLocalityDelay(), labels, + defaultLabelExpression, this.nodeLabelCapacities, + this.maximumNodeLabelCapacities, cs.getConfiguration().getReservationContinueLook()); if(LOG.isDebugEnabled()) { @@ -219,7 +189,7 @@ public LeafQueue(CapacitySchedulerContext cs, new TreeSet(applicationComparator); this.activeApplications = new TreeSet(applicationComparator); } - + // externalizing in method, to allow overriding protected float getCapacityFromConf() { return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100; @@ -234,19 +204,22 @@ protected synchronized void setupQueueConfigs( int maxApplicationsPerUser, int maxActiveApplications, int maxActiveApplicationsPerUser, QueueState state, Map acls, int nodeLocalityDelay, - boolean continueLooking) - { + Set labels, String defaultLabelExpression, + Map capacitieByLabel, + Map maximumCapacitiesByLabel, boolean continueLooking) + throws IOException { + super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity, + maximumCapacity, absoluteMaxCapacity, state, acls, labels, + defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel, + continueLooking); // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); float absCapacity = getParent().getAbsoluteCapacity() * capacity; - CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity); + CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity, + absoluteMaxCapacity); - this.capacity = capacity; this.absoluteCapacity = absCapacity; - this.maximumCapacity = maximumCapacity; - this.absoluteMaxCapacity = absoluteMaxCapacity; - this.userLimit = userLimit; this.userLimitFactor = userLimitFactor; @@ -256,14 +229,20 @@ protected synchronized void setupQueueConfigs( this.maxActiveApplications = maxActiveApplications; this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser; - - this.state = state; - this.acls = acls; - - this.queueInfo.setCapacity(this.capacity); - this.queueInfo.setMaximumCapacity(this.maximumCapacity); - this.queueInfo.setQueueState(this.state); + if (!SchedulerUtils.checkQueueLabelExpression(this.labels, + this.defaultLabelExpression)) { + throw new IOException("Invalid default label expression of " + + " queue=" + + queueInfo.getQueueName() + + " doesn't have permission to access all labels " + + "in default label expression. labelExpression of resource request=" + + (this.defaultLabelExpression == null ? "" + : this.defaultLabelExpression) + + ". Queue labels=" + + (queueInfo.getNodeLabels() == null ? "" : StringUtils.join(queueInfo + .getNodeLabels().iterator(), ','))); + } this.nodeLocalityDelay = nodeLocalityDelay; this.reservationsContinueLooking = continueLooking; @@ -272,11 +251,14 @@ protected synchronized void setupQueueConfigs( for (Map.Entry e : acls.entrySet()) { aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); } - - // Update metrics - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, getParent(), clusterResource, - minimumAllocation); + + StringBuilder labelStrBuilder = new StringBuilder(); + if (labels != null) { + for (String s : labels) { + labelStrBuilder.append(s); + labelStrBuilder.append(","); + } + } LOG.info("Initializing " + queueName + "\n" + "capacity = " + capacity + @@ -331,50 +313,12 @@ protected synchronized void setupQueueConfigs( " [= configuredState ]" + "\n" + "acls = " + aclsString + " [= configuredAcls ]" + "\n" + + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + + "labels=" + labelStrBuilder.toString() + "\n" + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + "reservationsContinueLooking = " + reservationsContinueLooking + "\n"); } - - @Override - public synchronized float getCapacity() { - return capacity; - } - - @Override - public synchronized float getAbsoluteCapacity() { - return absoluteCapacity; - } - - @Override - public synchronized float getMaximumCapacity() { - return maximumCapacity; - } - - @Override - public synchronized float getAbsoluteMaximumCapacity() { - return absoluteMaxCapacity; - } - - @Override - public synchronized float getAbsoluteUsedCapacity() { - return absoluteUsedCapacity; - } - - @Override - public synchronized CSQueue getParent() { - return parent; - } - - @Override - public synchronized void setParent(CSQueue newParentQueue) { - this.parent = (ParentQueue)newParentQueue; - } - - @Override - public String getQueueName() { - return queueName; - } @Override public String getQueuePath() { @@ -385,22 +329,6 @@ public String getQueuePath() { * Used only by tests. */ @Private - public Resource getMinimumAllocation() { - return minimumAllocation; - } - - /** - * Used only by tests. - */ - @Private - public Resource getMaximumAllocation() { - return maximumAllocation; - } - - /** - * Used only by tests. - */ - @Private public float getMinimumAllocationFactor() { return minimumAllocationFactor; } @@ -435,45 +363,9 @@ public ActiveUsersManager getActiveUsersManager() { } @Override - public synchronized float getUsedCapacity() { - return usedCapacity; - } - - @Override - public synchronized Resource getUsedResources() { - return usedResources; - } - - @Override public List getChildQueues() { return null; } - - @Override - public synchronized void setUsedCapacity(float usedCapacity) { - this.usedCapacity = usedCapacity; - } - - @Override - public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) { - this.absoluteUsedCapacity = absUsedCapacity; - } - - /** - * Set maximum capacity - used only for testing. - * @param maximumCapacity new max capacity - */ - synchronized void setMaxCapacity(float maximumCapacity) { - // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - float absMaxCapacity = - CSQueueUtils.computeAbsoluteMaximumCapacity( - maximumCapacity, getParent()); - CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity); - - this.maximumCapacity = maximumCapacity; - this.absoluteMaxCapacity = absMaxCapacity; - } /** * Set user limit - used only for testing. @@ -567,11 +459,6 @@ public int getNodeLocalityDelay() { return nodeLocalityDelay; } - @Private - boolean getReservationContinueLooking() { - return reservationsContinueLooking; - } - public String toString() { return queueName + ": " + "capacity=" + capacity + ", " + @@ -582,6 +469,11 @@ public String toString() { "numApps=" + getNumApplications() + ", " + "numContainers=" + getNumContainers(); } + + @VisibleForTesting + public synchronized void setNodeLabelManager(DynamicNodeLabelsManager mgr) { + this.labelManager = mgr; + } @VisibleForTesting public synchronized User getUser(String userName) { @@ -631,6 +523,10 @@ public synchronized void reinitialize( newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls, newlyParsedLeafQueue.getNodeLocalityDelay(), + newlyParsedLeafQueue.labels, + newlyParsedLeafQueue.defaultLabelExpression, + newlyParsedLeafQueue.nodeLabelCapacities, + newlyParsedLeafQueue.maximumNodeLabelCapacities, newlyParsedLeafQueue.reservationsContinueLooking); // queue metrics are updated, more resource may be available @@ -639,19 +535,6 @@ public synchronized void reinitialize( } @Override - public boolean hasAccess(QueueACL acl, UserGroupInformation user) { - // Check if the leaf-queue allows access - synchronized (this) { - if (acls.get(acl).isUserAllowed(user)) { - return true; - } - } - - // Check if parent-queue allows access - return getParent().hasAccess(acl, user); - } - - @Override public void submitApplicationAttempt(FiCaSchedulerApp application, String userName) { // Careful! Locking order is important! @@ -747,7 +630,8 @@ private synchronized void activateApplications() { } } - private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) { + private synchronized void addApplicationAttempt(FiCaSchedulerApp application, + User user) { // Accept user.submitApplication(); pendingApplications.add(application); @@ -783,7 +667,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) getParent().finishApplicationAttempt(application, queue); } - public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, User user) { + public synchronized void removeApplicationAttempt( + FiCaSchedulerApp application, User user) { boolean wasActive = activeApplications.remove(application); if (!wasActive) { pendingApplications.remove(application); @@ -828,6 +713,12 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, + " #applications=" + activeApplications.size()); } + // if our queue cannot access this node, just return + if (!SchedulerUtils.checkQueueAccessToNode(labels, + labelManager.getLabelsOnNode(node.getNodeID()))) { + return NULL_ASSIGNMENT; + } + // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { @@ -888,7 +779,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, required); // Check queue max-capacity limit - if (!assignToQueue(clusterResource, required, application, true)) { + if (!assignToQueue(clusterResource, required, + labelManager.getLabelsOnNode(node.getNodeID()), application, true)) { return NULL_ASSIGNMENT; } @@ -920,7 +812,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Book-keeping // Note: Update headroom to account for current allocation too... - allocateResource(clusterResource, application, assigned); + allocateResource(clusterResource, application, assigned, + labelManager.getLabelsOnNode(node.getNodeID())); // Don't reset scheduling opportunities for non-local assignments // otherwise the app will be delayed for each non-local assignment. @@ -971,27 +864,43 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); } - - @Private - protected synchronized boolean assignToQueue(Resource clusterResource, - Resource required, FiCaSchedulerApp application, + synchronized boolean assignToQueue(Resource clusterResource, + Resource required, Set nodeLabels, FiCaSchedulerApp application, boolean checkReservations) { - - Resource potentialTotalResource = Resources.add(usedResources, required); - // Check how of the cluster's absolute capacity we are currently using... - float potentialNewCapacity = Resources.divide(resourceCalculator, - clusterResource, potentialTotalResource, clusterResource); - if (potentialNewCapacity > absoluteMaxCapacity) { + // Get label of this queue can access, it's (nodeLabel AND queueLabel) + Set labelCanAccess; + if (null == nodeLabels || nodeLabels.isEmpty()) { + labelCanAccess = new HashSet(); + // Any queue can always access any node without label + labelCanAccess.add(DynamicNodeLabelsManager.NO_LABEL); + } else { + labelCanAccess = new HashSet(Sets.intersection(labels, nodeLabels)); + } + + boolean canAssign = false; + for (String label : labelCanAccess) { + if (!usedResourcesByLabels.containsKey(label)) { + usedResourcesByLabels.put(label, Resources.createResource(0)); + } + + Resource potentialTotalCapacity = + Resources.add(usedResourcesByLabels.get(label), required); + + float potenialNewCapacity = + Resources.divide(resourceCalculator, clusterResource, + potentialTotalCapacity, + labelManager.getResourceByLabel(label, clusterResource)); // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && checkReservations) { - + // TODO, now only consider reservation cases when the node has no label + if (this.reservationsContinueLooking && checkReservations + && label.equals(DynamicNodeLabelsManager.NO_LABEL)) { float potentialNewWithoutReservedCapacity = Resources.divide( resourceCalculator, clusterResource, - Resources.subtract(potentialTotalResource, - application.getCurrentReservation()), - clusterResource); + Resources.subtract(potentialTotalCapacity, + application.getCurrentReservation()), + labelManager.getResourceByLabel(label, clusterResource)); if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) { if (LOG.isDebugEnabled()) { @@ -1013,33 +922,41 @@ protected synchronized boolean assignToQueue(Resource clusterResource, // we could potentially use this node instead of reserved node return true; } - } + + // otherwise, if any of the label doesn't beyond limit, we can allocate on this node + if (potenialNewCapacity <= getAbsoluteMaximumCapacityByNodeLabel(label)) { + canAssign = true; + break; + } + if (LOG.isDebugEnabled()) { LOG.debug(getQueueName() - + " usedResources: " + usedResources + + "Check assign to queue, label=" + label + + " usedResources: " + usedResourcesByLabels.get(label) + " clusterResources: " + clusterResource + " currentCapacity " + Resources.divide(resourceCalculator, clusterResource, - usedResources, clusterResource) + " required " + required - + " potentialNewCapacity: " + potentialNewCapacity + " ( " + usedResourcesByLabels.get(label), + labelManager.getResourceByLabel(label, clusterResource)) + + " potentialNewCapacity: " + potenialNewCapacity + " ( " + " max-capacity: " + absoluteMaxCapacity + ")"); } - return false; } - return true; + + return canAssign; } @Lock({LeafQueue.class, FiCaSchedulerApp.class}) private Resource computeUserLimitAndSetHeadroom( - FiCaSchedulerApp application, Resource clusterResource, Resource required) { - + FiCaSchedulerApp application, Resource clusterResource, Resource required) { String user = application.getUser(); - /** - * Headroom is min((userLimit, queue-max-cap) - consumed) + /** + * Headroom = min(userLimit, queue-max-cap, max-capacity-consider-label) - + * consumed */ Resource userLimit = // User limit @@ -1058,11 +975,21 @@ private Resource computeUserLimitAndSetHeadroom( absoluteMaxAvailCapacity, minimumAllocation); - Resource userConsumed = getUser(user).getConsumedResources(); - Resource headroom = + // Max possible capacity this queue can access, will consider label only. + Resource maxCapacityConsiderLabel = + labelManager == null ? clusterResource : labelManager.getQueueResource( + queueName, labels, clusterResource); + maxCapacityConsiderLabel = + Resources.roundDown(resourceCalculator, maxCapacityConsiderLabel, + minimumAllocation); + Resource userConsumed = getUser(user).getConsumedResources(); + + Resource headroom = Resources.subtract( - Resources.min(resourceCalculator, clusterResource, - userLimit, queueMaxCap), + Resources.min(resourceCalculator, clusterResource, + Resources.min(resourceCalculator, clusterResource, userLimit, + queueMaxCap), + maxCapacityConsiderLabel), userConsumed); if (LOG.isDebugEnabled()) { @@ -1191,7 +1118,8 @@ protected synchronized boolean assignToUser(Resource clusterResource, return true; } - boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource required) { + boolean needContainers(FiCaSchedulerApp application, Priority priority, + Resource required) { int requiredContainers = application.getTotalRequiredResources(priority); int reservedContainers = application.getNumReservedContainers(priority); int starvation = 0; @@ -1221,10 +1149,9 @@ boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource return (((starvation + requiredContainers) - reservedContainers) > 0); } - private CSAssignment assignContainersOnNode(Resource clusterResource, - FiCaSchedulerNode node, FiCaSchedulerApp application, - Priority priority, RMContainer reservedContainer, boolean needToUnreserve) { - + private CSAssignment assignContainersOnNode(Resource clusterResource, + FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, + RMContainer reservedContainer, boolean needToUnreserve) { Resource assigned = Resources.none(); // Data-local @@ -1331,8 +1258,9 @@ protected boolean checkLimitsToReserve(Resource clusterResource, Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, capability); - // Check queue max-capacity limit - if (!assignToQueue(clusterResource, capability, application, false)) { + // Check queue max-capacity limit, + // TODO: Consider reservation on labels + if (!assignToQueue(clusterResource, capability, null, application, false)) { if (LOG.isDebugEnabled()) { LOG.debug("was going to reserve but hit queue limit"); } @@ -1479,6 +1407,20 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod + " request=" + request + " type=" + type + " needToUnreserve= " + needToUnreserve); } + + // check if the resource request can access the label + if (!SchedulerUtils.checkNodeLabelExpression( + labelManager.getLabelsOnNode(node.getNodeID()), + request.getNodeLabelExpression())) { + // this is a reserved container, but we cannot allocate it now according + // to label not match. This can be caused by node label changed + // We should un-reserve this container. + if (rmContainer != null) { + unreserve(application, priority, node, rmContainer); + } + return Resources.none(); + } + Resource capability = request.getCapability(); Resource available = node.getAvailableResource(); Resource totalResource = node.getTotalResource(); @@ -1658,8 +1600,9 @@ public void completedContainer(Resource clusterResource, // Book-keeping if (removed) { - releaseResource(clusterResource, - application, container.getResource()); + releaseResource(clusterResource, application, + container.getResource(), + labelManager.getLabelsOnNode(node.getNodeID())); LOG.info("completedContainer" + " container=" + container + " queue=" + this + @@ -1675,14 +1618,11 @@ public void completedContainer(Resource clusterResource, } } - synchronized void allocateResource(Resource clusterResource, - SchedulerApplicationAttempt application, Resource resource) { - // Update queue metrics - Resources.addTo(usedResources, resource); - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, getParent(), clusterResource, minimumAllocation); - ++numContainers; - + synchronized void allocateResource(Resource clusterResource, + SchedulerApplicationAttempt application, Resource resource, + Set nodeLabels) { + super.allocateResource(clusterResource, resource, nodeLabels); + // Update user metrics String userName = application.getUser(); User user = getUser(userName); @@ -1703,14 +1643,9 @@ synchronized void allocateResource(Resource clusterResource, } synchronized void releaseResource(Resource clusterResource, - FiCaSchedulerApp application, Resource resource) { - // Update queue metrics - Resources.subtractFrom(usedResources, resource); - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, getParent(), clusterResource, - minimumAllocation); - --numContainers; - + FiCaSchedulerApp application, Resource resource, Set nodeLabels) { + super.releaseResource(clusterResource, resource, nodeLabels); + // Update user metrics String userName = application.getUser(); User user = getUser(userName); @@ -1724,6 +1659,8 @@ synchronized void releaseResource(Resource clusterResource, @Override public synchronized void updateClusterResource(Resource clusterResource) { + lastClusterResource = clusterResource; + // Update queue properties maxActiveApplications = CSQueueUtils.computeMaxActiveApplications( @@ -1756,11 +1693,6 @@ public synchronized void updateClusterResource(Resource clusterResource) { } } } - - @Override - public QueueMetrics getMetrics() { - return metrics; - } @VisibleForTesting public static class User { @@ -1820,7 +1752,8 @@ public void recoverContainer(Resource clusterResource, // Careful! Locking order is important! synchronized (this) { allocateResource(clusterResource, attempt, rmContainer.getContainer() - .getResource()); + .getResource(), labelManager.getLabelsOnNode(rmContainer + .getContainer().getNodeId())); } getParent().recoverContainer(clusterResource, attempt, rmContainer); } @@ -1858,7 +1791,8 @@ public void attachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { allocateResource(clusterResource, application, rmContainer.getContainer() - .getResource()); + .getResource(), labelManager.getLabelsOnNode(rmContainer + .getContainer().getNodeId())); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() @@ -1874,7 +1808,8 @@ public void detachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { releaseResource(clusterResource, application, rmContainer.getContainer() - .getResource()); + .getResource(), labelManager.getLabelsOnNode(rmContainer.getContainer() + .getNodeId())); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() @@ -1885,6 +1820,24 @@ public void detachContainer(Resource clusterResource, } } + @Override + public float getAbsActualCapacity() { + if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + lastClusterResource, Resources.none())) { + return absoluteCapacity; + } + + Resource resourceRespectLabels = + labelManager == null ? lastClusterResource : labelManager + .getQueueResource(queueName, labels, lastClusterResource); + float absActualCapacity = + Resources.divide(resourceCalculator, lastClusterResource, + resourceRespectLabels, lastClusterResource); + + return absActualCapacity > absoluteCapacity ? absoluteCapacity + : absActualCapacity; + } + public void setCapacity(float capacity) { this.capacity = capacity; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 011c99c..e3ed68b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -21,8 +21,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -46,77 +46,37 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.NodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.collect.Sets; + @Private @Evolving -public class ParentQueue implements CSQueue { +public class ParentQueue extends AbstractCSQueue { private static final Log LOG = LogFactory.getLog(ParentQueue.class); - private CSQueue parent; - private final String queueName; - - private float capacity; - private float maximumCapacity; - private float absoluteCapacity; - private float absoluteMaxCapacity; - private float absoluteUsedCapacity = 0.0f; - - private float usedCapacity = 0.0f; - - protected final Set childQueues; - private final Comparator queueComparator; - - private Resource usedResources = Resources.createResource(0, 0); - + protected final Set childQueues; private final boolean rootQueue; - - private final Resource minimumAllocation; - - private volatile int numApplications; - private volatile int numContainers; - - private QueueState state; - - private final QueueMetrics metrics; - - private QueueInfo queueInfo; - - private Map acls = - new HashMap(); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private final ResourceCalculator resourceCalculator; - - private boolean reservationsContinueLooking; - public ParentQueue(CapacitySchedulerContext cs, - String queueName, CSQueue parent, CSQueue old) { - minimumAllocation = cs.getMinimumResourceCapability(); - - this.parent = parent; - this.queueName = queueName; - this.rootQueue = (parent == null); - this.resourceCalculator = cs.getResourceCalculator(); + String queueName, CSQueue parent, CSQueue old) throws IOException { + super(cs, queueName, parent, old); - // must be called after parent and queueName is set - this.metrics = old != null ? old.getMetrics() : - QueueMetrics.forQueue(getQueuePath(), parent, - cs.getConfiguration().getEnableUserMetrics(), - cs.getConf()); + this.rootQueue = (parent == null); float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath()); @@ -141,17 +101,14 @@ public ParentQueue(CapacitySchedulerContext cs, Map acls = cs.getConfiguration().getAcls(getQueuePath()); - - this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class); - this.queueInfo.setQueueName(queueName); + this.queueInfo.setChildQueues(new ArrayList()); - setupQueueConfigs(cs.getClusterResource(), - capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, state, acls, + setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, + maximumCapacity, absoluteMaxCapacity, state, acls, labels, + defaultLabelExpression, nodeLabelCapacities, maximumNodeLabelCapacities, cs.getConfiguration().getReservationContinueLook()); - this.queueComparator = cs.getQueueComparator(); this.childQueues = new TreeSet(queueComparator); LOG.info("Initialized parent-queue " + queueName + @@ -159,41 +116,29 @@ public ParentQueue(CapacitySchedulerContext cs, ", fullname=" + getQueuePath()); } - protected synchronized void setupQueueConfigs( - Resource clusterResource, - float capacity, float absoluteCapacity, - float maximumCapacity, float absoluteMaxCapacity, + synchronized void setupQueueConfigs(Resource clusterResource, float capacity, + float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, QueueState state, Map acls, - boolean continueLooking - ) { - // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absoluteMaxCapacity); - - this.capacity = capacity; - this.absoluteCapacity = absoluteCapacity; - - this.maximumCapacity = maximumCapacity; - this.absoluteMaxCapacity = absoluteMaxCapacity; - - this.state = state; - - this.acls = acls; - - this.queueInfo.setCapacity(this.capacity); - this.queueInfo.setMaximumCapacity(this.maximumCapacity); - this.queueInfo.setQueueState(this.state); - - this.reservationsContinueLooking = continueLooking; - - StringBuilder aclsString = new StringBuilder(); + Set labels, String defaultLabelExpression, + Map nodeLabelCapacities, + Map maximumCapacitiesByLabel, boolean continueLooking) + throws IOException { + super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity, + maximumCapacity, absoluteMaxCapacity, state, acls, labels, + defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel, + continueLooking); + StringBuilder aclsString = new StringBuilder(); for (Map.Entry e : acls.entrySet()) { aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); } - // Update metrics - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, parent, clusterResource, minimumAllocation); + StringBuilder labelStrBuilder = new StringBuilder(); + if (labels != null) { + for (String s : labels) { + labelStrBuilder.append(s); + labelStrBuilder.append(","); + } + } LOG.info(queueName + ", capacity=" + capacity + @@ -201,13 +146,13 @@ protected synchronized void setupQueueConfigs( ", maxCapacity=" + maximumCapacity + ", asboluteMaxCapacity=" + absoluteMaxCapacity + ", state=" + state + - ", acls=" + aclsString + + ", acls=" + aclsString + + ", labels=" + labelStrBuilder.toString() + "\n" + ", reservationsContinueLooking=" + reservationsContinueLooking); } private static float PRECISION = 0.0005f; // 0.05% precision void setChildQueues(Collection childQueues) { - // Validate float childCapacities = 0; for (CSQueue queue : childQueues) { @@ -221,6 +166,21 @@ void setChildQueues(Collection childQueues) { " capacity of " + childCapacities + " for children of queue " + queueName); } + // check label capacities + for (String nodeLabel : labelManager.getClusterNodeLabels()) { + float capacityByLabel = getCapacityByNodeLabel(nodeLabel); + // check children's labels + float sum = 0; + for (CSQueue queue : childQueues) { + sum += queue.getCapacityByNodeLabel(nodeLabel); + } + if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION) + || (capacityByLabel == 0) && (sum > 0)) { + throw new IllegalArgumentException("Illegal" + " capacity of " + + sum + " for children of queue " + queueName + + " for label=" + nodeLabel); + } + } this.childQueues.clear(); this.childQueues.addAll(childQueues); @@ -228,21 +188,6 @@ void setChildQueues(Collection childQueues) { LOG.debug("setChildQueues: " + getChildQueuesToPrint()); } } - - @Override - public synchronized CSQueue getParent() { - return parent; - } - - @Override - public synchronized void setParent(CSQueue newParentQueue) { - this.parent = (ParentQueue)newParentQueue; - } - - @Override - public String getQueueName() { - return queueName; - } @Override public String getQueuePath() { @@ -251,65 +196,6 @@ public String getQueuePath() { } @Override - public synchronized float getCapacity() { - return capacity; - } - - @Override - public synchronized float getAbsoluteCapacity() { - return absoluteCapacity; - } - - @Override - public float getAbsoluteMaximumCapacity() { - return absoluteMaxCapacity; - } - - @Override - public synchronized float getAbsoluteUsedCapacity() { - return absoluteUsedCapacity; - } - - @Override - public float getMaximumCapacity() { - return maximumCapacity; - } - - @Override - public ActiveUsersManager getActiveUsersManager() { - // Should never be called since all applications are submitted to LeafQueues - return null; - } - - @Override - public synchronized float getUsedCapacity() { - return usedCapacity; - } - - @Override - public synchronized Resource getUsedResources() { - return usedResources; - } - - @Override - public synchronized List getChildQueues() { - return new ArrayList(childQueues); - } - - public synchronized int getNumContainers() { - return numContainers; - } - - public synchronized int getNumApplications() { - return numApplications; - } - - @Override - public synchronized QueueState getState() { - return state; - } - - @Override public synchronized QueueInfo getQueueInfo( boolean includeChildQueues, boolean recursive) { queueInfo.setCurrentCapacity(usedCapacity); @@ -391,6 +277,10 @@ public synchronized void reinitialize( newlyParsedParentQueue.absoluteMaxCapacity, newlyParsedParentQueue.state, newlyParsedParentQueue.acls, + newlyParsedParentQueue.labels, + newlyParsedParentQueue.defaultLabelExpression, + newlyParsedParentQueue.nodeLabelCapacities, + newlyParsedParentQueue.maximumNodeLabelCapacities, newlyParsedParentQueue.reservationsContinueLooking); // Re-configure existing child queues and add new ones @@ -434,21 +324,6 @@ public synchronized void reinitialize( } return queuesMap; } - - @Override - public boolean hasAccess(QueueACL acl, UserGroupInformation user) { - synchronized (this) { - if (acls.get(acl).isUserAllowed(user)) { - return true; - } - } - - if (parent != null) { - return parent.hasAccess(acl, user); - } - - return false; - } @Override public void submitApplication(ApplicationId applicationId, String user, @@ -532,30 +407,6 @@ public synchronized void removeApplication(ApplicationId applicationId, " leaf-queue of parent: " + getQueueName() + " #applications: " + getNumApplications()); } - - @Override - public synchronized void setUsedCapacity(float usedCapacity) { - this.usedCapacity = usedCapacity; - } - - @Override - public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) { - this.absoluteUsedCapacity = absUsedCapacity; - } - - /** - * Set maximum capacity - used only for testing. - * @param maximumCapacity new max capacity - */ - synchronized void setMaxCapacity(float maximumCapacity) { - // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); - CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity); - - this.maximumCapacity = maximumCapacity; - this.absoluteMaxCapacity = absMaxCapacity; - } @Override public synchronized CSAssignment assignContainers( @@ -571,7 +422,8 @@ public synchronized CSAssignment assignContainers( boolean localNeedToUnreserve = false; // Are we over maximum-capacity for this queue? - if (!assignToQueue(clusterResource)) { + if (!assignToQueue(clusterResource, + labelManager.getLabelsOnNode(node.getNodeID()))) { // check to see if we could if we unreserve first localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource); if (!localNeedToUnreserve) { @@ -589,7 +441,8 @@ public synchronized CSAssignment assignContainers( resourceCalculator, clusterResource, assignedToChild.getResource(), Resources.none())) { // Track resource utilization for the parent-queue - allocateResource(clusterResource, assignedToChild.getResource()); + allocateResource(clusterResource, assignedToChild.getResource(), + labelManager.getLabelsOnNode(node.getNodeID())); // Track resource utilization in this pass of the scheduler Resources.addTo(assignment.getResource(), assignedToChild.getResource()); @@ -628,22 +481,38 @@ public synchronized CSAssignment assignContainers( return assignment; } - private synchronized boolean assignToQueue(Resource clusterResource) { - // Check how of the cluster's absolute capacity we are currently using... - float currentCapacity = - Resources.divide( - resourceCalculator, clusterResource, - usedResources, clusterResource); + private synchronized boolean assignToQueue(Resource clusterResource, + Set nodeLabels) { + Set labelCanAccess = + new HashSet(labels.contains(NodeLabelsManager.ANY) ? nodeLabels + : Sets.intersection(labels, nodeLabels)); + if (nodeLabels.isEmpty()) { + // Any queue can always access any node without label + labelCanAccess.add(DynamicNodeLabelsManager.NO_LABEL); + } - if (currentCapacity >= absoluteMaxCapacity) { - LOG.info(getQueueName() + - " used=" + usedResources + - " current-capacity (" + currentCapacity + ") " + - " >= max-capacity (" + absoluteMaxCapacity + ")"); - return false; + boolean canAssign = false; + for (String label : labelCanAccess) { + if (!usedResourcesByLabels.containsKey(label)) { + usedResourcesByLabels.put(label, Resources.createResource(0)); + } + float currentLabelUsedCapacity = + Resources.divide(resourceCalculator, clusterResource, + usedResourcesByLabels.get(label), + labelManager.getResourceByLabel(label, clusterResource)); + // if any of the label doesn't beyond limit, we can allocate on this node + if (currentLabelUsedCapacity >= getAbsoluteMaximumCapacityByNodeLabel(label)) { + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + " used=" + usedResources + + " current-capacity (" + usedResourcesByLabels.get(label) + ") " + + " >= max-capacity (" + labelManager.getResourceByLabel(label, clusterResource) + ")"); + } + } else { + canAssign = true; + } } - return true; - + + return canAssign; } @@ -749,8 +618,8 @@ public void completedContainer(Resource clusterResource, // Careful! Locking order is important! // Book keeping synchronized (this) { - releaseResource(clusterResource, - rmContainer.getContainer().getResource()); + releaseResource(clusterResource, rmContainer.getContainer() + .getResource(), labelManager.getLabelsOnNode(node.getNodeID())); LOG.info("completedContainer" + " queue=" + getQueueName() + @@ -787,27 +656,6 @@ public void completedContainer(Resource clusterResource, } } - @Private - boolean getReservationContinueLooking() { - return reservationsContinueLooking; - } - - synchronized void allocateResource(Resource clusterResource, - Resource resource) { - Resources.addTo(usedResources, resource); - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, parent, clusterResource, minimumAllocation); - ++numContainers; - } - - synchronized void releaseResource(Resource clusterResource, - Resource resource) { - Resources.subtractFrom(usedResources, resource); - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, parent, clusterResource, minimumAllocation); - --numContainers; - } - @Override public synchronized void updateClusterResource(Resource clusterResource) { // Update all children @@ -821,10 +669,9 @@ public synchronized void updateClusterResource(Resource clusterResource) { } @Override - public QueueMetrics getMetrics() { - return metrics; + public synchronized List getChildQueues() { + return new ArrayList(childQueues); } - @Override public void recoverContainer(Resource clusterResource, @@ -834,12 +681,20 @@ public void recoverContainer(Resource clusterResource, } // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource,rmContainer.getContainer().getResource()); + allocateResource(clusterResource, rmContainer.getContainer() + .getResource(), labelManager.getLabelsOnNode(rmContainer + .getContainer().getNodeId())); } if (parent != null) { parent.recoverContainer(clusterResource, attempt, rmContainer); } } + + @Override + public ActiveUsersManager getActiveUsersManager() { + // Should never be called since all applications are submitted to LeafQueues + return null; + } @Override public void collectSchedulerApplications( @@ -854,7 +709,8 @@ public void attachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { allocateResource(clusterResource, rmContainer.getContainer() - .getResource()); + .getResource(), labelManager.getLabelsOnNode(rmContainer + .getContainer().getNodeId())); LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" @@ -870,7 +726,9 @@ public void attachContainer(Resource clusterResource, public void detachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { if (application != null) { - releaseResource(clusterResource, rmContainer.getContainer().getResource()); + releaseResource(clusterResource, + rmContainer.getContainer().getResource(), + labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId())); LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" @@ -882,6 +740,13 @@ public void detachContainer(Resource clusterResource, } } + @Override + public float getAbsActualCapacity() { + // for now, simply return actual capacity = guaranteed capacity for parent + // queue + return absoluteCapacity; + } + public Map getACLs() { return acls; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index b87744d..d0251a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import org.apache.hadoop.yarn.api.records.Resource; @@ -47,7 +49,7 @@ private boolean showReservationsAsQueues; public PlanQueue(CapacitySchedulerContext cs, String queueName, - CSQueue parent, CSQueue old) { + CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); this.schedulerContext = cs; @@ -99,11 +101,14 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue, } // Set new configs + // TODO: add support for node labels setupQueueConfigs(clusterResource, newlyParsedParentQueue.getCapacity(), newlyParsedParentQueue.getAbsoluteCapacity(), newlyParsedParentQueue.getMaximumCapacity(), newlyParsedParentQueue.getAbsoluteMaximumCapacity(), newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs(), + new HashSet(), null, + new HashMap(), new HashMap(), newlyParsedParentQueue.getReservationContinueLooking()); updateQuotas(newlyParsedParentQueue.userLimit, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index 8e61821..c4424b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -42,7 +42,7 @@ private int maxSystemApps; public ReservationQueue(CapacitySchedulerContext cs, String queueName, - PlanQueue parent) { + PlanQueue parent) throws IOException { super(cs, queueName, parent, null); maxSystemApps = cs.getConfiguration().getMaximumSystemApplications(); // the following parameters are common to all reservation in the plan diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index ff8e873..ebf604d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -65,6 +65,9 @@ LeafQueue queue; private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); + + RMContext rmContext = null; + @Before public void setUp() throws IOException { @@ -73,7 +76,9 @@ public void setUp() throws IOException { YarnConfiguration conf = new YarnConfiguration(); setupQueueConfiguration(csConf); - + rmContext = TestUtils.getMockRMContext(); + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); when(csContext.getConfiguration()).thenReturn(csConf); when(csContext.getConf()).thenReturn(conf); @@ -89,6 +94,8 @@ public void setUp() throws IOException { thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); + when(csContext.getRMContext()).thenReturn(rmContext); + RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(conf); containerTokenSecretManager.rollMasterKey(); @@ -162,6 +169,7 @@ public void testLimitsComputation() throws Exception { when(csContext.getQueueComparator()). thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + when(csContext.getRMContext()).thenReturn(rmContext); // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16); @@ -475,6 +483,7 @@ public void testHeadroom() throws Exception { when(csContext.getQueueComparator()). thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + when(csContext.getRMContext()).thenReturn(rmContext); // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java index 7260afd..297c551 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java @@ -19,38 +19,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.when; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; public class TestCSQueueUtils { @@ -88,6 +69,8 @@ public void runInvalidDivisorTest(boolean useDominant) throws Exception { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(0, 0)); + RMContext rmContext = mock(RMContext.class); + when(csContext.getRMContext()).thenReturn(rmContext); final String L1Q1 = "L1Q1"; csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1}); @@ -129,6 +112,8 @@ public void testAbsoluteMaxAvailCapacityNoUse() throws Exception { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 32)); + RMContext rmContext = mock(RMContext.class); + when(csContext.getRMContext()).thenReturn(rmContext); final String L1Q1 = "L1Q1"; csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1}); @@ -174,6 +159,9 @@ public void testAbsoluteMaxAvailCapacityWithUse() throws Exception { when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 32)); + RMContext rmContext = mock(RMContext.class); + when(csContext.getRMContext()).thenReturn(rmContext); + final String L1Q1 = "L1Q1"; final String L1Q2 = "L1Q2"; final String L2Q1 = "L2Q1"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index f7c098c..76f6c5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -962,10 +962,7 @@ public void testNumClusterNodes() throws Exception { YarnConfiguration conf = new YarnConfiguration(); CapacityScheduler cs = new CapacityScheduler(); cs.setConf(conf); - RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, - null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null); + RMContext rmContext = TestUtils.getMockRMContext(); cs.setRMContext(rmContext); CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index fdb9028..af58a43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -99,6 +99,7 @@ public void setUp() throws Exception { thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); + when(csContext.getRMContext()).thenReturn(rmContext); } private FiCaSchedulerApp getMockApplication(int appId, String user) { @@ -132,11 +133,11 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { final Resource allocatedResource = Resources.createResource(allocation); if (queue instanceof ParentQueue) { ((ParentQueue)queue).allocateResource(clusterResource, - allocatedResource); + allocatedResource, null); } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, - allocatedResource); + allocatedResource, null); } // Next call - nothing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 85ef381..e3b557b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -20,12 +20,14 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; @@ -41,19 +43,28 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DummyDynamicNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + public class TestContainerAllocation { @@ -307,4 +318,411 @@ protected RMSecretManagerService createRMSecretManagerService() { rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED); MockRM.launchAndRegisterAM(app1, rm1, nm1); } + + private Configuration getConfigurationWithDefaultQueueLabels( + Configuration config) { + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + + CapacitySchedulerConfiguration conf = + (CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config); + new CapacitySchedulerConfiguration(config); + conf.setDefaultNodeLabelExpression(A, "x"); + conf.setDefaultNodeLabelExpression(B, "y"); + return conf; + } + + private Configuration getConfigurationWithQueueLabels(Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); + + // root can access anything + conf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT, + toSet("x", "y")); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 15); + conf.setAccessibleNodeLabels(A, toSet("x")); + conf.setCapacityByLabel(A, "x", 100); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 20); + conf.setAccessibleNodeLabels(B, toSet("y")); + conf.setCapacityByLabel(B, "y", 100); + + final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + conf.setCapacity(C, 70); + conf.setMaximumCapacity(C, 70); + conf.setAccessibleNodeLabels(C, DynamicNodeLabelsManager.EMPTY_STRING_SET); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + conf.setQueues(A, new String[] {"a1"}); + conf.setCapacity(A1, 100); + conf.setMaximumCapacity(A1, 100); + conf.setCapacityByLabel(A1, "x", 100); + + final String B1 = B + ".b1"; + conf.setQueues(B, new String[] {"b1"}); + conf.setCapacity(B1, 100); + conf.setMaximumCapacity(B1, 100); + conf.setCapacityByLabel(B1, "y", 100); + + final String C1 = C + ".c1"; + conf.setQueues(C, new String[] {"c1"}); + conf.setCapacity(C1, 100); + conf.setMaximumCapacity(C1, 100); + + return conf; + } + + private void checkTaskContainersHost(ApplicationAttemptId attemptId, + ContainerId containerId, ResourceManager rm, String host) { + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId); + + Assert.assertTrue(appReport.getLiveContainers().size() > 0); + for (RMContainer c : appReport.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + Assert.assertEquals(host, c.getAllocatedNode().getHost()); + } + } + } + + private Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + private Configuration getComplexConfigurationWithQueueLabels( + Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + + // root can access anything + conf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT, + toSet("x", "y", "z")); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 10); + conf.setAccessibleNodeLabels(A, toSet("x", "y")); + conf.setCapacityByLabel(A, "x", 100); + conf.setCapacityByLabel(A, "y", 50); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 90); + conf.setMaximumCapacity(B, 100); + conf.setAccessibleNodeLabels(B, toSet("y", "z")); + conf.setCapacityByLabel(B, "y", 50); + conf.setCapacityByLabel(B, "z", 100); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + conf.setQueues(A, new String[] {"a1"}); + conf.setCapacity(A1, 100); + conf.setMaximumCapacity(A1, 100); + conf.setAccessibleNodeLabels(A1, toSet("x", "y")); + conf.setDefaultNodeLabelExpression(A1, "x"); + conf.setCapacityByLabel(A1, "x", 100); + conf.setCapacityByLabel(A1, "y", 100); + + conf.setQueues(B, new String[] {"b1", "b2"}); + final String B1 = B + ".b1"; + conf.setCapacity(B1, 50); + conf.setMaximumCapacity(B1, 50); + conf.setAccessibleNodeLabels(B1, DynamicNodeLabelsManager.EMPTY_STRING_SET); + + final String B2 = B + ".b2"; + conf.setCapacity(B2, 50); + conf.setMaximumCapacity(B2, 50); + conf.setAccessibleNodeLabels(B2, toSet("y", "z")); + conf.setCapacityByLabel(B2, "y", 100); + conf.setCapacityByLabel(B2, "z", 100); + + return conf; + } + + @Test//(timeout = 300000) + public void testContainerAllocateWithComplexLabels() throws Exception { + // make it harder .. + final DynamicNodeLabelsManager mgr = new DummyDynamicNodeLabelsManager(); + mgr.init(conf); + + /* + * Queue structure: + * root (*) + * / \ + * a x(100%), y(50%) b y(50%), z(100%) + * / / \ + * a1 (x,y) b1(no) b2(y,z) + * 100% y = 100%, z = 100% + * + * Node structure: + * h1 : x + * h2 : x, y + * h3 : y + * h4 : y, z + * h5 : NO + * + * Total resource: + * x: 4G + * y: 6G + * z: 2G + * *: 2G + * + * Resource of + * a1: x=4G, y=3G, NO=0.2G + * b1: NO=0.9G (max=1G) + * b2: y=3, z=2G, NO=0.9G (max=1G) + * + * Each node can only allocate two containers + */ + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), + toSet("x"), NodeId.newInstance("h2", 0), toSet("x", "y"), + NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0), + toSet("y", "z"), NodeId.newInstance("h5", 0), + DynamicNodeLabelsManager.EMPTY_STRING_SET)); + + // inject node label manager + MockRM rm1 = new MockRM(getComplexConfigurationWithQueueLabels(conf)) { + @Override + public DynamicNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 2048); + MockNM nm2 = rm1.registerNode("h2:1234", 2048); + MockNM nm3 = rm1.registerNode("h3:1234", 2048); + MockNM nm4 = rm1.registerNode("h4:1234", 2048); + MockNM nm5 = rm1.registerNode("h5:1234", 2048); + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container (label = x && y). can only allocate on nm2 + am1.allocate("*", 1024, 1, new ArrayList(), "x && y"); + containerId = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // launch an app to queue b1 (label = y), and check all container will + // be allocated in h5 + RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5); + + // request a container for AM, will succeed + // and now b1's queue capacity will be used, cannot allocate more containers + // (Maximum capacity reached) + am2.allocate("*", 1024, 1, new ArrayList()); + containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm4, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertFalse(rm1.waitForState(nm5, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + + // launch an app to queue b2 + RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5); + + // request a container. try to allocate on nm1 (label = x) and nm3 (label = + // y,z). Will successfully allocate on nm3 + am3.allocate("*", 1024, 1, new ArrayList(), "y"); + containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, + "h3"); + + // try to allocate container (request label = y && z) on nm3 (label = y) and + // nm4 (label = y,z). Will sucessfully allocate on nm4 only. + am3.allocate("*", 1024, 1, new ArrayList(), "y && z"); + containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 3); + Assert.assertFalse(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm4, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, + "h4"); + + rm1.close(); + } + + @Test (timeout = 120000) + public void testContainerAllocateWithLabels() throws Exception { + final DynamicNodeLabelsManager mgr = new DummyDynamicNodeLabelsManager(); + mgr.init(conf); + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), + NodeId.newInstance("h2", 0), toSet("y"))); + + // inject node label manager + MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public DynamicNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y + MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3); + + // request a container. + am1.allocate("*", 1024, 1, new ArrayList(), "x"); + containerId = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h1"); + + // launch an app to queue b1 (label = y), and check all container will + // be allocated in h2 + RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3); + + // request a container. + am2.allocate("*", 1024, 1, new ArrayList(), "y"); + containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // launch an app to queue c1 (label = ""), and check all container will + // be allocated in h3 + RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3); + + // request a container. + am3.allocate("*", 1024, 1, new ArrayList()); + containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, + "h3"); + + rm1.close(); + } + + @Test (timeout = 120000) + public void testContainerAllocateWithDefaultQueueLabels() throws Exception { + // This test is pretty much similar to testContainerAllocateWithLabel. + // Difference is, this test doesn't specify label expression in ResourceRequest, + // instead, it uses default queue label expression + + final DynamicNodeLabelsManager mgr = new DummyDynamicNodeLabelsManager(); + mgr.init(conf); + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), + NodeId.newInstance("h2", 0), toSet("y"))); + + // inject node label manager + MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) { + @Override + public DynamicNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y + MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container. + am1.allocate("*", 1024, 1, new ArrayList()); + containerId = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h1"); + + // launch an app to queue b1 (label = y), and check all container will + // be allocated in h2 + RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // request a container. + am2.allocate("*", 1024, 1, new ArrayList()); + containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // launch an app to queue c1 (label = ""), and check all container will + // be allocated in h3 + RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3); + + // request a container. + am3.allocate("*", 1024, 1, new ArrayList()); + containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, + "h3"); + + rm1.close(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 092ff83..780606d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -40,10 +40,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -82,6 +83,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; @@ -147,6 +149,7 @@ public void setUp() throws Exception { thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); + when(csContext.getRMContext()).thenReturn(rmContext); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(conf); containerTokenSecretManager.rollMasterKey(); @@ -747,6 +750,81 @@ public void testHeadroomWithMaxCap() throws Exception { a.assignContainers(clusterResource, node_1, false); assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap } + + @SuppressWarnings("unchecked") + @Test + public void testHeadroomWithLabel() throws Exception { + DynamicNodeLabelsManager nlm = mock(DynamicNodeLabelsManager.class); + + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + //unset maxCapacity + a.setMaxCapacity(1.0f); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), rmContext); + a.submitApplicationAttempt(app_0, user_0); + + // Setup some nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = + TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 64 * GB); + + final int numNodes = 1; + Resource clusterResource = Resources.createResource(numNodes * (64 * GB), 1); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory))); + + /** + * Start testing... + */ + + // Set user-limit + a.setUserLimit(100); + a.setUserLimitFactor(1); + + // 1 container to user_0 + a.assignContainers(clusterResource, node_0, false); + assertEquals(1 * GB, a.getUsedResources().getMemory()); + assertEquals(1 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, app_0.getHeadroom().getMemory()); // User limit = 6G + + // mock getQueueResource to 4999 MB + when( + nlm.getQueueResource(any(String.class), any(Set.class), + any(Resource.class))).thenReturn(Resource.newInstance(4999, 1)); + a.setNodeLabelManager(nlm); + + // do a resource allocation again + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory))); + + when( + nlm.getResourceByLabel(any(String.class), + any(Resource.class))).thenReturn(Resource.newInstance(4999, 1)); + a.assignContainers(clusterResource, node_0, false); + + // current headroom should be + // Headroom = min(6G (user-limit), 4G (queueLabelResource)) - + // 2G (used-resource) = 2G + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(2 * GB, app_0.getHeadroom().getMemory()); + } @Test public void testSingleQueueWithMultipleUsers() throws Exception { @@ -2042,6 +2120,7 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh() Resource clusterResource = Resources .createResource(100 * 16 * GB, 100 * 32); CapacitySchedulerContext csContext = mockCSContext(csConf, clusterResource); + when(csContext.getRMContext()).thenReturn(rmContext); csConf.setFloat(CapacitySchedulerConfiguration. MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.1f); ParentQueue root = new ParentQueue(csContext, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 8b24a7e..72983ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -95,6 +95,7 @@ public void setUp() throws Exception { thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); + when(csContext.getRMContext()).thenReturn(rmContext); } private static final String A = "a"; @@ -144,11 +145,11 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { final Resource allocatedResource = Resources.createResource(allocation); if (queue instanceof ParentQueue) { ((ParentQueue)queue).allocateResource(clusterResource, - allocatedResource); + allocatedResource, null); } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, - allocatedResource); + allocatedResource, null); } // Next call - nothing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java index f573f43..2317fab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java @@ -27,14 +27,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping; -import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; -import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -79,10 +76,7 @@ public void testQueueMapping() throws Exception { YarnConfiguration conf = new YarnConfiguration(csConf); CapacityScheduler cs = new CapacityScheduler(); - RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, - null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null); + RMContext rmContext = TestUtils.getMockRMContext(); cs.setConf(conf); cs.setRMContext(rmContext); cs.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index a3b990c..5aa0020 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -18,23 +18,40 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import org.junit.Assert; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import com.google.common.collect.ImmutableSet; + public class TestQueueParsing { private static final Log LOG = LogFactory.getLog(TestQueueParsing.class); private static final double DELTA = 0.000001; + private DynamicNodeLabelsManager nodeLabelManager; + + @Before + public void setup() { + nodeLabelManager = mock(DynamicNodeLabelsManager.class); + when(nodeLabelManager.containsNodeLabel(any(String.class))).thenReturn(true); + } + @Test public void testQueueParsing() throws Exception { CapacitySchedulerConfiguration csConf = @@ -43,15 +60,11 @@ public void testQueueParsing() throws Exception { YarnConfiguration conf = new YarnConfiguration(csConf); CapacityScheduler capacityScheduler = new CapacityScheduler(); - RMContextImpl rmContext = new RMContextImpl(null, null, - null, null, null, null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null); capacityScheduler.setConf(conf); - capacityScheduler.setRMContext(rmContext); + capacityScheduler.setRMContext(TestUtils.getMockRMContext()); capacityScheduler.init(conf); capacityScheduler.start(); - capacityScheduler.reinitialize(conf, rmContext); + capacityScheduler.reinitialize(conf, TestUtils.getMockRMContext()); CSQueue a = capacityScheduler.getQueue("a"); Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA); @@ -202,4 +215,164 @@ public void testMaxCapacity() throws Exception { capacityScheduler.stop(); } + private void setupQueueConfigurationWithoutLabels(CapacitySchedulerConfiguration conf) { + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 15); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 90); + + LOG.info("Setup top-level queues"); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + final String A2 = A + ".a2"; + conf.setQueues(A, new String[] {"a1", "a2"}); + conf.setCapacity(A1, 30); + conf.setMaximumCapacity(A1, 45); + conf.setCapacity(A2, 70); + conf.setMaximumCapacity(A2, 85); + + final String B1 = B + ".b1"; + final String B2 = B + ".b2"; + final String B3 = B + ".b3"; + conf.setQueues(B, new String[] {"b1", "b2", "b3"}); + conf.setCapacity(B1, 50); + conf.setMaximumCapacity(B1, 85); + conf.setCapacity(B2, 30); + conf.setMaximumCapacity(B2, 35); + conf.setCapacity(B3, 20); + conf.setMaximumCapacity(B3, 35); + } + + private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration conf) { + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 15); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 90); + + LOG.info("Setup top-level queues"); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + final String A2 = A + ".a2"; + conf.setQueues(A, new String[] {"a1", "a2"}); + conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue")); + conf.setCapacityByLabel(A, "red", 50); + conf.setCapacityByLabel(A, "blue", 50); + + conf.setCapacity(A1, 30); + conf.setMaximumCapacity(A1, 45); + conf.setCapacityByLabel(A1, "red", 50); + conf.setCapacityByLabel(A1, "blue", 100); + + conf.setCapacity(A2, 70); + conf.setMaximumCapacity(A2, 85); + conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red")); + conf.setCapacityByLabel(A2, "red", 50); + + final String B1 = B + ".b1"; + final String B2 = B + ".b2"; + final String B3 = B + ".b3"; + conf.setQueues(B, new String[] {"b1", "b2", "b3"}); + conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue")); + conf.setCapacityByLabel(B, "red", 50); + conf.setCapacityByLabel(B, "blue", 50); + + conf.setCapacity(B1, 50); + conf.setMaximumCapacity(B1, 85); + conf.setCapacityByLabel(B1, "red", 50); + conf.setCapacityByLabel(B1, "blue", 50); + + conf.setCapacity(B2, 30); + conf.setMaximumCapacity(B2, 35); + conf.setCapacityByLabel(B2, "red", 25); + conf.setCapacityByLabel(B2, "blue", 25); + + conf.setCapacity(B3, 20); + conf.setMaximumCapacity(B3, 35); + conf.setCapacityByLabel(B3, "red", 25); + conf.setCapacityByLabel(B3, "blue", 25); + } + + @Test + public void testQueueParsingReinitializeWithLabels() throws IOException { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithoutLabels(csConf); + YarnConfiguration conf = new YarnConfiguration(csConf); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + rmContext.setNodeLabelManager(nodeLabelManager); + capacityScheduler.setConf(conf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(conf); + capacityScheduler.start(); + csConf = new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithLabels(csConf); + conf = new YarnConfiguration(csConf); + capacityScheduler.reinitialize(conf, rmContext); + checkQueueLabels(capacityScheduler); + capacityScheduler.stop(); + } + + private void checkQueueLabels(CapacityScheduler capacityScheduler) { + // queue-A is red, blue + Assert.assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels() + .containsAll(ImmutableSet.of("red", "blue"))); + + // queue-A1 inherits A's configuration + Assert.assertTrue(capacityScheduler.getQueue("a1").getAccessibleNodeLabels() + .containsAll(ImmutableSet.of("red", "blue"))); + + // queue-A2 is "red" + Assert.assertEquals(1, capacityScheduler + .getQueue("a2").getAccessibleNodeLabels().size()); + Assert.assertTrue(capacityScheduler + .getQueue("a2").getAccessibleNodeLabels().contains("red")); + + // queue-B is "red"/"blue" + Assert.assertTrue(capacityScheduler + .getQueue("b").getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue"))); + + // queue-B2 inherits "red"/"blue" + Assert.assertTrue(capacityScheduler + .getQueue("b2").getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue"))); + } + + @Test + public void testQueueParsingWithLabels() throws IOException { + YarnConfiguration conf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfigurationWithLabels(csConf); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(csConf), + new NMTokenSecretManagerInRM(csConf), + new ClientToAMTokenSecretManagerInRM(), null); + rmContext.setNodeLabelManager(nodeLabelManager); + capacityScheduler.setConf(csConf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(csConf); + capacityScheduler.start(); + checkQueueLabels(capacityScheduler); + capacityScheduler.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java index c53b7a9..4e6c73d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java @@ -23,7 +23,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; + import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -42,8 +45,7 @@ ReservationQueue reservationQueue; @Before - public void setup() { - + public void setup() throws IOException { // setup a context / conf csConf = new CapacitySchedulerConfiguration(); YarnConfiguration conf = new YarnConfiguration(); @@ -57,6 +59,9 @@ public void setup() { when(csContext.getClusterResource()).thenReturn( Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + + RMContext mockRMContext = TestUtils.getMockRMContext(); + when(csContext.getRMContext()).thenReturn(mockRMContext); // create a queue PlanQueue pq = new PlanQueue(csContext, "root", null, null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 0f8290e..e3e16e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.NodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; @@ -121,6 +122,7 @@ private void setup(CapacitySchedulerConfiguration csConf) throws Exception { when(csContext.getQueueComparator()).thenReturn( CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + when(csContext.getRMContext()).thenReturn(rmContext); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager( conf); containerTokenSecretManager.rollMasterKey(); @@ -819,7 +821,9 @@ public void testAssignToQueue() throws Exception { // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity Resource capability = Resources.createResource(32 * GB, 0); - boolean res = a.assignToQueue(clusterResource, capability, app_0, true); + boolean res = + a.assignToQueue(clusterResource, capability, + NodeLabelsManager.EMPTY_STRING_SET, app_0, true); assertFalse(res); // now add in reservations and make sure it continues if config set @@ -836,23 +840,29 @@ public void testAssignToQueue() throws Exception { assertEquals(3 * GB, node_1.getUsedResource().getMemory()); capability = Resources.createResource(5 * GB, 0); - res = a - .assignToQueue(clusterResource, capability, app_0, true); + res = + a.assignToQueue(clusterResource, capability, + NodeLabelsManager.EMPTY_STRING_SET, app_0, true); assertTrue(res); // tell to not check reservations - res = a.assignToQueue(clusterResource, capability, app_0, false); + res = + a.assignToQueue(clusterResource, capability, + NodeLabelsManager.EMPTY_STRING_SET, app_0, false); assertFalse(res); refreshQueuesTurnOffReservationsContLook(a, csConf); // should return false no matter what checkReservations is passed // in since feature is off - res = a.assignToQueue(clusterResource, capability, app_0, false); + res = + a.assignToQueue(clusterResource, capability, + NodeLabelsManager.EMPTY_STRING_SET, app_0, false); assertFalse(res); - res = a - .assignToQueue(clusterResource, capability, app_0, true); + res = + a.assignToQueue(clusterResource, capability, + NodeLabelsManager.EMPTY_STRING_SET, app_0, true); assertFalse(res); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 9cb902d..f37f409 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -18,11 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,16 +46,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.resource.Resources; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestUtils { private static final Log LOG = LogFactory.getLog(TestUtils.class); @@ -61,7 +67,7 @@ * Get a mock {@link RMContext} for use in test cases. * @return a mock {@link RMContext} for use in test cases */ - @SuppressWarnings("rawtypes") + @SuppressWarnings({ "rawtypes", "unchecked" }) public static RMContext getMockRMContext() { // Null dispatcher Dispatcher nullDispatcher = new Dispatcher() { @@ -93,6 +99,27 @@ public EventHandler getEventHandler() { new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), writer); + DynamicNodeLabelsManager nlm = mock(DynamicNodeLabelsManager.class); + when( + nlm.getQueueResource(any(String.class), any(Set.class), + any(Resource.class))).thenAnswer(new Answer() { + @Override + public Resource answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return (Resource) args[2]; + } + }); + + when(nlm.getResourceByLabel(any(String.class), any(Resource.class))) + .thenAnswer(new Answer() { + @Override + public Resource answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return (Resource) args[1]; + } + }); + + rmContext.setNodeLabelManager(nlm); rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); return rmContext; }