diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 04c2fd5..711b26b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -72,9 +72,18 @@ /** * Get the configured capacity of the queue. - * @return queue capacity + * @return configured queue capacity */ public float getCapacity(); + + /** + * Get actual capacity of the queue, this may be different from + * configured capacity when mis-config take place, like add labels to the + * cluster + * + * @return actual queue capacity + */ + public float getAbsActualCapacity(); /** * Get capacity of the parent of the queue as a function of the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 737062b..04b3442 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -19,7 +19,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index a8ef942..1444ac0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.label.NodeLabelManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*; @@ -189,6 +191,7 @@ public Configuration getConf() { private boolean scheduleAsynchronously; private AsyncScheduleThread asyncSchedulerThread; + private NodeLabelManager labelManager; /** * EXPERT @@ -273,6 +276,8 @@ private synchronized void initScheduler(Configuration configuration) throws this.applications = new ConcurrentHashMap>(); + this.labelManager = rmContext.getNodeLabelManager(); + initializeQueues(this.conf); scheduleAsynchronously = this.conf.getScheduleAynschronously(); @@ -440,11 +445,13 @@ private void initializeQueueMappings() throws IOException { @Lock(CapacityScheduler.class) private void initializeQueues(CapacitySchedulerConfiguration conf) throws IOException { - + Map> queueToLabels = new HashMap>(); root = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, - queues, queues, noop); - + queues, queues, noop, queueToLabels); + if (labelManager != null) { + labelManager.reinitializeQueueLabels(queueToLabels); + } LOG.info("Initialized root queue " + root); initializeQueueMappings(); } @@ -454,9 +461,10 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) throws IOException { // Parse new queues Map newQueues = new HashMap(); + Map> queueToLabels = new HashMap>(); CSQueue newRoot = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, - newQueues, queues, noop); + newQueues, queues, noop, queueToLabels); // Ensure all existing queues are still present validateExistingQueues(queues, newQueues); @@ -467,6 +475,10 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) // Re-configure queues root.reinitialize(newRoot, clusterResource); initializeQueueMappings(); + + if (labelManager != null) { + labelManager.reinitializeQueueLabels(queueToLabels); + } } /** @@ -506,11 +518,12 @@ private void addNewQueues( @Lock(CapacityScheduler.class) static CSQueue parseQueue( - CapacitySchedulerContext csContext, + CapacitySchedulerContext csContext, CapacitySchedulerConfiguration conf, CSQueue parent, String queueName, Map queues, Map oldQueues, - QueueHook hook) throws IOException { + QueueHook hook, + Map> queueToLabels) throws IOException { CSQueue queue; String[] childQueueNames = conf.getQueues((parent == null) ? @@ -521,13 +534,13 @@ static CSQueue parseQueue( "Queue configuration missing child queue names for " + queueName); } queue = - new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName)); + new LeafQueue(csContext, queueName, parent, oldQueues.get(queueName)); // Used only for unit tests queue = hook.hook(queue); } else { ParentQueue parentQueue = - new ParentQueue(csContext, queueName, parent,oldQueues.get(queueName)); + new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); // Used only for unit tests queue = hook.hook(parentQueue); @@ -536,7 +549,7 @@ static CSQueue parseQueue( for (String childQueueName : childQueueNames) { CSQueue childQueue = parseQueue(csContext, conf, queue, childQueueName, - queues, oldQueues, hook); + queues, oldQueues, hook, queueToLabels); childQueues.add(childQueue); } parentQueue.setChildQueues(childQueues); @@ -548,6 +561,9 @@ static CSQueue parseQueue( + ". Leaf queue names must be distinct"); } queues.put(queueName, queue); + if (queueToLabels != null) { + queueToLabels.put(queueName, queue.getLabels()); + } LOG.info("Initialized queue: " + queue); return queue; @@ -1044,11 +1060,18 @@ public void handle(SchedulerEvent event) { } private synchronized void addNode(RMNode nodeManager) { + // update this node to node label manager + if (labelManager != null) { + labelManager.activeNode(nodeManager.getNodeID(), + nodeManager.getTotalCapability()); + } + this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, usePortForNodeName)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); int numNodes = numNodeManagers.incrementAndGet(); + LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); @@ -1058,6 +1081,11 @@ private synchronized void addNode(RMNode nodeManager) { } private synchronized void removeNode(RMNode nodeInfo) { + // update this node to node label manager + if (labelManager != null) { + labelManager.deactiveNode(nodeInfo.getNodeID()); + } + FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID()); if (node == null) { return; @@ -1091,6 +1119,7 @@ private synchronized void removeNode(RMNode nodeInfo) { } this.nodes.remove(nodeInfo.getNodeID()); + LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index af6bdc3..dad0252 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -18,7 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.label.NodeLabelManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -81,6 +90,12 @@ @Private public static final String STATE = "state"; + + @Private + public static final String LABELS = "labels"; + + @Private + public static final String DEFAULT_LABEL_EXPRESSION = "default-label-expression"; @Private public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; @@ -308,6 +323,42 @@ public QueueState getState(String queue) { QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING; } + public void setLabels(String queue, Set labels) { + if (labels == null) { + return; + } + String str = StringUtils.join(",", labels); + set(getQueuePrefix(queue) + LABELS, str); + } + + public Set getLabels(String queue) { + String labelStr = get(getQueuePrefix(queue) + LABELS); + if (labelStr == null) { + return queue.equals(ROOT) ? NodeLabelManager.EMPTY_STRING_SET : null; + } else { + Set set = new HashSet(); + for (String str : labelStr.split(",")) { + if (!str.trim().isEmpty()) { + set.add(str.trim().toLowerCase()); + } + } + // if labels contains "*", only leave ANY behind + if (set.contains(NodeLabelManager.ANY)) { + set.clear(); + set.add(NodeLabelManager.ANY); + } + return Collections.unmodifiableSet(set); + } + } + + public String getDefaultLabelExpression(String queue) { + return get(getQueuePrefix(queue) + DEFAULT_LABEL_EXPRESSION); + } + + public void setDefaultLabelExpression(String queue, String exp) { + set(getQueuePrefix(queue) + DEFAULT_LABEL_EXPRESSION, exp); + } + private static String getAclKey(QueueACL acl) { return "acl_" + acl.toString().toLowerCase(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5c93c5f..ee77946 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.TreeSet; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.label.NodeLabelManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -60,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -129,12 +132,24 @@ private final ResourceCalculator resourceCalculator; + private Set labels; + + private String defaultLabelExpression; + + private NodeLabelManager labelManager; + + // cache last cluster resource to compute actual capacity + private Resource lastClusterResource = Resources.none(); + public LeafQueue(CapacitySchedulerContext cs, - String queueName, CSQueue parent, CSQueue old) { + String queueName, CSQueue parent, CSQueue old) throws IOException { + this.labelManager = cs.getRMContext().getNodeLabelManager(); this.scheduler = cs; this.queueName = queueName; this.parent = parent; - + this.labels = cs.getConfiguration().getLabels(getQueuePath()); + this.defaultLabelExpression = cs.getConfiguration() + .getDefaultLabelExpression(getQueuePath()); this.resourceCalculator = cs.getResourceCalculator(); // must be after parent and queueName are initialized @@ -196,14 +211,12 @@ public LeafQueue(CapacitySchedulerContext cs, Map acls = cs.getConfiguration().getAcls(getQueuePath()); - setupQueueConfigs( - cs.getClusterResource(), - capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, - userLimit, userLimitFactor, + setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, + maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser, maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs - .getConfiguration().getNodeLocalityDelay()); + .getConfiguration().getNodeLocalityDelay(), labels, + defaultLabelExpression); if(LOG.isDebugEnabled()) { LOG.debug("LeafQueue:" + " name=" + queueName @@ -217,15 +230,14 @@ public LeafQueue(CapacitySchedulerContext cs, this.activeApplications = new TreeSet(applicationComparator); } - private synchronized void setupQueueConfigs( - Resource clusterResource, - float capacity, float absoluteCapacity, - float maximumCapacity, float absoluteMaxCapacity, - int userLimit, float userLimitFactor, + private synchronized void setupQueueConfigs(Resource clusterResource, + float capacity, float absoluteCapacity, float maximumCapacity, + float absoluteMaxCapacity, int userLimit, float userLimitFactor, int maxApplications, float maxAMResourcePerQueuePercent, int maxApplicationsPerUser, int maxActiveApplications, int maxActiveApplicationsPerUser, QueueState state, - Map acls, int nodeLocalityDelay) + Map acls, int nodeLocalityDelay, + Set labels, String defaultLabelExpression) throws IOException { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); @@ -251,10 +263,38 @@ private synchronized void setupQueueConfigs( this.state = state; this.acls = acls; + + // set labels + this.labels = labels; + if (this.labels == null) { + this.labels = parent.getLabels(); + } + SchedulerUtils.checkAndThrowIfLabelNotIncluded(labelManager, this.labels); + + // set default label expression + this.defaultLabelExpression = defaultLabelExpression; + if (this.defaultLabelExpression == null) { + this.defaultLabelExpression = parent.getDefaultLabelExpression(); + } + if (!SchedulerUtils.checkQueueLabelExpression(this.labels, + this.defaultLabelExpression)) { + throw new IOException("Invalid default label expression of " + + " queue=" + + queueInfo.getQueueName() + + " doesn't have permission to access all labels " + + "in default label expression. labelExpression of resource request=" + + (this.defaultLabelExpression == null ? "" + : this.defaultLabelExpression) + + ". Queue labels=" + + (queueInfo.getLabels() == null ? "" : StringUtils.join(queueInfo + .getLabels().iterator(), ','))); + } this.queueInfo.setCapacity(this.capacity); this.queueInfo.setMaximumCapacity(this.maximumCapacity); this.queueInfo.setQueueState(this.state); + this.queueInfo.setLabels(this.labels); + this.queueInfo.setDefaultLabelExpression(this.defaultLabelExpression); this.nodeLocalityDelay = nodeLocalityDelay; @@ -267,6 +307,14 @@ private synchronized void setupQueueConfigs( CSQueueUtils.updateQueueStatistics( resourceCalculator, this, getParent(), clusterResource, minimumAllocation); + + StringBuilder labelStrBuilder = new StringBuilder(); + if (this.labels != null) { + for (String s : this.labels) { + labelStrBuilder.append(s); + labelStrBuilder.append(","); + } + } LOG.info("Initializing " + queueName + "\n" + "capacity = " + capacity + @@ -321,7 +369,8 @@ private synchronized void setupQueueConfigs( " [= configuredState ]" + "\n" + "acls = " + aclsString + " [= configuredAcls ]" + "\n" + - "nodeLocalityDelay = " + nodeLocalityDelay + "\n"); + "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + + "labels=" + labelStrBuilder.toString() + "\n"); } @Override @@ -565,6 +614,11 @@ public String toString() { "numApps=" + getNumApplications() + ", " + "numContainers=" + getNumContainers(); } + + @VisibleForTesting + public synchronized void setNodeLabelManager(NodeLabelManager mgr) { + this.labelManager = mgr; + } @VisibleForTesting public synchronized User getUser(String userName) { @@ -613,7 +667,9 @@ public synchronized void reinitialize( newlyParsedLeafQueue.getMaximumActiveApplications(), newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls, - newlyParsedLeafQueue.getNodeLocalityDelay()); + newlyParsedLeafQueue.getNodeLocalityDelay(), + newlyParsedLeafQueue.labels, + newlyParsedLeafQueue.defaultLabelExpression); // queue metrics are updated, more resource may be available // activate the pending applications if possible @@ -804,12 +860,19 @@ private synchronized FiCaSchedulerApp getApplication( @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node) { - if(LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " #applications=" + activeApplications.size()); } + // if our queue cannot access this node, just return + if (labelManager != null) { + if (!SchedulerUtils.checkQueueAccessToNode(labels, + labelManager.getLabelsOnNode(node.getNodeName()))) { + return NULL_ASSIGNMENT; + } + } + // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { @@ -968,12 +1031,12 @@ private synchronized boolean assignToQueue(Resource clusterResource, @Lock({LeafQueue.class, FiCaSchedulerApp.class}) private Resource computeUserLimitAndSetHeadroom( - FiCaSchedulerApp application, Resource clusterResource, Resource required) { - + FiCaSchedulerApp application, Resource clusterResource, Resource required) { String user = application.getUser(); - /** - * Headroom is min((userLimit, queue-max-cap) - consumed) + /** + * Headroom = min(userLimit, queue-max-cap, max-capacity-consider-label) - + * consumed */ Resource userLimit = // User limit @@ -992,11 +1055,21 @@ private Resource computeUserLimitAndSetHeadroom( absoluteMaxAvailCapacity, minimumAllocation); - Resource userConsumed = getUser(user).getConsumedResources(); - Resource headroom = + // Max possible capacity this queue can access, will consider label only. + Resource maxCapacityConsiderLabel = + labelManager == null ? clusterResource : labelManager.getQueueResource( + queueName, labels, clusterResource); + maxCapacityConsiderLabel = + Resources.roundDown(resourceCalculator, maxCapacityConsiderLabel, + minimumAllocation); + Resource userConsumed = getUser(user).getConsumedResources(); + + Resource headroom = Resources.subtract( - Resources.min(resourceCalculator, clusterResource, - userLimit, queueMaxCap), + Resources.min(resourceCalculator, clusterResource, + Resources.min(resourceCalculator, clusterResource, userLimit, + queueMaxCap), + maxCapacityConsiderLabel), userConsumed); if (LOG.isDebugEnabled()) { @@ -1312,6 +1385,22 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod + " priority=" + priority.getPriority() + " request=" + request + " type=" + type); } + + // check if the resource request can access the label + if (labelManager != null) { + if (!SchedulerUtils.checkNodeLabelExpression( + labelManager.getLabelsOnNode(node.getNodeName()), + request.getLabelExpression())) { + // this is a reserved container, but we cannot allocate it now according + // to label not match. This can be caused by node label changed + // We should un-reserve this container. + if (rmContainer != null) { + unreserve(application, priority, node, rmContainer); + } + return Resources.none(); + } + } + Resource capability = request.getCapability(); Resource available = node.getAvailableResource(); Resource totalResource = node.getTotalResource(); @@ -1501,6 +1590,8 @@ synchronized void releaseResource(Resource clusterResource, @Override public synchronized void updateClusterResource(Resource clusterResource) { + lastClusterResource = clusterResource; + // Update queue properties maxActiveApplications = CSQueueUtils.computeMaxActiveApplications( @@ -1661,4 +1752,32 @@ public void detachContainer(Resource clusterResource, getParent().detachContainer(clusterResource, application, rmContainer); } } + + @Override + public Set getLabels() { + return labels; + } + + @Override + public float getAbsActualCapacity() { + if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + lastClusterResource, Resources.none())) { + return absoluteCapacity; + } + + Resource resourceRespectLabels = + labelManager == null ? lastClusterResource : labelManager + .getQueueResource(queueName, labels, lastClusterResource); + float absActualCapacity = + Resources.divide(resourceCalculator, lastClusterResource, + resourceRespectLabels, lastClusterResource); + + return absActualCapacity > absoluteCapacity ? absoluteCapacity + : absActualCapacity; + } + + @Override + public String getDefaultLabelExpression() { + return defaultLabelExpression; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 8c654b7..59a04f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.label.NodeLabelManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -100,11 +102,14 @@ RecordFactoryProvider.getRecordFactory(null); private final ResourceCalculator resourceCalculator; + private Set labels; + private final NodeLabelManager labelManager; + private String defaultLabelExpression; public ParentQueue(CapacitySchedulerContext cs, - String queueName, CSQueue parent, CSQueue old) { + String queueName, CSQueue parent, CSQueue old) throws IOException { minimumAllocation = cs.getMinimumResourceCapability(); - + this.labelManager = cs.getRMContext().getNodeLabelManager(); this.parent = parent; this.queueName = queueName; this.rootQueue = (parent == null); @@ -117,6 +122,9 @@ public ParentQueue(CapacitySchedulerContext cs, cs.getConf()); float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath()); + this.labels = cs.getConfiguration().getLabels(getQueuePath()); + this.defaultLabelExpression = cs.getConfiguration() + .getDefaultLabelExpression(getQueuePath()); if (rootQueue && (rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) { @@ -144,9 +152,9 @@ public ParentQueue(CapacitySchedulerContext cs, this.queueInfo.setQueueName(queueName); this.queueInfo.setChildQueues(new ArrayList()); - setupQueueConfigs(cs.getClusterResource(), - capacity, absoluteCapacity, - maximumCapacity, absoluteMaxCapacity, state, acls); + setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity, + maximumCapacity, absoluteMaxCapacity, state, acls, labels, + defaultLabelExpression); this.queueComparator = cs.getQueueComparator(); this.childQueues = new TreeSet(queueComparator); @@ -156,12 +164,11 @@ public ParentQueue(CapacitySchedulerContext cs, ", fullname=" + getQueuePath()); } - private synchronized void setupQueueConfigs( - Resource clusterResource, - float capacity, float absoluteCapacity, - float maximumCapacity, float absoluteMaxCapacity, - QueueState state, Map acls - ) { + private synchronized void setupQueueConfigs(Resource clusterResource, + float capacity, float absoluteCapacity, float maximumCapacity, + float absoluteMaxCapacity, QueueState state, + Map acls, Set labels, + String defaultLabelExpression) throws IOException { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absoluteMaxCapacity); @@ -176,9 +183,24 @@ private synchronized void setupQueueConfigs( this.acls = acls; + // set labels + this.labels = labels; + if (this.labels == null && parent != null) { + this.labels = parent.getLabels(); + SchedulerUtils.checkAndThrowIfLabelNotIncluded(labelManager, this.labels); + } + + // set label expression + this.defaultLabelExpression = defaultLabelExpression; + if (this.defaultLabelExpression == null && parent != null) { + this.defaultLabelExpression = parent.getDefaultLabelExpression(); + } + + this.queueInfo.setLabels(labels); this.queueInfo.setCapacity(this.capacity); this.queueInfo.setMaximumCapacity(this.maximumCapacity); this.queueInfo.setQueueState(this.state); + this.queueInfo.setDefaultLabelExpression(this.defaultLabelExpression); StringBuilder aclsString = new StringBuilder(); for (Map.Entry e : acls.entrySet()) { @@ -188,6 +210,14 @@ private synchronized void setupQueueConfigs( // Update metrics CSQueueUtils.updateQueueStatistics( resourceCalculator, this, parent, clusterResource, minimumAllocation); + + StringBuilder labelStrBuilder = new StringBuilder(); + if (labels != null) { + for (String s : labels) { + labelStrBuilder.append(s); + labelStrBuilder.append(","); + } + } LOG.info(queueName + ", capacity=" + capacity + @@ -195,7 +225,8 @@ private synchronized void setupQueueConfigs( ", maxCapacity=" + maximumCapacity + ", asboluteMaxCapacity=" + absoluteMaxCapacity + ", state=" + state + - ", acls=" + aclsString); + ", acls=" + aclsString + + ", labels=" + labelStrBuilder.toString() + "\n"); } private static float PRECISION = 0.0005f; // 0.05% precision @@ -383,7 +414,9 @@ public synchronized void reinitialize( newlyParsedParentQueue.maximumCapacity, newlyParsedParentQueue.absoluteMaxCapacity, newlyParsedParentQueue.state, - newlyParsedParentQueue.acls); + newlyParsedParentQueue.acls, + newlyParsedParentQueue.labels, + newlyParsedParentQueue.defaultLabelExpression); // Re-configure existing child queues and add new ones // The CS has already checked to ensure all existing child queues are present! @@ -824,4 +857,20 @@ public void detachContainer(Resource clusterResource, } } } + + public Set getLabels() { + return labels; + } + + @Override + public float getAbsActualCapacity() { + // for now, simply return actual capacity = guaranteed capacity for parent + // queue + return absoluteCapacity; + } + + @Override + public String getDefaultLabelExpression() { + return defaultLabelExpression; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index a9a9975..4dc4632 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -65,6 +65,7 @@ LeafQueue queue; private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); + RMContext rmContext = mock(RMContext.class); @Before public void setUp() throws IOException { @@ -72,8 +73,7 @@ public void setUp() throws IOException { new CapacitySchedulerConfiguration(); YarnConfiguration conf = new YarnConfiguration(); setupQueueConfiguration(csConf); - - + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); when(csContext.getConfiguration()).thenReturn(csConf); when(csContext.getConf()).thenReturn(conf); @@ -89,6 +89,8 @@ public void setUp() throws IOException { thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); + when(csContext.getRMContext()).thenReturn(rmContext); + RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(conf); containerTokenSecretManager.rollMasterKey(); @@ -99,7 +101,7 @@ public void setUp() throws IOException { CSQueue root = CapacityScheduler.parseQueue(csContext, csConf, null, "root", queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); queue = spy(new LeafQueue(csContext, A, root, null)); @@ -162,6 +164,7 @@ public void testLimitsComputation() throws Exception { when(csContext.getQueueComparator()). thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + when(csContext.getRMContext()).thenReturn(rmContext); // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16); @@ -170,7 +173,7 @@ public void testLimitsComputation() throws Exception { Map queues = new HashMap(); CSQueue root = CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, TestUtils.spyHook); + queues, queues, TestUtils.spyHook, null); LeafQueue queue = (LeafQueue)queues.get(A); @@ -259,7 +262,7 @@ public void testLimitsComputation() throws Exception { queues = new HashMap(); root = CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, TestUtils.spyHook); + queues, queues, TestUtils.spyHook, null); clusterResource = Resources.createResource(100 * 16 * GB); queue = (LeafQueue)queues.get(A); @@ -285,7 +288,7 @@ public void testLimitsComputation() throws Exception { queues = new HashMap(); root = CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, TestUtils.spyHook); + queues, queues, TestUtils.spyHook, null); queue = (LeafQueue)queues.get(A); assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath())); @@ -475,6 +478,7 @@ public void testHeadroom() throws Exception { when(csContext.getQueueComparator()). thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + when(csContext.getRMContext()).thenReturn(rmContext); // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB); @@ -482,7 +486,7 @@ public void testHeadroom() throws Exception { Map queues = new HashMap(); CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, TestUtils.spyHook); + queues, queues, TestUtils.spyHook, null); // Manipulate queue 'a' LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java index 7260afd..297c551 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java @@ -19,38 +19,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.when; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; public class TestCSQueueUtils { @@ -88,6 +69,8 @@ public void runInvalidDivisorTest(boolean useDominant) throws Exception { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(0, 0)); + RMContext rmContext = mock(RMContext.class); + when(csContext.getRMContext()).thenReturn(rmContext); final String L1Q1 = "L1Q1"; csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1}); @@ -129,6 +112,8 @@ public void testAbsoluteMaxAvailCapacityNoUse() throws Exception { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 32)); + RMContext rmContext = mock(RMContext.class); + when(csContext.getRMContext()).thenReturn(rmContext); final String L1Q1 = "L1Q1"; csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1}); @@ -174,6 +159,9 @@ public void testAbsoluteMaxAvailCapacityWithUse() throws Exception { when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 32)); + RMContext rmContext = mock(RMContext.class); + when(csContext.getRMContext()).thenReturn(rmContext); + final String L1Q1 = "L1Q1"; final String L1Q2 = "L1Q2"; final String L2Q1 = "L2Q1"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index fd14ef6..45e6c5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -97,6 +97,7 @@ public void setUp() throws Exception { thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); + when(csContext.getRMContext()).thenReturn(rmContext); } private FiCaSchedulerApp getMockApplication(int appId, String user) { @@ -214,7 +215,7 @@ public void testSortedQueues() throws Exception { CSQueue root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); // Setup some nodes final int memoryPerNode = 10; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index a9bfc2f..71565f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -20,12 +20,14 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.SecurityUtilTestHelper; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -34,11 +36,14 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.label.MemoryNodeLabelManager; +import org.apache.hadoop.yarn.label.NodeLabelManager; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -46,11 +51,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + public class TestContainerAllocation { @@ -251,4 +262,416 @@ protected RMSecretManagerService createRMSecretManagerService() { rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED); MockRM.launchAndRegisterAM(app1, rm1, nm1); } + + private Configuration getConfigurationWithDefaultQueueLabels( + Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); + + // root can access anything + conf.setLabels(CapacitySchedulerConfiguration.ROOT, toSet("*")); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 15); + conf.setLabels(A, toSet("x")); + conf.setDefaultLabelExpression(A, "x"); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 20); + conf.setLabels(B, toSet("y")); + conf.setDefaultLabelExpression(B, "y"); + + final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + conf.setCapacity(C, 70); + conf.setMaximumCapacity(C, 70); + conf.setLabels(C, NodeLabelManager.EMPTY_STRING_SET); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + conf.setQueues(A, new String[] {"a1"}); + conf.setCapacity(A1, 100); + conf.setMaximumCapacity(A1, 100); + + final String B1 = B + ".b1"; + conf.setQueues(B, new String[] {"b1"}); + conf.setCapacity(B1, 100); + conf.setMaximumCapacity(B1, 100); + + final String C1 = C + ".c1"; + conf.setQueues(C, new String[] {"c1"}); + conf.setCapacity(C1, 100); + conf.setMaximumCapacity(C1, 100); + + return conf; + } + + private Configuration getConfigurationWithQueueLabels(Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); + + // root can access anything + conf.setLabels(CapacitySchedulerConfiguration.ROOT, toSet("*")); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 15); + conf.setLabels(A, toSet("x")); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 20); + conf.setLabels(B, toSet("y")); + + final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + conf.setCapacity(C, 70); + conf.setMaximumCapacity(C, 70); + conf.setLabels(C, NodeLabelManager.EMPTY_STRING_SET); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + conf.setQueues(A, new String[] {"a1"}); + conf.setCapacity(A1, 100); + conf.setMaximumCapacity(A1, 100); + + final String B1 = B + ".b1"; + conf.setQueues(B, new String[] {"b1"}); + conf.setCapacity(B1, 100); + conf.setMaximumCapacity(B1, 100); + + final String C1 = C + ".c1"; + conf.setQueues(C, new String[] {"c1"}); + conf.setCapacity(C1, 100); + conf.setMaximumCapacity(C1, 100); + + return conf; + } + + private void checkTaskContainersHost(ApplicationAttemptId attemptId, + ContainerId containerId, ResourceManager rm, String host) { + YarnScheduler scheduler = rm.getRMContext().getScheduler(); + SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId); + + Assert.assertTrue(appReport.getLiveContainers().size() > 0); + for (RMContainer c : appReport.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + Assert.assertEquals(host, c.getAllocatedNode().getHost()); + } + } + } + + private Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + private Configuration getComplexConfigurationWithQueueLabels( + Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + + // root can access anything + conf.setLabels(CapacitySchedulerConfiguration.ROOT, toSet("*")); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 50); + conf.setMaximumCapacity(A, 50); + conf.setLabels(A, toSet("x")); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 50); + conf.setMaximumCapacity(B, 50); + conf.setLabels(B, toSet("y")); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + conf.setQueues(A, new String[] {"a1"}); + conf.setCapacity(A1, 100); + conf.setMaximumCapacity(A1, 100); + conf.setLabels(A1, toSet("x", "y")); + conf.setDefaultLabelExpression(A1, "x"); + + conf.setQueues(B, new String[] {"b1", "b2"}); + final String B1 = B + ".b1"; + conf.setCapacity(B1, 10); + conf.setMaximumCapacity(B1, 20); + conf.setLabels(B1, NodeLabelManager.EMPTY_STRING_SET); + + final String B2 = B + ".b2"; + conf.setCapacity(B2, 90); + conf.setMaximumCapacity(B2, 90); + conf.setLabels(B2, toSet("y", "z")); + + return conf; + } + + @Test (timeout = 300000) + public void testContainerAllocateWithComplexLabels() throws Exception { + // make it harder .. + final NodeLabelManager mgr = new MemoryNodeLabelManager(); + mgr.init(conf); + + /* + * Queue structure: + * root (*) + * / \ + * a(x) 50% b(y) 50% + * / / \ + * a1 (x,y) b1(NO) b2(y,z) + * 100% 10% 90% + * + * Node structure: + * h1 : x + * h2 : x, y + * h3 : y + * h4 : y, z + * h5 : NO + * + * Each node can only allocate two containers + */ + + // set node -> label + mgr.addLabels(ImmutableSet.of("x", "y", "z")); + mgr.setLabelsOnMultipleNodes(ImmutableMap.of("h1", toSet("x"), "h2", + toSet("x", "y"), "h3", toSet("y"), "h4", toSet("y", "z"), "h5", + NodeLabelManager.EMPTY_STRING_SET)); + + // inject node label manager + MockRM rm1 = new MockRM(getComplexConfigurationWithQueueLabels(conf)) { + @Override + public NodeLabelManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 2048); + MockNM nm2 = rm1.registerNode("h2:1234", 2048); + MockNM nm3 = rm1.registerNode("h3:1234", 2048); + MockNM nm4 = rm1.registerNode("h4:1234", 2048); + MockNM nm5 = rm1.registerNode("h5:1234", 2048); + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container (label = x && y). can only allocate on nm2 + am1.allocate("*", 1024, 1, new ArrayList(), "x && y"); + containerId = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // launch an app to queue b1 (label = y), and check all container will + // be allocated in h2 + RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5); + + // request a container for AM, will succeed + // and now b1's queue capacity will be used, cannot allocate more containers + am2.allocate("*", 1024, 1, new ArrayList()); + containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm4, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertFalse(rm1.waitForState(nm5, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + + // launch an app to queue b2 + RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5); + + // request a container. try to allocate on nm1 (label = x) and nm3 (label = + // y,z). Will successfully allocate on nm3 + am3.allocate("*", 1024, 1, new ArrayList(), "y"); + containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, + "h3"); + + // try to allocate container (request label = y && z) on nm3 (label = y) and + // nm4 (label = y,z). Will sucessfully allocate on nm4 only. + am3.allocate("*", 1024, 1, new ArrayList(), "y && z"); + containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 3); + Assert.assertFalse(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm4, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, + "h4"); + + rm1.close(); + } + + @Test (timeout = 120000) + public void testContainerAllocateWithLabels() throws Exception { + final NodeLabelManager mgr = new MemoryNodeLabelManager(); + mgr.init(conf); + + // set node -> label + mgr.addLabels(ImmutableSet.of("x", "y")); + mgr.setLabelsOnMultipleNodes(ImmutableMap.of("h1", toSet("x"), + "h2", toSet("y"))); + + // inject node label manager + MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public NodeLabelManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y + MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3); + + // request a container. + am1.allocate("*", 1024, 1, new ArrayList(), "x"); + containerId = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h1"); + + // launch an app to queue b1 (label = y), and check all container will + // be allocated in h2 + RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3); + + // request a container. + am2.allocate("*", 1024, 1, new ArrayList(), "y"); + containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // launch an app to queue c1 (label = ""), and check all container will + // be allocated in h3 + RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3); + + // request a container. + am3.allocate("*", 1024, 1, new ArrayList()); + containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, + "h3"); + + rm1.close(); + } + + @Test (timeout = 120000) + public void testContainerAllocateWithDefaultQueueLabels() throws Exception { + // This test is pretty much similar to testContainerAllocateWithLabel. + // Difference is, this test doesn't specify label expression in ResourceRequest, + // instead, it uses default queue label expression + + final NodeLabelManager mgr = new MemoryNodeLabelManager(); + mgr.init(conf); + + // set node -> label + mgr.addLabels(ImmutableSet.of("x", "y")); + mgr.setLabelsOnMultipleNodes(ImmutableMap.of("h1", toSet("x"), + "h2", toSet("y"))); + + // inject node label manager + MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) { + @Override + public NodeLabelManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y + MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container. + am1.allocate("*", 1024, 1, new ArrayList()); + containerId = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, + "h1"); + + // launch an app to queue b1 (label = y), and check all container will + // be allocated in h2 + RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // request a container. + am2.allocate("*", 1024, 1, new ArrayList()); + containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1, + "h2"); + + // launch an app to queue c1 (label = ""), and check all container will + // be allocated in h3 + RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3); + + // request a container. + am3.allocate("*", 1024, 1, new ArrayList()); + containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2); + Assert.assertFalse(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + Assert.assertTrue(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, + "h3"); + + rm1.close(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index d5eb933..bfd0b8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -39,8 +39,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; -import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.label.NodeLabelManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -131,6 +133,7 @@ public void setUp() throws Exception { thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); + when(csContext.getRMContext()).thenReturn(rmContext); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(conf); containerTokenSecretManager.rollMasterKey(); @@ -141,7 +144,7 @@ public void setUp() throws Exception { CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); cs.setRMContext(rmContext); cs.init(csConf); @@ -731,6 +734,77 @@ public void testHeadroomWithMaxCap() throws Exception { a.assignContainers(clusterResource, node_1); assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap } + + @SuppressWarnings("unchecked") + @Test + public void testHeadroomWithLabel() throws Exception { + NodeLabelManager nlm = mock(NodeLabelManager.class); + + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + //unset maxCapacity + a.setMaxCapacity(1.0f); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), rmContext); + a.submitApplicationAttempt(app_0, user_0); + + // Setup some nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = + TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 64 * GB); + + final int numNodes = 1; + Resource clusterResource = Resources.createResource(numNodes * (64 * GB), 1); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory))); + + /** + * Start testing... + */ + + // Set user-limit + a.setUserLimit(100); + a.setUserLimitFactor(1); + + // 1 container to user_0 + a.assignContainers(clusterResource, node_0); + assertEquals(1 * GB, a.getUsedResources().getMemory()); + assertEquals(1 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, app_0.getHeadroom().getMemory()); // User limit = 6G + + // mock getQueueResource to 4999 MB + when( + nlm.getQueueResource(any(String.class), any(Set.class), + any(Resource.class))).thenReturn(Resource.newInstance(4999, 1)); + a.setNodeLabelManager(nlm); + + // do a resource allocation again + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority, + recordFactory))); + a.assignContainers(clusterResource, node_0); + + // current headroom should be + // Headroom = min(6G (user-limit), 4G (queueLabelResource)) - + // 2G (used-resource) = 2G + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(2 * GB, app_0.getHeadroom().getMemory()); + } @Test public void testSingleQueueWithMultipleUsers() throws Exception { @@ -1682,7 +1756,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); queues = newQueues; root.reinitialize(newRoot, cs.getClusterResource()); @@ -1707,7 +1781,7 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception { CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); queues = newQueues; root.reinitialize(newRoot, cs.getClusterResource()); @@ -2025,6 +2099,7 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh() Resource clusterResource = Resources .createResource(100 * 16 * GB, 100 * 32); CapacitySchedulerContext csContext = mockCSContext(csConf, clusterResource); + when(csContext.getRMContext()).thenReturn(rmContext); csConf.setFloat(CapacitySchedulerConfiguration. MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.1f); ParentQueue root = new ParentQueue(csContext, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index fa9edb1..ea4b3d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -94,6 +94,7 @@ public void setUp() throws Exception { thenReturn(CapacityScheduler.queueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); + when(csContext.getRMContext()).thenReturn(rmContext); } private static final String A = "a"; @@ -203,7 +204,7 @@ public void testSingleLevelQueues() throws Exception { CSQueue root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); // Setup some nodes final int memoryPerNode = 10; @@ -297,7 +298,7 @@ public void testSingleLevelQueuesPrecision() throws Exception { try { CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); } catch (IllegalArgumentException ie) { exceptionOccured = true; } @@ -311,7 +312,7 @@ public void testSingleLevelQueuesPrecision() throws Exception { try { CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); } catch (IllegalArgumentException ie) { exceptionOccured = true; } @@ -325,7 +326,7 @@ public void testSingleLevelQueuesPrecision() throws Exception { try { CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); } catch (IllegalArgumentException ie) { exceptionOccured = true; } @@ -402,7 +403,7 @@ public void testMultiLevelQueues() throws Exception { CSQueue root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); // Setup some nodes final int memoryPerNode = 10; @@ -518,7 +519,7 @@ public void testQueueCapacitySettingChildZero() throws Exception { Map queues = new HashMap(); CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); } @Test (expected=IllegalArgumentException.class) @@ -535,7 +536,7 @@ public void testQueueCapacitySettingParentZero() throws Exception { Map queues = new HashMap(); CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); } @Test @@ -557,7 +558,7 @@ public void testQueueCapacityZero() throws Exception { try { CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); } catch (IllegalArgumentException e) { fail("Failed to create queues with 0 capacity: " + e); } @@ -573,7 +574,7 @@ public void testOffSwitchScheduling() throws Exception { CSQueue root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); // Setup some nodes final int memoryPerNode = 10; @@ -639,7 +640,7 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { CSQueue root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); // Setup some nodes final int memoryPerNode = 10; @@ -723,7 +724,7 @@ public void testQueueAcl() throws Exception { CSQueue root = CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, - TestUtils.spyHook); + TestUtils.spyHook, null); UserGroupInformation user = UserGroupInformation.getCurrentUser(); // Setup queue configs diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index a3b990c..e33caf7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -18,23 +18,40 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import org.junit.Assert; +import java.io.IOException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Matchers.any; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.label.NodeLabelManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import com.google.common.collect.ImmutableSet; + public class TestQueueParsing { private static final Log LOG = LogFactory.getLog(TestQueueParsing.class); private static final double DELTA = 0.000001; + private NodeLabelManager nodeLabelManager; + + @Before + public void setup() { + nodeLabelManager = mock(NodeLabelManager.class); + when(nodeLabelManager.containsLabel(any(String.class))).thenReturn(true); + } + @Test public void testQueueParsing() throws Exception { CapacitySchedulerConfiguration csConf = @@ -202,4 +219,150 @@ public void testMaxCapacity() throws Exception { capacityScheduler.stop(); } + private void setupQueueConfigurationWithoutLabels(CapacitySchedulerConfiguration conf) { + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 15); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 90); + + LOG.info("Setup top-level queues"); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + final String A2 = A + ".a2"; + conf.setQueues(A, new String[] {"a1", "a2"}); + conf.setCapacity(A1, 30); + conf.setMaximumCapacity(A1, 45); + conf.setCapacity(A2, 70); + conf.setMaximumCapacity(A2, 85); + + final String B1 = B + ".b1"; + final String B2 = B + ".b2"; + final String B3 = B + ".b3"; + conf.setQueues(B, new String[] {"b1", "b2", "b3"}); + conf.setCapacity(B1, 50); + conf.setMaximumCapacity(B1, 85); + conf.setCapacity(B2, 30); + conf.setMaximumCapacity(B2, 35); + conf.setCapacity(B3, 20); + conf.setMaximumCapacity(B3, 35); + } + + private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration conf) { + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 15); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 90); + + LOG.info("Setup top-level queues"); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + final String A2 = A + ".a2"; + conf.setQueues(A, new String[] {"a1", "a2"}); + conf.setLabels(A, ImmutableSet.of("*")); + conf.setCapacity(A1, 30); + conf.setMaximumCapacity(A1, 45); + conf.setCapacity(A2, 70); + conf.setMaximumCapacity(A2, 85); + conf.setLabels(A2, ImmutableSet.of("red")); + + final String B1 = B + ".b1"; + final String B2 = B + ".b2"; + final String B3 = B + ".b3"; + conf.setQueues(B, new String[] {"b1", "b2", "b3"}); + conf.setLabels(B, ImmutableSet.of("red", "blue")); + conf.setCapacity(B1, 50); + conf.setMaximumCapacity(B1, 85); + conf.setCapacity(B2, 30); + conf.setMaximumCapacity(B2, 35); + conf.setCapacity(B3, 20); + conf.setMaximumCapacity(B3, 35); + } + + @Test(timeout = 5000) + public void testQueueParsingReinitializeWithLabels() throws IOException { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithoutLabels(csConf); + YarnConfiguration conf = new YarnConfiguration(csConf); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + rmContext.setNodeLabelManager(nodeLabelManager); + capacityScheduler.setConf(conf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(conf); + capacityScheduler.start(); + csConf = new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithLabels(csConf); + conf = new YarnConfiguration(csConf); + capacityScheduler.reinitialize(conf, rmContext); + checkQueueLabels(capacityScheduler); + capacityScheduler.stop(); + } + + private void checkQueueLabels(CapacityScheduler capacityScheduler) { + // by default, label is empty + Assert.assertTrue(capacityScheduler + .getQueue(CapacitySchedulerConfiguration.ROOT).getLabels().isEmpty()); + + // queue-A is * + Assert.assertTrue(capacityScheduler + .getQueue("a").getLabels().contains("*")); + + // queue-A1 inherits A's configuration + Assert.assertTrue(capacityScheduler + .getQueue("a1").getLabels().contains("*")); + + // queue-A2 is "red" + Assert.assertEquals(1, capacityScheduler + .getQueue("a2").getLabels().size()); + Assert.assertTrue(capacityScheduler + .getQueue("a2").getLabels().contains("red")); + + // queue-B is "red"/"blue" + Assert.assertTrue(capacityScheduler + .getQueue("b").getLabels().containsAll(ImmutableSet.of("red", "blue"))); + + // queue-B2 inherits "red"/"blue" + Assert.assertTrue(capacityScheduler + .getQueue("b2").getLabels().containsAll(ImmutableSet.of("red", "blue"))); + } + + @Test(timeout = 5000) + public void testQueueParsingWithLabels() throws IOException { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithLabels(csConf); + YarnConfiguration conf = new YarnConfiguration(csConf); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + rmContext.setNodeLabelManager(nodeLabelManager); + capacityScheduler.setConf(conf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(conf); + capacityScheduler.start(); + checkQueueLabels(capacityScheduler); + capacityScheduler.stop(); + } }