diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index c393759..fe639a8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -79,6 +79,7 @@ public FSLeafQueue(String name, FairScheduler scheduler, this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime(); activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); + getMetrics().setAMResourceUsage(amResourceUsage); } public void addApp(FSAppAttempt app, boolean runnable) { @@ -130,6 +131,7 @@ public boolean removeApp(FSAppAttempt app) { // running an unmanaged AM. if (runnable && app.isAmRunning()) { Resources.subtractFrom(amResourceUsage, app.getAMResource()); + getMetrics().setAMResourceUsage(amResourceUsage); } return runnable; @@ -473,18 +475,14 @@ public ActiveUsersManager getActiveUsersManager() { } /** - * Check whether this queue can run this application master under the - * maxAMShare limit. - * @param amResource - * @return true if this queue can run + * Compute the maximum resource AM can use. The value is the result of + * multiplying FairShare and maxAMShare. If FairShare is zero, use + * min(maxShare, available resource) instead to prevent zero value for + * maximum AM resource since it forbids any job running in the queue. + * + * @return the maximum resource AM can use */ - public boolean canRunAppAM(Resource amResource) { - if (Math.abs(maxAMShare - -1.0f) < 0.0001) { - return true; - } - - // If FairShare is zero, use min(maxShare, available resource) to compute - // maxAMResource + public Resource computeMaxAMResource() { Resource maxResource = Resources.clone(getFairShare()); if (maxResource.getMemorySize() == 0) { maxResource.setMemorySize( @@ -498,7 +496,23 @@ public boolean canRunAppAM(Resource amResource) { getMaxShare().getVirtualCores())); } - Resource maxAMResource = Resources.multiply(maxResource, maxAMShare); + return Resources.multiply(maxResource, maxAMShare); + } + + /** + * Check whether this queue can run the Application Master under the + * maxAMShare limit. + * + * @param amResource the resource AM needed + * @return true if this queue can run + */ + public boolean canRunAppAM(Resource amResource) { + if (Math.abs(maxAMShare - -1.0f) < 0.0001) { + return true; + } + + Resource maxAMResource = computeMaxAMResource(); + getMetrics().setMaxAMShare(maxAMResource); Resource ifRunAMResource = Resources.add(amResourceUsage, amResource); return Resources.fitsIn(ifRunAMResource, maxAMResource); } @@ -506,6 +520,7 @@ public boolean canRunAppAM(Resource amResource) { public void addAMResourceUsage(Resource amResource) { if (amResource != null) { Resources.addTo(amResourceUsage, amResource); + getMetrics().setAMResourceUsage(amResourceUsage); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 5fa2ee1..15219af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + @Private @Unstable public abstract class FSQueue implements Queue, Schedulable { @@ -160,6 +162,11 @@ public int getMaxRunningApps() { return maxRunningApps; } + @VisibleForTesting + public float getMaxAMShare() { + return maxAMShare; + } + public void setMaxAMShare(float maxAMShare){ this.maxAMShare = maxAMShare; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java index 42c8825..c4f8ac3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java @@ -41,6 +41,10 @@ @Metric("Maximum share of memory in MB") MutableGaugeLong maxShareMB; @Metric("Maximum share of CPU in vcores") MutableGaugeLong maxShareVCores; @Metric("Maximum number of applications") MutableGaugeInt maxApps; + @Metric("Maximum AM share of memory in MB") MutableGaugeLong maxAMShareMB; + @Metric("Maximum AM share of CPU in vcores") MutableGaugeInt maxAMShareVCores; + @Metric("AM resource usage of memory in MB") MutableGaugeLong amResourceUsageMB; + @Metric("AM resource usage of CPU in vcores") MutableGaugeInt amResourceUsageVCores; private String schedulingPolicy; @@ -109,6 +113,32 @@ public void setMaxApps(int max) { maxApps.set(max); } + public long getMaxAMShareMB() { + return maxAMShareMB.value(); + } + + public int getMaxAMShareVCores() { + return maxAMShareVCores.value(); + } + + public void setMaxAMShare(Resource resource) { + maxAMShareMB.set(resource.getMemorySize()); + maxAMShareVCores.set(resource.getVirtualCores()); + } + + public long getAMResourceUsageMB() { + return amResourceUsageMB.value(); + } + + public int getAMResourceUsageVCores() { + return amResourceUsageVCores.value(); + } + + public void setAMResourceUsage(Resource resource) { + amResourceUsageMB.set(resource.getMemorySize()); + amResourceUsageVCores.set(resource.getVirtualCores()); + } + public String getSchedulingPolicy() { return schedulingPolicy; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index ffbfec8..f8fe99e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -594,6 +594,109 @@ public void testFairShareWithZeroWeight() throws IOException { assertEquals(0, queue.getFairShare().getMemorySize()); } + /** + * Test if we compute the maximum AM resource correctly. + * + * @throws IOException if scheduler reinitialization fails + */ + @Test + public void testComputeMaxAMResource() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + // set queueA and queueB weight zero. + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0"); + out.println("4096 mb 4 vcores"); + out.println(""); + out.println(""); + out.println("0.0"); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println("drf" + + ""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + long memCapacity = 20 * 1024; + int cpuCapacity = 20; + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(memCapacity, + cpuCapacity), 0, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeEvent); + scheduler.update(); + + Resource amResource = Resource.newInstance(1024, 1); + int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); + + // queue1 + FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", + true); + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "queue1", "user1", amResource); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1); + scheduler.update(); + scheduler.handle(updateEvent); + + // queue1's weight is 0.0, so its fair share should be 0, we use the + // min(maxShare, available resource) to compute maxAMShare, in this case, we + // use maxShare, since it is smaller than available resource. + assertEquals(0, queue1.getFairShare().getMemorySize()); + assertEquals( + (long)(queue1.getMaxShare().getMemorySize() * queue1.getMaxAMShare()), + queue1.getMetrics().getMaxAMShareMB()); + assertEquals( + (long)(queue1.getMaxShare().getVirtualCores() * queue1.getMaxAMShare()), + queue1.getMetrics().getMaxAMShareVCores()); + assertEquals(1024, queue1.getMetrics().getAMResourceUsageMB()); + + // queue2 + FSLeafQueue queue2 = scheduler.getQueueManager().getLeafQueue("queue2", + true); + ApplicationAttemptId attId2 = createAppAttemptId(2, 1); + createApplicationWithAMResource(attId2, "queue2", "user1", amResource); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2); + scheduler.update(); + scheduler.handle(updateEvent); + + // queue2's weight is 0.0, so its fair share is 0, and we use the + // min(maxShare, available resource) to compute maxAMShare, in this case, we + // use available resource since it is smaller than the default maxShare. + assertEquals(0, queue2.getFairShare().getMemorySize()); + assertEquals((long) ((memCapacity - 1024) * queue2.getMaxAMShare()), + queue2.getMetrics().getMaxAMShareMB()); + assertEquals((long) ((cpuCapacity - 1) * queue2.getMaxAMShare()), + queue2.getMetrics().getMaxAMShareVCores()); + assertEquals(1024, queue2.getMetrics().getAMResourceUsageMB()); + + // queue3 + FSLeafQueue queue3 = scheduler.getQueueManager().getLeafQueue("queue3", + true); + ApplicationAttemptId attId3 = createAppAttemptId(3, 1); + createApplicationWithAMResource(attId3, "queue3", "user1", amResource); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3); + scheduler.update(); + scheduler.handle(updateEvent); + + // queue3's weight is 1, so its fair share is not 0, and we use the fair + // share to compute maxAMShare + assertNotEquals(0, queue3.getFairShare().getMemorySize()); + assertEquals((long)(memCapacity * queue3.getMaxAMShare()), + queue3.getMetrics().getMaxAMShareMB()); + assertEquals((long)(cpuCapacity * queue3.getMaxAMShare()), + queue3.getMetrics().getMaxAMShareVCores()); + assertEquals(1024, queue3.getMetrics().getAMResourceUsageMB()); + } + @Test public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);