diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 32dd23b..7503471 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -330,6 +330,8 @@ public synchronized boolean isReserved(SchedulerNode node, Priority priority) { } public synchronized void setHeadroom(Resource globalLimit) { + //this is where the headroom is set for the application, which will be returned + //when allocate() is called this.resourceLimit = globalLimit; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 65ff81c..01ebf11 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -429,6 +429,9 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) // Re-configure queues root.reinitialize(newRoot, clusterResource); + + // Re-calculate headroom et all + root.updateClusterResource(clusterResource); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 4fd8b49..da34cf1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1005,7 +1005,15 @@ private Resource computeUserLimitAndSetHeadroom( " headroom=" + headroom); } - application.setHeadroom(headroom); + for (FiCaSchedulerApp queueApp : getApplications()) { + //for all of this user's applications in this queue the headroom is the same + //this sets it to the same resource instance, so that as containers are allocated + //and deallocated all applications are made aware of the impact + if (queueApp.getUser().equals(user)) { + queueApp.setHeadroom(headroom); + } + } + metrics.setAvailableResourcesToUser(user, headroom); return userLimit; @@ -1490,6 +1498,7 @@ synchronized void releaseResource(Resource clusterResource, String userName = application.getUser(); User user = getUser(userName); user.releaseContainer(resource); + Resources.addTo(application.getHeadroom(), resource); // headroom metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); LOG.info(getQueueName() + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index a9a9975..8a532da 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -536,8 +536,8 @@ public void testHeadroom() throws Exception { // Schedule to compute queue.assignContainers(clusterResource, node_0); // Schedule to compute - verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom)); - verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change + verify(app_0_0, times(3)).setHeadroom(eq(expectedHeadroom)); + verify(app_0_1, times(2)).setHeadroom(eq(expectedHeadroom));// no change // Submit first application from user_1, check for new headroom final ApplicationAttemptId appAttemptId_1_0 = @@ -556,16 +556,16 @@ public void testHeadroom() throws Exception { // Schedule to compute queue.assignContainers(clusterResource, node_0); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes - verify(app_0_0).setHeadroom(eq(expectedHeadroom)); - verify(app_0_1).setHeadroom(eq(expectedHeadroom)); + verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom)); + verify(app_0_1, times(2)).setHeadroom(eq(expectedHeadroom)); verify(app_1_0).setHeadroom(eq(expectedHeadroom)); // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); queue.assignContainers(clusterResource, node_0); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes - verify(app_0_0).setHeadroom(eq(expectedHeadroom)); - verify(app_0_1).setHeadroom(eq(expectedHeadroom)); + verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom)); + verify(app_0_1, times(2)).setHeadroom(eq(expectedHeadroom)); verify(app_1_0).setHeadroom(eq(expectedHeadroom)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index d5eb933..82217bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -624,6 +624,91 @@ public void testUserLimits() throws Exception { } @Test + public void testUserHeadroomMultiApp() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + //unset maxCapacity + a.setMaxCapacity(1.0f); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), rmContext); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, + a.getActiveUsersManager(), rmContext); + a.submitApplicationAttempt(app_1, user_0); // same user + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, + a.getActiveUsersManager(), rmContext); + a.submitApplicationAttempt(app_2, user_1); + + // Setup some nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 16*GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 16*GB); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (16*GB), 1); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + Priority priority = TestUtils.createMockPriority(1); + + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + priority, recordFactory))); + + a.assignContainers(clusterResource, node_0); + assertEquals(1*GB, a.getUsedResources().getMemory()); + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + //Now, headroom is the same for all apps for a given user + queue combo + //and a change to any app's headroom is reflected for all the user's apps + assertEquals(2*GB, app_0.getHeadroom().getMemory()); + assertEquals(2*GB, app_1.getHeadroom().getMemory()); + //NOTE, this will likely fail and require test changes after YARN-1857 + assertEquals(0*GB, app_2.getHeadroom().getMemory()); + + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); + + a.assignContainers(clusterResource, node_0); + assertEquals(2*GB, a.getUsedResources().getMemory()); + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_0.getHeadroom().getMemory()); + assertEquals(1*GB, app_1.getHeadroom().getMemory()); + //also may fail after YARN-1857 + assertEquals(0*GB, app_2.getHeadroom().getMemory()); + + //Complete container and verify that headroom is updated, for both apps for the user + RMContainer rmContainer = app_0.getLiveContainers().iterator().next(); + a.completedContainer(clusterResource, app_0, node_0, rmContainer, + ContainerStatus.newInstance(rmContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL, null); + + assertEquals(2*GB, app_0.getHeadroom().getMemory()); + assertEquals(2*GB, app_1.getHeadroom().getMemory()); + } + + @Test public void testHeadroomWithMaxCap() throws Exception { // Mock the queue LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));