diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index e1014c11fc8..9554086d8ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -66,19 +66,9 @@ public static void checkAbsoluteCapacity(String queueName,
private static void capacitiesSanityCheck(String queueName,
QueueCapacities queueCapacities) {
for (String label : queueCapacities.getExistingNodeLabels()) {
- float capacity = queueCapacities.getCapacity(label);
- float maximumCapacity = queueCapacities.getMaximumCapacity(label);
- if (capacity > maximumCapacity) {
- throw new IllegalArgumentException("Illegal queue capacity setting, "
- + "(capacity=" + capacity + ") > (maximum-capacity="
- + maximumCapacity + "). When label=[" + label + "]");
- }
-
- // Actually, this may not needed since we have verified capacity <=
- // maximumCapacity. And the way we compute absolute capacity (abs(x) =
- // cap(x) * cap(x.parent) * ...) is a monotone increasing function. But
- // just keep it here to make sure our compute abs capacity method works
- // correctly.
+ // The only thing we should care about is absolute capacity <=
+ // absolute max capacity otherwise the absolute max capacity is
+ // no longer an absolute maximum.
float absCapacity = queueCapacities.getAbsoluteCapacity(label);
float absMaxCapacity = queueCapacities.getAbsoluteMaximumCapacity(label);
if (absCapacity > absMaxCapacity) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 71fddfc4a33..1836919d404 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -2083,4 +2083,167 @@ public RMNodeLabelsManager createNodeLabelManager() {
rm1.close();
}
+
+ @Test
+ public void testQueueMetricsWithLabelsDisableElasticity() throws Exception {
+ /**
+ * Test case: have a following queue structure:
+ *
+ *
+ *
+ * root
+ * / \
+ * a b
+ * (x) (x)
+ * / \
+ * a1 a2
+ * (x) (x)
+ *
+ *
+ * a/b can access x, both of them has max-capacity-on-x = 50
+ *
+ * When doing non-exclusive allocation, app in a (or b) can use 100% of x
+ * resource.
+ */
+
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
+ this.conf);
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] { "a", "b" });
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+ final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
+ csConf.setCapacity(queueA, 50);
+ csConf.setMaximumCapacity(queueA, 100);
+ csConf.setAccessibleNodeLabels(queueA, toSet("x"));
+ csConf.setCapacityByLabel(queueA, "x", 50);
+ csConf.setMaximumCapacityByLabel(queueA, "x", 100);
+ final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
+ csConf.setCapacity(queueB, 50);
+ csConf.setMaximumCapacity(queueB, 100);
+ csConf.setAccessibleNodeLabels(queueB, toSet("x"));
+ csConf.setCapacityByLabel(queueB, "x", 50);
+ csConf.setMaximumCapacityByLabel(queueB, "x", 100);
+
+ // Define 2nd-level queues
+ csConf.setQueues(queueA, new String[] { "a1",
+ "a2"});
+
+ final String A1 = queueA + ".a1";
+ csConf.setCapacity(A1, 20);
+ csConf.setMaximumCapacity(A1, 60);
+ csConf.setAccessibleNodeLabels(A1, toSet("x"));
+ csConf.setCapacityByLabel(A1, "x", 60);
+ csConf.setMaximumCapacityByLabel(A1, "x", 30);
+
+ final String A2 = queueA + ".a2";
+ csConf.setCapacity(A2, 80);
+ csConf.setMaximumCapacity(A2, 40);
+ csConf.setAccessibleNodeLabels(A2, toSet("x"));
+ csConf.setCapacityByLabel(A2, "x", 40);
+ csConf.setMaximumCapacityByLabel(A2, "x", 20);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(
+ ImmutableSet.of(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(
+ ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x
+ // app1 -> a1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // app1 asks for 6 partition=x containers
+ am1.allocate("*", 1 * GB, 6, new ArrayList(), "x");
+
+ // NM1 do 50 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ doNMHeartbeat(rm1, nm1.getNodeId(), 50);
+ checkNumOfContainersInAnAppOnGivenNode(6, nm1.getNodeId(),
+ cs.getApplicationAttempt(am1.getApplicationAttemptId()));
+
+ SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
+ .getNodeReport(nm1.getNodeId());
+ Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
+ Assert.assertEquals(14 * GB,
+ reportNm1.getAvailableResource().getMemorySize());
+
+ // Try to launch app2 in a2, asked 2GB, should success
+ // app2 -> a2
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2", "x");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+ // app2 asks for 4 partition=x containers
+ am2.allocate("*", 1 * GB, 4, new ArrayList(), "x");
+ // NM1 do 50 heartbeats
+
+ doNMHeartbeat(rm1, nm1.getNodeId(), 50);
+ checkNumOfContainersInAnAppOnGivenNode(4, nm1.getNodeId(),
+ cs.getApplicationAttempt(am2.getApplicationAttemptId()));
+
+ reportNm1 = rm1.getResourceScheduler()
+ .getNodeReport(nm1.getNodeId());
+ Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize());
+ Assert.assertEquals(10 * GB,
+ reportNm1.getAvailableResource().getMemorySize());
+
+ // Kill all apps in queue a2
+ cs.killAllAppsInQueue("a2");
+ rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
+ rm1.waitForAppRemovedFromScheduler(app2.getApplicationId());
+
+ // Try to launch app3 in a2, asked 6GB, should fail
+ // app3 -> a2
+ RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a2", "x");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+
+ am3.allocate("*", 1 * GB, 6, new ArrayList(), "x");
+ // NM1 do 50 heartbeats
+ doNMHeartbeat(rm1, nm1.getNodeId(), 50);
+ // app3 cannot preempt more resources restricted by disable elasticity
+ checkNumOfContainersInAnAppOnGivenNode(4, nm1.getNodeId(),
+ cs.getApplicationAttempt(am3.getApplicationAttemptId()));
+
+ Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize());
+ Assert.assertEquals(10 * GB,
+ reportNm1.getAvailableResource().getMemorySize());
+
+ // Kill all apps in queue a1
+ cs.killAllAppsInQueue("a1");
+ rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+ rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
+
+ // app4 -> a1, try to allocate more than 6GB resource, should fail
+ RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x");
+ MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm1);
+
+ // app3 asks for 7 partition=x containers
+ am4.allocate("*", 1 * GB, 7, new ArrayList(), "x");
+ // NM1 do 50 heartbeats
+ doNMHeartbeat(rm1, nm1.getNodeId(), 50);
+
+ // app4 should only gets 6GB resource in partition=x
+ // since elasticity is disabled
+ checkNumOfContainersInAnAppOnGivenNode(6, nm1.getNodeId(),
+ cs.getApplicationAttempt(am4.getApplicationAttemptId()));
+
+ Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize());
+ Assert.assertEquals(10 * GB,
+ reportNm1.getAvailableResource().getMemorySize());
+
+ rm1.close();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
index 5d167c7900d..add14ab2fdc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
@@ -365,7 +365,76 @@ private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration co
conf.setCapacityByLabel(B3, "red", 25);
conf.setCapacityByLabel(B3, "blue", 25);
}
-
+
+ private void setupQueueConfigurationWithLabelsAndReleaseCheck
+ (CapacitySchedulerConfiguration conf) {
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "red", 100);
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "blue", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ // The cap <= max-cap check is not needed
+ conf.setCapacity(A, 50);
+ conf.setMaximumCapacity(A, 100);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 50);
+ conf.setMaximumCapacity(B, 100);
+
+ LOG.info("Setup top-level queues");
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ final String A2 = A + ".a2";
+ conf.setQueues(A, new String[] {"a1", "a2"});
+ conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
+ conf.setCapacityByLabel(A, "red", 50);
+ conf.setMaximumCapacityByLabel(A, "red", 100);
+ conf.setCapacityByLabel(A, "blue", 30);
+ conf.setMaximumCapacityByLabel(A, "blue", 50);
+
+ conf.setCapacity(A1, 60);
+ conf.setMaximumCapacity(A1, 60);
+ conf.setCapacityByLabel(A1, "red", 60);
+ conf.setMaximumCapacityByLabel(A1, "red", 30);
+ conf.setCapacityByLabel(A1, "blue", 100);
+ conf.setMaximumCapacityByLabel(A1, "blue", 100);
+
+ conf.setCapacity(A2, 40);
+ conf.setMaximumCapacity(A2, 85);
+ conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
+ conf.setCapacityByLabel(A2, "red", 40);
+ conf.setMaximumCapacityByLabel(A2, "red", 60);
+
+ final String B1 = B + ".b1";
+ final String B2 = B + ".b2";
+ final String B3 = B + ".b3";
+ conf.setQueues(B, new String[] {"b1", "b2", "b3"});
+ conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
+ conf.setCapacityByLabel(B, "red", 50);
+ conf.setMaximumCapacityByLabel(B, "red", 100);
+ conf.setCapacityByLabel(B, "blue", 70);
+ conf.setMaximumCapacityByLabel(B, "blue", 100);
+
+ conf.setCapacity(B1, 10);
+ conf.setMaximumCapacity(B1, 10);
+ conf.setCapacityByLabel(B1, "red", 60);
+ conf.setMaximumCapacityByLabel(B1, "red", 30);
+ conf.setCapacityByLabel(B1, "blue", 50);
+ conf.setMaximumCapacityByLabel(B1, "blue", 100);
+
+ conf.setCapacity(B2, 80);
+ conf.setMaximumCapacity(B2, 40);
+ conf.setCapacityByLabel(B2, "red", 30);
+ conf.setCapacityByLabel(B2, "blue", 25);
+
+ conf.setCapacity(B3, 10);
+ conf.setMaximumCapacity(B3, 25);
+ conf.setCapacityByLabel(B3, "red", 10);
+ conf.setCapacityByLabel(B3, "blue", 25);
+ }
+
private void setupQueueConfigurationWithLabelsInherit(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
@@ -472,7 +541,7 @@ private void checkQueueLabels(CapacityScheduler capacityScheduler) {
// queue-B2 inherits "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b2")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
-
+
// check capacity of A2
CSQueue qA2 = capacityScheduler.getQueue("a2");
Assert.assertEquals(0.7, qA2.getCapacity(), DELTA);
@@ -481,7 +550,7 @@ private void checkQueueLabels(CapacityScheduler capacityScheduler) {
Assert.assertEquals(0.25, qA2.getQueueCapacities().getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.1275, qA2.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(0.3, qA2.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
-
+
// check capacity of B3
CSQueue qB3 = capacityScheduler.getQueue("b3");
Assert.assertEquals(0.18, qB3.getAbsoluteCapacity(), DELTA);
@@ -489,7 +558,71 @@ private void checkQueueLabels(CapacityScheduler capacityScheduler) {
Assert.assertEquals(0.35, qB3.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(1, qB3.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
}
-
+
+ private void checkQueueLabelsWithLeafQueueDisableElasticity
+ (CapacityScheduler capacityScheduler) {
+ // queue-A is red, blue
+ Assert.assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels()
+ .containsAll(ImmutableSet.of("red", "blue")));
+
+ // queue-A1 inherits A's configuration
+ Assert.assertTrue(capacityScheduler.getQueue("a1")
+ .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
+
+ // queue-A2 is "red"
+ Assert.assertEquals(1, capacityScheduler.getQueue("a2")
+ .getAccessibleNodeLabels().size());
+ Assert.assertTrue(capacityScheduler.getQueue("a2")
+ .getAccessibleNodeLabels().contains("red"));
+
+ // queue-B is "red"/"blue"
+ Assert.assertTrue(capacityScheduler.getQueue("b").getAccessibleNodeLabels()
+ .containsAll(ImmutableSet.of("red", "blue")));
+
+ // queue-B2 inherits "red"/"blue"
+ Assert.assertTrue(capacityScheduler.getQueue("b2")
+ .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
+
+ // check capacity of A2
+ CSQueue qA2 = capacityScheduler.getQueue("a2");
+ Assert.assertEquals(0.4, qA2.getCapacity(), DELTA);
+ Assert.assertEquals(0.4, qA2.getQueueCapacities()
+ .getCapacity("red"), DELTA);
+ Assert.assertEquals(0.2, qA2.getAbsoluteCapacity(), DELTA);
+ Assert.assertEquals(0.2, qA2.getQueueCapacities()
+ .getAbsoluteCapacity("red"), DELTA);
+ Assert.assertEquals(0.85, qA2.getAbsoluteMaximumCapacity(), DELTA);
+ Assert.assertEquals(0.6, qA2.getQueueCapacities()
+ .getAbsoluteMaximumCapacity("red"), DELTA);
+
+ // check disable elasticity at leaf queue level without label
+ CSQueue qB2 = capacityScheduler.getQueue("b2");
+ Assert.assertEquals(0.4, qB2.getAbsoluteCapacity(), DELTA);
+ Assert.assertEquals(0.4, qB2.getAbsoluteMaximumCapacity(), DELTA);
+
+ // check disable elasticity at leaf queue level with label
+ CSQueue qA1 = capacityScheduler.getQueue("a1");
+ Assert.assertEquals(0.3, qA1.getQueueCapacities().
+ getAbsoluteCapacity("red"), DELTA);
+ Assert.assertEquals(0.3, qA1.getQueueCapacities().
+ getAbsoluteMaximumCapacity("red"), DELTA);
+
+ CSQueue qB1 = capacityScheduler.getQueue("b1");
+ Assert.assertEquals(0.3, qB1.getQueueCapacities()
+ .getAbsoluteCapacity("red"), DELTA);
+ Assert.assertEquals(0.3, qB1.getQueueCapacities()
+ .getAbsoluteMaximumCapacity("red"), DELTA);
+
+ // check capacity of B3
+ CSQueue qB3 = capacityScheduler.getQueue("b3");
+ Assert.assertEquals(0.05, qB3.getAbsoluteCapacity(), DELTA);
+ Assert.assertEquals(0.175, qB3.getQueueCapacities()
+ .getAbsoluteCapacity("blue"), DELTA);
+ Assert.assertEquals(0.25, qB3.getAbsoluteMaximumCapacity(), DELTA);
+ Assert.assertEquals(1, qB3.getQueueCapacities()
+ .getAbsoluteMaximumCapacity("blue"), DELTA);
+ }
+
private void
checkQueueLabelsInheritConfig(CapacityScheduler capacityScheduler) {
// queue-A is red, blue
@@ -514,7 +647,7 @@ private void checkQueueLabels(CapacityScheduler capacityScheduler) {
@Test
public void testQueueParsingWithLabels() throws IOException {
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("red", "blue"));
-
+
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
@@ -534,6 +667,31 @@ public void testQueueParsingWithLabels() throws IOException {
checkQueueLabels(capacityScheduler);
ServiceOperations.stopQuietly(capacityScheduler);
}
+
+ @Test
+ public void testQueueParsingWithLeafQueueDisableElasticity()
+ throws IOException {
+ nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity
+ (ImmutableSet.of("red", "blue"));
+
+ YarnConfiguration conf = new YarnConfiguration();
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(conf);
+ setupQueueConfigurationWithLabelsAndReleaseCheck(csConf);
+ CapacityScheduler capacityScheduler = new CapacityScheduler();
+ RMContextImpl rmContext =
+ new RMContextImpl(null, null, null, null, null, null,
+ new RMContainerTokenSecretManager(csConf),
+ new NMTokenSecretManagerInRM(csConf),
+ new ClientToAMTokenSecretManagerInRM(), null);
+ rmContext.setNodeLabelManager(nodeLabelManager);
+ capacityScheduler.setConf(csConf);
+ capacityScheduler.setRMContext(rmContext);
+ capacityScheduler.init(csConf);
+ capacityScheduler.start();
+ checkQueueLabelsWithLeafQueueDisableElasticity(capacityScheduler);
+ ServiceOperations.stopQuietly(capacityScheduler);
+ }
@Test
public void testQueueParsingWithLabelsInherit() throws IOException {