diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 15fa211..872cdd7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -964,18 +964,18 @@ private synchronized boolean assignToQueue(Resource clusterResource, } @Lock({LeafQueue.class, FiCaSchedulerApp.class}) - private Resource computeUserLimitAndSetHeadroom( + Resource computeUserLimitAndSetHeadroom( FiCaSchedulerApp application, Resource clusterResource, Resource required) { String user = application.getUser(); - /** - * Headroom is min((userLimit, queue-max-cap) - consumed) + /** + * Headroom is min(min(userlimit, queueMaxCap)-usedResource, + * (queueMaxCap-usedResources)) */ Resource userLimit = // User limit computeUserLimit(application, clusterResource, required); - Resource queueMaxCap = // Queue Max-Capacity Resources.multiplyAndNormalizeDown( @@ -985,12 +985,12 @@ private Resource computeUserLimitAndSetHeadroom( minimumAllocation); Resource userConsumed = getUser(user).getConsumedResources(); - Resource headroom = - Resources.subtract( - Resources.min(resourceCalculator, clusterResource, - userLimit, queueMaxCap), - userConsumed); - + Resource headroom = + Resources.min(resourceCalculator, clusterResource, + Resources.subtract(Resources.min(resourceCalculator, clusterResource, + userLimit, queueMaxCap), userConsumed), + Resources.subtract(queueMaxCap,usedResources)); + if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 13cdcf0..8c66e74 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -196,6 +196,7 @@ private void setupQueueConfiguration( conf.setCapacity(Q_E, 1); conf.setMaximumCapacity(Q_E, 1); conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e"); + } static LeafQueue stubLeafQueue(LeafQueue queue) { @@ -614,7 +615,135 @@ public void testUserLimits() throws Exception { assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); } - + + @Test + public void testComputeUserLimitAndSetHeadroom(){ + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B)); + a.setMaxCapacity(1.0f); + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + //create nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + //our test plan contains three cases + //1. single user dominate the queue, we test the headroom + //2. two users, but user_0 is assigned 100% of the queue resource, + // submit user_1's application, check headroom correctness + //3. two users, each is assigned 50% of the queue resource + // each user submit one application and check their headrooms + //4. similarly to 3. but user_0 has no quote left and there are + // free resources left, check headroom + + //test case 1 + a.setUserLimit(100); + a.setUserLimitFactor(1); + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), rmContext); + a.submitApplicationAttempt(app_0, user_0); + Priority u0Priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, + u0Priority, recordFactory))); + + assertEquals("There should only be 1 active user!", + 1, a.getActiveUsersManager().getNumActiveUsers()); + //get headroom + a.assignContainers(clusterResource, node_0); + a.computeUserLimitAndSetHeadroom(app_0, clusterResource, + app_0.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability()); + + assertEquals(9*GB,app_0.getHeadroom().getMemory()); + + //test case 2 + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, + a.getActiveUsersManager(), rmContext); + Priority u1Priority = TestUtils.createMockPriority(2); + app_2.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, + u1Priority, recordFactory))); + a.submitApplicationAttempt(app_2, user_1); + a.assignContainers(clusterResource, node_1); + + a.computeUserLimitAndSetHeadroom(app_0, clusterResource, + app_0.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability()); + + assertEquals(8*GB, a.getUsedResources().getMemory()); + assertEquals(4*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(8*GB, app_0.getHeadroom().getMemory()); + assertEquals(4*GB, app_2.getCurrentConsumption().getMemory()); + + //test case 3 + a.finishApplication(app_0.getApplicationId(), user_0); + a.finishApplication(app_2.getApplicationId(), user_1); + a.releaseResource(clusterResource, app_0, app_0.getResource(u0Priority)); + a.releaseResource(clusterResource, app_2, app_2.getResource(u1Priority)); + + a.setUserLimit(50); + a.setUserLimitFactor(1); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, + a.getActiveUsersManager(), rmContext); + final ApplicationAttemptId appAttemptId_3 = + TestUtils.getMockApplicationAttemptId(3, 0); + FiCaSchedulerApp app_3 = + new FiCaSchedulerApp(appAttemptId_3, user_1, a, + a.getActiveUsersManager(), rmContext); + + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, + u0Priority, recordFactory))); + app_3.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, + u1Priority, recordFactory))); + + a.submitApplicationAttempt(app_1, user_0); + a.submitApplicationAttempt(app_3, user_1); + + a.assignContainers(clusterResource, node_0); + a.assignContainers(clusterResource, node_0); + a.computeUserLimitAndSetHeadroom(app_3, clusterResource, + app_3.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability()); + + assertEquals(4*GB, a.getUsedResources().getMemory()); + assertEquals(5*GB, app_3.getHeadroom().getMemory()); + + //test case 4 + final ApplicationAttemptId appAttemptId_4 = + TestUtils.getMockApplicationAttemptId(4, 0); + FiCaSchedulerApp app_4 = + new FiCaSchedulerApp(appAttemptId_4, user_0, a, + a.getActiveUsersManager(), rmContext); + a.submitApplicationAttempt(app_4, user_0); + app_4.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true, + u0Priority, recordFactory))); + a.assignContainers(clusterResource, node_1); + a.computeUserLimitAndSetHeadroom(app_4, clusterResource, + app_4.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability()); + + assertEquals(10*GB, a.getUsedResources().getMemory()); + assertEquals(5*GB, app_4.getHeadroom().getMemory()); + } + @Test public void testHeadroomWithMaxCap() throws Exception { // Mock the queue @@ -675,7 +804,7 @@ public void testHeadroomWithMaxCap() throws Exception { // Set user-limit a.setUserLimit(50); a.setUserLimitFactor(2); - + // Now, only user_0 should be active since he is the only one with // outstanding requests assertEquals("There should only be 1 active user!", @@ -721,7 +850,7 @@ public void testHeadroomWithMaxCap() throws Exception { priority, recordFactory))); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); a.assignContainers(clusterResource, node_1); - assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap + assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap } @Test