diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java index 721eb362a93..5840a023ffc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -38,6 +38,9 @@ // containers. private volatile Resource headroom; + // How much resource should be reserved for high-priority blocked queues + private volatile Resource blockedHeadroom; + private boolean allowPreempt = false; public ResourceLimits(Resource limit) { @@ -81,4 +84,18 @@ public boolean isAllowPreemption() { public void setIsAllowPreemption(boolean allowPreempt) { this.allowPreempt = allowPreempt; } + + public void addBlockedHeadroom(Resource resource) { + if (blockedHeadroom == null) { + blockedHeadroom = Resource.newInstance(0, 0); + } + Resources.addTo(blockedHeadroom, resource); + } + + public Resource getBlockedHeadroom() { + if (blockedHeadroom == null) { + return Resources.none(); + } + return blockedHeadroom; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 80549ca5c16..16dd68fe5c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -790,7 +790,8 @@ private CSAssignment assignContainersToChildQueues(Resource cluster, // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, parentLimits, + getResourceLimitsOfChild(childQueue, cluster, + Resources.subtract(parentLimits, limits.getBlockedHeadroom()), candidates.getPartition()); CSAssignment childAssignment = childQueue.assignContainers(cluster, @@ -812,16 +813,18 @@ private CSAssignment assignContainersToChildQueues(Resource cluster, CSAssignment.SkippedType.QUEUE_LIMIT) { assignment = childAssignment; } + if (childQueue instanceof LeafQueue) { + childLimits.addBlockedHeadroom(childLimits.getHeadroom()); + } Resource resourceToSubtract = Resources.max(resourceCalculator, - cluster, childLimits.getHeadroom(), Resources.none()); + cluster, childLimits.getBlockedHeadroom(), Resources.none()); + limits.addBlockedHeadroom(resourceToSubtract); if(LOG.isDebugEnabled()) { LOG.debug("Decrease parentLimits " + parentLimits + " for " + this.getQueueName() + " by " + resourceToSubtract + " as childQueue=" + childQueue.getQueueName() + " is blocked"); } - parentLimits = Resources.subtract(parentLimits, - resourceToSubtract); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index b9bfc2aab58..652dfa9d280 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -59,7 +60,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.util.resource.TestResourceUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -984,4 +988,82 @@ public void testActiveUsersWithOnlyPendingApps() throws Exception { Assert.assertEquals(2, lq.getMetrics().getAppsPending()); rm1.close(); } + + @Test(timeout = 60000) + public void testAllocationCannotBeBlockedWhenFormerQueueReachedItsLimit() + throws Exception { + /** + * Queue structure: + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     *                   |  \
+     *                  c1   c2
+     *           10(max=10)  90
+     * 
+ * Test case: + * Create a cluster with two nodes whose node resource both are + * <10GB, 10core>, create queues as above, among them max-capacity of "c1" + * is 10 and others are all 100, so that max-capacity of queue "c1" is + * <2GB, 2core>, + * submit app1 to queue "c1" and launch am1(resource=<1GB, 1 core>) on nm1, + * submit app2 to queue "b" and launch am2(resource=<1GB, 1 core>) on nm1, + * app1 and app2 both ask one <2GB, 1core> containers + * + * Now queue "c" has lower capacity percentage than queue "b", the + * allocation sequence will be "a" -> "c" -> "b", queue "c1" has reached + * queue limit so that requests of app1 should be pending + * + * After nm1 do 1 heartbeat, scheduler should allocate one container for + * app2 on nm1. + */ + CapacitySchedulerConfiguration newConf = + (CapacitySchedulerConfiguration) TestUtils + .getConfigurationWithMultipleQueues(conf); + newConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".c", + new String[] { "c1", "c2" }); + newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10); + newConf + .setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10); + newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c2", 90); + newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class, ResourceCalculator.class); + + MockRM rm1 = new MockRM(newConf); + + RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager(); + nodeLabelsManager.init(newConf); + rm1.getRMContext().setNodeLabelManager(nodeLabelsManager); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); + + // launch an app to queue "c1", AM container should be launched on nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // launch another app to queue "b", AM container should be launched on nm1 + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + am1.allocate("*", 2 * GB, 1, new ArrayList()); + am2.allocate("*", 2 * GB, 1, new ArrayList()); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp schedulerApp2 = + cs.getApplicationAttempt(am2.getApplicationAttemptId()); + + // Do nm1 heartbeats 1 times, will allocate a container on nm1 for app2 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + rm1.drainEvents(); + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } }