diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 719cf1e..2a7b8ea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -607,6 +607,12 @@ public synchronized void reinitialize( newlyParsedLeafQueue.getMaximumActiveApplications(), newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls); + + activateApplications(); + LOG.info("Activate the pending application if possible after reinitializing" + + " queue: " + getQueueName() + + " #queue-pending-applications: " + getNumPendingApplications() + + " #queue-active-applications: " + getNumActiveApplications()); } @Override @@ -1477,7 +1483,13 @@ public synchronized void updateClusterResource(Resource clusterResource) { CSQueueUtils.updateQueueStatistics( resourceCalculator, this, getParent(), clusterResource, minimumAllocation); - + + activateApplications(); + LOG.info("Activate the pending application if possible after updating" + + " cluster resource" + + " #queue-pending-applications: " + getNumPendingApplications() + + " #queue-active-applications: " + getNumActiveApplications()); + // Update application properties for (FiCaSchedulerApp application : activeApplications) { synchronized (application) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index ccf2a47..9ed8119 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -138,6 +138,7 @@ public void setUp() throws Exception { private static final String C = "c"; private static final String C1 = "c1"; private static final String D = "d"; + private static final String E = "e"; private void setupQueueConfiguration( CapacitySchedulerConfiguration conf, final String newRoot) { @@ -148,7 +149,7 @@ private void setupQueueConfiguration( conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " "); final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot; - conf.setQueues(Q_newRoot, new String[] {A, B, C, D}); + conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E}); conf.setCapacity(Q_newRoot, 100); conf.setMaximumCapacity(Q_newRoot, 100); conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " "); @@ -174,10 +175,14 @@ private void setupQueueConfiguration( conf.setCapacity(Q_C1, 100); final String Q_D = Q_newRoot + "." + D; - conf.setCapacity(Q_D, 10); + conf.setCapacity(Q_D, 9); conf.setMaximumCapacity(Q_D, 11); conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d"); + final String Q_E = Q_newRoot + "." + E; + conf.setCapacity(Q_E, 1); + conf.setMaximumCapacity(Q_E, 1); + conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e"); } static LeafQueue stubLeafQueue(LeafQueue queue) { @@ -1567,6 +1572,100 @@ public void testSchedulingConstraints() throws Exception { } + @Test (timeout = 30000) + public void testActivateApplicationAfterReinitialization() throws Exception { + + // Manipulate queue 'e' + LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E)); + + // Users + final String user_e = "user_e"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_e, e, + mock(ActiveUsersManager.class), rmContext); + e.submitApplication(app_0, user_e, E); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_e, e, + mock(ActiveUsersManager.class), rmContext); + e.submitApplication(app_1, user_e, E); // same user + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_e, e, + mock(ActiveUsersManager.class), rmContext); + e.submitApplication(app_2, user_e, E); // same user + + // before reinitialization + assertEquals(2, e.activeApplications.size()); + assertEquals(1, e.pendingApplications.size()); + + when(csContext.getClusterResources()). + thenReturn(Resources.createResource(200 * 16 * GB, 100 * 32)); + Map newQueues = new HashMap(); + CSQueue newRoot = + CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, + newQueues, queues, + TestUtils.spyHook); + queues = newQueues; + root.reinitialize(newRoot, cs.getClusterResources()); + + // after reinitialization + assertEquals(3, e.activeApplications.size()); + assertEquals(0, e.pendingApplications.size()); + } + + @Test (timeout = 30000) + public void testActivateApplicationByUpdatingClusterResource() + throws Exception { + + // Manipulate queue 'e' + LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E)); + + // Users + final String user_e = "user_e"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_e, e, + mock(ActiveUsersManager.class), rmContext); + e.submitApplication(app_0, user_e, E); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_e, e, + mock(ActiveUsersManager.class), rmContext); + e.submitApplication(app_1, user_e, E); // same user + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_e, e, + mock(ActiveUsersManager.class), rmContext); + e.submitApplication(app_2, user_e, E); // same user + + // before updating cluster resource + assertEquals(2, e.activeApplications.size()); + assertEquals(1, e.pendingApplications.size()); + + e.updateClusterResource(Resources.createResource(200 * 16 * GB, 100 * 32)); + + // after updating cluster resource + assertEquals(3, e.activeApplications.size()); + assertEquals(0, e.pendingApplications.size()); + } + public boolean hasQueueACL(List aclInfos, QueueACL acl) { for (QueueUserACLInfo aclInfo : aclInfos) { if (aclInfo.getUserAcls().contains(acl)) {