diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 6287deb..6bf5852 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -571,6 +571,11 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { LOG.debug("Skipping allocation because maxAMShare limit would " + "be exceeded"); } + // If we had previously made a reservation, delete it, + // so other applications can allocate containers from this node. + if (reserved) { + unreserve(node.getReservedContainer().getReservedPriority(), node); + } return Resources.none(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 69e0a8c..560256b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3702,6 +3702,137 @@ public void testQueueMaxAMShareDefault() throws Exception { } @Test + public void testQueueMaxAMShareWithContainerReservation() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0.5"); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(10240, 10), + 1, "127.0.0.1"); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(10240, 10), + 2, "127.0.0.2"); + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(5120, 5), + 3, "127.0.0.3"); + NodeAddedSchedulerEvent nodeE1 = new NodeAddedSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateE1 = new NodeUpdateSchedulerEvent(node1); + NodeAddedSchedulerEvent nodeE2 = new NodeAddedSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateE2 = new NodeUpdateSchedulerEvent(node2); + NodeAddedSchedulerEvent nodeE3 = new NodeAddedSchedulerEvent(node3); + NodeUpdateSchedulerEvent updateE3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeE1); + scheduler.handle(nodeE2); + scheduler.handle(nodeE3); + scheduler.update(); + FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", + true); + Resource amResource1 = Resource.newInstance(1024, 1); + Resource amResource2 = Resource.newInstance(1024, 1); + Resource amResource3 = Resource.newInstance(10240, 1); + Resource amResource4 = Resource.newInstance(5120, 1); + Resource amResource5 = Resource.newInstance(1024, 1); + int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "queue1", "user1", amResource1); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + scheduler.update(); + // Allocate app1's AM container on node1. + scheduler.handle(updateE1); + assertEquals("Application1's AM requests 1024 MB memory", + 1024, app1.getAMResource().getMemory()); + assertEquals("Application1's AM should be running", + 1, app1.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 1024 MB memory", + 1024, queue1.getAmResourceUsage().getMemory()); + + ApplicationAttemptId attId2 = createAppAttemptId(2, 1); + createApplicationWithAMResource(attId2, "queue1", "user1", amResource2); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); + scheduler.update(); + // Allocate app2's AM container on node2. + scheduler.handle(updateE2); + assertEquals("Application2's AM requests 1024 MB memory", + 1024, app2.getAMResource().getMemory()); + assertEquals("Application2's AM should be running", + 1, app2.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + ApplicationAttemptId attId3 = createAppAttemptId(3, 1); + createApplicationWithAMResource(attId3, "queue1", "user1", amResource3); + createSchedulingRequestExistingApplication(10240, 1, amPriority, attId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); + scheduler.update(); + // app3 reserves a container on node1 because node1's available resource + // is less than app3's AM container resource. + scheduler.handle(updateE1); + // Similarly app3 reserves a container on node2. + scheduler.handle(updateE2); + assertEquals("Application3's AM resource shouldn't be updated", + 0, app3.getAMResource().getMemory()); + assertEquals("Application3's AM should not be running", + 0, app3.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + ApplicationAttemptId attId4 = createAppAttemptId(4, 1); + createApplicationWithAMResource(attId4, "queue1", "user1", amResource4); + createSchedulingRequestExistingApplication(5120, 1, amPriority, attId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); + scheduler.update(); + // Allocate app4's AM container on node3. + scheduler.handle(updateE3); + assertEquals("Application4's AM requests 5120 MB memory", + 5120, app4.getAMResource().getMemory()); + assertEquals("Application4's AM should be running", + 1, app4.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 7168 MB memory", + 7168, queue1.getAmResourceUsage().getMemory()); + + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = + new AppAttemptRemovedSchedulerEvent(attId1, + RMAppAttemptState.FINISHED, false); + // Release app1's AM container on node1. + scheduler.handle(appRemovedEvent1); + assertEquals("Queue1's AM resource usage should be 6144 MB memory", + 6144, queue1.getAmResourceUsage().getMemory()); + + scheduler.update(); + // app3 will free the reserved container on node1 due to + // exceeding queue MaxAMShare limit. + scheduler.handle(updateE1); + + ApplicationAttemptId attId5 = createAppAttemptId(5, 1); + createApplicationWithAMResource(attId5, "queue1", "user1", amResource5); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId5); + FSAppAttempt app5 = scheduler.getSchedulerApp(attId5); + scheduler.update(); + // app5 can allocate its AM container on node1 after + // app3 unreserve its container on node1. + scheduler.handle(updateE1); + assertEquals("Application5's AM requests 1024 MB memory", + 1024, app5.getAMResource().getMemory()); + assertEquals("Application5's AM should be running", + 1, app5.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 7168 MB memory", + 7168, queue1.getAmResourceUsage().getMemory()); + } + + @Test public void testMaxRunningAppsHierarchicalQueues() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); ControlledClock clock = new ControlledClock();