diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 4cc88c1..a52a4c1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -71,6 +71,8 @@ public static final long THREAD_JOIN_TIMEOUT_MS = 1000; + private static final float ROOT_MAX_AM_SHARE_DEFAULT = 0.8f; + private final Clock clock; private long lastSuccessfulReload; // Last time we successfully reloaded queues @@ -331,6 +333,12 @@ public synchronized void reloadAllocations() throws IOException, newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); } + + // Set the value of the max AM shares to the default for the root queue + // unless the user already set it to something + if (!queueMaxAMShares.containsKey("root")) { + queueMaxAMShares.put("root", ROOT_MAX_AM_SHARE_DEFAULT); + } AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 8f95738..9c9ed45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -94,6 +94,7 @@ public boolean removeApp(FSSchedulerApp app) { // Update AM resource usage if (app.isAmRunning() && app.getAMResource() != null) { Resources.subtractFrom(amResourceUsage, app.getAMResource()); + scheduler.decrementAmRootResourceUsage(app.getAMResource()); } return true; } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) { @@ -309,8 +310,23 @@ public boolean canRunAppAM(Resource amResource) { float maxAMShare = scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName()); if (Math.abs(maxAMShare - -1.0f) < 0.0001) { - return true; + float maxRootAmShare = + scheduler.getAllocationConfiguration().getQueueMaxAMShare( + scheduler.getQueueManager().getRootQueue().getName()); + if (Math.abs(maxRootAmShare - -1.0f) < 0.0001) { + return true; + } + return canRunAppAMHelper( + scheduler.getQueueManager().getRootQueue().getFairShare(), + maxRootAmShare, scheduler.getAmRootResourceUsage(), amResource); + } else { + return canRunAppAMHelper(getFairShare(), maxAMShare, amResourceUsage, + amResource); } + } + + private boolean canRunAppAMHelper(Resource fairShare, float maxAMShare, + Resource amResourceUsage, Resource amResource) { Resource maxAMResource = Resources.multiply(getFairShare(), maxAMShare); Resource ifRunAMResource = Resources.add(amResourceUsage, amResource); return !policy @@ -320,6 +336,7 @@ public boolean canRunAppAM(Resource amResource) { public void addAMResourceUsage(Resource amResource) { if (amResource != null) { Resources.addTo(amResourceUsage, amResource); + scheduler.incrementAmRootResourceUsage(amResource); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a042acd..ea71677 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -163,6 +163,9 @@ // Containers whose AMs have been warned that they will be preempted soon. private List warnedContainers = new ArrayList(); + + // Track the AM resource usage for the root queue + private Resource amRootResourceUsage; protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster @@ -192,6 +195,7 @@ public FairScheduler() { allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); + amRootResourceUsage = Resource.newInstance(0, 0); } private void validateConf(Configuration conf) { @@ -1479,4 +1483,18 @@ private FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) { } return queue1; // names are identical } + + public synchronized void incrementAmRootResourceUsage(Resource amResource) { + amRootResourceUsage = Resources.add(amRootResourceUsage, amResource); + } + + public synchronized void decrementAmRootResourceUsage(Resource amResource) { + amRootResourceUsage = Resources.subtract(amRootResourceUsage, amResource); + } + + public synchronized Resource getAmRootResourceUsage() { + Resource r = Resource.newInstance(amRootResourceUsage.getMemory(), + amRootResourceUsage.getVirtualCores()); + return r; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 68e6f14..2e9bbea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2605,6 +2605,94 @@ public void testQueueMaxAMShareDefault() throws Exception { } @Test + public void testQueueMaxAMShareRoot() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("-1"); + out.println(""); + out.println(""); + out.println(""); + out.println("1.0"); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(2048, 20), + 0, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeEvent); + scheduler.update(); + + FSLeafQueue queue1 = + scheduler.getQueueManager().getLeafQueue("queue1", true); + assertEquals("Queue queue1's fair share should be 683", + 683, queue1.getFairShare().getMemory()); + FSLeafQueue queue2 = + scheduler.getQueueManager().getLeafQueue("queue2", true); + assertEquals("Queue queue2's fair share should be 683", + 683, queue2.getFairShare().getMemory()); + + Resource amResource1 = Resource.newInstance(2048, 1); + int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); + + // Exceeds queue limit, but default rootMaxAMShare is -1.0 so it doesn't + // matter + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "queue1", "test1", amResource1); + createSchedulingRequestExistingApplication(2048, 1, amPriority, attId1); + FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application1's AM requests 2048 MB memory", + 2048, app1.getAMResource().getMemory()); + assertEquals("Application1's AM should be running", + 1, app1.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // Exceeds queue limit, and maxAMShare is 1.0 + ApplicationAttemptId attId2 = createAppAttemptId(2, 1); + createApplicationWithAMResource(attId2, "queue2", "test1", amResource1); + createSchedulingRequestExistingApplication(2048, 1, amPriority, attId2); + FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application2's AM requests 2048 MB memory", + 2048, app2.getAMResource().getMemory()); + assertEquals("Application2's AM should not be running", + 0, app2.getLiveContainers().size()); + assertEquals("Queue2's AM resource usage should be 0 MB memory", + 0, queue2.getAmResourceUsage().getMemory()); + } + + @Test + public void testAmRootResourceUsage() throws Exception { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + assertEquals(Resource.newInstance(0, 0), + scheduler.getAmRootResourceUsage()); + scheduler.incrementAmRootResourceUsage(Resource.newInstance(1024, 10)); + assertEquals(Resource.newInstance(1024, 10), + scheduler.getAmRootResourceUsage()); + scheduler.decrementAmRootResourceUsage(Resource.newInstance(256, 3)); + assertEquals(Resource.newInstance(768, 7), + scheduler.getAmRootResourceUsage()); + } + + @Test public void testMaxRunningAppsHierarchicalQueues() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); MockClock clock = new MockClock();