diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 06575be4c7a..9b8ccd89dee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -1490,7 +1490,7 @@ private Resource getMinResourceNormalized(String name, } void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf, - LeafQueue leafQueue, String label) { + LeafQueue leafQueue, float maxLabelAbsoluteCap) { int maxApplications = conf.getMaximumApplicationsPerQueue(queuePath); if (maxApplications < 0) { int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue(); @@ -1500,11 +1500,10 @@ void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf, maxApplications = this.capacityConfigType != CapacityConfigType.ABSOLUTE_RESOURCE ? maxGlobalPerQueueApps : - (int) (maxGlobalPerQueueApps * queueCapacities - .getAbsoluteCapacity(label)); + (int) (maxGlobalPerQueueApps * maxLabelAbsoluteCap); } else{ maxApplications = (int) (conf.getMaximumSystemApplications() - * queueCapacities.getAbsoluteCapacity(label)); + * maxLabelAbsoluteCap); } } leafQueue.setMaxApplications(maxApplications); @@ -1521,10 +1520,8 @@ void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf, LOG.info("LeafQueue:" + leafQueue.getQueuePath() + "update max app related, maxApplications=" + maxApplications + ", maxApplicationsPerUser=" - + maxApplicationsPerUser + ", Abs Cap:" + queueCapacities - .getAbsoluteCapacity(label) + ", Cap: " + queueCapacities - .getCapacity(label) + ", MaxCap : " + queueCapacities - .getMaximumCapacity(label)); + + maxApplicationsPerUser + ", Label max Abs Cap:" + + maxLabelAbsoluteCap); } private void deriveCapacityFromAbsoluteConfigurations(String label, @@ -1605,13 +1602,6 @@ void updateEffectiveResources(Resource clusterResource) { // percentage, we have to calculate percentage and update. ResourceCalculator rc = this.csContext.getResourceCalculator(); deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc); - // Re-visit max applications for a queue based on absolute capacity if - // needed. - if (this instanceof LeafQueue) { - LeafQueue leafQueue = (LeafQueue) this; - CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - updateMaxAppRelatedField(conf, leafQueue, label); - } } else{ queueResourceQuotas.setEffectiveMinResource(label, Resources .multiply(resourceByLabel, @@ -1629,6 +1619,18 @@ void updateEffectiveResources(Resource clusterResource) { + queueResourceQuotas.getEffectiveMaxResource(label)); } } + // Re-visit max applications for a queue + // based on max label absolute capacity if needed. + if (this instanceof LeafQueue) { + LeafQueue leafQueue = (LeafQueue) this; + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + updateMaxAppRelatedField(conf, leafQueue, + configuredNodelabels. + stream().map(label -> + leafQueue.getQueueCapacities(). + getAbsoluteCapacity(label)). + max(Float::compareTo).get().floatValue()); + } } public boolean isDynamicQueue() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 89e7f838003..3eef93afc45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1932,7 +1932,7 @@ public void updateClusterResource(Resource clusterResource, // When add new queue, the parent queue's other children should also // update the max app. super.updateMaxAppRelatedField(csContext.getConfiguration(), - this, CommonNodeLabelsManager.NO_LABEL); + this, queueCapacities.getAbsoluteCapacity()); super.updateEffectiveResources(clusterResource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index d7f81c46260..9461d3bb9ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -94,6 +94,51 @@ private Configuration getConfigurationWithQueueLabels(Configuration config) { return conf; } + private Configuration + getConfigurationWithMoreQueueLabels(Configuration config) { + CapacitySchedulerConfiguration configuration = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + configuration.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a", "b", "c"}); + configuration. + setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, + "x", 100); + configuration. + setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, + "y", 100); + configuration. + setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, + "z", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + configuration.setCapacity(queueA, 50); + configuration. + setAccessibleNodeLabels(queueA, ImmutableSet.of("x", "y", "z")); + configuration.setCapacityByLabel(queueA, "x", 10); + configuration.setCapacityByLabel(queueA, "y", 20); + configuration.setCapacityByLabel(queueA, "z", 30); + + final String queueB = CapacitySchedulerConfiguration.ROOT + ".b"; + configuration.setCapacity(queueB, 40); + configuration. + setAccessibleNodeLabels(queueB, ImmutableSet.of("x", "y", "z")); + configuration.setCapacityByLabel(queueB, "x", 80); + configuration.setCapacityByLabel(queueB, "y", 50); + configuration.setCapacityByLabel(queueB, "z", 30); + + final String queueC = CapacitySchedulerConfiguration.ROOT + ".c"; + configuration.setCapacity(queueC, 10); + configuration. + setAccessibleNodeLabels(queueC, ImmutableSet.of("x", "y", "z")); + configuration.setCapacityByLabel(queueC, "x", 10); + configuration.setCapacityByLabel(queueC, "y", 30); + configuration.setCapacityByLabel(queueC, "z", 40); + + return configuration; + } + private Configuration getConfigurationWithSubQueueLabels( Configuration config) { CapacitySchedulerConfiguration conf2 = @@ -953,4 +998,51 @@ public RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals(0, waitForNodeLabelSchedulerEventUpdate(rm, "x", 0, 3000L)); } + + @Test + public void tesLeafQueueMaxApplicationsWithNodeLabels() + throws Exception { + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity( + ImmutableSet.of("x", "y", "z")); + + // set mapping: + // h1 -> x + // h2 -> y + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 1234), toSet("x"))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet("x"))); + + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithMoreQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 8000); + rm.registerNode("h2:1234", 8000); + rm.registerNode("h3:1234", 8000); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Max label abs cap of root.a is 30%. + // Max label abs cap of root.b is 80%. + // Max label abs cap of root.c is 40% + Assert.assertEquals(0.5 * cs. + getConfiguration().getMaximumSystemApplications(), + ((LeafQueue)cs.getQueue("root.a")).getMaxApplications(), 1); + + Assert.assertEquals(0.8 * cs. + getConfiguration().getMaximumSystemApplications(), + ((LeafQueue)cs.getQueue("root.b")).getMaxApplications(), 1); + + Assert.assertEquals(0.4 * cs. + getConfiguration().getMaximumSystemApplications(), + ((LeafQueue)cs.getQueue("root.c")).getMaxApplications(), 1); + } }