diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 6287deb..d1ef12b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -542,7 +542,7 @@ private Resource assignContainer( return container.getResource(); } else { - if (!FairScheduler.fitsInMaxShare(getQueue(), capability)) { + if (!getQueue().fitsInMaxShare(capability)) { return Resources.none(); } @@ -557,23 +557,25 @@ private boolean hasNodeOrRackLocalRequests(Priority priority) { return getResourceRequests(priority).size() > 1; } - private Resource assignContainer(FSSchedulerNode node, boolean reserved) { - if (LOG.isDebugEnabled()) { - LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); - } - + /** + * Whether the AM container for this app is over maxAMShare limit. + */ + private boolean isOverAMShareLimit() { // Check the AM resource usage for the leaf queue if (!isAmRunning() && !getUnmanagedAM()) { List ask = appSchedulingInfo.getAllResourceRequests(); if (ask.isEmpty() || !getQueue().canRunAppAM( ask.get(0).getCapability())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping allocation because maxAMShare limit would " + - "be exceeded"); - } - return Resources.none(); + return true; } } + return false; + } + + private Resource assignContainer(FSSchedulerNode node, boolean reserved) { + if (LOG.isDebugEnabled()) { + LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); + } Collection prioritiesToTry = (reserved) ? Arrays.asList(node.getReservedContainer().getReservedPriority()) : @@ -584,8 +586,9 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { // (not scheduled) in order to promote better locality. synchronized (this) { for (Priority priority : prioritiesToTry) { - if (getTotalRequiredResources(priority) <= 0 || - !hasContainerForNode(priority, node)) { + // Skip it for reserved container, since + // we already check it in okToUnreserve. + if (!reserved && !hasContainerForNode(priority, node)) { continue; } @@ -660,15 +663,6 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { * Node that the application has an existing reservation on */ public Resource assignReservedContainer(FSSchedulerNode node) { - RMContainer rmContainer = node.getReservedContainer(); - Priority priority = rmContainer.getReservedPriority(); - - // Make sure the application still needs requests at this priority - if (getTotalRequiredResources(priority) == 0) { - unreserve(priority, node); - return Resources.none(); - } - // Fail early if the reserved container won't fit. // Note that we have an assumption here that there's only one container size // per priority. @@ -706,6 +700,20 @@ public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) { anyRequest.getCapability(), node.getRMNode().getTotalCapability()); } + /** + * Whether it is ok to unreserve container on this node. + */ + public boolean okToUnreserve(FSSchedulerNode node) { + Priority reservedPriority = node.getReservedContainer(). + getReservedPriority(); + if (!hasContainerForNode(reservedPriority, node) + || !getQueue().fitsInMaxShare( + node.getReservedContainer().getReservedResource()) + || isOverAMShareLimit()) { + return true; + } + return false; + } static class RMContainerComparator implements Comparator, Serializable { @@ -795,6 +803,13 @@ public void updateDemand() { @Override public Resource assignContainer(FSSchedulerNode node) { + if (isOverAMShareLimit()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping allocation because maxAMShare limit would " + + "be exceeded"); + } + return Resources.none(); + } return assignContainer(node, false); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index ade2880..e488c76 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -330,4 +330,19 @@ public void incPendingResource(String nodeLabel, Resource resourceToInc) { @Override public void decPendingResource(String nodeLabel, Resource resourceToDec) { } + + public boolean fitsInMaxShare(Resource additionalResource) { + Resource usagePlusAddition = + Resources.add(getResourceUsage(), additionalResource); + + if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) { + return false; + } + + FSQueue parentQueue = getParent(); + if (parentQueue != null) { + return parentQueue.fitsInMaxShare(additionalResource); + } + return true; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f481de5..0cb967b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1052,12 +1052,9 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { - Priority reservedPriority = node.getReservedContainer().getReservedPriority(); - FSQueue queue = reservedAppSchedulable.getQueue(); - - if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node) - || !fitsInMaxShare(queue, - node.getReservedContainer().getReservedResource())) { + if (reservedAppSchedulable.okToUnreserve(node)) { + Priority reservedPriority = node.getReservedContainer(). + getReservedPriority(); // Don't hold the reservation if app can no longer use it LOG.info("Releasing reservation that cannot be satisfied for application " + reservedAppSchedulable.getApplicationAttemptId() @@ -1071,7 +1068,7 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { + reservedAppSchedulable.getApplicationAttemptId() + " on node: " + node); } - node.getReservedAppSchedulable().assignReservedContainer(node); + reservedAppSchedulable.assignReservedContainer(node); } } if (reservedAppSchedulable == null) { @@ -1092,22 +1089,6 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { updateRootQueueMetrics(); } - static boolean fitsInMaxShare(FSQueue queue, Resource - additionalResource) { - Resource usagePlusAddition = - Resources.add(queue.getResourceUsage(), additionalResource); - - if (!Resources.fitsIn(usagePlusAddition, queue.getMaxShare())) { - return false; - } - - FSQueue parentQueue = queue.getParent(); - if (parentQueue != null) { - return fitsInMaxShare(parentQueue, additionalResource); - } - return true; - } - public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { return super.getApplicationAttempt(appAttemptId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 69e0a8c..560256b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3702,6 +3702,137 @@ public void testQueueMaxAMShareDefault() throws Exception { } @Test + public void testQueueMaxAMShareWithContainerReservation() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0.5"); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(10240, 10), + 1, "127.0.0.1"); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(10240, 10), + 2, "127.0.0.2"); + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(5120, 5), + 3, "127.0.0.3"); + NodeAddedSchedulerEvent nodeE1 = new NodeAddedSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateE1 = new NodeUpdateSchedulerEvent(node1); + NodeAddedSchedulerEvent nodeE2 = new NodeAddedSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateE2 = new NodeUpdateSchedulerEvent(node2); + NodeAddedSchedulerEvent nodeE3 = new NodeAddedSchedulerEvent(node3); + NodeUpdateSchedulerEvent updateE3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeE1); + scheduler.handle(nodeE2); + scheduler.handle(nodeE3); + scheduler.update(); + FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", + true); + Resource amResource1 = Resource.newInstance(1024, 1); + Resource amResource2 = Resource.newInstance(1024, 1); + Resource amResource3 = Resource.newInstance(10240, 1); + Resource amResource4 = Resource.newInstance(5120, 1); + Resource amResource5 = Resource.newInstance(1024, 1); + int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "queue1", "user1", amResource1); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + scheduler.update(); + // Allocate app1's AM container on node1. + scheduler.handle(updateE1); + assertEquals("Application1's AM requests 1024 MB memory", + 1024, app1.getAMResource().getMemory()); + assertEquals("Application1's AM should be running", + 1, app1.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 1024 MB memory", + 1024, queue1.getAmResourceUsage().getMemory()); + + ApplicationAttemptId attId2 = createAppAttemptId(2, 1); + createApplicationWithAMResource(attId2, "queue1", "user1", amResource2); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); + scheduler.update(); + // Allocate app2's AM container on node2. + scheduler.handle(updateE2); + assertEquals("Application2's AM requests 1024 MB memory", + 1024, app2.getAMResource().getMemory()); + assertEquals("Application2's AM should be running", + 1, app2.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + ApplicationAttemptId attId3 = createAppAttemptId(3, 1); + createApplicationWithAMResource(attId3, "queue1", "user1", amResource3); + createSchedulingRequestExistingApplication(10240, 1, amPriority, attId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); + scheduler.update(); + // app3 reserves a container on node1 because node1's available resource + // is less than app3's AM container resource. + scheduler.handle(updateE1); + // Similarly app3 reserves a container on node2. + scheduler.handle(updateE2); + assertEquals("Application3's AM resource shouldn't be updated", + 0, app3.getAMResource().getMemory()); + assertEquals("Application3's AM should not be running", + 0, app3.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + ApplicationAttemptId attId4 = createAppAttemptId(4, 1); + createApplicationWithAMResource(attId4, "queue1", "user1", amResource4); + createSchedulingRequestExistingApplication(5120, 1, amPriority, attId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); + scheduler.update(); + // Allocate app4's AM container on node3. + scheduler.handle(updateE3); + assertEquals("Application4's AM requests 5120 MB memory", + 5120, app4.getAMResource().getMemory()); + assertEquals("Application4's AM should be running", + 1, app4.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 7168 MB memory", + 7168, queue1.getAmResourceUsage().getMemory()); + + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = + new AppAttemptRemovedSchedulerEvent(attId1, + RMAppAttemptState.FINISHED, false); + // Release app1's AM container on node1. + scheduler.handle(appRemovedEvent1); + assertEquals("Queue1's AM resource usage should be 6144 MB memory", + 6144, queue1.getAmResourceUsage().getMemory()); + + scheduler.update(); + // app3 will free the reserved container on node1 due to + // exceeding queue MaxAMShare limit. + scheduler.handle(updateE1); + + ApplicationAttemptId attId5 = createAppAttemptId(5, 1); + createApplicationWithAMResource(attId5, "queue1", "user1", amResource5); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId5); + FSAppAttempt app5 = scheduler.getSchedulerApp(attId5); + scheduler.update(); + // app5 can allocate its AM container on node1 after + // app3 unreserve its container on node1. + scheduler.handle(updateE1); + assertEquals("Application5's AM requests 1024 MB memory", + 1024, app5.getAMResource().getMemory()); + assertEquals("Application5's AM should be running", + 1, app5.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 7168 MB memory", + 7168, queue1.getAmResourceUsage().getMemory()); + } + + @Test public void testMaxRunningAppsHierarchicalQueues() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); ControlledClock clock = new ControlledClock();