diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5c93c5f..ed93588 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -967,7 +967,7 @@ private synchronized boolean assignToQueue(Resource clusterResource, } @Lock({LeafQueue.class, FiCaSchedulerApp.class}) - private Resource computeUserLimitAndSetHeadroom( + Resource computeUserLimitAndSetHeadroom( FiCaSchedulerApp application, Resource clusterResource, Resource required) { String user = application.getUser(); @@ -993,11 +993,13 @@ private Resource computeUserLimitAndSetHeadroom( minimumAllocation); Resource userConsumed = getUser(user).getConsumedResources(); - Resource headroom = + Resource headroom = + Resources.min(resourceCalculator, clusterResource, Resources.subtract( Resources.min(resourceCalculator, clusterResource, userLimit, queueMaxCap), - userConsumed); + userConsumed), + Resources.subtract(queueMaxCap, usedResources)); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index d5eb933..7af5e7a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -199,6 +199,7 @@ private void setupQueueConfiguration( conf.setCapacity(Q_E, 1); conf.setMaximumCapacity(Q_E, 1); conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e"); + } static LeafQueue stubLeafQueue(LeafQueue queue) { @@ -622,7 +623,137 @@ public void testUserLimits() throws Exception { assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); } - + + @Test + public void testComputeUserLimitAndSetHeadroom(){ + LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B)); + qb.setMaxCapacity(1.0f); + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + //create nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + //our test plan contains three cases + //1. single user dominate the queue, we test the headroom + //2. two users, but user_0 is assigned 100% of the queue resource, + // submit user_1's application, check headroom correctness + //3. two users, each is assigned 50% of the queue resource + // each user submit one application and check their headrooms + //4. similarly to 3. but user_0 has no quote left and there are + // free resources left, check headroom + + //test case 1 + qb.setUserLimit(100); + qb.setUserLimitFactor(1); + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, qb, + qb.getActiveUsersManager(), rmContext); + qb.submitApplicationAttempt(app_0, user_0); + Priority u0Priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, + u0Priority, recordFactory))); + + assertEquals("There should only be 1 active user!", + 1, qb.getActiveUsersManager().getNumActiveUsers()); + //get headroom + qb.assignContainers(clusterResource, node_0); + qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, + app_0.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability()); + + //queue B, has .8 of the total So .8*16=12.8-4=8.8 which rounds + //up to 9 (4 being what was just allocated) + assertEquals(9*GB,app_0.getHeadroom().getMemory()); + + //test case 2 + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, qb, + qb.getActiveUsersManager(), rmContext); + Priority u1Priority = TestUtils.createMockPriority(2); + app_2.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, + u1Priority, recordFactory))); + qb.submitApplicationAttempt(app_2, user_1); + qb.assignContainers(clusterResource, node_1); + + qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, + app_0.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability()); + + assertEquals(8*GB, qb.getUsedResources().getMemory()); + assertEquals(4*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(8*GB, app_0.getHeadroom().getMemory()); + assertEquals(4*GB, app_2.getCurrentConsumption().getMemory()); + + //test case 3 + qb.finishApplication(app_0.getApplicationId(), user_0); + qb.finishApplication(app_2.getApplicationId(), user_1); + qb.releaseResource(clusterResource, app_0, app_0.getResource(u0Priority)); + qb.releaseResource(clusterResource, app_2, app_2.getResource(u1Priority)); + + qb.setUserLimit(50); + qb.setUserLimitFactor(1); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, qb, + qb.getActiveUsersManager(), rmContext); + final ApplicationAttemptId appAttemptId_3 = + TestUtils.getMockApplicationAttemptId(3, 0); + FiCaSchedulerApp app_3 = + new FiCaSchedulerApp(appAttemptId_3, user_1, qb, + qb.getActiveUsersManager(), rmContext); + + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, + u0Priority, recordFactory))); + app_3.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, + u1Priority, recordFactory))); + + qb.submitApplicationAttempt(app_1, user_0); + qb.submitApplicationAttempt(app_3, user_1); + + qb.assignContainers(clusterResource, node_0); + qb.assignContainers(clusterResource, node_0); + qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, + app_3.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability()); + + assertEquals(4*GB, qb.getUsedResources().getMemory()); + assertEquals(5*GB, app_3.getHeadroom().getMemory()); + + //test case 4 + final ApplicationAttemptId appAttemptId_4 = + TestUtils.getMockApplicationAttemptId(4, 0); + FiCaSchedulerApp app_4 = + new FiCaSchedulerApp(appAttemptId_4, user_0, qb, + qb.getActiveUsersManager(), rmContext); + qb.submitApplicationAttempt(app_4, user_0); + app_4.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true, + u0Priority, recordFactory))); + qb.assignContainers(clusterResource, node_1); + qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, + app_4.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability()); + + assertEquals(10*GB, qb.getUsedResources().getMemory()); + assertEquals(5*GB, app_4.getHeadroom().getMemory()); + } + @Test public void testHeadroomWithMaxCap() throws Exception { // Mock the queue @@ -683,7 +814,7 @@ public void testHeadroomWithMaxCap() throws Exception { // Set user-limit a.setUserLimit(50); a.setUserLimitFactor(2); - + // Now, only user_0 should be active since he is the only one with // outstanding requests assertEquals("There should only be 1 active user!", @@ -729,7 +860,7 @@ public void testHeadroomWithMaxCap() throws Exception { priority, recordFactory))); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); a.assignContainers(clusterResource, node_1); - assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap + assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap } @Test