diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3fc3019..4dcaa3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1029,7 +1029,10 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { Priority reservedPriority = node.getReservedContainer().getReservedPriority(); - if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { + FSQueue queue = reservedAppSchedulable.getQueue(); + + if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node) + || !fitInMaxShare(queue)) { // Don't hold the reservation if app can no longer use it LOG.info("Releasing reservation that cannot be satisfied for application " + reservedAppSchedulable.getApplicationAttemptId() @@ -1043,7 +1046,6 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { + reservedAppSchedulable.getApplicationAttemptId() + " on node: " + node); } - node.getReservedAppSchedulable().assignReservedContainer(node); } } @@ -1065,6 +1067,19 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { updateRootQueueMetrics(); } + private boolean fitInMaxShare(FSQueue queue) { + if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterResource, + queue.getMaxShare(), queue.getResourceUsage())) { + return false; + } + + FSQueue parentQueue = queue.getParent(); + if (parentQueue != null) { + return fitInMaxShare(parentQueue); + } + return true; + } + public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { return super.getApplicationAttempt(appAttemptId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 843555f..7094aa0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -722,6 +722,85 @@ public void testSimpleContainerReservation() throws Exception { } + @Test (timeout = 5000) + public void testContainerReservationNotExceedingQueueMax() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("1024mb,5vcores"); + out.println("2048mb,10vcores"); + out.println(""); + out.println(""); + out.println("1024mb,5vcores"); + out.println("2048mb,10vcores"); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(3072, 5), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue 1 requests full capacity of the queue + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // Make sure queue 1 is allocated app capacity + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + // Now queue 2 requests likewise + ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user2", 1); + scheduler.update(); + scheduler.handle(updateEvent); + + // Make sure queue 2 is allocated app capacity + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getMemory()); + + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(updateEvent); + + // Make sure queue 1 is waiting with a reservation + assertEquals(1024, scheduler.getSchedulerApp(attId1) + .getCurrentReservation().getMemory()); + + // Now remove app of queue2 + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId, RMAppAttemptState.FINISHED, false); + scheduler.update(); + scheduler.handle(appRemovedEvent1); + + // Queue should have no apps + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getMemory()); + + createSchedulingRequest(1024, "queue2", "user2", 1); + scheduler.handle(updateEvent); + // Make sure allocated memory of queue1 doesn't exceed its maximum + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + //the reservation of queue1 should be reclaim + assertEquals(0, scheduler.getSchedulerApp(attId1). + getCurrentReservation().getMemory()); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getMemory()); + } + @Test public void testUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");