diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 2b59716..b914008 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1416,6 +1416,7 @@ public void reinitialize(Configuration conf, RMContext rmContext) throws IOException { try { allocsLoader.reloadAllocations(); + maxRunningEnforcer.updateRunnabilityOnRefreshQueues(); } catch (Exception e) { LOG.error("Failed to reload allocations file", e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java index 2c90edd..9ea8512 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java @@ -104,6 +104,16 @@ public void trackNonRunnableApp(FSAppAttempt app) { usersNonRunnableApps.put(user, app); } + public void updateRunnabilityOnRefreshQueues() { + FSParentQueue rootQueue = scheduler.getQueueManager().getRootQueue(); + List> appsNowMaybeRunnable = + new ArrayList>(); + + gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable); + + updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE); + } + /** * Checks to see whether any other applications runnable now that the given * application has been removed from the given queue. And makes them so. @@ -156,6 +166,12 @@ public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) { } } + updateAppsRunnability(appsNowMaybeRunnable, + appsNowMaybeRunnable.size()); + } + + private void updateAppsRunnability(List> + appsNowMaybeRunnable, int maxRunnableApps) { // Scan through and check whether this means that any apps are now runnable Iterator iter = new MultiListStartTimeIterator( appsNowMaybeRunnable); @@ -175,7 +191,7 @@ public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) { // No more than one app per list will be able to be made runnable, so // we can stop looking after we've found that many - if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) { + if (noLongerPendingApps.size() >= maxRunnableApps) { break; } } @@ -194,11 +210,10 @@ public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) { if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) { LOG.error("Waiting app " + appSched + " expected to be in " - + "usersNonRunnableApps, but was not. This should never happen."); + + "usersNonRunnableApps, but was not. This should never happen."); } } } - /** * Updates the relevant tracking variables after a runnable app with the given * queue and user has been removed. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index c29dbfc..a80665f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2288,7 +2288,222 @@ public void testUserMaxRunningApps() throws Exception { // Request should be fulfilled assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); } - + +@Test (timeout = 5000) + public void testIncreaseUserMaxRunningAppsOnTheFly() throws Exception { + // Set max running apps + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("10"); + out.println(""); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println(""); + out.close(); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Request for app 1 + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // App 1 should be running + assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + // App 3 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("10"); + out.println(""); + out.println(""); + out.println(""); + out.println("3"); + out.println(""); + out.println(""); + out.close(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should be running + assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 3 should be running + assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 1 + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId1, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should be running + assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + } + + @Test (timeout = 5000) + public void testIncreaseQueueMaxRunningAppsOnTheFly() throws Exception { + // Set max running apps + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Request for app 1 + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // App 1 should be running + assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + // App 3 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("3"); + out.println(""); + out.println(""); + out.println(""); + out.close(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should be running + assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 3 should be running + assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 1 + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId1, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should be running + assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + } + @Test (timeout = 5000) public void testReservationWhileMultiplePriorities() throws IOException { scheduler.init(conf);