diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 747a488a5a5..42ede478523 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -85,6 +85,7 @@ private final QueueMetrics metrics; private AtomicInteger activeUsers = new AtomicInteger(0); + private AtomicInteger activeUsersWithOnlyPendingApps = new AtomicInteger(0); private Map> usersApplications = new HashMap>(); @@ -671,9 +672,24 @@ private long getLocalVersionOfUsersState(String nodePartition, // update in local storage userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit); + computeNumActiveUsersWithOnlyPendingApps(); + return userLimitPerSchedulingMode; } + // This method is called within the lock. + private void computeNumActiveUsersWithOnlyPendingApps() { + + int numPendingUsers = 0; + for (User user : users.values()) { + if ((user.getPendingApplications() > 0) + && (user.getActiveApplications() <= 0)) { + numPendingUsers++; + } + } + activeUsersWithOnlyPendingApps = new AtomicInteger(numPendingUsers); + } + private Resource computeUserLimit(String userName, Resource clusterResource, String nodePartition, SchedulingMode schedulingMode, boolean activeUser) { Resource partitionResource = labelManager.getResourceByLabel(nodePartition, @@ -839,6 +855,11 @@ public void activateApplication(String user, ApplicationId applicationId) { try { this.writeLock.lock(); + User userDesc = getUser(user); + if (userDesc != null && userDesc.getActiveApplications() <= 0) { + return; + } + Set userApps = usersApplications.get(user); if (userApps == null) { userApps = new HashSet(); @@ -893,7 +914,7 @@ public void deactivateApplication(String user, ApplicationId applicationId) { @Override public int getNumActiveUsers() { - return activeUsers.get(); + return activeUsers.get() + activeUsersWithOnlyPendingApps.get(); } float sumActiveUsersTimesWeights() { @@ -1090,4 +1111,9 @@ public void updateUserWeights() { this.writeLock.unlock(); } } + + @VisibleForTesting + public int getNumActiveUsersWithOnlyPendingApps() { + return activeUsersWithOnlyPendingApps.get(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 0b54010c276..56c0b2efed2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @@ -134,6 +135,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -4930,4 +4932,255 @@ public Object answer(InvocationOnMock invocation) throws Exception { spyCs.handle(new NodeUpdateSchedulerEvent( spyCs.getNode(nm.getNodeId()).getRMNode())); } + + @Test + public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception { + + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + + setupQueueConfigurationForActiveUsersChecks(newConf); + newConf.setUserLimitFactor( + CapacitySchedulerConfiguration.ROOT + ".a1", 0.5f); + newConf.setMaximumAMResourcePercentPerPartition( + CapacitySchedulerConfiguration.ROOT + ".a1", "", 0.2f); + MockRM rm = new MockRM(newConf); + rm.start(); + + CapacityScheduler scheduler = + (CapacityScheduler) rm.getResourceScheduler(); + + MockNM nm1 = rm.registerNode("h1:1234", 16 * GB); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "u1", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1); + + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + RMApp app2 = rm.submitApp(1 * GB, "app", "u2", null, "a1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + + RMApp app3 = rm.submitApp(1 * GB, "app", "u3", null, "a1"); + + RMApp app4 = rm.submitApp(1 * GB, "app", "u4", null, "a1"); + + // Each application asks 50 * 1GB containers + am1.allocate("*", 1 * GB, 50, null); + am2.allocate("*", 1 * GB, 50, null); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // check preconditions + List appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(4, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertEquals("a1", queue); + + List appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(4, appsInA.size()); + + List appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(4, appsInRoot.size()); + + List appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + UsersManager um = + (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager(); + + assertEquals(4, um.getNumActiveUsers()); + assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps()); + + // now move the app + scheduler.moveAllApps("a1", "b1"); + + Thread.sleep(5000); + + //Triggering this event so that user limit computation can + //happen again + for (int i = 0; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(5000); + } + + // check postconditions + appsInB1 = scheduler.getAppsInQueue("b1"); + + assertEquals(4, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertEquals("b1", queue); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.contains(appAttemptId)); + assertEquals(4, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(4, appsInRoot.size()); + + List oldAppsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(0, oldAppsInA1.size()); + + UsersManager um_b1 = + (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager(); + + assertEquals(2, um_b1.getNumActiveUsers()); + assertEquals(2, um_b1.getNumActiveUsersWithOnlyPendingApps()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(4, appsInB1.size()); + rm.close(); + } + + /** + * Test demonstrating incorrect active users with only pending apps count if + * there is no event after move app operation for brief amount of time (say 1 + * min). + * + * @throws Exception + */ + @Test + public void testMoveAppWithActiveUsersWithOnlyPendingApps1() + throws Exception { + + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + + setupQueueConfigurationForActiveUsersChecks(newConf); + newConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a1", + 0.5f); + newConf.setMaximumAMResourcePercentPerPartition( + CapacitySchedulerConfiguration.ROOT + ".a1", "", 0.2f); + MockRM rm = new MockRM(newConf); + rm.start(); + + CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); + + MockNM nm1 = rm.registerNode("h1:1234", 16 * GB); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "u1", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1); + + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + RMApp app2 = rm.submitApp(1 * GB, "app", "u2", null, "a1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + + RMApp app3 = rm.submitApp(1 * GB, "app", "u3", null, "a1"); + + RMApp app4 = rm.submitApp(1 * GB, "app", "u4", null, "a1"); + + // Each application asks 50 * 1GB containers + am1.allocate("*", 1 * GB, 50, null); + am2.allocate("*", 1 * GB, 50, null); + + // check preconditions + List appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(4, appsInA1.size()); + String queue = scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertEquals("a1", queue); + + List appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(4, appsInA.size()); + + List appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(4, appsInRoot.size()); + + List appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + UsersManager um = + (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager(); + + assertEquals(4, um.getNumActiveUsers()); + assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps()); + + // now move the app + scheduler.moveAllApps("a1", "b1"); + + // Sleep for 1 min + Thread.sleep(60000); + + // check postconditions + appsInB1 = scheduler.getAppsInQueue("b1"); + + assertEquals(4, appsInB1.size()); + queue = scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertEquals("b1", queue); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.contains(appAttemptId)); + assertEquals(4, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(4, appsInRoot.size()); + + List oldAppsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(0, oldAppsInA1.size()); + + UsersManager um_b1 = + (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager(); + + assertEquals(2, um_b1.getNumActiveUsers()); + assertEquals(0, um_b1.getNumActiveUsersWithOnlyPendingApps()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(4, appsInB1.size()); + rm.close(); + } + + private CapacitySchedulerConfiguration setupQueueConfigurationForActiveUsersChecks( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + + conf.setCapacity(A, 50); + conf.setCapacity(B, 50); + + // Define 2nd-level queues + conf.setQueues(A, new String[] { "a1" }); + conf.setCapacity(A1, 100); + conf.setUserLimitFactor(A1, 100.0f); + + conf.setQueues(B, new String[] { "b1" }); + conf.setCapacity(B1, 100); + conf.setUserLimitFactor(B1, 100.0f); + + LOG.info("Setup top-level queues a and b"); + return conf; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 25e535ae8fa..69185f3f578 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -941,4 +941,49 @@ public void testUserLimitAllocationMultipleContainers() throws Exception { rm1.close(); } + + @Test + public void testActiveUsersWithOnlyPendingApps() throws Exception { + + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + newConf.setUserLimitFactor( + CapacitySchedulerConfiguration.ROOT + ".default", 0.5f); + newConf.setMaximumAMResourcePercentPerPartition( + CapacitySchedulerConfiguration.ROOT + ".default", "", 0.2f); + MockRM rm1 = new MockRM(newConf); + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + + RMApp app1 = rm1.submitApp(1 * GB, "app", "u1", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + RMApp app2 = rm1.submitApp(1 * GB, "app", "u2", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + RMApp app3 = rm1.submitApp(1 * GB, "app", "u3", null, "default"); + + RMApp app4 = rm1.submitApp(1 * GB, "app", "u4", null, "default"); + + // Each application asks 50 * 1GB containers + am1.allocate("*", 1 * GB, 50, null); + am2.allocate("*", 1 * GB, 50, null); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + for (int i = 0; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(1000); + } + LeafQueue lq = (LeafQueue) cs.getQueue("default"); + UsersManager um = (UsersManager) lq.getAbstractUsersManager(); + + Assert.assertEquals(4, um.getNumActiveUsers()); + Assert.assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps()); + Assert.assertEquals(2, lq.getMetrics().getAppsPending()); + rm1.close(); + } }