diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 0ea7314..1581b5d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -58,6 +58,9 @@ final Map queueMaxAMShares; private final float queueMaxAMShareDefault; + // Maximum resource share that can be used by AMs across queues + private final float clusterMaxAMShare; + // ACL's for each queue. Only specifies non-default ACL's from configuration. private final Map> queueAcls; @@ -109,7 +112,8 @@ public AllocationConfiguration(Map minQueueResources, QueuePlacementPolicy placementPolicy, Map> configuredQueues, ReservationQueueConfiguration globalReservationQueueConfig, - Set reservableQueues) { + Set reservableQueues, + float clusterMaxAMShare) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; this.queueMaxApps = queueMaxApps; @@ -129,6 +133,7 @@ public AllocationConfiguration(Map minQueueResources, this.globalReservationQueueConfig = globalReservationQueueConfig; this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; + this.clusterMaxAMShare = clusterMaxAMShare; } public AllocationConfiguration(Configuration conf) { @@ -141,6 +146,7 @@ public AllocationConfiguration(Configuration conf) { userMaxAppsDefault = Integer.MAX_VALUE; queueMaxAppsDefault = Integer.MAX_VALUE; queueMaxAMShareDefault = 0.5f; + clusterMaxAMShare = 0.5f; queueAcls = new HashMap>(); minSharePreemptionTimeouts = new HashMap(); fairSharePreemptionTimeouts = new HashMap(); @@ -227,6 +233,10 @@ public float getQueueMaxAMShare(String queue) { return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare; } + public float getClusterMaxAMShare() { + return clusterMaxAMShare; + } + /** * Get the minimum resource allocation for the given queue. * @return the cap set on this queue, or 0 if not set. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index dab6d9f..b6989b4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -226,6 +226,7 @@ public synchronized void reloadAllocations() throws IOException, int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; float queueMaxAMShareDefault = 0.5f; + float clusterMaxAMShare = 0.5f; long defaultFairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; float defaultFairSharePreemptionThreshold = 0.5f; @@ -317,6 +318,11 @@ public synchronized void reloadAllocations() throws IOException, float val = Float.parseFloat(text); val = Math.min(val, 1.0f); queueMaxAMShareDefault = val; + } else if ("clusterMaxAMShare".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.min(val, 1.0f); + clusterMaxAMShare = val; } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) || "defaultQueueSchedulingMode".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); @@ -402,7 +408,7 @@ public synchronized void reloadAllocations() throws IOException, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, newPlacementPolicy, configuredQueues, globalReservationQueueConfig, - reservableQueues); + reservableQueues, clusterMaxAMShare); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 04dbd2f..51077ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -128,6 +128,7 @@ public boolean removeApp(FSAppAttempt app) { // running an unmanaged AM. if (runnable && app.isAmRunning()) { Resources.subtractFrom(amResourceUsage, app.getAMResource()); + Resources.subtractFrom(scheduler.getTotalAmResourceUsage(), app.getAMResource()); } return runnable; @@ -487,6 +488,24 @@ public ActiveUsersManager getActiveUsersManager() { * @return true if this queue can run */ public boolean canRunAppAM(Resource amResource) { + // Check if cluster-wide AM share allows running AMs + float clusterMaxAMShare = + scheduler.getAllocationConfiguration().getClusterMaxAMShare(); + if (clusterMaxAMShare > 0) { // clusterMaxAMShare check is turned off when it is negative. + Resource clusterMaxAMResource = Resources.multiply(scheduler.getClusterResource(), clusterMaxAMShare); + Resource ifRunClusterAMResource = Resources.add(scheduler.getTotalAmResourceUsage(), amResource); + if (policy.checkIfAMResourceUsageOverLimit(ifRunClusterAMResource, clusterMaxAMResource)) { + return false; + } + } + + if (amResourceUsage.equals(Resources.none())) { + // We don't block AMs from running if no AM is currently running in this queue + // and it doesn't exceed cluster AM Resources (which was checked above) + LOG.debug("amResourceUsage for this queue is 0. Letting the AM run"); + return true; + } + float maxAMShare = scheduler.getAllocationConfiguration().getQueueMaxAMShare(getName()); if (Math.abs(maxAMShare - -1.0f) < 0.0001) { @@ -501,6 +520,7 @@ public boolean canRunAppAM(Resource amResource) { public void addAMResourceUsage(Resource amResource) { if (amResource != null) { Resources.addTo(amResourceUsage, amResource); + Resources.addTo(scheduler.getTotalAmResourceUsage(), amResource); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f481de5..488b5a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -185,6 +185,9 @@ // heartbeat protected int maxAssign; // Max containers to assign per heartbeat + // Track the total AM resource usage across queues + private Resource totalAmResourceUsage; + @VisibleForTesting final MaxRunningAppsEnforcer maxRunningEnforcer; @@ -198,6 +201,7 @@ public FairScheduler() { allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this); + totalAmResourceUsage = Resource.newInstance(0, 0); } private void validateConf(Configuration conf) { @@ -1689,4 +1693,8 @@ private String handleMoveToPlanQueue(String targetQueueName) { } return targetQueueName; } + + public Resource getTotalAmResourceUsage() { + return totalAmResourceUsage; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 69e0a8c..4d51a25 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3684,21 +3684,129 @@ public void testQueueMaxAMShareDefault() throws Exception { 1, app1.getLiveContainers().size()); assertEquals("Queue1's AM resource usage should be 1024 MB memory", 1024, queue1.getAmResourceUsage().getMemory()); + } + + @Test + public void testClusterMaxAMShare() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("0.4"); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(8192, 20), + 0, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeEvent); + scheduler.update(); + + FSLeafQueue queue1 = + scheduler.getQueueManager().getLeafQueue("queue1", true); + FSLeafQueue queue2 = + scheduler.getQueueManager().getLeafQueue("queue2", true); + FSLeafQueue queue3 = + scheduler.getQueueManager().getLeafQueue("queue3", true); + + Resource amResource1 = Resource.newInstance(2048, 1); + Resource amResource2 = Resource.newInstance(1024, 1); + int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); + + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "queue1", "test1", amResource1); + createSchedulingRequestExistingApplication(2048, 1, amPriority, attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application1's AM requests 2048 MB memory", + 2048, app1.getAMResource().getMemory()); + assertEquals("Application1's AM should be running", + 1, app1.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + assertEquals("Cluster-wide AM Share should be 2048 MB", + 2048, scheduler.getTotalAmResourceUsage().getMemory()); - // Now the fair share is 1639 MB, and the maxAMShare is 0.4f, - // so the AM is not accepted. ApplicationAttemptId attId2 = createAppAttemptId(2, 1); createApplicationWithAMResource(attId2, "queue2", "test1", amResource1); - createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2); + createSchedulingRequestExistingApplication(2048, 1, amPriority, attId2); FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); scheduler.update(); scheduler.handle(updateEvent); - assertEquals("Application2's AM resource shouldn't be updated", - 0, app2.getAMResource().getMemory()); - assertEquals("Application2's AM should not be running", - 0, app2.getLiveContainers().size()); - assertEquals("Queue2's AM resource usage should be 0 MB memory", - 0, queue2.getAmResourceUsage().getMemory()); + assertEquals("Application2's AM requests 2048 MB memory", + 2048, app2.getAMResource().getMemory()); + assertEquals("Application2's AM should be running", + 1, app2.getLiveContainers().size()); + assertEquals("Queue2's AM resource usage should be 2048 MB memory", + 2048, queue2.getAmResourceUsage().getMemory()); + assertEquals("Cluster-wide AM Share should be 4096 MB", + 4096, scheduler.getTotalAmResourceUsage().getMemory()); + + // This AM won't run because it will exceed cluster-wide maxAMShare + ApplicationAttemptId attId3 = createAppAttemptId(3, 1); + createApplicationWithAMResource(attId3, "queue3", "test1", amResource2); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application3's AM resource shouldn't be updated", + 0, app3.getAMResource().getMemory()); + assertEquals("Application3's AM should not be running", + 0, app3.getLiveContainers().size()); + assertEquals("Queue3's AM resource usage should be 0 MB memory", + 0, queue3.getAmResourceUsage().getMemory()); + assertEquals("Cluster-wide AM Share should still be 4096 MB", + 4096, scheduler.getTotalAmResourceUsage().getMemory()); + + // Finish app1, app3 AM should start running + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = + new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false); + scheduler.update(); + scheduler.handle(appRemovedEvent1); + scheduler.handle(updateEvent); + assertEquals("Application1's AM should be finished", + 0, app1.getLiveContainers().size()); + assertEquals("Application3's AM should be running", + 1, app3.getLiveContainers().size()); + assertEquals("Application3's AM requests 1024 MB memory", + 1024, app3.getAMResource().getMemory()); + assertEquals("Queue3's AM resource usage should be 1024 MB memory", + 1024, queue3.getAmResourceUsage().getMemory()); + assertEquals("Cluster-wide AM Share should be 3072 MB", + 3072, scheduler.getTotalAmResourceUsage().getMemory()); + + // Try to run another 1024MB AM in queue3, it won't run because + // queue's maxAMShare is 0.4, it's fair share is 8192/2 = 4096 + // Note that cluster-wide AM Share doesn't block this, queue AM Share blocks this. + ApplicationAttemptId attId4 = createAppAttemptId(4, 1); + createApplicationWithAMResource(attId4, "queue3", "test1", amResource2); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application4's AM resource shouldn't be updated", + 0, app4.getAMResource().getMemory()); + assertEquals("Application4's AM should not be running", + 0, app4.getLiveContainers().size()); + assertEquals("Queue3's AM resource usage should still be 1024 MB memory", + 1024, queue3.getAmResourceUsage().getMemory()); + assertEquals("Cluster-wide AM Share should still be 3072 MB", + 3072, scheduler.getTotalAmResourceUsage().getMemory()); + } @Test