diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 0e6207b..9f91bca 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -188,6 +188,23 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
new file mode 100644
index 0000000..f251dfc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -0,0 +1,444 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.Sets;
+
+public abstract class AbstractCSQueue implements CSQueue {
+
+ CSQueue parent;
+ final String queueName;
+
+ float capacity;
+ float maximumCapacity;
+ float absoluteCapacity;
+ float absoluteMaxCapacity;
+ float absoluteUsedCapacity = 0.0f;
+
+ float usedCapacity = 0.0f;
+ volatile int numApplications;
+ volatile int numContainers;
+
+ final Resource minimumAllocation;
+ final Resource maximumAllocation;
+ QueueState state;
+ final QueueMetrics metrics;
+
+ final ResourceCalculator resourceCalculator;
+ Set labels;
+ DynamicNodeLabelsManager labelManager;
+ String defaultLabelExpression;
+ Resource usedResources = Resources.createResource(0, 0);
+ QueueInfo queueInfo;
+ final Comparator queueComparator;
+ Map absoluteNodeLabelCapacities;
+ Map nodeLabelCapacities;
+ Map usedResourcesByLabels = new HashMap();
+ Map absoluteMaximumNodeLabelCapacities;
+ Map maximumNodeLabelCapacities;
+
+ Map acls =
+ new HashMap();
+ boolean reservationsContinueLooking;
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ public AbstractCSQueue(CapacitySchedulerContext cs,
+ String queueName, CSQueue parent, CSQueue old) throws IOException {
+ this.minimumAllocation = cs.getMinimumResourceCapability();
+ this.maximumAllocation = cs.getMaximumResourceCapability();
+ this.labelManager = cs.getRMContext().getNodeLabelManager();
+ this.parent = parent;
+ this.queueName = queueName;
+ this.resourceCalculator = cs.getResourceCalculator();
+ this.queueComparator = cs.getQueueComparator();
+ this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
+
+ // must be called after parent and queueName is set
+ this.metrics = old != null ? old.getMetrics() :
+ QueueMetrics.forQueue(getQueuePath(), parent,
+ cs.getConfiguration().getEnableUserMetrics(),
+ cs.getConf());
+
+ // get labels
+ this.labels = cs.getConfiguration().getAccessibleLabels(getQueuePath());
+ this.defaultLabelExpression = cs.getConfiguration()
+ .getDefaultNodeLabelExpression(getQueuePath());
+
+ this.queueInfo.setQueueName(queueName);
+
+ // inherit from parent if labels not set
+ if (this.labels == null && parent != null) {
+ this.labels = parent.getAccessibleLabels();
+ SchedulerUtils.checkAndThrowIfLabelNotIncluded(labelManager, this.labels);
+ }
+
+ // inherit from parent if labels not set
+ if (this.defaultLabelExpression == null && parent != null
+ && this.labels.containsAll(parent.getAccessibleLabels())) {
+ this.defaultLabelExpression = parent.getDefaultLabelExpression();
+ }
+
+ // set capacity by labels
+ nodeLabelCapacities =
+ cs.getConfiguration().getNodeLabelCapacities(getQueuePath(), labels);
+
+ // set maximum capacity by labels
+ maximumNodeLabelCapacities =
+ cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(),
+ labels);
+ }
+
+ @Override
+ public synchronized float getCapacity() {
+ return capacity;
+ }
+
+ @Override
+ public synchronized float getAbsoluteCapacity() {
+ return absoluteCapacity;
+ }
+
+ @Override
+ public float getAbsoluteMaximumCapacity() {
+ return absoluteMaxCapacity;
+ }
+
+ @Override
+ public synchronized float getAbsoluteUsedCapacity() {
+ return absoluteUsedCapacity;
+ }
+
+ @Override
+ public float getMaximumCapacity() {
+ return maximumCapacity;
+ }
+
+ @Override
+ public synchronized float getUsedCapacity() {
+ return usedCapacity;
+ }
+
+ @Override
+ public synchronized Resource getUsedResources() {
+ return usedResources;
+ }
+
+ public synchronized int getNumContainers() {
+ return numContainers;
+ }
+
+ public synchronized int getNumApplications() {
+ return numApplications;
+ }
+
+ @Override
+ public synchronized QueueState getState() {
+ return state;
+ }
+
+ @Override
+ public QueueMetrics getMetrics() {
+ return metrics;
+ }
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
+
+ @Override
+ public synchronized CSQueue getParent() {
+ return parent;
+ }
+
+ @Override
+ public synchronized void setParent(CSQueue newParentQueue) {
+ this.parent = (ParentQueue)newParentQueue;
+ }
+
+ public Set getAccessibleLabels() {
+ return labels;
+ }
+
+ @Override
+ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
+ synchronized (this) {
+ if (acls.get(acl).isUserAllowed(user)) {
+ return true;
+ }
+ }
+
+ if (parent != null) {
+ return parent.hasAccess(acl, user);
+ }
+
+ return false;
+ }
+
+ @Override
+ public synchronized void setUsedCapacity(float usedCapacity) {
+ this.usedCapacity = usedCapacity;
+ }
+
+ @Override
+ public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
+ this.absoluteUsedCapacity = absUsedCapacity;
+ }
+
+ /**
+ * Set maximum capacity - used only for testing.
+ * @param maximumCapacity new max capacity
+ */
+ synchronized void setMaxCapacity(float maximumCapacity) {
+ // Sanity check
+ CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
+ float absMaxCapacity =
+ CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
+ CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity,
+ absMaxCapacity);
+
+ this.maximumCapacity = maximumCapacity;
+ this.absoluteMaxCapacity = absMaxCapacity;
+ }
+
+ @Override
+ public float getAbsActualCapacity() {
+ // for now, simply return actual capacity = guaranteed capacity for parent
+ // queue
+ return absoluteCapacity;
+ }
+
+ @Override
+ public String getDefaultLabelExpression() {
+ return defaultLabelExpression;
+ }
+
+ synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
+ float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
+ QueueState state, Map acls,
+ Set labels, String defaultLabelExpression,
+ Map nodeLabelCapacities,
+ Map maximumNodeLabelCapacities, boolean continueLooking)
+ throws IOException {
+ // Sanity check
+ CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
+ CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity,
+ absoluteMaxCapacity);
+
+ this.capacity = capacity;
+ this.absoluteCapacity = absoluteCapacity;
+
+ this.maximumCapacity = maximumCapacity;
+ this.absoluteMaxCapacity = absoluteMaxCapacity;
+
+ this.state = state;
+
+ this.acls = acls;
+
+ // set labels
+ this.labels = labels;
+
+ // set label expression
+ this.defaultLabelExpression = defaultLabelExpression;
+
+ // copy node label capacity
+ this.nodeLabelCapacities = new HashMap(nodeLabelCapacities);
+ this.maximumNodeLabelCapacities =
+ new HashMap(maximumNodeLabelCapacities);
+
+ this.queueInfo.setNodeLabels(this.labels);
+ this.queueInfo.setCapacity(this.capacity);
+ this.queueInfo.setMaximumCapacity(this.maximumCapacity);
+ this.queueInfo.setQueueState(this.state);
+ this.queueInfo.setDefaultNodeLabelExpression(this.defaultLabelExpression);
+
+ // Update metrics
+ CSQueueUtils.updateQueueStatistics(
+ resourceCalculator, this, parent, clusterResource, minimumAllocation);
+
+ // Check if labels of this queue is a subset of parent queue, only do this
+ // when we not root
+ if (parent != null && parent.getParent() != null) {
+ if (parent.getAccessibleLabels() != null
+ && !parent.getAccessibleLabels().contains(DynamicNodeLabelsManager.ANY)) {
+ // if parent isn't "*", child shouldn't be "*" too
+ if (this.getAccessibleLabels().contains(DynamicNodeLabelsManager.ANY)) {
+ throw new IOException("Parent's accessible queue is not ANY(*), "
+ + "but child's accessible queue is *");
+ } else {
+ Set diff =
+ Sets.difference(this.getAccessibleLabels(),
+ parent.getAccessibleLabels());
+ if (!diff.isEmpty()) {
+ throw new IOException("Some labels of child queue is not a subset "
+ + "of parent queue, these labels=["
+ + StringUtils.join(diff, ",") + "]");
+ }
+ }
+ }
+ }
+
+ // calculate absolute capacity by each node label
+ this.absoluteNodeLabelCapacities =
+ CSQueueUtils.computeAbsoluteNodeLabelCapacities(
+ this.nodeLabelCapacities, parent);
+
+ // calculate maximum capacity by each node label
+ this.absoluteMaximumNodeLabelCapacities =
+ CSQueueUtils.computeAbsoluteMaximumNodeLabelCapacities(
+ maximumNodeLabelCapacities, parent);
+
+ // check absoluteMaximumNodeLabelCapacities is valid
+ CSQueueUtils.checkAbsoluteCapacitiesByLabel(getQueueName(),
+ absoluteNodeLabelCapacities, absoluteNodeLabelCapacities);
+
+ this.reservationsContinueLooking = continueLooking;
+ }
+
+ @Private
+ public Resource getMaximumAllocation() {
+ return maximumAllocation;
+ }
+
+ @Private
+ public Resource getMinimumAllocation() {
+ return minimumAllocation;
+ }
+
+ synchronized void allocateResource(Resource clusterResource,
+ Resource resource, Set nodeLabels) {
+ Resources.addTo(usedResources, resource);
+
+ // Update usedResources by labels
+ if (nodeLabels == null || nodeLabels.isEmpty()) {
+ if (!usedResourcesByLabels.containsKey(DynamicNodeLabelsManager.NO_LABEL)) {
+ usedResourcesByLabels.put(DynamicNodeLabelsManager.NO_LABEL,
+ Resources.createResource(0));
+ }
+ Resources.addTo(usedResourcesByLabels.get(DynamicNodeLabelsManager.NO_LABEL),
+ resource);
+ } else {
+ for (String label : Sets.intersection(labels, nodeLabels)) {
+ if (!usedResourcesByLabels.containsKey(label)) {
+ usedResourcesByLabels.put(label, Resources.createResource(0));
+ }
+ Resources.addTo(usedResourcesByLabels.get(label), resource);
+ }
+ }
+
+ ++numContainers;
+ CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
+ clusterResource, minimumAllocation);
+ }
+
+ synchronized void releaseResource(Resource clusterResource,
+ Resource resource, Set nodeLabels) {
+ // Update queue metrics
+ Resources.subtractFrom(usedResources, resource);
+
+ // Update usedResources by labels
+ if (nodeLabels.isEmpty()) {
+ if (!usedResourcesByLabels.containsKey(DynamicNodeLabelsManager.NO_LABEL)) {
+ usedResourcesByLabels.put(DynamicNodeLabelsManager.NO_LABEL,
+ Resources.createResource(0));
+ }
+ Resources.subtractFrom(
+ usedResourcesByLabels.get(DynamicNodeLabelsManager.NO_LABEL), resource);
+ } else {
+ for (String label : Sets.intersection(labels, nodeLabels)) {
+ if (!usedResourcesByLabels.containsKey(label)) {
+ usedResourcesByLabels.put(label, Resources.createResource(0));
+ }
+ Resources.subtractFrom(usedResourcesByLabels.get(label), resource);
+ }
+ }
+
+ CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
+ clusterResource, minimumAllocation);
+ --numContainers;
+ }
+
+ @Private
+ public float getCapacityByNodeLabel(String label) {
+ if (StringUtils.equals(label, DynamicNodeLabelsManager.NO_LABEL)) {
+ return getCapacity();
+ }
+
+ if (!nodeLabelCapacities.containsKey(label)) {
+ return 0;
+ } else {
+ return nodeLabelCapacities.get(label);
+ }
+ }
+
+ @Private
+ public float getAbsoluteCapacityByNodeLabel(String label) {
+ if (StringUtils.equals(label, DynamicNodeLabelsManager.NO_LABEL)) {
+ return getAbsoluteCapacity();
+ }
+
+ if (null == parent) {
+ return 1;
+ }
+
+ if (!absoluteMaximumNodeLabelCapacities.containsKey(label)) {
+ return 0;
+ } else {
+ return absoluteMaximumNodeLabelCapacities.get(label);
+ }
+ }
+
+ @Private
+ public float getAbsoluteMaximumCapacityByNodeLabel(String label) {
+ if (StringUtils.equals(label, DynamicNodeLabelsManager.NO_LABEL)) {
+ return getAbsoluteMaximumCapacity();
+ }
+
+ return getAbsoluteCapacityByNodeLabel(label);
+ }
+
+ @Private
+ public boolean getReservationContinueLooking() {
+ return reservationsContinueLooking;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index db893dc..031bbad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -72,9 +72,18 @@
/**
* Get the configured capacity of the queue.
- * @return queue capacity
+ * @return configured queue capacity
*/
public float getCapacity();
+
+ /**
+ * Get actual capacity of the queue, this may be different from
+ * configured capacity when mis-config take place, like add labels to the
+ * cluster
+ *
+ * @return actual queue capacity
+ */
+ public float getAbsActualCapacity();
/**
* Get capacity of the parent of the queue as a function of the
@@ -259,4 +268,25 @@ public void detachContainer(Resource clusterResource,
*/
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer container);
+
+ /**
+ * Get absolute capacity by label of this queue can use
+ * @param nodeLabel
+ * @return absolute capacity by label of this queue can use
+ */
+ public float getAbsoluteCapacityByNodeLabel(String nodeLabel);
+
+ /**
+ * Get absolute max capacity by label of this queue can use
+ * @param nodeLabel
+ * @return absolute capacity by label of this queue can use
+ */
+ public float getAbsoluteMaximumCapacityByNodeLabel(String nodeLabel);
+
+ /**
+ * Get capacity by node label
+ * @param nodeLabel
+ * @return capacity by node label
+ */
+ public float getCapacityByNodeLabel(String nodeLabel);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 737062b..6f94119 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -17,9 +17,12 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -40,7 +43,7 @@ public static void checkMaxCapacity(String queueName,
}
}
- public static void checkAbsoluteCapacities(String queueName,
+ public static void checkAbsoluteCapacity(String queueName,
float absCapacity, float absMaxCapacity) {
if (absMaxCapacity < (absCapacity - EPSILON)) {
throw new IllegalArgumentException("Illegal call to setMaxCapacity. "
@@ -49,6 +52,23 @@ public static void checkAbsoluteCapacities(String queueName,
+ ")");
}
}
+
+ public static void checkAbsoluteCapacitiesByLabel(String queueName,
+ Map absCapacities,
+ Map absMaximumCapacities) {
+ for (Entry entry : absCapacities.entrySet()) {
+ String label = entry.getKey();
+ float absCapacity = entry.getValue();
+ float absMaxCapacity = absMaximumCapacities.get(label);
+ if (absMaxCapacity < (absCapacity - EPSILON)) {
+ throw new IllegalArgumentException("Illegal call to setMaxCapacity. "
+ + "Queue '" + queueName + "' has " + "an absolute capacity ("
+ + absCapacity + ") greater than "
+ + "its absolute maximumCapacity (" + absMaxCapacity + ") of label="
+ + label);
+ }
+ }
+ }
public static float computeAbsoluteMaximumCapacity(
float maximumCapacity, CSQueue parent) {
@@ -56,6 +76,39 @@ public static float computeAbsoluteMaximumCapacity(
(parent == null) ? 1.0f : parent.getAbsoluteMaximumCapacity();
return (parentAbsMaxCapacity * maximumCapacity);
}
+
+ public static Map computeAbsoluteNodeLabelCapacities(
+ Map nodeLabelToCapacities, CSQueue parent) {
+ if (parent == null) {
+ return nodeLabelToCapacities;
+ }
+
+ Map absoluteNodeLabelToCapacities =
+ new HashMap();
+ for (Entry entry : nodeLabelToCapacities.entrySet()) {
+ String label = entry.getKey();
+ float capacity = entry.getValue();
+ absoluteNodeLabelToCapacities.put(label,
+ capacity * parent.getAbsoluteCapacityByNodeLabel(label));
+ }
+ return absoluteNodeLabelToCapacities;
+ }
+
+ public static Map computeAbsoluteMaximumNodeLabelCapacities(
+ Map maximumNodeLabelToCapacities, CSQueue parent) {
+ if (parent == null) {
+ return maximumNodeLabelToCapacities;
+ }
+ Map absoluteMaximumNodeLabelToCapacities =
+ new HashMap();
+ for (Entry entry : maximumNodeLabelToCapacities.entrySet()) {
+ String label = entry.getKey();
+ float maxCapacity = entry.getValue();
+ absoluteMaximumNodeLabelToCapacities.put(label,
+ maxCapacity * parent.getAbsoluteMaximumCapacityByNodeLabel(label));
+ }
+ return absoluteMaximumNodeLabelToCapacities;
+ }
public static int computeMaxActiveApplications(
ResourceCalculator calculator,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6a3c7dc..2202a8a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -20,7 +20,15 @@
import java.io.IOException;
import java.io.InputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,8 +61,13 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -191,6 +204,7 @@ public Configuration getConf() {
private boolean scheduleAsynchronously;
private AsyncScheduleThread asyncSchedulerThread;
+ private DynamicNodeLabelsManager labelManager;
/**
* EXPERT
@@ -275,6 +289,8 @@ private synchronized void initScheduler(Configuration configuration) throws
this.applications =
new ConcurrentHashMap>();
+ this.labelManager = rmContext.getNodeLabelManager();
+
initializeQueues(this.conf);
scheduleAsynchronously = this.conf.getScheduleAynschronously();
@@ -446,7 +462,7 @@ private void initializeQueues(CapacitySchedulerConfiguration conf)
root =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
queues, queues, noop);
-
+ labelManager.reinitializeQueueLabels(getQueueToLabels());
LOG.info("Initialized root queue " + root);
initializeQueueMappings();
}
@@ -469,6 +485,16 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf)
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
initializeQueueMappings();
+
+ labelManager.reinitializeQueueLabels(getQueueToLabels());
+ }
+
+ private Map> getQueueToLabels() {
+ Map> queueToLabels = new HashMap>();
+ for (CSQueue queue : queues.values()) {
+ queueToLabels.put(queue.getQueueName(), queue.getAccessibleLabels());
+ }
+ return queueToLabels;
}
/**
@@ -511,7 +537,7 @@ private void addNewQueues(
@Lock(CapacityScheduler.class)
static CSQueue parseQueue(
- CapacitySchedulerContext csContext,
+ CapacitySchedulerContext csContext,
CapacitySchedulerConfiguration conf,
CSQueue parent, String queueName, Map queues,
Map oldQueues,
@@ -1077,11 +1103,18 @@ public void handle(SchedulerEvent event) {
}
private synchronized void addNode(RMNode nodeManager) {
+ // update this node to node label manager
+ if (labelManager != null) {
+ labelManager.activateNode(nodeManager.getNodeID(),
+ nodeManager.getTotalCapability());
+ }
+
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
usePortForNodeName));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
int numNodes = numNodeManagers.incrementAndGet();
+
LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource);
@@ -1091,6 +1124,11 @@ private synchronized void addNode(RMNode nodeManager) {
}
private synchronized void removeNode(RMNode nodeInfo) {
+ // update this node to node label manager
+ if (labelManager != null) {
+ labelManager.deactivateNode(nodeInfo.getNodeID());
+ }
+
FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID());
if (node == null) {
return;
@@ -1124,6 +1162,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
}
this.nodes.remove(nodeInfo.getNodeID());
+
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
" clusterResource: " + clusterResource);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index b1f239c..a0363ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -18,7 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +39,7 @@
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -83,6 +92,12 @@
public static final String STATE = "state";
@Private
+ public static final String ACCESSIBLE_LABELS = "accessible-labels";
+
+ @Private
+ public static final String DEFAULT_NODE_LABEL_EXPRESSION =
+ "default-node-label-expression";
+
public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX
+ "reservations-continue-look-all-nodes";
@@ -268,6 +283,10 @@ private String getQueuePrefix(String queue) {
return queueName;
}
+ private String getNodeLabelPrefix(String queue, String label) {
+ return getQueuePrefix(queue) + ACCESSIBLE_LABELS + DOT + label + DOT;
+ }
+
public int getMaximumSystemApplications() {
int maxApplications =
getInt(MAXIMUM_SYSTEM_APPLICATIONS, DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS);
@@ -343,6 +362,15 @@ public void setMaximumCapacity(String queue, float maxCapacity) {
", maxCapacity=" + maxCapacity);
}
+ public void setCapacityByLabel(String queue, String label, float capacity) {
+ setFloat(getNodeLabelPrefix(queue, label) + CAPACITY, capacity);
+ }
+
+ public void setMaximumCapacityByLabel(String queue, String label,
+ float capacity) {
+ setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity);
+ }
+
public int getUserLimit(String queue) {
int userLimit = getInt(getQueuePrefix(queue) + USER_LIMIT,
DEFAULT_USER_LIMIT);
@@ -372,6 +400,97 @@ public QueueState getState(String queue) {
QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING;
}
+ public void setAccessibleLabels(String queue, Set labels) {
+ if (labels == null) {
+ return;
+ }
+ String str = StringUtils.join(",", labels);
+ set(getQueuePrefix(queue) + ACCESSIBLE_LABELS, str);
+ }
+
+ public Set getAccessibleLabels(String queue) {
+ String labelStr = get(getQueuePrefix(queue) + ACCESSIBLE_LABELS);
+ if (labelStr == null) {
+ return queue.equals(ROOT) ? DynamicNodeLabelsManager.EMPTY_STRING_SET : null;
+ } else {
+ Set set = new HashSet();
+ for (String str : labelStr.split(",")) {
+ if (!str.trim().isEmpty()) {
+ set.add(str.trim());
+ }
+ }
+ // if labels contains "*", only leave ANY behind
+ if (set.contains(DynamicNodeLabelsManager.ANY)) {
+ set.clear();
+ set.add(DynamicNodeLabelsManager.ANY);
+ }
+ return Collections.unmodifiableSet(set);
+ }
+ }
+
+ public Map getNodeLabelCapacities(String queue,
+ Set labels) {
+ Map nodeLabelCapacities = new HashMap();
+
+ if (labels == null) {
+ return nodeLabelCapacities;
+ }
+
+ for (String label : labels) {
+ // capacity of all labels in each queue should be 1
+ if (org.apache.commons.lang.StringUtils.equals(ROOT, queue)) {
+ nodeLabelCapacities.put(label, 1.0f);
+ continue;
+ }
+ float capacity =
+ getFloat(getNodeLabelPrefix(queue, label) + CAPACITY, UNDEFINED);
+ if (capacity < MINIMUM_CAPACITY_VALUE
+ || capacity > MAXIMUM_CAPACITY_VALUE) {
+ throw new IllegalArgumentException("Illegal " + "capacity of "
+ + capacity + " for label=" + label + " in queue=" + queue);
+ }
+ LOG.debug("CSConf - getCapacityOfLabel: prefix="
+ + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity);
+
+ nodeLabelCapacities.put(label, capacity / 100f);
+ }
+ return nodeLabelCapacities;
+ }
+
+ public Map getMaximumNodeLabelCapacities(String queue,
+ Set labels) {
+ Map maximumNodeLabelCapacities = new HashMap();
+ if (labels == null) {
+ return maximumNodeLabelCapacities;
+ }
+
+ for (String label : labels) {
+ float maxCapacity =
+ getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
+ UNDEFINED);
+ maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ?
+ MAXIMUM_CAPACITY_VALUE : maxCapacity;
+ if (maxCapacity < MINIMUM_CAPACITY_VALUE
+ || maxCapacity > MAXIMUM_CAPACITY_VALUE) {
+ throw new IllegalArgumentException("Illegal " + "capacity of "
+ + maxCapacity + " for label=" + label + " in queue=" + queue);
+ }
+ LOG.debug("CSConf - getCapacityOfLabel: prefix="
+ + getNodeLabelPrefix(queue, label) + ", capacity=" + maxCapacity);
+
+ maximumNodeLabelCapacities.put(label, maxCapacity / 100f);
+ }
+ return maximumNodeLabelCapacities;
+ }
+
+ public String getDefaultNodeLabelExpression(String queue) {
+ return get(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION);
+ }
+
+ public void setDefaultNodeLabelExpression(String queue, String exp) {
+ set(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION, exp);
+ }
+
/*
* Returns whether we should continue to look at all heart beating nodes even
* after the reservation limit was hit. The node heart beating in could
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index f0cff71..163f446 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -24,12 +24,14 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -52,36 +54,30 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
@Private
@Unstable
-public class LeafQueue implements CSQueue {
+public class LeafQueue extends AbstractCSQueue {
private static final Log LOG = LogFactory.getLog(LeafQueue.class);
- private final String queueName;
- private CSQueue parent;
- private float capacity;
- private float absoluteCapacity;
- private float maximumCapacity;
- private float absoluteMaxCapacity;
private float absoluteUsedCapacity = 0.0f;
private int userLimit;
private float userLimitFactor;
@@ -95,10 +91,6 @@
private int maxActiveApplicationsPerUser;
private int nodeLocalityDelay;
-
- private Resource usedResources = Resources.createResource(0, 0);
- private float usedCapacity = 0.0f;
- private volatile int numContainers;
Set activeApplications;
Map applicationAttemptMap =
@@ -106,20 +98,9 @@
Set pendingApplications;
- private final Resource minimumAllocation;
- private final Resource maximumAllocation;
private final float minimumAllocationFactor;
private Map users = new HashMap();
-
- private final QueueMetrics metrics;
-
- private QueueInfo queueInfo;
-
- private QueueState state;
-
- private Map acls =
- new HashMap();
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -127,27 +108,18 @@
private CapacitySchedulerContext scheduler;
private final ActiveUsersManager activeUsersManager;
-
- private final ResourceCalculator resourceCalculator;
+
+ // cache last cluster resource to compute actual capacity
+ private Resource lastClusterResource = Resources.none();
private boolean reservationsContinueLooking;
public LeafQueue(CapacitySchedulerContext cs,
- String queueName, CSQueue parent, CSQueue old) {
+ String queueName, CSQueue parent, CSQueue old) throws IOException {
+ super(cs, queueName, parent, old);
this.scheduler = cs;
- this.queueName = queueName;
- this.parent = parent;
-
- this.resourceCalculator = cs.getResourceCalculator();
- // must be after parent and queueName are initialized
- this.metrics = old != null ? old.getMetrics() :
- QueueMetrics.forQueue(getQueuePath(), parent,
- cs.getConfiguration().getEnableUserMetrics(),
- cs.getConf());
this.activeUsersManager = new ActiveUsersManager(metrics);
- this.minimumAllocation = cs.getMinimumResourceCapability();
- this.maximumAllocation = cs.getMaximumResourceCapability();
this.minimumAllocationFactor =
Resources.ratio(resourceCalculator,
Resources.subtract(maximumAllocation, minimumAllocation),
@@ -165,7 +137,8 @@ public LeafQueue(CapacitySchedulerContext cs,
float userLimitFactor =
cs.getConfiguration().getUserLimitFactor(getQueuePath());
- int maxApplications = cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
+ int maxApplications =
+ cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
maxApplications = (int)(maxSystemApps * absoluteCapacity);
@@ -185,12 +158,10 @@ public LeafQueue(CapacitySchedulerContext cs,
resourceCalculator,
cs.getClusterResource(), this.minimumAllocation,
maxAMResourcePerQueuePercent, absoluteCapacity);
- int maxActiveApplicationsPerUser =
- CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit,
- userLimitFactor);
+ int maxActiveApplicationsPerUser =
+ CSQueueUtils.computeMaxActiveApplicationsPerUser(
+ maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
- this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
- this.queueInfo.setQueueName(queueName);
this.queueInfo.setChildQueues(new ArrayList());
QueueState state = cs.getConfiguration().getState(getQueuePath());
@@ -198,14 +169,13 @@ public LeafQueue(CapacitySchedulerContext cs,
Map acls =
cs.getConfiguration().getAcls(getQueuePath());
- setupQueueConfigs(
- cs.getClusterResource(),
- capacity, absoluteCapacity,
- maximumCapacity, absoluteMaxCapacity,
- userLimit, userLimitFactor,
+ setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
+ maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
- maxActiveApplications, maxActiveApplicationsPerUser, state, acls,
- cs.getConfiguration().getNodeLocalityDelay(),
+ maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
+ .getConfiguration().getNodeLocalityDelay(), labels,
+ defaultLabelExpression, this.nodeLabelCapacities,
+ this.maximumNodeLabelCapacities,
cs.getConfiguration().getReservationContinueLook());
if(LOG.isDebugEnabled()) {
@@ -219,7 +189,7 @@ public LeafQueue(CapacitySchedulerContext cs,
new TreeSet(applicationComparator);
this.activeApplications = new TreeSet(applicationComparator);
}
-
+
// externalizing in method, to allow overriding
protected float getCapacityFromConf() {
return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100;
@@ -234,19 +204,22 @@ protected synchronized void setupQueueConfigs(
int maxApplicationsPerUser, int maxActiveApplications,
int maxActiveApplicationsPerUser, QueueState state,
Map acls, int nodeLocalityDelay,
- boolean continueLooking)
- {
+ Set labels, String defaultLabelExpression,
+ Map capacitieByLabel,
+ Map maximumCapacitiesByLabel, boolean continueLooking)
+ throws IOException {
+ super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
+ maximumCapacity, absoluteMaxCapacity, state, acls, labels,
+ defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel,
+ continueLooking);
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
float absCapacity = getParent().getAbsoluteCapacity() * capacity;
- CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity);
+ CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity,
+ absoluteMaxCapacity);
- this.capacity = capacity;
this.absoluteCapacity = absCapacity;
- this.maximumCapacity = maximumCapacity;
- this.absoluteMaxCapacity = absoluteMaxCapacity;
-
this.userLimit = userLimit;
this.userLimitFactor = userLimitFactor;
@@ -256,14 +229,20 @@ protected synchronized void setupQueueConfigs(
this.maxActiveApplications = maxActiveApplications;
this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser;
-
- this.state = state;
- this.acls = acls;
-
- this.queueInfo.setCapacity(this.capacity);
- this.queueInfo.setMaximumCapacity(this.maximumCapacity);
- this.queueInfo.setQueueState(this.state);
+ if (!SchedulerUtils.checkQueueLabelExpression(this.labels,
+ this.defaultLabelExpression)) {
+ throw new IOException("Invalid default label expression of "
+ + " queue="
+ + queueInfo.getQueueName()
+ + " doesn't have permission to access all labels "
+ + "in default label expression. labelExpression of resource request="
+ + (this.defaultLabelExpression == null ? ""
+ : this.defaultLabelExpression)
+ + ". Queue labels="
+ + (queueInfo.getNodeLabels() == null ? "" : StringUtils.join(queueInfo
+ .getNodeLabels().iterator(), ',')));
+ }
this.nodeLocalityDelay = nodeLocalityDelay;
this.reservationsContinueLooking = continueLooking;
@@ -272,11 +251,14 @@ protected synchronized void setupQueueConfigs(
for (Map.Entry e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}
-
- // Update metrics
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, getParent(), clusterResource,
- minimumAllocation);
+
+ StringBuilder labelStrBuilder = new StringBuilder();
+ if (labels != null) {
+ for (String s : labels) {
+ labelStrBuilder.append(s);
+ labelStrBuilder.append(",");
+ }
+ }
LOG.info("Initializing " + queueName + "\n" +
"capacity = " + capacity +
@@ -331,50 +313,12 @@ protected synchronized void setupQueueConfigs(
" [= configuredState ]" + "\n" +
"acls = " + aclsString +
" [= configuredAcls ]" + "\n" +
+ "nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
+ "labels=" + labelStrBuilder.toString() + "\n" +
"nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
"reservationsContinueLooking = " +
reservationsContinueLooking + "\n");
}
-
- @Override
- public synchronized float getCapacity() {
- return capacity;
- }
-
- @Override
- public synchronized float getAbsoluteCapacity() {
- return absoluteCapacity;
- }
-
- @Override
- public synchronized float getMaximumCapacity() {
- return maximumCapacity;
- }
-
- @Override
- public synchronized float getAbsoluteMaximumCapacity() {
- return absoluteMaxCapacity;
- }
-
- @Override
- public synchronized float getAbsoluteUsedCapacity() {
- return absoluteUsedCapacity;
- }
-
- @Override
- public synchronized CSQueue getParent() {
- return parent;
- }
-
- @Override
- public synchronized void setParent(CSQueue newParentQueue) {
- this.parent = (ParentQueue)newParentQueue;
- }
-
- @Override
- public String getQueueName() {
- return queueName;
- }
@Override
public String getQueuePath() {
@@ -385,22 +329,6 @@ public String getQueuePath() {
* Used only by tests.
*/
@Private
- public Resource getMinimumAllocation() {
- return minimumAllocation;
- }
-
- /**
- * Used only by tests.
- */
- @Private
- public Resource getMaximumAllocation() {
- return maximumAllocation;
- }
-
- /**
- * Used only by tests.
- */
- @Private
public float getMinimumAllocationFactor() {
return minimumAllocationFactor;
}
@@ -435,45 +363,9 @@ public ActiveUsersManager getActiveUsersManager() {
}
@Override
- public synchronized float getUsedCapacity() {
- return usedCapacity;
- }
-
- @Override
- public synchronized Resource getUsedResources() {
- return usedResources;
- }
-
- @Override
public List getChildQueues() {
return null;
}
-
- @Override
- public synchronized void setUsedCapacity(float usedCapacity) {
- this.usedCapacity = usedCapacity;
- }
-
- @Override
- public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
- this.absoluteUsedCapacity = absUsedCapacity;
- }
-
- /**
- * Set maximum capacity - used only for testing.
- * @param maximumCapacity new max capacity
- */
- synchronized void setMaxCapacity(float maximumCapacity) {
- // Sanity check
- CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
- float absMaxCapacity =
- CSQueueUtils.computeAbsoluteMaximumCapacity(
- maximumCapacity, getParent());
- CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity);
-
- this.maximumCapacity = maximumCapacity;
- this.absoluteMaxCapacity = absMaxCapacity;
- }
/**
* Set user limit - used only for testing.
@@ -567,11 +459,6 @@ public int getNodeLocalityDelay() {
return nodeLocalityDelay;
}
- @Private
- boolean getReservationContinueLooking() {
- return reservationsContinueLooking;
- }
-
public String toString() {
return queueName + ": " +
"capacity=" + capacity + ", " +
@@ -582,6 +469,11 @@ public String toString() {
"numApps=" + getNumApplications() + ", " +
"numContainers=" + getNumContainers();
}
+
+ @VisibleForTesting
+ public synchronized void setNodeLabelManager(DynamicNodeLabelsManager mgr) {
+ this.labelManager = mgr;
+ }
@VisibleForTesting
public synchronized User getUser(String userName) {
@@ -631,6 +523,10 @@ public synchronized void reinitialize(
newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
newlyParsedLeafQueue.getNodeLocalityDelay(),
+ newlyParsedLeafQueue.labels,
+ newlyParsedLeafQueue.defaultLabelExpression,
+ newlyParsedLeafQueue.nodeLabelCapacities,
+ newlyParsedLeafQueue.maximumNodeLabelCapacities,
newlyParsedLeafQueue.reservationsContinueLooking);
// queue metrics are updated, more resource may be available
@@ -639,19 +535,6 @@ public synchronized void reinitialize(
}
@Override
- public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
- // Check if the leaf-queue allows access
- synchronized (this) {
- if (acls.get(acl).isUserAllowed(user)) {
- return true;
- }
- }
-
- // Check if parent-queue allows access
- return getParent().hasAccess(acl, user);
- }
-
- @Override
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName) {
// Careful! Locking order is important!
@@ -747,7 +630,8 @@ private synchronized void activateApplications() {
}
}
- private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) {
+ private synchronized void addApplicationAttempt(FiCaSchedulerApp application,
+ User user) {
// Accept
user.submitApplication();
pendingApplications.add(application);
@@ -783,7 +667,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, String queue)
getParent().finishApplicationAttempt(application, queue);
}
- public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, User user) {
+ public synchronized void removeApplicationAttempt(
+ FiCaSchedulerApp application, User user) {
boolean wasActive = activeApplications.remove(application);
if (!wasActive) {
pendingApplications.remove(application);
@@ -828,6 +713,12 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
+ " #applications=" + activeApplications.size());
}
+ // if our queue cannot access this node, just return
+ if (!SchedulerUtils.checkQueueAccessToNode(labels,
+ labelManager.getLabelsOnNode(node.getNodeID()))) {
+ return NULL_ASSIGNMENT;
+ }
+
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
@@ -888,7 +779,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
required);
// Check queue max-capacity limit
- if (!assignToQueue(clusterResource, required, application, true)) {
+ if (!assignToQueue(clusterResource, required,
+ labelManager.getLabelsOnNode(node.getNodeID()), application, true)) {
return NULL_ASSIGNMENT;
}
@@ -920,7 +812,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
// Book-keeping
// Note: Update headroom to account for current allocation too...
- allocateResource(clusterResource, application, assigned);
+ allocateResource(clusterResource, application, assigned,
+ labelManager.getLabelsOnNode(node.getNodeID()));
// Don't reset scheduling opportunities for non-local assignments
// otherwise the app will be delayed for each non-local assignment.
@@ -971,27 +864,43 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
}
-
- @Private
- protected synchronized boolean assignToQueue(Resource clusterResource,
- Resource required, FiCaSchedulerApp application,
+ synchronized boolean assignToQueue(Resource clusterResource,
+ Resource required, Set nodeLabels, FiCaSchedulerApp application,
boolean checkReservations) {
-
- Resource potentialTotalResource = Resources.add(usedResources, required);
- // Check how of the cluster's absolute capacity we are currently using...
- float potentialNewCapacity = Resources.divide(resourceCalculator,
- clusterResource, potentialTotalResource, clusterResource);
- if (potentialNewCapacity > absoluteMaxCapacity) {
+ // Get label of this queue can access, it's (nodeLabel AND queueLabel)
+ Set labelCanAccess;
+ if (null == nodeLabels || nodeLabels.isEmpty()) {
+ labelCanAccess = new HashSet();
+ // Any queue can always access any node without label
+ labelCanAccess.add(DynamicNodeLabelsManager.NO_LABEL);
+ } else {
+ labelCanAccess = new HashSet(Sets.intersection(labels, nodeLabels));
+ }
+
+ boolean canAssign = false;
+ for (String label : labelCanAccess) {
+ if (!usedResourcesByLabels.containsKey(label)) {
+ usedResourcesByLabels.put(label, Resources.createResource(0));
+ }
+
+ Resource potentialTotalCapacity =
+ Resources.add(usedResourcesByLabels.get(label), required);
+
+ float potenialNewCapacity =
+ Resources.divide(resourceCalculator, clusterResource,
+ potentialTotalCapacity,
+ labelManager.getResourceByLabel(label, clusterResource));
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
- if (this.reservationsContinueLooking && checkReservations) {
-
+ // TODO, now only consider reservation cases when the node has no label
+ if (this.reservationsContinueLooking && checkReservations
+ && label.equals(DynamicNodeLabelsManager.NO_LABEL)) {
float potentialNewWithoutReservedCapacity = Resources.divide(
resourceCalculator,
clusterResource,
- Resources.subtract(potentialTotalResource,
- application.getCurrentReservation()),
- clusterResource);
+ Resources.subtract(potentialTotalCapacity,
+ application.getCurrentReservation()),
+ labelManager.getResourceByLabel(label, clusterResource));
if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) {
if (LOG.isDebugEnabled()) {
@@ -1013,33 +922,41 @@ protected synchronized boolean assignToQueue(Resource clusterResource,
// we could potentially use this node instead of reserved node
return true;
}
-
}
+
+ // otherwise, if any of the label doesn't beyond limit, we can allocate on this node
+ if (potenialNewCapacity <= getAbsoluteMaximumCapacityByNodeLabel(label)) {
+ canAssign = true;
+ break;
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName()
- + " usedResources: " + usedResources
+ + "Check assign to queue, label=" + label
+ + " usedResources: " + usedResourcesByLabels.get(label)
+ " clusterResources: " + clusterResource
+ " currentCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
- usedResources, clusterResource) + " required " + required
- + " potentialNewCapacity: " + potentialNewCapacity + " ( "
+ usedResourcesByLabels.get(label),
+ labelManager.getResourceByLabel(label, clusterResource))
+ + " potentialNewCapacity: " + potenialNewCapacity + " ( "
+ " max-capacity: " + absoluteMaxCapacity + ")");
}
- return false;
}
- return true;
+
+ return canAssign;
}
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
private Resource computeUserLimitAndSetHeadroom(
- FiCaSchedulerApp application, Resource clusterResource, Resource required) {
-
+ FiCaSchedulerApp application, Resource clusterResource, Resource required) {
String user = application.getUser();
- /**
- * Headroom is min((userLimit, queue-max-cap) - consumed)
+ /**
+ * Headroom = min(userLimit, queue-max-cap, max-capacity-consider-label) -
+ * consumed
*/
Resource userLimit = // User limit
@@ -1058,11 +975,21 @@ private Resource computeUserLimitAndSetHeadroom(
absoluteMaxAvailCapacity,
minimumAllocation);
- Resource userConsumed = getUser(user).getConsumedResources();
- Resource headroom =
+ // Max possible capacity this queue can access, will consider label only.
+ Resource maxCapacityConsiderLabel =
+ labelManager == null ? clusterResource : labelManager.getQueueResource(
+ queueName, labels, clusterResource);
+ maxCapacityConsiderLabel =
+ Resources.roundDown(resourceCalculator, maxCapacityConsiderLabel,
+ minimumAllocation);
+ Resource userConsumed = getUser(user).getConsumedResources();
+
+ Resource headroom =
Resources.subtract(
- Resources.min(resourceCalculator, clusterResource,
- userLimit, queueMaxCap),
+ Resources.min(resourceCalculator, clusterResource,
+ Resources.min(resourceCalculator, clusterResource, userLimit,
+ queueMaxCap),
+ maxCapacityConsiderLabel),
userConsumed);
if (LOG.isDebugEnabled()) {
@@ -1191,7 +1118,8 @@ protected synchronized boolean assignToUser(Resource clusterResource,
return true;
}
- boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource required) {
+ boolean needContainers(FiCaSchedulerApp application, Priority priority,
+ Resource required) {
int requiredContainers = application.getTotalRequiredResources(priority);
int reservedContainers = application.getNumReservedContainers(priority);
int starvation = 0;
@@ -1221,10 +1149,9 @@ boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource
return (((starvation + requiredContainers) - reservedContainers) > 0);
}
- private CSAssignment assignContainersOnNode(Resource clusterResource,
- FiCaSchedulerNode node, FiCaSchedulerApp application,
- Priority priority, RMContainer reservedContainer, boolean needToUnreserve) {
-
+ private CSAssignment assignContainersOnNode(Resource clusterResource,
+ FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
+ RMContainer reservedContainer, boolean needToUnreserve) {
Resource assigned = Resources.none();
// Data-local
@@ -1331,8 +1258,9 @@ protected boolean checkLimitsToReserve(Resource clusterResource,
Resource userLimit = computeUserLimitAndSetHeadroom(application,
clusterResource, capability);
- // Check queue max-capacity limit
- if (!assignToQueue(clusterResource, capability, application, false)) {
+ // Check queue max-capacity limit,
+ // TODO: Consider reservation on labels
+ if (!assignToQueue(clusterResource, capability, null, application, false)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit queue limit");
}
@@ -1479,6 +1407,20 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
+ " request=" + request + " type=" + type
+ " needToUnreserve= " + needToUnreserve);
}
+
+ // check if the resource request can access the label
+ if (!SchedulerUtils.checkNodeLabelExpression(
+ labelManager.getLabelsOnNode(node.getNodeID()),
+ request.getNodeLabelExpression())) {
+ // this is a reserved container, but we cannot allocate it now according
+ // to label not match. This can be caused by node label changed
+ // We should un-reserve this container.
+ if (rmContainer != null) {
+ unreserve(application, priority, node, rmContainer);
+ }
+ return Resources.none();
+ }
+
Resource capability = request.getCapability();
Resource available = node.getAvailableResource();
Resource totalResource = node.getTotalResource();
@@ -1658,8 +1600,9 @@ public void completedContainer(Resource clusterResource,
// Book-keeping
if (removed) {
- releaseResource(clusterResource,
- application, container.getResource());
+ releaseResource(clusterResource, application,
+ container.getResource(),
+ labelManager.getLabelsOnNode(node.getNodeID()));
LOG.info("completedContainer" +
" container=" + container +
" queue=" + this +
@@ -1675,14 +1618,11 @@ public void completedContainer(Resource clusterResource,
}
}
- synchronized void allocateResource(Resource clusterResource,
- SchedulerApplicationAttempt application, Resource resource) {
- // Update queue metrics
- Resources.addTo(usedResources, resource);
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, getParent(), clusterResource, minimumAllocation);
- ++numContainers;
-
+ synchronized void allocateResource(Resource clusterResource,
+ SchedulerApplicationAttempt application, Resource resource,
+ Set nodeLabels) {
+ super.allocateResource(clusterResource, resource, nodeLabels);
+
// Update user metrics
String userName = application.getUser();
User user = getUser(userName);
@@ -1703,14 +1643,9 @@ synchronized void allocateResource(Resource clusterResource,
}
synchronized void releaseResource(Resource clusterResource,
- FiCaSchedulerApp application, Resource resource) {
- // Update queue metrics
- Resources.subtractFrom(usedResources, resource);
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, getParent(), clusterResource,
- minimumAllocation);
- --numContainers;
-
+ FiCaSchedulerApp application, Resource resource, Set nodeLabels) {
+ super.releaseResource(clusterResource, resource, nodeLabels);
+
// Update user metrics
String userName = application.getUser();
User user = getUser(userName);
@@ -1724,6 +1659,8 @@ synchronized void releaseResource(Resource clusterResource,
@Override
public synchronized void updateClusterResource(Resource clusterResource) {
+ lastClusterResource = clusterResource;
+
// Update queue properties
maxActiveApplications =
CSQueueUtils.computeMaxActiveApplications(
@@ -1756,11 +1693,6 @@ public synchronized void updateClusterResource(Resource clusterResource) {
}
}
}
-
- @Override
- public QueueMetrics getMetrics() {
- return metrics;
- }
@VisibleForTesting
public static class User {
@@ -1820,7 +1752,8 @@ public void recoverContainer(Resource clusterResource,
// Careful! Locking order is important!
synchronized (this) {
allocateResource(clusterResource, attempt, rmContainer.getContainer()
- .getResource());
+ .getResource(), labelManager.getLabelsOnNode(rmContainer
+ .getContainer().getNodeId()));
}
getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
@@ -1858,7 +1791,8 @@ public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
allocateResource(clusterResource, application, rmContainer.getContainer()
- .getResource());
+ .getResource(), labelManager.getLabelsOnNode(rmContainer
+ .getContainer().getNodeId()));
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1874,7 +1808,8 @@ public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
releaseResource(clusterResource, application, rmContainer.getContainer()
- .getResource());
+ .getResource(), labelManager.getLabelsOnNode(rmContainer.getContainer()
+ .getNodeId()));
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1885,6 +1820,24 @@ public void detachContainer(Resource clusterResource,
}
}
+ @Override
+ public float getAbsActualCapacity() {
+ if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
+ lastClusterResource, Resources.none())) {
+ return absoluteCapacity;
+ }
+
+ Resource resourceRespectLabels =
+ labelManager == null ? lastClusterResource : labelManager
+ .getQueueResource(queueName, labels, lastClusterResource);
+ float absActualCapacity =
+ Resources.divide(resourceCalculator, lastClusterResource,
+ resourceRespectLabels, lastClusterResource);
+
+ return absActualCapacity > absoluteCapacity ? absoluteCapacity
+ : absActualCapacity;
+ }
+
public void setCapacity(float capacity) {
this.capacity = capacity;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 011c99c..98335cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -21,8 +21,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -46,77 +46,36 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.collect.Sets;
+
@Private
@Evolving
-public class ParentQueue implements CSQueue {
+public class ParentQueue extends AbstractCSQueue {
private static final Log LOG = LogFactory.getLog(ParentQueue.class);
- private CSQueue parent;
- private final String queueName;
-
- private float capacity;
- private float maximumCapacity;
- private float absoluteCapacity;
- private float absoluteMaxCapacity;
- private float absoluteUsedCapacity = 0.0f;
-
- private float usedCapacity = 0.0f;
-
- protected final Set childQueues;
- private final Comparator queueComparator;
-
- private Resource usedResources = Resources.createResource(0, 0);
-
+ protected final Set childQueues;
private final boolean rootQueue;
-
- private final Resource minimumAllocation;
-
- private volatile int numApplications;
- private volatile int numContainers;
-
- private QueueState state;
-
- private final QueueMetrics metrics;
-
- private QueueInfo queueInfo;
-
- private Map acls =
- new HashMap();
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- private final ResourceCalculator resourceCalculator;
-
- private boolean reservationsContinueLooking;
-
public ParentQueue(CapacitySchedulerContext cs,
- String queueName, CSQueue parent, CSQueue old) {
- minimumAllocation = cs.getMinimumResourceCapability();
-
- this.parent = parent;
- this.queueName = queueName;
- this.rootQueue = (parent == null);
- this.resourceCalculator = cs.getResourceCalculator();
+ String queueName, CSQueue parent, CSQueue old) throws IOException {
+ super(cs, queueName, parent, old);
- // must be called after parent and queueName is set
- this.metrics = old != null ? old.getMetrics() :
- QueueMetrics.forQueue(getQueuePath(), parent,
- cs.getConfiguration().getEnableUserMetrics(),
- cs.getConf());
+ this.rootQueue = (parent == null);
float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath());
@@ -141,17 +100,14 @@ public ParentQueue(CapacitySchedulerContext cs,
Map acls =
cs.getConfiguration().getAcls(getQueuePath());
-
- this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
- this.queueInfo.setQueueName(queueName);
+
this.queueInfo.setChildQueues(new ArrayList());
- setupQueueConfigs(cs.getClusterResource(),
- capacity, absoluteCapacity,
- maximumCapacity, absoluteMaxCapacity, state, acls,
+ setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
+ maximumCapacity, absoluteMaxCapacity, state, acls, labels,
+ defaultLabelExpression, nodeLabelCapacities, maximumNodeLabelCapacities,
cs.getConfiguration().getReservationContinueLook());
- this.queueComparator = cs.getQueueComparator();
this.childQueues = new TreeSet(queueComparator);
LOG.info("Initialized parent-queue " + queueName +
@@ -159,41 +115,29 @@ public ParentQueue(CapacitySchedulerContext cs,
", fullname=" + getQueuePath());
}
- protected synchronized void setupQueueConfigs(
- Resource clusterResource,
- float capacity, float absoluteCapacity,
- float maximumCapacity, float absoluteMaxCapacity,
+ synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
+ float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
QueueState state, Map acls,
- boolean continueLooking
- ) {
- // Sanity check
- CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
- CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absoluteMaxCapacity);
-
- this.capacity = capacity;
- this.absoluteCapacity = absoluteCapacity;
-
- this.maximumCapacity = maximumCapacity;
- this.absoluteMaxCapacity = absoluteMaxCapacity;
-
- this.state = state;
-
- this.acls = acls;
-
- this.queueInfo.setCapacity(this.capacity);
- this.queueInfo.setMaximumCapacity(this.maximumCapacity);
- this.queueInfo.setQueueState(this.state);
-
- this.reservationsContinueLooking = continueLooking;
-
- StringBuilder aclsString = new StringBuilder();
+ Set labels, String defaultLabelExpression,
+ Map nodeLabelCapacities,
+ Map maximumCapacitiesByLabel, boolean continueLooking)
+ throws IOException {
+ super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
+ maximumCapacity, absoluteMaxCapacity, state, acls, labels,
+ defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel,
+ continueLooking);
+ StringBuilder aclsString = new StringBuilder();
for (Map.Entry e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}
- // Update metrics
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, parent, clusterResource, minimumAllocation);
+ StringBuilder labelStrBuilder = new StringBuilder();
+ if (labels != null) {
+ for (String s : labels) {
+ labelStrBuilder.append(s);
+ labelStrBuilder.append(",");
+ }
+ }
LOG.info(queueName +
", capacity=" + capacity +
@@ -201,13 +145,13 @@ protected synchronized void setupQueueConfigs(
", maxCapacity=" + maximumCapacity +
", asboluteMaxCapacity=" + absoluteMaxCapacity +
", state=" + state +
- ", acls=" + aclsString +
+ ", acls=" + aclsString +
+ ", labels=" + labelStrBuilder.toString() + "\n" +
", reservationsContinueLooking=" + reservationsContinueLooking);
}
private static float PRECISION = 0.0005f; // 0.05% precision
void setChildQueues(Collection childQueues) {
-
// Validate
float childCapacities = 0;
for (CSQueue queue : childQueues) {
@@ -221,6 +165,21 @@ void setChildQueues(Collection childQueues) {
" capacity of " + childCapacities +
" for children of queue " + queueName);
}
+ // check label capacities
+ for (String nodeLabel : labelManager.getClusterNodeLabels()) {
+ float capacityByLabel = getCapacityByNodeLabel(nodeLabel);
+ // check children's labels
+ float sum = 0;
+ for (CSQueue queue : childQueues) {
+ sum += queue.getCapacityByNodeLabel(nodeLabel);
+ }
+ if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION)
+ || (capacityByLabel == 0) && (sum > 0)) {
+ throw new IllegalArgumentException("Illegal" + " capacity of "
+ + sum + " for children of queue " + queueName
+ + " for label=" + nodeLabel);
+ }
+ }
this.childQueues.clear();
this.childQueues.addAll(childQueues);
@@ -228,21 +187,6 @@ void setChildQueues(Collection childQueues) {
LOG.debug("setChildQueues: " + getChildQueuesToPrint());
}
}
-
- @Override
- public synchronized CSQueue getParent() {
- return parent;
- }
-
- @Override
- public synchronized void setParent(CSQueue newParentQueue) {
- this.parent = (ParentQueue)newParentQueue;
- }
-
- @Override
- public String getQueueName() {
- return queueName;
- }
@Override
public String getQueuePath() {
@@ -251,65 +195,6 @@ public String getQueuePath() {
}
@Override
- public synchronized float getCapacity() {
- return capacity;
- }
-
- @Override
- public synchronized float getAbsoluteCapacity() {
- return absoluteCapacity;
- }
-
- @Override
- public float getAbsoluteMaximumCapacity() {
- return absoluteMaxCapacity;
- }
-
- @Override
- public synchronized float getAbsoluteUsedCapacity() {
- return absoluteUsedCapacity;
- }
-
- @Override
- public float getMaximumCapacity() {
- return maximumCapacity;
- }
-
- @Override
- public ActiveUsersManager getActiveUsersManager() {
- // Should never be called since all applications are submitted to LeafQueues
- return null;
- }
-
- @Override
- public synchronized float getUsedCapacity() {
- return usedCapacity;
- }
-
- @Override
- public synchronized Resource getUsedResources() {
- return usedResources;
- }
-
- @Override
- public synchronized List getChildQueues() {
- return new ArrayList(childQueues);
- }
-
- public synchronized int getNumContainers() {
- return numContainers;
- }
-
- public synchronized int getNumApplications() {
- return numApplications;
- }
-
- @Override
- public synchronized QueueState getState() {
- return state;
- }
-
- @Override
public synchronized QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
queueInfo.setCurrentCapacity(usedCapacity);
@@ -391,6 +276,10 @@ public synchronized void reinitialize(
newlyParsedParentQueue.absoluteMaxCapacity,
newlyParsedParentQueue.state,
newlyParsedParentQueue.acls,
+ newlyParsedParentQueue.labels,
+ newlyParsedParentQueue.defaultLabelExpression,
+ newlyParsedParentQueue.nodeLabelCapacities,
+ newlyParsedParentQueue.maximumNodeLabelCapacities,
newlyParsedParentQueue.reservationsContinueLooking);
// Re-configure existing child queues and add new ones
@@ -434,21 +323,6 @@ public synchronized void reinitialize(
}
return queuesMap;
}
-
- @Override
- public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
- synchronized (this) {
- if (acls.get(acl).isUserAllowed(user)) {
- return true;
- }
- }
-
- if (parent != null) {
- return parent.hasAccess(acl, user);
- }
-
- return false;
- }
@Override
public void submitApplication(ApplicationId applicationId, String user,
@@ -532,30 +406,6 @@ public synchronized void removeApplication(ApplicationId applicationId,
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
}
-
- @Override
- public synchronized void setUsedCapacity(float usedCapacity) {
- this.usedCapacity = usedCapacity;
- }
-
- @Override
- public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
- this.absoluteUsedCapacity = absUsedCapacity;
- }
-
- /**
- * Set maximum capacity - used only for testing.
- * @param maximumCapacity new max capacity
- */
- synchronized void setMaxCapacity(float maximumCapacity) {
- // Sanity check
- CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
- float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
- CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity);
-
- this.maximumCapacity = maximumCapacity;
- this.absoluteMaxCapacity = absMaxCapacity;
- }
@Override
public synchronized CSAssignment assignContainers(
@@ -571,7 +421,8 @@ public synchronized CSAssignment assignContainers(
boolean localNeedToUnreserve = false;
// Are we over maximum-capacity for this queue?
- if (!assignToQueue(clusterResource)) {
+ if (!assignToQueue(clusterResource,
+ labelManager.getLabelsOnNode(node.getNodeID()))) {
// check to see if we could if we unreserve first
localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
if (!localNeedToUnreserve) {
@@ -589,7 +440,8 @@ public synchronized CSAssignment assignContainers(
resourceCalculator, clusterResource,
assignedToChild.getResource(), Resources.none())) {
// Track resource utilization for the parent-queue
- allocateResource(clusterResource, assignedToChild.getResource());
+ allocateResource(clusterResource, assignedToChild.getResource(),
+ labelManager.getLabelsOnNode(node.getNodeID()));
// Track resource utilization in this pass of the scheduler
Resources.addTo(assignment.getResource(), assignedToChild.getResource());
@@ -628,22 +480,37 @@ public synchronized CSAssignment assignContainers(
return assignment;
}
- private synchronized boolean assignToQueue(Resource clusterResource) {
- // Check how of the cluster's absolute capacity we are currently using...
- float currentCapacity =
- Resources.divide(
- resourceCalculator, clusterResource,
- usedResources, clusterResource);
+ private synchronized boolean assignToQueue(Resource clusterResource,
+ Set nodeLabels) {
+ Set labelCanAccess =
+ new HashSet(Sets.intersection(labels, nodeLabels));
+ if (nodeLabels.isEmpty()) {
+ // Any queue can always access any node without label
+ labelCanAccess.add(DynamicNodeLabelsManager.NO_LABEL);
+ }
- if (currentCapacity >= absoluteMaxCapacity) {
- LOG.info(getQueueName() +
- " used=" + usedResources +
- " current-capacity (" + currentCapacity + ") " +
- " >= max-capacity (" + absoluteMaxCapacity + ")");
- return false;
+ boolean canAssign = false;
+ for (String label : labelCanAccess) {
+ if (!usedResourcesByLabels.containsKey(label)) {
+ usedResourcesByLabels.put(label, Resources.createResource(0));
+ }
+ float currentLabelUsedCapacity =
+ Resources.divide(resourceCalculator, clusterResource,
+ usedResourcesByLabels.get(label),
+ labelManager.getResourceByLabel(label, clusterResource));
+ // if any of the label doesn't beyond limit, we can allocate on this node
+ if (currentLabelUsedCapacity >= getAbsoluteMaximumCapacityByNodeLabel(label)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getQueueName() + " used=" + usedResources
+ + " current-capacity (" + usedResourcesByLabels.get(label) + ") "
+ + " >= max-capacity (" + labelManager.getResourceByLabel(label, clusterResource) + ")");
+ }
+ } else {
+ canAssign = true;
+ }
}
- return true;
-
+
+ return canAssign;
}
@@ -749,8 +616,8 @@ public void completedContainer(Resource clusterResource,
// Careful! Locking order is important!
// Book keeping
synchronized (this) {
- releaseResource(clusterResource,
- rmContainer.getContainer().getResource());
+ releaseResource(clusterResource, rmContainer.getContainer()
+ .getResource(), labelManager.getLabelsOnNode(node.getNodeID()));
LOG.info("completedContainer" +
" queue=" + getQueueName() +
@@ -787,27 +654,6 @@ public void completedContainer(Resource clusterResource,
}
}
- @Private
- boolean getReservationContinueLooking() {
- return reservationsContinueLooking;
- }
-
- synchronized void allocateResource(Resource clusterResource,
- Resource resource) {
- Resources.addTo(usedResources, resource);
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, parent, clusterResource, minimumAllocation);
- ++numContainers;
- }
-
- synchronized void releaseResource(Resource clusterResource,
- Resource resource) {
- Resources.subtractFrom(usedResources, resource);
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, parent, clusterResource, minimumAllocation);
- --numContainers;
- }
-
@Override
public synchronized void updateClusterResource(Resource clusterResource) {
// Update all children
@@ -821,10 +667,9 @@ public synchronized void updateClusterResource(Resource clusterResource) {
}
@Override
- public QueueMetrics getMetrics() {
- return metrics;
+ public synchronized List getChildQueues() {
+ return new ArrayList(childQueues);
}
-
@Override
public void recoverContainer(Resource clusterResource,
@@ -834,12 +679,20 @@ public void recoverContainer(Resource clusterResource,
}
// Careful! Locking order is important!
synchronized (this) {
- allocateResource(clusterResource,rmContainer.getContainer().getResource());
+ allocateResource(clusterResource, rmContainer.getContainer()
+ .getResource(), labelManager.getLabelsOnNode(rmContainer
+ .getContainer().getNodeId()));
}
if (parent != null) {
parent.recoverContainer(clusterResource, attempt, rmContainer);
}
}
+
+ @Override
+ public ActiveUsersManager getActiveUsersManager() {
+ // Should never be called since all applications are submitted to LeafQueues
+ return null;
+ }
@Override
public void collectSchedulerApplications(
@@ -854,7 +707,8 @@ public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
allocateResource(clusterResource, rmContainer.getContainer()
- .getResource());
+ .getResource(), labelManager.getLabelsOnNode(rmContainer
+ .getContainer().getNodeId()));
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
@@ -870,7 +724,9 @@ public void attachContainer(Resource clusterResource,
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
- releaseResource(clusterResource, rmContainer.getContainer().getResource());
+ releaseResource(clusterResource,
+ rmContainer.getContainer().getResource(),
+ labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId()));
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
@@ -882,6 +738,13 @@ public void detachContainer(Resource clusterResource,
}
}
+ @Override
+ public float getAbsActualCapacity() {
+ // for now, simply return actual capacity = guaranteed capacity for parent
+ // queue
+ return absoluteCapacity;
+ }
+
public Map getACLs() {
return acls;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index b87744d..d0251a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -47,7 +49,7 @@
private boolean showReservationsAsQueues;
public PlanQueue(CapacitySchedulerContext cs, String queueName,
- CSQueue parent, CSQueue old) {
+ CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.schedulerContext = cs;
@@ -99,11 +101,14 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue,
}
// Set new configs
+ // TODO: add support for node labels
setupQueueConfigs(clusterResource, newlyParsedParentQueue.getCapacity(),
newlyParsedParentQueue.getAbsoluteCapacity(),
newlyParsedParentQueue.getMaximumCapacity(),
newlyParsedParentQueue.getAbsoluteMaximumCapacity(),
newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs(),
+ new HashSet(), null,
+ new HashMap(), new HashMap(),
newlyParsedParentQueue.getReservationContinueLooking());
updateQuotas(newlyParsedParentQueue.userLimit,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
index 8e61821..c4424b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -42,7 +42,7 @@
private int maxSystemApps;
public ReservationQueue(CapacitySchedulerContext cs, String queueName,
- PlanQueue parent) {
+ PlanQueue parent) throws IOException {
super(cs, queueName, parent, null);
maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
// the following parameters are common to all reservation in the plan
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index ff8e873..ebf604d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -65,6 +65,9 @@
LeafQueue queue;
private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
+
+ RMContext rmContext = null;
+
@Before
public void setUp() throws IOException {
@@ -73,7 +76,9 @@ public void setUp() throws IOException {
YarnConfiguration conf = new YarnConfiguration();
setupQueueConfiguration(csConf);
-
+ rmContext = TestUtils.getMockRMContext();
+
+
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(conf);
@@ -89,6 +94,8 @@ public void setUp() throws IOException {
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
+
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.rollMasterKey();
@@ -162,6 +169,7 @@ public void testLimitsComputation() throws Exception {
when(csContext.getQueueComparator()).
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
// Say cluster has 100 nodes of 16G each
Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16);
@@ -475,6 +483,7 @@ public void testHeadroom() throws Exception {
when(csContext.getQueueComparator()).
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
// Say cluster has 100 nodes of 16G each
Resource clusterResource = Resources.createResource(100 * 16 * GB);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
index 7260afd..297c551 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
@@ -19,38 +19,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
-import org.mockito.InOrder;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
public class TestCSQueueUtils {
@@ -88,6 +69,8 @@ public void runInvalidDivisorTest(boolean useDominant) throws Exception {
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(0, 0));
+ RMContext rmContext = mock(RMContext.class);
+ when(csContext.getRMContext()).thenReturn(rmContext);
final String L1Q1 = "L1Q1";
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1});
@@ -129,6 +112,8 @@ public void testAbsoluteMaxAvailCapacityNoUse() throws Exception {
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 32));
+ RMContext rmContext = mock(RMContext.class);
+ when(csContext.getRMContext()).thenReturn(rmContext);
final String L1Q1 = "L1Q1";
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1});
@@ -174,6 +159,9 @@ public void testAbsoluteMaxAvailCapacityWithUse() throws Exception {
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 32));
+ RMContext rmContext = mock(RMContext.class);
+ when(csContext.getRMContext()).thenReturn(rmContext);
+
final String L1Q1 = "L1Q1";
final String L1Q2 = "L1Q2";
final String L2Q1 = "L2Q1";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index f7c098c..76f6c5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -962,10 +962,7 @@ public void testNumClusterNodes() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(conf);
- RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
- null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null);
+ RMContext rmContext = TestUtils.getMockRMContext();
cs.setRMContext(rmContext);
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index fdb9028..af58a43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -99,6 +99,7 @@ public void setUp() throws Exception {
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
}
private FiCaSchedulerApp getMockApplication(int appId, String user) {
@@ -132,11 +133,11 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) {
((ParentQueue)queue).allocateResource(clusterResource,
- allocatedResource);
+ allocatedResource, null);
} else {
FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
- allocatedResource);
+ allocatedResource, null);
}
// Next call - nothing
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 85ef381..e05f698 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -20,12 +20,14 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
@@ -41,19 +43,28 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DummyDynamicNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
public class TestContainerAllocation {
@@ -307,4 +318,411 @@ protected RMSecretManagerService createRMSecretManagerService() {
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED);
MockRM.launchAndRegisterAM(app1, rm1, nm1);
}
+
+ private Configuration getConfigurationWithDefaultQueueLabels(
+ Configuration config) {
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+
+ CapacitySchedulerConfiguration conf =
+ (CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config);
+ new CapacitySchedulerConfiguration(config);
+ conf.setDefaultNodeLabelExpression(A, "x");
+ conf.setDefaultNodeLabelExpression(B, "y");
+ return conf;
+ }
+
+ private Configuration getConfigurationWithQueueLabels(Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+
+ // root can access anything
+ conf.setAccessibleLabels(CapacitySchedulerConfiguration.ROOT,
+ toSet("x", "y"));
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 15);
+ conf.setAccessibleLabels(A, toSet("x"));
+ conf.setCapacityByLabel(A, "x", 100);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 20);
+ conf.setAccessibleLabels(B, toSet("y"));
+ conf.setCapacityByLabel(B, "y", 100);
+
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ conf.setCapacity(C, 70);
+ conf.setMaximumCapacity(C, 70);
+ conf.setAccessibleLabels(C, DynamicNodeLabelsManager.EMPTY_STRING_SET);
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ conf.setQueues(A, new String[] {"a1"});
+ conf.setCapacity(A1, 100);
+ conf.setMaximumCapacity(A1, 100);
+ conf.setCapacityByLabel(A1, "x", 100);
+
+ final String B1 = B + ".b1";
+ conf.setQueues(B, new String[] {"b1"});
+ conf.setCapacity(B1, 100);
+ conf.setMaximumCapacity(B1, 100);
+ conf.setCapacityByLabel(B1, "y", 100);
+
+ final String C1 = C + ".c1";
+ conf.setQueues(C, new String[] {"c1"});
+ conf.setCapacity(C1, 100);
+ conf.setMaximumCapacity(C1, 100);
+
+ return conf;
+ }
+
+ private void checkTaskContainersHost(ApplicationAttemptId attemptId,
+ ContainerId containerId, ResourceManager rm, String host) {
+ YarnScheduler scheduler = rm.getRMContext().getScheduler();
+ SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
+
+ Assert.assertTrue(appReport.getLiveContainers().size() > 0);
+ for (RMContainer c : appReport.getLiveContainers()) {
+ if (c.getContainerId().equals(containerId)) {
+ Assert.assertEquals(host, c.getAllocatedNode().getHost());
+ }
+ }
+ }
+
+ private Set toSet(E... elements) {
+ Set set = Sets.newHashSet(elements);
+ return set;
+ }
+
+ private Configuration getComplexConfigurationWithQueueLabels(
+ Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+
+ // root can access anything
+ conf.setAccessibleLabels(CapacitySchedulerConfiguration.ROOT,
+ toSet("x", "y", "z"));
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 10);
+ conf.setAccessibleLabels(A, toSet("x", "y"));
+ conf.setCapacityByLabel(A, "x", 100);
+ conf.setCapacityByLabel(A, "y", 50);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 90);
+ conf.setMaximumCapacity(B, 100);
+ conf.setAccessibleLabels(B, toSet("y", "z"));
+ conf.setCapacityByLabel(B, "y", 50);
+ conf.setCapacityByLabel(B, "z", 100);
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ conf.setQueues(A, new String[] {"a1"});
+ conf.setCapacity(A1, 100);
+ conf.setMaximumCapacity(A1, 100);
+ conf.setAccessibleLabels(A1, toSet("x", "y"));
+ conf.setDefaultNodeLabelExpression(A1, "x");
+ conf.setCapacityByLabel(A1, "x", 100);
+ conf.setCapacityByLabel(A1, "y", 100);
+
+ conf.setQueues(B, new String[] {"b1", "b2"});
+ final String B1 = B + ".b1";
+ conf.setCapacity(B1, 50);
+ conf.setMaximumCapacity(B1, 50);
+ conf.setAccessibleLabels(B1, DynamicNodeLabelsManager.EMPTY_STRING_SET);
+
+ final String B2 = B + ".b2";
+ conf.setCapacity(B2, 50);
+ conf.setMaximumCapacity(B2, 50);
+ conf.setAccessibleLabels(B2, toSet("y", "z"));
+ conf.setCapacityByLabel(B2, "y", 100);
+ conf.setCapacityByLabel(B2, "z", 100);
+
+ return conf;
+ }
+
+ @Test//(timeout = 300000)
+ public void testContainerAllocateWithComplexLabels() throws Exception {
+ // make it harder ..
+ final DynamicNodeLabelsManager mgr = new DummyDynamicNodeLabelsManager();
+ mgr.init(conf);
+
+ /*
+ * Queue structure:
+ * root (*)
+ * / \
+ * a x(100%), y(50%) b y(50%), z(100%)
+ * / / \
+ * a1 (x,y) b1(no) b2(y,z)
+ * 100% y = 100%, z = 100%
+ *
+ * Node structure:
+ * h1 : x
+ * h2 : x, y
+ * h3 : y
+ * h4 : y, z
+ * h5 : NO
+ *
+ * Total resource:
+ * x: 4G
+ * y: 6G
+ * z: 2G
+ * *: 2G
+ *
+ * Resource of
+ * a1: x=4G, y=3G, NO=0.2G
+ * b1: NO=0.9G (max=1G)
+ * b2: y=3, z=2G, NO=0.9G (max=1G)
+ *
+ * Each node can only allocate two containers
+ */
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+ toSet("x"), NodeId.newInstance("h2", 0), toSet("x", "y"),
+ NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
+ toSet("y", "z"), NodeId.newInstance("h5", 0),
+ DynamicNodeLabelsManager.EMPTY_STRING_SET));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(getComplexConfigurationWithQueueLabels(conf)) {
+ @Override
+ public DynamicNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 2048);
+ MockNM nm2 = rm1.registerNode("h2:1234", 2048);
+ MockNM nm3 = rm1.registerNode("h3:1234", 2048);
+ MockNM nm4 = rm1.registerNode("h4:1234", 2048);
+ MockNM nm5 = rm1.registerNode("h5:1234", 2048);
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // request a container (label = x && y). can only allocate on nm2
+ am1.allocate("*", 1024, 1, new ArrayList(), "x && y");
+ containerId =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h5
+ RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
+
+ // request a container for AM, will succeed
+ // and now b1's queue capacity will be used, cannot allocate more containers
+ // (Maximum capacity reached)
+ am2.allocate("*", 1024, 1, new ArrayList());
+ containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm4, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertFalse(rm1.waitForState(nm5, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+
+ // launch an app to queue b2
+ RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
+
+ // request a container. try to allocate on nm1 (label = x) and nm3 (label =
+ // y,z). Will successfully allocate on nm3
+ am3.allocate("*", 1024, 1, new ArrayList(), "y");
+ containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h3");
+
+ // try to allocate container (request label = y && z) on nm3 (label = y) and
+ // nm4 (label = y,z). Will sucessfully allocate on nm4 only.
+ am3.allocate("*", 1024, 1, new ArrayList(), "y && z");
+ containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 3);
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm4, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h4");
+
+ rm1.close();
+ }
+
+ @Test (timeout = 120000)
+ public void testContainerAllocateWithLabels() throws Exception {
+ final DynamicNodeLabelsManager mgr = new DummyDynamicNodeLabelsManager();
+ mgr.init(conf);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+ NodeId.newInstance("h2", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public DynamicNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label =
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
+
+ // request a container.
+ am1.allocate("*", 1024, 1, new ArrayList(), "x");
+ containerId =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h1");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h2
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
+
+ // request a container.
+ am2.allocate("*", 1024, 1, new ArrayList(), "y");
+ containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // launch an app to queue c1 (label = ""), and check all container will
+ // be allocated in h3
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+ // request a container.
+ am3.allocate("*", 1024, 1, new ArrayList());
+ containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h3");
+
+ rm1.close();
+ }
+
+ @Test (timeout = 120000)
+ public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
+ // This test is pretty much similar to testContainerAllocateWithLabel.
+ // Difference is, this test doesn't specify label expression in ResourceRequest,
+ // instead, it uses default queue label expression
+
+ final DynamicNodeLabelsManager mgr = new DummyDynamicNodeLabelsManager();
+ mgr.init(conf);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+ NodeId.newInstance("h2", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) {
+ @Override
+ public DynamicNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label =
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // request a container.
+ am1.allocate("*", 1024, 1, new ArrayList());
+ containerId =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h1");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h2
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // request a container.
+ am2.allocate("*", 1024, 1, new ArrayList());
+ containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // launch an app to queue c1 (label = ""), and check all container will
+ // be allocated in h3
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+ // request a container.
+ am3.allocate("*", 1024, 1, new ArrayList());
+ containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h3");
+
+ rm1.close();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 092ff83..780606d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -40,10 +40,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
@@ -63,6 +63,7 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -82,6 +83,7 @@
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
@@ -147,6 +149,7 @@ public void setUp() throws Exception {
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.rollMasterKey();
@@ -747,6 +750,81 @@ public void testHeadroomWithMaxCap() throws Exception {
a.assignContainers(clusterResource, node_1, false);
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testHeadroomWithLabel() throws Exception {
+ DynamicNodeLabelsManager nlm = mock(DynamicNodeLabelsManager.class);
+
+ // Mock the queue
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+ //unset maxCapacity
+ a.setMaxCapacity(1.0f);
+
+ // Users
+ final String user_0 = "user_0";
+
+ // Submit applications
+ final ApplicationAttemptId appAttemptId_0 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ a.getActiveUsersManager(), rmContext);
+ a.submitApplicationAttempt(app_0, user_0);
+
+ // Setup some nodes
+ String host_0 = "127.0.0.1";
+ FiCaSchedulerNode node_0 =
+ TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 64 * GB);
+
+ final int numNodes = 1;
+ Resource clusterResource = Resources.createResource(numNodes * (64 * GB), 1);
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ // Setup resource-requests
+ Priority priority = TestUtils.createMockPriority(1);
+ app_0.updateResourceRequests(Collections.singletonList(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+ recordFactory)));
+
+ /**
+ * Start testing...
+ */
+
+ // Set user-limit
+ a.setUserLimit(100);
+ a.setUserLimitFactor(1);
+
+ // 1 container to user_0
+ a.assignContainers(clusterResource, node_0, false);
+ assertEquals(1 * GB, a.getUsedResources().getMemory());
+ assertEquals(1 * GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(5 * GB, app_0.getHeadroom().getMemory()); // User limit = 6G
+
+ // mock getQueueResource to 4999 MB
+ when(
+ nlm.getQueueResource(any(String.class), any(Set.class),
+ any(Resource.class))).thenReturn(Resource.newInstance(4999, 1));
+ a.setNodeLabelManager(nlm);
+
+ // do a resource allocation again
+ app_0.updateResourceRequests(Collections.singletonList(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+ recordFactory)));
+
+ when(
+ nlm.getResourceByLabel(any(String.class),
+ any(Resource.class))).thenReturn(Resource.newInstance(4999, 1));
+ a.assignContainers(clusterResource, node_0, false);
+
+ // current headroom should be
+ // Headroom = min(6G (user-limit), 4G (queueLabelResource)) -
+ // 2G (used-resource) = 2G
+ assertEquals(2 * GB, a.getUsedResources().getMemory());
+ assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(2 * GB, app_0.getHeadroom().getMemory());
+ }
@Test
public void testSingleQueueWithMultipleUsers() throws Exception {
@@ -2042,6 +2120,7 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh()
Resource clusterResource = Resources
.createResource(100 * 16 * GB, 100 * 32);
CapacitySchedulerContext csContext = mockCSContext(csConf, clusterResource);
+ when(csContext.getRMContext()).thenReturn(rmContext);
csConf.setFloat(CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.1f);
ParentQueue root = new ParentQueue(csContext,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 8b24a7e..72983ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -95,6 +95,7 @@ public void setUp() throws Exception {
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
}
private static final String A = "a";
@@ -144,11 +145,11 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) {
((ParentQueue)queue).allocateResource(clusterResource,
- allocatedResource);
+ allocatedResource, null);
} else {
FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
- allocatedResource);
+ allocatedResource, null);
}
// Next call - nothing
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
index f573f43..2317fab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
@@ -27,14 +27,11 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
-import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -79,10 +76,7 @@ public void testQueueMapping() throws Exception {
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler cs = new CapacityScheduler();
- RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
- null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null);
+ RMContext rmContext = TestUtils.getMockRMContext();
cs.setConf(conf);
cs.setRMContext(rmContext);
cs.init(conf);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
index a3b990c..a8702b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
@@ -18,23 +18,40 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import org.junit.Assert;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.ImmutableSet;
+
public class TestQueueParsing {
private static final Log LOG = LogFactory.getLog(TestQueueParsing.class);
private static final double DELTA = 0.000001;
+ private DynamicNodeLabelsManager nodeLabelManager;
+
+ @Before
+ public void setup() {
+ nodeLabelManager = mock(DynamicNodeLabelsManager.class);
+ when(nodeLabelManager.containsNodeLabel(any(String.class))).thenReturn(true);
+ }
+
@Test
public void testQueueParsing() throws Exception {
CapacitySchedulerConfiguration csConf =
@@ -43,15 +60,11 @@ public void testQueueParsing() throws Exception {
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
- RMContextImpl rmContext = new RMContextImpl(null, null,
- null, null, null, null, new RMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInRM(conf),
- new ClientToAMTokenSecretManagerInRM(), null);
capacityScheduler.setConf(conf);
- capacityScheduler.setRMContext(rmContext);
+ capacityScheduler.setRMContext(TestUtils.getMockRMContext());
capacityScheduler.init(conf);
capacityScheduler.start();
- capacityScheduler.reinitialize(conf, rmContext);
+ capacityScheduler.reinitialize(conf, TestUtils.getMockRMContext());
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
@@ -202,4 +215,164 @@ public void testMaxCapacity() throws Exception {
capacityScheduler.stop();
}
+ private void setupQueueConfigurationWithoutLabels(CapacitySchedulerConfiguration conf) {
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 15);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 90);
+
+ LOG.info("Setup top-level queues");
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ final String A2 = A + ".a2";
+ conf.setQueues(A, new String[] {"a1", "a2"});
+ conf.setCapacity(A1, 30);
+ conf.setMaximumCapacity(A1, 45);
+ conf.setCapacity(A2, 70);
+ conf.setMaximumCapacity(A2, 85);
+
+ final String B1 = B + ".b1";
+ final String B2 = B + ".b2";
+ final String B3 = B + ".b3";
+ conf.setQueues(B, new String[] {"b1", "b2", "b3"});
+ conf.setCapacity(B1, 50);
+ conf.setMaximumCapacity(B1, 85);
+ conf.setCapacity(B2, 30);
+ conf.setMaximumCapacity(B2, 35);
+ conf.setCapacity(B3, 20);
+ conf.setMaximumCapacity(B3, 35);
+ }
+
+ private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration conf) {
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 15);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 90);
+
+ LOG.info("Setup top-level queues");
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ final String A2 = A + ".a2";
+ conf.setQueues(A, new String[] {"a1", "a2"});
+ conf.setAccessibleLabels(A, ImmutableSet.of("red", "blue"));
+ conf.setCapacityByLabel(A, "red", 50);
+ conf.setCapacityByLabel(A, "blue", 50);
+
+ conf.setCapacity(A1, 30);
+ conf.setMaximumCapacity(A1, 45);
+ conf.setCapacityByLabel(A1, "red", 50);
+ conf.setCapacityByLabel(A1, "blue", 100);
+
+ conf.setCapacity(A2, 70);
+ conf.setMaximumCapacity(A2, 85);
+ conf.setAccessibleLabels(A2, ImmutableSet.of("red"));
+ conf.setCapacityByLabel(A2, "red", 50);
+
+ final String B1 = B + ".b1";
+ final String B2 = B + ".b2";
+ final String B3 = B + ".b3";
+ conf.setQueues(B, new String[] {"b1", "b2", "b3"});
+ conf.setAccessibleLabels(B, ImmutableSet.of("red", "blue"));
+ conf.setCapacityByLabel(B, "red", 50);
+ conf.setCapacityByLabel(B, "blue", 50);
+
+ conf.setCapacity(B1, 50);
+ conf.setMaximumCapacity(B1, 85);
+ conf.setCapacityByLabel(B1, "red", 50);
+ conf.setCapacityByLabel(B1, "blue", 50);
+
+ conf.setCapacity(B2, 30);
+ conf.setMaximumCapacity(B2, 35);
+ conf.setCapacityByLabel(B2, "red", 25);
+ conf.setCapacityByLabel(B2, "blue", 25);
+
+ conf.setCapacity(B3, 20);
+ conf.setMaximumCapacity(B3, 35);
+ conf.setCapacityByLabel(B3, "red", 25);
+ conf.setCapacityByLabel(B3, "blue", 25);
+ }
+
+ @Test
+ public void testQueueParsingReinitializeWithLabels() throws IOException {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfigurationWithoutLabels(csConf);
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+
+ CapacityScheduler capacityScheduler = new CapacityScheduler();
+ RMContextImpl rmContext =
+ new RMContextImpl(null, null, null, null, null, null,
+ new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null);
+ rmContext.setNodeLabelManager(nodeLabelManager);
+ capacityScheduler.setConf(conf);
+ capacityScheduler.setRMContext(rmContext);
+ capacityScheduler.init(conf);
+ capacityScheduler.start();
+ csConf = new CapacitySchedulerConfiguration();
+ setupQueueConfigurationWithLabels(csConf);
+ conf = new YarnConfiguration(csConf);
+ capacityScheduler.reinitialize(conf, rmContext);
+ checkQueueLabels(capacityScheduler);
+ capacityScheduler.stop();
+ }
+
+ private void checkQueueLabels(CapacityScheduler capacityScheduler) {
+ // queue-A is red, blue
+ Assert.assertTrue(capacityScheduler.getQueue("a").getAccessibleLabels()
+ .containsAll(ImmutableSet.of("red", "blue")));
+
+ // queue-A1 inherits A's configuration
+ Assert.assertTrue(capacityScheduler.getQueue("a1").getAccessibleLabels()
+ .containsAll(ImmutableSet.of("red", "blue")));
+
+ // queue-A2 is "red"
+ Assert.assertEquals(1, capacityScheduler
+ .getQueue("a2").getAccessibleLabels().size());
+ Assert.assertTrue(capacityScheduler
+ .getQueue("a2").getAccessibleLabels().contains("red"));
+
+ // queue-B is "red"/"blue"
+ Assert.assertTrue(capacityScheduler
+ .getQueue("b").getAccessibleLabels().containsAll(ImmutableSet.of("red", "blue")));
+
+ // queue-B2 inherits "red"/"blue"
+ Assert.assertTrue(capacityScheduler
+ .getQueue("b2").getAccessibleLabels().containsAll(ImmutableSet.of("red", "blue")));
+ }
+
+ @Test
+ public void testQueueParsingWithLabels() throws IOException {
+ YarnConfiguration conf = new YarnConfiguration();
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(conf);
+ setupQueueConfigurationWithLabels(csConf);
+
+ CapacityScheduler capacityScheduler = new CapacityScheduler();
+ RMContextImpl rmContext =
+ new RMContextImpl(null, null, null, null, null, null,
+ new RMContainerTokenSecretManager(csConf),
+ new NMTokenSecretManagerInRM(csConf),
+ new ClientToAMTokenSecretManagerInRM(), null);
+ rmContext.setNodeLabelManager(nodeLabelManager);
+ capacityScheduler.setConf(csConf);
+ capacityScheduler.setRMContext(rmContext);
+ capacityScheduler.init(csConf);
+ capacityScheduler.start();
+ checkQueueLabels(capacityScheduler);
+ capacityScheduler.stop();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java
index c53b7a9..4e6c73d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java
@@ -23,7 +23,10 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.IOException;
+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -42,8 +45,7 @@
ReservationQueue reservationQueue;
@Before
- public void setup() {
-
+ public void setup() throws IOException {
// setup a context / conf
csConf = new CapacitySchedulerConfiguration();
YarnConfiguration conf = new YarnConfiguration();
@@ -57,6 +59,9 @@ public void setup() {
when(csContext.getClusterResource()).thenReturn(
Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
+
+ RMContext mockRMContext = TestUtils.getMockRMContext();
+ when(csContext.getRMContext()).thenReturn(mockRMContext);
// create a queue
PlanQueue pq = new PlanQueue(csContext, "root", null, null);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 0f8290e..e3e16e7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
@@ -121,6 +122,7 @@ private void setup(CapacitySchedulerConfiguration csConf) throws Exception {
when(csContext.getQueueComparator()).thenReturn(
CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(
conf);
containerTokenSecretManager.rollMasterKey();
@@ -819,7 +821,9 @@ public void testAssignToQueue() throws Exception {
// allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity
Resource capability = Resources.createResource(32 * GB, 0);
- boolean res = a.assignToQueue(clusterResource, capability, app_0, true);
+ boolean res =
+ a.assignToQueue(clusterResource, capability,
+ NodeLabelsManager.EMPTY_STRING_SET, app_0, true);
assertFalse(res);
// now add in reservations and make sure it continues if config set
@@ -836,23 +840,29 @@ public void testAssignToQueue() throws Exception {
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
capability = Resources.createResource(5 * GB, 0);
- res = a
- .assignToQueue(clusterResource, capability, app_0, true);
+ res =
+ a.assignToQueue(clusterResource, capability,
+ NodeLabelsManager.EMPTY_STRING_SET, app_0, true);
assertTrue(res);
// tell to not check reservations
- res = a.assignToQueue(clusterResource, capability, app_0, false);
+ res =
+ a.assignToQueue(clusterResource, capability,
+ NodeLabelsManager.EMPTY_STRING_SET, app_0, false);
assertFalse(res);
refreshQueuesTurnOffReservationsContLook(a, csConf);
// should return false no matter what checkReservations is passed
// in since feature is off
- res = a.assignToQueue(clusterResource, capability, app_0, false);
+ res =
+ a.assignToQueue(clusterResource, capability,
+ NodeLabelsManager.EMPTY_STRING_SET, app_0, false);
assertFalse(res);
- res = a
- .assignToQueue(clusterResource, capability, app_0, true);
+ res =
+ a.assignToQueue(clusterResource, capability,
+ NodeLabelsManager.EMPTY_STRING_SET, app_0, true);
assertFalse(res);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 9cb902d..f37f409 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -18,11 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,16 +46,19 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DynamicNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestUtils {
private static final Log LOG = LogFactory.getLog(TestUtils.class);
@@ -61,7 +67,7 @@
* Get a mock {@link RMContext} for use in test cases.
* @return a mock {@link RMContext} for use in test cases
*/
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public static RMContext getMockRMContext() {
// Null dispatcher
Dispatcher nullDispatcher = new Dispatcher() {
@@ -93,6 +99,27 @@ public EventHandler getEventHandler() {
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), writer);
+ DynamicNodeLabelsManager nlm = mock(DynamicNodeLabelsManager.class);
+ when(
+ nlm.getQueueResource(any(String.class), any(Set.class),
+ any(Resource.class))).thenAnswer(new Answer() {
+ @Override
+ public Resource answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ return (Resource) args[2];
+ }
+ });
+
+ when(nlm.getResourceByLabel(any(String.class), any(Resource.class)))
+ .thenAnswer(new Answer() {
+ @Override
+ public Resource answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ return (Resource) args[1];
+ }
+ });
+
+ rmContext.setNodeLabelManager(nlm);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
return rmContext;
}