diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 04c2fd5..711b26b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -72,9 +72,18 @@
/**
* Get the configured capacity of the queue.
- * @return queue capacity
+ * @return configured queue capacity
*/
public float getCapacity();
+
+ /**
+ * Get actual capacity of the queue, this may be different from
+ * configured capacity when mis-config take place, like add labels to the
+ * cluster
+ *
+ * @return actual queue capacity
+ */
+ public float getAbsActualCapacity();
/**
* Get capacity of the parent of the queue as a function of the
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 737062b..04b3442 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -19,7 +19,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index a8ef942..1444ac0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -56,6 +57,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.label.NodeLabelManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
@@ -189,6 +191,7 @@ public Configuration getConf() {
private boolean scheduleAsynchronously;
private AsyncScheduleThread asyncSchedulerThread;
+ private NodeLabelManager labelManager;
/**
* EXPERT
@@ -273,6 +276,8 @@ private synchronized void initScheduler(Configuration configuration) throws
this.applications =
new ConcurrentHashMap>();
+ this.labelManager = rmContext.getNodeLabelManager();
+
initializeQueues(this.conf);
scheduleAsynchronously = this.conf.getScheduleAynschronously();
@@ -440,11 +445,13 @@ private void initializeQueueMappings() throws IOException {
@Lock(CapacityScheduler.class)
private void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
-
+ Map> queueToLabels = new HashMap>();
root =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
- queues, queues, noop);
-
+ queues, queues, noop, queueToLabels);
+ if (labelManager != null) {
+ labelManager.reinitializeQueueLabels(queueToLabels);
+ }
LOG.info("Initialized root queue " + root);
initializeQueueMappings();
}
@@ -454,9 +461,10 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
// Parse new queues
Map newQueues = new HashMap();
+ Map> queueToLabels = new HashMap>();
CSQueue newRoot =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
- newQueues, queues, noop);
+ newQueues, queues, noop, queueToLabels);
// Ensure all existing queues are still present
validateExistingQueues(queues, newQueues);
@@ -467,6 +475,10 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf)
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
initializeQueueMappings();
+
+ if (labelManager != null) {
+ labelManager.reinitializeQueueLabels(queueToLabels);
+ }
}
/**
@@ -506,11 +518,12 @@ private void addNewQueues(
@Lock(CapacityScheduler.class)
static CSQueue parseQueue(
- CapacitySchedulerContext csContext,
+ CapacitySchedulerContext csContext,
CapacitySchedulerConfiguration conf,
CSQueue parent, String queueName, Map queues,
Map oldQueues,
- QueueHook hook) throws IOException {
+ QueueHook hook,
+ Map> queueToLabels) throws IOException {
CSQueue queue;
String[] childQueueNames =
conf.getQueues((parent == null) ?
@@ -521,13 +534,13 @@ static CSQueue parseQueue(
"Queue configuration missing child queue names for " + queueName);
}
queue =
- new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName));
+ new LeafQueue(csContext, queueName, parent, oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(queue);
} else {
ParentQueue parentQueue =
- new ParentQueue(csContext, queueName, parent,oldQueues.get(queueName));
+ new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(parentQueue);
@@ -536,7 +549,7 @@ static CSQueue parseQueue(
for (String childQueueName : childQueueNames) {
CSQueue childQueue =
parseQueue(csContext, conf, queue, childQueueName,
- queues, oldQueues, hook);
+ queues, oldQueues, hook, queueToLabels);
childQueues.add(childQueue);
}
parentQueue.setChildQueues(childQueues);
@@ -548,6 +561,9 @@ static CSQueue parseQueue(
+ ". Leaf queue names must be distinct");
}
queues.put(queueName, queue);
+ if (queueToLabels != null) {
+ queueToLabels.put(queueName, queue.getLabels());
+ }
LOG.info("Initialized queue: " + queue);
return queue;
@@ -1044,11 +1060,18 @@ public void handle(SchedulerEvent event) {
}
private synchronized void addNode(RMNode nodeManager) {
+ // update this node to node label manager
+ if (labelManager != null) {
+ labelManager.activeNode(nodeManager.getNodeID(),
+ nodeManager.getTotalCapability());
+ }
+
this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
usePortForNodeName));
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
root.updateClusterResource(clusterResource);
int numNodes = numNodeManagers.incrementAndGet();
+
LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource);
@@ -1058,6 +1081,11 @@ private synchronized void addNode(RMNode nodeManager) {
}
private synchronized void removeNode(RMNode nodeInfo) {
+ // update this node to node label manager
+ if (labelManager != null) {
+ labelManager.deactiveNode(nodeInfo.getNodeID());
+ }
+
FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID());
if (node == null) {
return;
@@ -1091,6 +1119,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
}
this.nodes.remove(nodeInfo.getNodeID());
+
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
" clusterResource: " + clusterResource);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index af6bdc3..dad0252 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -18,7 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +39,7 @@
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.label.NodeLabelManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -81,6 +90,12 @@
@Private
public static final String STATE = "state";
+
+ @Private
+ public static final String LABELS = "labels";
+
+ @Private
+ public static final String DEFAULT_LABEL_EXPRESSION = "default-label-expression";
@Private
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
@@ -308,6 +323,42 @@ public QueueState getState(String queue) {
QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING;
}
+ public void setLabels(String queue, Set labels) {
+ if (labels == null) {
+ return;
+ }
+ String str = StringUtils.join(",", labels);
+ set(getQueuePrefix(queue) + LABELS, str);
+ }
+
+ public Set getLabels(String queue) {
+ String labelStr = get(getQueuePrefix(queue) + LABELS);
+ if (labelStr == null) {
+ return queue.equals(ROOT) ? NodeLabelManager.EMPTY_STRING_SET : null;
+ } else {
+ Set set = new HashSet();
+ for (String str : labelStr.split(",")) {
+ if (!str.trim().isEmpty()) {
+ set.add(str.trim().toLowerCase());
+ }
+ }
+ // if labels contains "*", only leave ANY behind
+ if (set.contains(NodeLabelManager.ANY)) {
+ set.clear();
+ set.add(NodeLabelManager.ANY);
+ }
+ return Collections.unmodifiableSet(set);
+ }
+ }
+
+ public String getDefaultLabelExpression(String queue) {
+ return get(getQueuePrefix(queue) + DEFAULT_LABEL_EXPRESSION);
+ }
+
+ public void setDefaultLabelExpression(String queue, String exp) {
+ set(getQueuePrefix(queue) + DEFAULT_LABEL_EXPRESSION, exp);
+ }
+
private static String getAclKey(QueueACL acl) {
return "acl_" + acl.toString().toLowerCase();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 5c93c5f..ee77946 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -30,6 +30,7 @@
import java.util.Set;
import java.util.TreeSet;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -52,6 +53,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.label.NodeLabelManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -60,6 +62,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -129,12 +132,24 @@
private final ResourceCalculator resourceCalculator;
+ private Set labels;
+
+ private String defaultLabelExpression;
+
+ private NodeLabelManager labelManager;
+
+ // cache last cluster resource to compute actual capacity
+ private Resource lastClusterResource = Resources.none();
+
public LeafQueue(CapacitySchedulerContext cs,
- String queueName, CSQueue parent, CSQueue old) {
+ String queueName, CSQueue parent, CSQueue old) throws IOException {
+ this.labelManager = cs.getRMContext().getNodeLabelManager();
this.scheduler = cs;
this.queueName = queueName;
this.parent = parent;
-
+ this.labels = cs.getConfiguration().getLabels(getQueuePath());
+ this.defaultLabelExpression = cs.getConfiguration()
+ .getDefaultLabelExpression(getQueuePath());
this.resourceCalculator = cs.getResourceCalculator();
// must be after parent and queueName are initialized
@@ -196,14 +211,12 @@ public LeafQueue(CapacitySchedulerContext cs,
Map acls =
cs.getConfiguration().getAcls(getQueuePath());
- setupQueueConfigs(
- cs.getClusterResource(),
- capacity, absoluteCapacity,
- maximumCapacity, absoluteMaxCapacity,
- userLimit, userLimitFactor,
+ setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
+ maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
- .getConfiguration().getNodeLocalityDelay());
+ .getConfiguration().getNodeLocalityDelay(), labels,
+ defaultLabelExpression);
if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName
@@ -217,15 +230,14 @@ public LeafQueue(CapacitySchedulerContext cs,
this.activeApplications = new TreeSet(applicationComparator);
}
- private synchronized void setupQueueConfigs(
- Resource clusterResource,
- float capacity, float absoluteCapacity,
- float maximumCapacity, float absoluteMaxCapacity,
- int userLimit, float userLimitFactor,
+ private synchronized void setupQueueConfigs(Resource clusterResource,
+ float capacity, float absoluteCapacity, float maximumCapacity,
+ float absoluteMaxCapacity, int userLimit, float userLimitFactor,
int maxApplications, float maxAMResourcePerQueuePercent,
int maxApplicationsPerUser, int maxActiveApplications,
int maxActiveApplicationsPerUser, QueueState state,
- Map acls, int nodeLocalityDelay)
+ Map acls, int nodeLocalityDelay,
+ Set labels, String defaultLabelExpression) throws IOException
{
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
@@ -251,10 +263,38 @@ private synchronized void setupQueueConfigs(
this.state = state;
this.acls = acls;
+
+ // set labels
+ this.labels = labels;
+ if (this.labels == null) {
+ this.labels = parent.getLabels();
+ }
+ SchedulerUtils.checkAndThrowIfLabelNotIncluded(labelManager, this.labels);
+
+ // set default label expression
+ this.defaultLabelExpression = defaultLabelExpression;
+ if (this.defaultLabelExpression == null) {
+ this.defaultLabelExpression = parent.getDefaultLabelExpression();
+ }
+ if (!SchedulerUtils.checkQueueLabelExpression(this.labels,
+ this.defaultLabelExpression)) {
+ throw new IOException("Invalid default label expression of "
+ + " queue="
+ + queueInfo.getQueueName()
+ + " doesn't have permission to access all labels "
+ + "in default label expression. labelExpression of resource request="
+ + (this.defaultLabelExpression == null ? ""
+ : this.defaultLabelExpression)
+ + ". Queue labels="
+ + (queueInfo.getLabels() == null ? "" : StringUtils.join(queueInfo
+ .getLabels().iterator(), ',')));
+ }
this.queueInfo.setCapacity(this.capacity);
this.queueInfo.setMaximumCapacity(this.maximumCapacity);
this.queueInfo.setQueueState(this.state);
+ this.queueInfo.setLabels(this.labels);
+ this.queueInfo.setDefaultLabelExpression(this.defaultLabelExpression);
this.nodeLocalityDelay = nodeLocalityDelay;
@@ -267,6 +307,14 @@ private synchronized void setupQueueConfigs(
CSQueueUtils.updateQueueStatistics(
resourceCalculator, this, getParent(), clusterResource,
minimumAllocation);
+
+ StringBuilder labelStrBuilder = new StringBuilder();
+ if (this.labels != null) {
+ for (String s : this.labels) {
+ labelStrBuilder.append(s);
+ labelStrBuilder.append(",");
+ }
+ }
LOG.info("Initializing " + queueName + "\n" +
"capacity = " + capacity +
@@ -321,7 +369,8 @@ private synchronized void setupQueueConfigs(
" [= configuredState ]" + "\n" +
"acls = " + aclsString +
" [= configuredAcls ]" + "\n" +
- "nodeLocalityDelay = " + nodeLocalityDelay + "\n");
+ "nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
+ "labels=" + labelStrBuilder.toString() + "\n");
}
@Override
@@ -565,6 +614,11 @@ public String toString() {
"numApps=" + getNumApplications() + ", " +
"numContainers=" + getNumContainers();
}
+
+ @VisibleForTesting
+ public synchronized void setNodeLabelManager(NodeLabelManager mgr) {
+ this.labelManager = mgr;
+ }
@VisibleForTesting
public synchronized User getUser(String userName) {
@@ -613,7 +667,9 @@ public synchronized void reinitialize(
newlyParsedLeafQueue.getMaximumActiveApplications(),
newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
- newlyParsedLeafQueue.getNodeLocalityDelay());
+ newlyParsedLeafQueue.getNodeLocalityDelay(),
+ newlyParsedLeafQueue.labels,
+ newlyParsedLeafQueue.defaultLabelExpression);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
@@ -804,12 +860,19 @@ private synchronized FiCaSchedulerApp getApplication(
@Override
public synchronized CSAssignment
assignContainers(Resource clusterResource, FiCaSchedulerNode node) {
-
if(LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " #applications=" + activeApplications.size());
}
+ // if our queue cannot access this node, just return
+ if (labelManager != null) {
+ if (!SchedulerUtils.checkQueueAccessToNode(labels,
+ labelManager.getLabelsOnNode(node.getNodeName()))) {
+ return NULL_ASSIGNMENT;
+ }
+ }
+
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
@@ -968,12 +1031,12 @@ private synchronized boolean assignToQueue(Resource clusterResource,
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
private Resource computeUserLimitAndSetHeadroom(
- FiCaSchedulerApp application, Resource clusterResource, Resource required) {
-
+ FiCaSchedulerApp application, Resource clusterResource, Resource required) {
String user = application.getUser();
- /**
- * Headroom is min((userLimit, queue-max-cap) - consumed)
+ /**
+ * Headroom = min(userLimit, queue-max-cap, max-capacity-consider-label) -
+ * consumed
*/
Resource userLimit = // User limit
@@ -992,11 +1055,21 @@ private Resource computeUserLimitAndSetHeadroom(
absoluteMaxAvailCapacity,
minimumAllocation);
- Resource userConsumed = getUser(user).getConsumedResources();
- Resource headroom =
+ // Max possible capacity this queue can access, will consider label only.
+ Resource maxCapacityConsiderLabel =
+ labelManager == null ? clusterResource : labelManager.getQueueResource(
+ queueName, labels, clusterResource);
+ maxCapacityConsiderLabel =
+ Resources.roundDown(resourceCalculator, maxCapacityConsiderLabel,
+ minimumAllocation);
+ Resource userConsumed = getUser(user).getConsumedResources();
+
+ Resource headroom =
Resources.subtract(
- Resources.min(resourceCalculator, clusterResource,
- userLimit, queueMaxCap),
+ Resources.min(resourceCalculator, clusterResource,
+ Resources.min(resourceCalculator, clusterResource, userLimit,
+ queueMaxCap),
+ maxCapacityConsiderLabel),
userConsumed);
if (LOG.isDebugEnabled()) {
@@ -1312,6 +1385,22 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
+ " priority=" + priority.getPriority()
+ " request=" + request + " type=" + type);
}
+
+ // check if the resource request can access the label
+ if (labelManager != null) {
+ if (!SchedulerUtils.checkNodeLabelExpression(
+ labelManager.getLabelsOnNode(node.getNodeName()),
+ request.getLabelExpression())) {
+ // this is a reserved container, but we cannot allocate it now according
+ // to label not match. This can be caused by node label changed
+ // We should un-reserve this container.
+ if (rmContainer != null) {
+ unreserve(application, priority, node, rmContainer);
+ }
+ return Resources.none();
+ }
+ }
+
Resource capability = request.getCapability();
Resource available = node.getAvailableResource();
Resource totalResource = node.getTotalResource();
@@ -1501,6 +1590,8 @@ synchronized void releaseResource(Resource clusterResource,
@Override
public synchronized void updateClusterResource(Resource clusterResource) {
+ lastClusterResource = clusterResource;
+
// Update queue properties
maxActiveApplications =
CSQueueUtils.computeMaxActiveApplications(
@@ -1661,4 +1752,32 @@ public void detachContainer(Resource clusterResource,
getParent().detachContainer(clusterResource, application, rmContainer);
}
}
+
+ @Override
+ public Set getLabels() {
+ return labels;
+ }
+
+ @Override
+ public float getAbsActualCapacity() {
+ if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
+ lastClusterResource, Resources.none())) {
+ return absoluteCapacity;
+ }
+
+ Resource resourceRespectLabels =
+ labelManager == null ? lastClusterResource : labelManager
+ .getQueueResource(queueName, labels, lastClusterResource);
+ float absActualCapacity =
+ Resources.divide(resourceCalculator, lastClusterResource,
+ resourceRespectLabels, lastClusterResource);
+
+ return absActualCapacity > absoluteCapacity ? absoluteCapacity
+ : absActualCapacity;
+ }
+
+ @Override
+ public String getDefaultLabelExpression() {
+ return defaultLabelExpression;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 8c654b7..59a04f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.label.NodeLabelManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -53,6 +54,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -100,11 +102,14 @@
RecordFactoryProvider.getRecordFactory(null);
private final ResourceCalculator resourceCalculator;
+ private Set labels;
+ private final NodeLabelManager labelManager;
+ private String defaultLabelExpression;
public ParentQueue(CapacitySchedulerContext cs,
- String queueName, CSQueue parent, CSQueue old) {
+ String queueName, CSQueue parent, CSQueue old) throws IOException {
minimumAllocation = cs.getMinimumResourceCapability();
-
+ this.labelManager = cs.getRMContext().getNodeLabelManager();
this.parent = parent;
this.queueName = queueName;
this.rootQueue = (parent == null);
@@ -117,6 +122,9 @@ public ParentQueue(CapacitySchedulerContext cs,
cs.getConf());
float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath());
+ this.labels = cs.getConfiguration().getLabels(getQueuePath());
+ this.defaultLabelExpression = cs.getConfiguration()
+ .getDefaultLabelExpression(getQueuePath());
if (rootQueue &&
(rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
@@ -144,9 +152,9 @@ public ParentQueue(CapacitySchedulerContext cs,
this.queueInfo.setQueueName(queueName);
this.queueInfo.setChildQueues(new ArrayList());
- setupQueueConfigs(cs.getClusterResource(),
- capacity, absoluteCapacity,
- maximumCapacity, absoluteMaxCapacity, state, acls);
+ setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
+ maximumCapacity, absoluteMaxCapacity, state, acls, labels,
+ defaultLabelExpression);
this.queueComparator = cs.getQueueComparator();
this.childQueues = new TreeSet(queueComparator);
@@ -156,12 +164,11 @@ public ParentQueue(CapacitySchedulerContext cs,
", fullname=" + getQueuePath());
}
- private synchronized void setupQueueConfigs(
- Resource clusterResource,
- float capacity, float absoluteCapacity,
- float maximumCapacity, float absoluteMaxCapacity,
- QueueState state, Map acls
- ) {
+ private synchronized void setupQueueConfigs(Resource clusterResource,
+ float capacity, float absoluteCapacity, float maximumCapacity,
+ float absoluteMaxCapacity, QueueState state,
+ Map acls, Set labels,
+ String defaultLabelExpression) throws IOException {
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absoluteMaxCapacity);
@@ -176,9 +183,24 @@ private synchronized void setupQueueConfigs(
this.acls = acls;
+ // set labels
+ this.labels = labels;
+ if (this.labels == null && parent != null) {
+ this.labels = parent.getLabels();
+ SchedulerUtils.checkAndThrowIfLabelNotIncluded(labelManager, this.labels);
+ }
+
+ // set label expression
+ this.defaultLabelExpression = defaultLabelExpression;
+ if (this.defaultLabelExpression == null && parent != null) {
+ this.defaultLabelExpression = parent.getDefaultLabelExpression();
+ }
+
+ this.queueInfo.setLabels(labels);
this.queueInfo.setCapacity(this.capacity);
this.queueInfo.setMaximumCapacity(this.maximumCapacity);
this.queueInfo.setQueueState(this.state);
+ this.queueInfo.setDefaultLabelExpression(this.defaultLabelExpression);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry e : acls.entrySet()) {
@@ -188,6 +210,14 @@ private synchronized void setupQueueConfigs(
// Update metrics
CSQueueUtils.updateQueueStatistics(
resourceCalculator, this, parent, clusterResource, minimumAllocation);
+
+ StringBuilder labelStrBuilder = new StringBuilder();
+ if (labels != null) {
+ for (String s : labels) {
+ labelStrBuilder.append(s);
+ labelStrBuilder.append(",");
+ }
+ }
LOG.info(queueName +
", capacity=" + capacity +
@@ -195,7 +225,8 @@ private synchronized void setupQueueConfigs(
", maxCapacity=" + maximumCapacity +
", asboluteMaxCapacity=" + absoluteMaxCapacity +
", state=" + state +
- ", acls=" + aclsString);
+ ", acls=" + aclsString +
+ ", labels=" + labelStrBuilder.toString() + "\n");
}
private static float PRECISION = 0.0005f; // 0.05% precision
@@ -383,7 +414,9 @@ public synchronized void reinitialize(
newlyParsedParentQueue.maximumCapacity,
newlyParsedParentQueue.absoluteMaxCapacity,
newlyParsedParentQueue.state,
- newlyParsedParentQueue.acls);
+ newlyParsedParentQueue.acls,
+ newlyParsedParentQueue.labels,
+ newlyParsedParentQueue.defaultLabelExpression);
// Re-configure existing child queues and add new ones
// The CS has already checked to ensure all existing child queues are present!
@@ -824,4 +857,20 @@ public void detachContainer(Resource clusterResource,
}
}
}
+
+ public Set getLabels() {
+ return labels;
+ }
+
+ @Override
+ public float getAbsActualCapacity() {
+ // for now, simply return actual capacity = guaranteed capacity for parent
+ // queue
+ return absoluteCapacity;
+ }
+
+ @Override
+ public String getDefaultLabelExpression() {
+ return defaultLabelExpression;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index a9a9975..4dc4632 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -65,6 +65,7 @@
LeafQueue queue;
private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
+ RMContext rmContext = mock(RMContext.class);
@Before
public void setUp() throws IOException {
@@ -72,8 +73,7 @@ public void setUp() throws IOException {
new CapacitySchedulerConfiguration();
YarnConfiguration conf = new YarnConfiguration();
setupQueueConfiguration(csConf);
-
-
+
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(conf);
@@ -89,6 +89,8 @@ public void setUp() throws IOException {
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
+
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.rollMasterKey();
@@ -99,7 +101,7 @@ public void setUp() throws IOException {
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
queue = spy(new LeafQueue(csContext, A, root, null));
@@ -162,6 +164,7 @@ public void testLimitsComputation() throws Exception {
when(csContext.getQueueComparator()).
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
// Say cluster has 100 nodes of 16G each
Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16);
@@ -170,7 +173,7 @@ public void testLimitsComputation() throws Exception {
Map queues = new HashMap();
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
- queues, queues, TestUtils.spyHook);
+ queues, queues, TestUtils.spyHook, null);
LeafQueue queue = (LeafQueue)queues.get(A);
@@ -259,7 +262,7 @@ public void testLimitsComputation() throws Exception {
queues = new HashMap();
root =
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
- queues, queues, TestUtils.spyHook);
+ queues, queues, TestUtils.spyHook, null);
clusterResource = Resources.createResource(100 * 16 * GB);
queue = (LeafQueue)queues.get(A);
@@ -285,7 +288,7 @@ public void testLimitsComputation() throws Exception {
queues = new HashMap();
root =
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
- queues, queues, TestUtils.spyHook);
+ queues, queues, TestUtils.spyHook, null);
queue = (LeafQueue)queues.get(A);
assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath()));
@@ -475,6 +478,7 @@ public void testHeadroom() throws Exception {
when(csContext.getQueueComparator()).
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
// Say cluster has 100 nodes of 16G each
Resource clusterResource = Resources.createResource(100 * 16 * GB);
@@ -482,7 +486,7 @@ public void testHeadroom() throws Exception {
Map queues = new HashMap();
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
- queues, queues, TestUtils.spyHook);
+ queues, queues, TestUtils.spyHook, null);
// Manipulate queue 'a'
LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
index 7260afd..297c551 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
@@ -19,38 +19,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
-import org.mockito.InOrder;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
public class TestCSQueueUtils {
@@ -88,6 +69,8 @@ public void runInvalidDivisorTest(boolean useDominant) throws Exception {
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(0, 0));
+ RMContext rmContext = mock(RMContext.class);
+ when(csContext.getRMContext()).thenReturn(rmContext);
final String L1Q1 = "L1Q1";
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1});
@@ -129,6 +112,8 @@ public void testAbsoluteMaxAvailCapacityNoUse() throws Exception {
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 32));
+ RMContext rmContext = mock(RMContext.class);
+ when(csContext.getRMContext()).thenReturn(rmContext);
final String L1Q1 = "L1Q1";
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1});
@@ -174,6 +159,9 @@ public void testAbsoluteMaxAvailCapacityWithUse() throws Exception {
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 32));
+ RMContext rmContext = mock(RMContext.class);
+ when(csContext.getRMContext()).thenReturn(rmContext);
+
final String L1Q1 = "L1Q1";
final String L1Q2 = "L1Q2";
final String L2Q1 = "L2Q1";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index fd14ef6..45e6c5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -97,6 +97,7 @@ public void setUp() throws Exception {
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
}
private FiCaSchedulerApp getMockApplication(int appId, String user) {
@@ -214,7 +215,7 @@ public void testSortedQueues() throws Exception {
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
// Setup some nodes
final int memoryPerNode = 10;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index a9bfc2f..71565f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -20,12 +20,14 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -34,11 +36,14 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.label.MemoryNodeLabelManager;
+import org.apache.hadoop.yarn.label.NodeLabelManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -46,11 +51,17 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
public class TestContainerAllocation {
@@ -251,4 +262,416 @@ protected RMSecretManagerService createRMSecretManagerService() {
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED);
MockRM.launchAndRegisterAM(app1, rm1, nm1);
}
+
+ private Configuration getConfigurationWithDefaultQueueLabels(
+ Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+
+ // root can access anything
+ conf.setLabels(CapacitySchedulerConfiguration.ROOT, toSet("*"));
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 15);
+ conf.setLabels(A, toSet("x"));
+ conf.setDefaultLabelExpression(A, "x");
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 20);
+ conf.setLabels(B, toSet("y"));
+ conf.setDefaultLabelExpression(B, "y");
+
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ conf.setCapacity(C, 70);
+ conf.setMaximumCapacity(C, 70);
+ conf.setLabels(C, NodeLabelManager.EMPTY_STRING_SET);
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ conf.setQueues(A, new String[] {"a1"});
+ conf.setCapacity(A1, 100);
+ conf.setMaximumCapacity(A1, 100);
+
+ final String B1 = B + ".b1";
+ conf.setQueues(B, new String[] {"b1"});
+ conf.setCapacity(B1, 100);
+ conf.setMaximumCapacity(B1, 100);
+
+ final String C1 = C + ".c1";
+ conf.setQueues(C, new String[] {"c1"});
+ conf.setCapacity(C1, 100);
+ conf.setMaximumCapacity(C1, 100);
+
+ return conf;
+ }
+
+ private Configuration getConfigurationWithQueueLabels(Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+
+ // root can access anything
+ conf.setLabels(CapacitySchedulerConfiguration.ROOT, toSet("*"));
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 15);
+ conf.setLabels(A, toSet("x"));
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 20);
+ conf.setLabels(B, toSet("y"));
+
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ conf.setCapacity(C, 70);
+ conf.setMaximumCapacity(C, 70);
+ conf.setLabels(C, NodeLabelManager.EMPTY_STRING_SET);
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ conf.setQueues(A, new String[] {"a1"});
+ conf.setCapacity(A1, 100);
+ conf.setMaximumCapacity(A1, 100);
+
+ final String B1 = B + ".b1";
+ conf.setQueues(B, new String[] {"b1"});
+ conf.setCapacity(B1, 100);
+ conf.setMaximumCapacity(B1, 100);
+
+ final String C1 = C + ".c1";
+ conf.setQueues(C, new String[] {"c1"});
+ conf.setCapacity(C1, 100);
+ conf.setMaximumCapacity(C1, 100);
+
+ return conf;
+ }
+
+ private void checkTaskContainersHost(ApplicationAttemptId attemptId,
+ ContainerId containerId, ResourceManager rm, String host) {
+ YarnScheduler scheduler = rm.getRMContext().getScheduler();
+ SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
+
+ Assert.assertTrue(appReport.getLiveContainers().size() > 0);
+ for (RMContainer c : appReport.getLiveContainers()) {
+ if (c.getContainerId().equals(containerId)) {
+ Assert.assertEquals(host, c.getAllocatedNode().getHost());
+ }
+ }
+ }
+
+ private Set toSet(E... elements) {
+ Set set = Sets.newHashSet(elements);
+ return set;
+ }
+
+ private Configuration getComplexConfigurationWithQueueLabels(
+ Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+
+ // root can access anything
+ conf.setLabels(CapacitySchedulerConfiguration.ROOT, toSet("*"));
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 50);
+ conf.setMaximumCapacity(A, 50);
+ conf.setLabels(A, toSet("x"));
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 50);
+ conf.setMaximumCapacity(B, 50);
+ conf.setLabels(B, toSet("y"));
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ conf.setQueues(A, new String[] {"a1"});
+ conf.setCapacity(A1, 100);
+ conf.setMaximumCapacity(A1, 100);
+ conf.setLabels(A1, toSet("x", "y"));
+ conf.setDefaultLabelExpression(A1, "x");
+
+ conf.setQueues(B, new String[] {"b1", "b2"});
+ final String B1 = B + ".b1";
+ conf.setCapacity(B1, 10);
+ conf.setMaximumCapacity(B1, 20);
+ conf.setLabels(B1, NodeLabelManager.EMPTY_STRING_SET);
+
+ final String B2 = B + ".b2";
+ conf.setCapacity(B2, 90);
+ conf.setMaximumCapacity(B2, 90);
+ conf.setLabels(B2, toSet("y", "z"));
+
+ return conf;
+ }
+
+ @Test (timeout = 300000)
+ public void testContainerAllocateWithComplexLabels() throws Exception {
+ // make it harder ..
+ final NodeLabelManager mgr = new MemoryNodeLabelManager();
+ mgr.init(conf);
+
+ /*
+ * Queue structure:
+ * root (*)
+ * / \
+ * a(x) 50% b(y) 50%
+ * / / \
+ * a1 (x,y) b1(NO) b2(y,z)
+ * 100% 10% 90%
+ *
+ * Node structure:
+ * h1 : x
+ * h2 : x, y
+ * h3 : y
+ * h4 : y, z
+ * h5 : NO
+ *
+ * Each node can only allocate two containers
+ */
+
+ // set node -> label
+ mgr.addLabels(ImmutableSet.of("x", "y", "z"));
+ mgr.setLabelsOnMultipleNodes(ImmutableMap.of("h1", toSet("x"), "h2",
+ toSet("x", "y"), "h3", toSet("y"), "h4", toSet("y", "z"), "h5",
+ NodeLabelManager.EMPTY_STRING_SET));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(getComplexConfigurationWithQueueLabels(conf)) {
+ @Override
+ public NodeLabelManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 2048);
+ MockNM nm2 = rm1.registerNode("h2:1234", 2048);
+ MockNM nm3 = rm1.registerNode("h3:1234", 2048);
+ MockNM nm4 = rm1.registerNode("h4:1234", 2048);
+ MockNM nm5 = rm1.registerNode("h5:1234", 2048);
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // request a container (label = x && y). can only allocate on nm2
+ am1.allocate("*", 1024, 1, new ArrayList(), "x && y");
+ containerId =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h2
+ RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
+
+ // request a container for AM, will succeed
+ // and now b1's queue capacity will be used, cannot allocate more containers
+ am2.allocate("*", 1024, 1, new ArrayList());
+ containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm4, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertFalse(rm1.waitForState(nm5, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+
+ // launch an app to queue b2
+ RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
+
+ // request a container. try to allocate on nm1 (label = x) and nm3 (label =
+ // y,z). Will successfully allocate on nm3
+ am3.allocate("*", 1024, 1, new ArrayList(), "y");
+ containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h3");
+
+ // try to allocate container (request label = y && z) on nm3 (label = y) and
+ // nm4 (label = y,z). Will sucessfully allocate on nm4 only.
+ am3.allocate("*", 1024, 1, new ArrayList(), "y && z");
+ containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 3);
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm4, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h4");
+
+ rm1.close();
+ }
+
+ @Test (timeout = 120000)
+ public void testContainerAllocateWithLabels() throws Exception {
+ final NodeLabelManager mgr = new MemoryNodeLabelManager();
+ mgr.init(conf);
+
+ // set node -> label
+ mgr.addLabels(ImmutableSet.of("x", "y"));
+ mgr.setLabelsOnMultipleNodes(ImmutableMap.of("h1", toSet("x"),
+ "h2", toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public NodeLabelManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label =
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
+
+ // request a container.
+ am1.allocate("*", 1024, 1, new ArrayList(), "x");
+ containerId =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h1");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h2
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
+
+ // request a container.
+ am2.allocate("*", 1024, 1, new ArrayList(), "y");
+ containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // launch an app to queue c1 (label = ""), and check all container will
+ // be allocated in h3
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+ // request a container.
+ am3.allocate("*", 1024, 1, new ArrayList());
+ containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h3");
+
+ rm1.close();
+ }
+
+ @Test (timeout = 120000)
+ public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
+ // This test is pretty much similar to testContainerAllocateWithLabel.
+ // Difference is, this test doesn't specify label expression in ResourceRequest,
+ // instead, it uses default queue label expression
+
+ final NodeLabelManager mgr = new MemoryNodeLabelManager();
+ mgr.init(conf);
+
+ // set node -> label
+ mgr.addLabels(ImmutableSet.of("x", "y"));
+ mgr.setLabelsOnMultipleNodes(ImmutableMap.of("h1", toSet("x"),
+ "h2", toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) {
+ @Override
+ public NodeLabelManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label =
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // request a container.
+ am1.allocate("*", 1024, 1, new ArrayList());
+ containerId =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h1");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h2
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // request a container.
+ am2.allocate("*", 1024, 1, new ArrayList());
+ containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // launch an app to queue c1 (label = ""), and check all container will
+ // be allocated in h3
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+ // request a container.
+ am3.allocate("*", 1024, 1, new ArrayList());
+ containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h3");
+
+ rm1.close();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index d5eb933..bfd0b8a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -39,8 +39,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
-import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
@@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.label.NodeLabelManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -77,6 +78,7 @@
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -131,6 +133,7 @@ public void setUp() throws Exception {
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.rollMasterKey();
@@ -141,7 +144,7 @@ public void setUp() throws Exception {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
cs.setRMContext(rmContext);
cs.init(csConf);
@@ -731,6 +734,77 @@ public void testHeadroomWithMaxCap() throws Exception {
a.assignContainers(clusterResource, node_1);
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testHeadroomWithLabel() throws Exception {
+ NodeLabelManager nlm = mock(NodeLabelManager.class);
+
+ // Mock the queue
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+ //unset maxCapacity
+ a.setMaxCapacity(1.0f);
+
+ // Users
+ final String user_0 = "user_0";
+
+ // Submit applications
+ final ApplicationAttemptId appAttemptId_0 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ a.getActiveUsersManager(), rmContext);
+ a.submitApplicationAttempt(app_0, user_0);
+
+ // Setup some nodes
+ String host_0 = "127.0.0.1";
+ FiCaSchedulerNode node_0 =
+ TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 64 * GB);
+
+ final int numNodes = 1;
+ Resource clusterResource = Resources.createResource(numNodes * (64 * GB), 1);
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ // Setup resource-requests
+ Priority priority = TestUtils.createMockPriority(1);
+ app_0.updateResourceRequests(Collections.singletonList(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+ recordFactory)));
+
+ /**
+ * Start testing...
+ */
+
+ // Set user-limit
+ a.setUserLimit(100);
+ a.setUserLimitFactor(1);
+
+ // 1 container to user_0
+ a.assignContainers(clusterResource, node_0);
+ assertEquals(1 * GB, a.getUsedResources().getMemory());
+ assertEquals(1 * GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(5 * GB, app_0.getHeadroom().getMemory()); // User limit = 6G
+
+ // mock getQueueResource to 4999 MB
+ when(
+ nlm.getQueueResource(any(String.class), any(Set.class),
+ any(Resource.class))).thenReturn(Resource.newInstance(4999, 1));
+ a.setNodeLabelManager(nlm);
+
+ // do a resource allocation again
+ app_0.updateResourceRequests(Collections.singletonList(TestUtils
+ .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
+ recordFactory)));
+ a.assignContainers(clusterResource, node_0);
+
+ // current headroom should be
+ // Headroom = min(6G (user-limit), 4G (queueLabelResource)) -
+ // 2G (used-resource) = 2G
+ assertEquals(2 * GB, a.getUsedResources().getMemory());
+ assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(2 * GB, app_0.getHeadroom().getMemory());
+ }
@Test
public void testSingleQueueWithMultipleUsers() throws Exception {
@@ -1682,7 +1756,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResource());
@@ -1707,7 +1781,7 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResource());
@@ -2025,6 +2099,7 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh()
Resource clusterResource = Resources
.createResource(100 * 16 * GB, 100 * 32);
CapacitySchedulerContext csContext = mockCSContext(csConf, clusterResource);
+ when(csContext.getRMContext()).thenReturn(rmContext);
csConf.setFloat(CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.1f);
ParentQueue root = new ParentQueue(csContext,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index fa9edb1..ea4b3d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -94,6 +94,7 @@ public void setUp() throws Exception {
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
+ when(csContext.getRMContext()).thenReturn(rmContext);
}
private static final String A = "a";
@@ -203,7 +204,7 @@ public void testSingleLevelQueues() throws Exception {
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
// Setup some nodes
final int memoryPerNode = 10;
@@ -297,7 +298,7 @@ public void testSingleLevelQueuesPrecision() throws Exception {
try {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
} catch (IllegalArgumentException ie) {
exceptionOccured = true;
}
@@ -311,7 +312,7 @@ public void testSingleLevelQueuesPrecision() throws Exception {
try {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
} catch (IllegalArgumentException ie) {
exceptionOccured = true;
}
@@ -325,7 +326,7 @@ public void testSingleLevelQueuesPrecision() throws Exception {
try {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
} catch (IllegalArgumentException ie) {
exceptionOccured = true;
}
@@ -402,7 +403,7 @@ public void testMultiLevelQueues() throws Exception {
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
// Setup some nodes
final int memoryPerNode = 10;
@@ -518,7 +519,7 @@ public void testQueueCapacitySettingChildZero() throws Exception {
Map queues = new HashMap();
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
}
@Test (expected=IllegalArgumentException.class)
@@ -535,7 +536,7 @@ public void testQueueCapacitySettingParentZero() throws Exception {
Map queues = new HashMap();
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
}
@Test
@@ -557,7 +558,7 @@ public void testQueueCapacityZero() throws Exception {
try {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
} catch (IllegalArgumentException e) {
fail("Failed to create queues with 0 capacity: " + e);
}
@@ -573,7 +574,7 @@ public void testOffSwitchScheduling() throws Exception {
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
// Setup some nodes
final int memoryPerNode = 10;
@@ -639,7 +640,7 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception {
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
// Setup some nodes
final int memoryPerNode = 10;
@@ -723,7 +724,7 @@ public void testQueueAcl() throws Exception {
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
- TestUtils.spyHook);
+ TestUtils.spyHook, null);
UserGroupInformation user = UserGroupInformation.getCurrentUser();
// Setup queue configs
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
index a3b990c..e33caf7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
@@ -18,23 +18,40 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import org.junit.Assert;
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.any;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.label.NodeLabelManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.ImmutableSet;
+
public class TestQueueParsing {
private static final Log LOG = LogFactory.getLog(TestQueueParsing.class);
private static final double DELTA = 0.000001;
+ private NodeLabelManager nodeLabelManager;
+
+ @Before
+ public void setup() {
+ nodeLabelManager = mock(NodeLabelManager.class);
+ when(nodeLabelManager.containsLabel(any(String.class))).thenReturn(true);
+ }
+
@Test
public void testQueueParsing() throws Exception {
CapacitySchedulerConfiguration csConf =
@@ -202,4 +219,150 @@ public void testMaxCapacity() throws Exception {
capacityScheduler.stop();
}
+ private void setupQueueConfigurationWithoutLabels(CapacitySchedulerConfiguration conf) {
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 15);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 90);
+
+ LOG.info("Setup top-level queues");
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ final String A2 = A + ".a2";
+ conf.setQueues(A, new String[] {"a1", "a2"});
+ conf.setCapacity(A1, 30);
+ conf.setMaximumCapacity(A1, 45);
+ conf.setCapacity(A2, 70);
+ conf.setMaximumCapacity(A2, 85);
+
+ final String B1 = B + ".b1";
+ final String B2 = B + ".b2";
+ final String B3 = B + ".b3";
+ conf.setQueues(B, new String[] {"b1", "b2", "b3"});
+ conf.setCapacity(B1, 50);
+ conf.setMaximumCapacity(B1, 85);
+ conf.setCapacity(B2, 30);
+ conf.setMaximumCapacity(B2, 35);
+ conf.setCapacity(B3, 20);
+ conf.setMaximumCapacity(B3, 35);
+ }
+
+ private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration conf) {
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 15);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 90);
+
+ LOG.info("Setup top-level queues");
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ final String A2 = A + ".a2";
+ conf.setQueues(A, new String[] {"a1", "a2"});
+ conf.setLabels(A, ImmutableSet.of("*"));
+ conf.setCapacity(A1, 30);
+ conf.setMaximumCapacity(A1, 45);
+ conf.setCapacity(A2, 70);
+ conf.setMaximumCapacity(A2, 85);
+ conf.setLabels(A2, ImmutableSet.of("red"));
+
+ final String B1 = B + ".b1";
+ final String B2 = B + ".b2";
+ final String B3 = B + ".b3";
+ conf.setQueues(B, new String[] {"b1", "b2", "b3"});
+ conf.setLabels(B, ImmutableSet.of("red", "blue"));
+ conf.setCapacity(B1, 50);
+ conf.setMaximumCapacity(B1, 85);
+ conf.setCapacity(B2, 30);
+ conf.setMaximumCapacity(B2, 35);
+ conf.setCapacity(B3, 20);
+ conf.setMaximumCapacity(B3, 35);
+ }
+
+ @Test(timeout = 5000)
+ public void testQueueParsingReinitializeWithLabels() throws IOException {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfigurationWithoutLabels(csConf);
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+
+ CapacityScheduler capacityScheduler = new CapacityScheduler();
+ RMContextImpl rmContext =
+ new RMContextImpl(null, null, null, null, null, null,
+ new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null);
+ rmContext.setNodeLabelManager(nodeLabelManager);
+ capacityScheduler.setConf(conf);
+ capacityScheduler.setRMContext(rmContext);
+ capacityScheduler.init(conf);
+ capacityScheduler.start();
+ csConf = new CapacitySchedulerConfiguration();
+ setupQueueConfigurationWithLabels(csConf);
+ conf = new YarnConfiguration(csConf);
+ capacityScheduler.reinitialize(conf, rmContext);
+ checkQueueLabels(capacityScheduler);
+ capacityScheduler.stop();
+ }
+
+ private void checkQueueLabels(CapacityScheduler capacityScheduler) {
+ // by default, label is empty
+ Assert.assertTrue(capacityScheduler
+ .getQueue(CapacitySchedulerConfiguration.ROOT).getLabels().isEmpty());
+
+ // queue-A is *
+ Assert.assertTrue(capacityScheduler
+ .getQueue("a").getLabels().contains("*"));
+
+ // queue-A1 inherits A's configuration
+ Assert.assertTrue(capacityScheduler
+ .getQueue("a1").getLabels().contains("*"));
+
+ // queue-A2 is "red"
+ Assert.assertEquals(1, capacityScheduler
+ .getQueue("a2").getLabels().size());
+ Assert.assertTrue(capacityScheduler
+ .getQueue("a2").getLabels().contains("red"));
+
+ // queue-B is "red"/"blue"
+ Assert.assertTrue(capacityScheduler
+ .getQueue("b").getLabels().containsAll(ImmutableSet.of("red", "blue")));
+
+ // queue-B2 inherits "red"/"blue"
+ Assert.assertTrue(capacityScheduler
+ .getQueue("b2").getLabels().containsAll(ImmutableSet.of("red", "blue")));
+ }
+
+ @Test(timeout = 5000)
+ public void testQueueParsingWithLabels() throws IOException {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfigurationWithLabels(csConf);
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+
+ CapacityScheduler capacityScheduler = new CapacityScheduler();
+ RMContextImpl rmContext =
+ new RMContextImpl(null, null, null, null, null, null,
+ new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null);
+ rmContext.setNodeLabelManager(nodeLabelManager);
+ capacityScheduler.setConf(conf);
+ capacityScheduler.setRMContext(rmContext);
+ capacityScheduler.init(conf);
+ capacityScheduler.start();
+ checkQueueLabels(capacityScheduler);
+ capacityScheduler.stop();
+ }
}