diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 46617ff..eda4df5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -523,8 +523,11 @@ private Resource assignContainer( // Inform the node node.allocateContainer(allocatedContainer); - // If this container is used to run AM, update the leaf queue's AM usage - if (getLiveContainers().size() == 1 && !getUnmanagedAM()) { + // If not running unmanaged, the first container we allocate is + // always the AM. Set amResource for this app and + // Update the leaf queue's AM usage + if (!isAmRunning() && !getUnmanagedAM()) { + setAMResource(container.getResource()); getQueue().addAMResourceUsage(container.getResource()); setAmRunning(true); } @@ -551,6 +554,19 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); } + // Check the AM resource usage for the leaf queue + if (!isAmRunning() && !getUnmanagedAM()) { + List ask = appSchedulingInfo.getAllResourceRequests(); + if (ask.isEmpty() || !getQueue().canRunAppAM( + ask.get(0).getCapability())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping allocation because maxAMShare limit would " + + "be exceeded"); + } + return Resources.none(); + } + } + Collection prioritiesToTry = (reserved) ? Arrays.asList(node.getReservedContainer().getReservedPriority()) : getPriorities(); @@ -567,17 +583,6 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { addSchedulingOpportunity(priority); - // Check the AM resource usage for the leaf queue - if (getLiveContainers().size() == 0 && !getUnmanagedAM()) { - if (!getQueue().canRunAppAM(getAMResource())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping allocation because maxAMShare limit would " + - "be exceeded"); - } - return Resources.none(); - } - } - ResourceRequest rackLocalRequest = getResourceRequest(priority, node.getRackName()); ResourceRequest localRequest = getResourceRequest(priority, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index c49a323..ec05c46 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -125,7 +125,8 @@ public boolean removeApp(FSAppAttempt app) { } // Update AM resource usage if needed - if (runnable && app.isAmRunning() && app.getAMResource() != null) { + // If isAMRunning is true, we're no running an unmanaged AM. + if (runnable && app.isAmRunning()) { Resources.subtractFrom(amResourceUsage, app.getAMResource()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 04c7f70..a6c5416 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -901,12 +901,6 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Record container allocation start time application.recordContainerRequestTime(getClock().getTime()); - // Set amResource for this app - if (!application.getUnmanagedAM() && ask.size() == 1 - && application.getLiveContainers().isEmpty()) { - application.setAMResource(ask.get(0).getCapability()); - } - // Release containers releaseContainers(release, application); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 7600a35..101c139 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -3549,8 +3550,8 @@ public void testQueueMaxAMShare() throws Exception { FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application3's AM requests 1024 MB memory", - 1024, app3.getAMResource().getMemory()); + assertEquals("Application3's AM resource shouldn't be updated", + 0, app3.getAMResource().getMemory()); assertEquals("Application3's AM should not be running", 0, app3.getLiveContainers().size()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", @@ -3575,6 +3576,8 @@ public void testQueueMaxAMShare() throws Exception { 0, app1.getLiveContainers().size()); assertEquals("Application3's AM should be running", 1, app3.getLiveContainers().size()); + assertEquals("Application3's AM requests 1024 MB memory", + 1024, app3.getAMResource().getMemory()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemory()); @@ -3585,8 +3588,8 @@ public void testQueueMaxAMShare() throws Exception { FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application4's AM requests 2048 MB memory", - 2048, app4.getAMResource().getMemory()); + assertEquals("Application4's AM resource shouldn't be updated", + 0, app4.getAMResource().getMemory()); assertEquals("Application4's AM should not be running", 0, app4.getLiveContainers().size()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", @@ -3599,8 +3602,8 @@ public void testQueueMaxAMShare() throws Exception { FSAppAttempt app5 = scheduler.getSchedulerApp(attId5); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application5's AM requests 2048 MB memory", - 2048, app5.getAMResource().getMemory()); + assertEquals("Application5's AM resource shouldn't be updated", + 0, app5.getAMResource().getMemory()); assertEquals("Application5's AM should not be running", 0, app5.getLiveContainers().size()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", @@ -3632,6 +3635,33 @@ public void testQueueMaxAMShare() throws Exception { 0, app3.getLiveContainers().size()); assertEquals("Application5's AM should be running", 1, app5.getLiveContainers().size()); + assertEquals("Application5's AM requests 2048 MB memory", + 2048, app5.getAMResource().getMemory()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // request non-AM container for app5 + createSchedulingRequestExistingApplication(1024, 1, attId5); + assertEquals("Application5's AM should have 1 container", + 1, app5.getLiveContainers().size()); + // complete AM container before non-AM container is allocated. + // spark application hit this situation. + RMContainer amContainer5 = (RMContainer)app5.getLiveContainers().toArray()[0]; + ContainerExpiredSchedulerEvent containerExpired = + new ContainerExpiredSchedulerEvent(amContainer5.getContainerId()); + scheduler.handle(containerExpired); + assertEquals("Application5's AM should have 0 container", + 0, app5.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + scheduler.update(); + scheduler.handle(updateEvent); + // non-AM container should be allocated + // check non-AM container allocation is not rejected + // due to queue MaxAMShare limitation. + assertEquals("Application5's AM should have 1 container", + 1, app5.getLiveContainers().size()); + // check non-AM container allocation won't affect queue AmResourceUsage assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemory()); @@ -3644,8 +3674,8 @@ public void testQueueMaxAMShare() throws Exception { scheduler.handle(updateEvent); assertEquals("Application6's AM should not be running", 0, app6.getLiveContainers().size()); - assertEquals("Application6's AM requests 2048 MB memory", - 2048, app6.getAMResource().getMemory()); + assertEquals("Application6's AM resource shouldn't be updated", + 0, app6.getAMResource().getMemory()); assertEquals("Queue1's AM resource usage should be 2048 MB memory", 2048, queue1.getAmResourceUsage().getMemory()); @@ -3749,8 +3779,8 @@ public void testQueueMaxAMShareDefault() throws Exception { FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application2's AM requests 1024 MB memory", - 1024, app2.getAMResource().getMemory()); + assertEquals("Application2's AM resource shouldn't be updated", + 0, app2.getAMResource().getMemory()); assertEquals("Application2's AM should not be running", 0, app2.getLiveContainers().size()); assertEquals("Queue2's AM resource usage should be 0 MB memory",