diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index c651878..c7881a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -43,6 +43,7 @@ private Map usages; // short for no-label :) private static final String NL = CommonNodeLabelsManager.NO_LABEL; + private static final int UNDEFINED = Integer.MIN_VALUE; public ResourceUsage() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -50,6 +51,7 @@ public ResourceUsage() { writeLock = lock.writeLock(); usages = new HashMap(); + usages.put(NL, new UsageByLabel(NL)); } // Usage enum here to make implement cleaner @@ -71,7 +73,20 @@ public UsageByLabel(String label) { resArr = new Resource[ResourceType.values().length]; for (int i = 0; i < resArr.length; i++) { resArr[i] = Resource.newInstance(0, 0); - } + }; + resArr[ResourceType.HEADROOM.idx] = + Resource.newInstance(UNDEFINED, UNDEFINED); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{used=" + resArr[0] + "%, "); + sb.append("pending=" + resArr[1] + "%, "); + sb.append("am_used=" + resArr[2] + "%, "); + sb.append("reserved=" + resArr[3] + "%, "); + sb.append("headroom=" + resArr[4] + "%}"); + return sb.toString(); } } @@ -183,12 +198,24 @@ public void setReserved(String label, Resource res) { /* * Headroom */ + public Resource getHeadroom(Resource clusterResource) { + return getHeadroom(NL, clusterResource); + } + public Resource getHeadroom() { - return getHeadroom(NL); + return getHeadroom(NL, Resources.none()); } - + public Resource getHeadroom(String label) { - return _get(label, ResourceType.HEADROOM); + return getHeadroom(label, Resources.none()); + } + + public Resource getHeadroom(String label, Resource clusterResource) { + Resource headroom = _get(label, ResourceType.HEADROOM); + if (headroom.getMemory() == UNDEFINED) { + return clusterResource; + } + return headroom; } public void incHeadroom(String label, Resource res) { @@ -309,4 +336,14 @@ private void _dec(String label, ResourceType type, Resource res) { writeLock.unlock(); } } + + @Override + public String toString() { + try { + readLock.lock(); + return usages.toString(); + } finally { + readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 865b0b4..1921195 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -225,52 +225,4 @@ public static void updateQueueStatistics( ) ); } - - public static float getAbsoluteMaxAvailCapacity( - ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) { - CSQueue parent = queue.getParent(); - if (parent == null) { - return queue.getAbsoluteMaximumCapacity(); - } - - //Get my parent's max avail, needed to determine my own - float parentMaxAvail = getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, parent); - //...and as a resource - Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail); - - //check for no resources parent before dividing, if so, max avail is none - if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) { - return 0.0f; - } - //sibling used is parent used - my used... - float siblingUsedCapacity = Resources.ratio( - resourceCalculator, - Resources.subtract(parent.getUsedResources(), queue.getUsedResources()), - parentResource); - //my max avail is the lesser of my max capacity and what is unused from my parent - //by my siblings (if they are beyond their base capacity) - float maxAvail = Math.min( - queue.getMaximumCapacity(), - 1.0f - siblingUsedCapacity); - //and, mutiply by parent to get absolute (cluster relative) value - float absoluteMaxAvail = maxAvail * parentMaxAvail; - - if (LOG.isDebugEnabled()) { - LOG.debug("qpath " + queue.getQueuePath()); - LOG.debug("parentMaxAvail " + parentMaxAvail); - LOG.debug("siblingUsedCapacity " + siblingUsedCapacity); - LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity()); - LOG.debug("maxAvail " + maxAvail); - LOG.debug("absoluteMaxAvail " + absoluteMaxAvail); - } - - if (absoluteMaxAvail < 0.0f) { - absoluteMaxAvail = 0.0f; - } else if (absoluteMaxAvail > 1.0f) { - absoluteMaxAvail = 1.0f; - } - - return absoluteMaxAvail; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 38d4712..db6795f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -150,8 +150,7 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) // and all queues may not be realized yet, we'll use (optimistic) // absoluteMaxCapacity (it will be replaced with the more accurate // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) - updateHeadroomInfo(clusterResource, - queueCapacities.getAbsoluteMaximumCapacity()); + computeQueueMaxResAndSetHeadroomInfo(clusterResource, false); CapacitySchedulerConfiguration conf = csContext.getConfiguration(); userLimit = conf.getUserLimit(getQueuePath()); @@ -901,7 +900,7 @@ protected Resource getHeadroom(User user, Resource queueMaxCap, computeUserLimit(application, clusterResource, required, user, null)); } - private Resource getHeadroom(User user, Resource queueMaxCap, + private Resource getHeadroom(User user, Resource queueMaxRes, Resource clusterResource, Resource userLimit) { /** * Headroom is: @@ -923,8 +922,11 @@ private Resource getHeadroom(User user, Resource queueMaxCap, Resource headroom = Resources.min(resourceCalculator, clusterResource, Resources.subtract(userLimit, user.getUsed()), - Resources.subtract(queueMaxCap, queueUsage.getUsed()) + Resources.subtract(queueMaxRes, queueUsage.getUsed()) ); + // Normalize it before return + headroom = + Resources.roundDown(resourceCalculator, headroom, minimumAllocation); return headroom; } @@ -1012,23 +1014,39 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, return canAssign; } - private Resource updateHeadroomInfo(Resource clusterResource, - float absoluteMaxAvailCapacity) { - - Resource queueMaxCap = + private Resource computeQueueMaxResAndSetHeadroomInfo( + Resource clusterResource, boolean considerParent) { + /* + * Queue's max available resource = min(this.max, parent.max - parent.used + + * this.used) + * + * Why add this.used? This is because parent.max - parent.used already + * includes this.used, when considering queue's max resource, this.used + * shouldn't excluded + */ + Resource queueMaxResource = Resources.multiplyAndNormalizeDown( resourceCalculator, clusterResource, - absoluteMaxAvailCapacity, + queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation); - + if (considerParent) { + // We need consider parent's available resource + queueMaxResource = + Resources.min(resourceCalculator, clusterResource, queueMaxResource, + Resources.add(queueUsage.getHeadroom(clusterResource), + queueUsage.getUsed())); + queueMaxResource = + Resources.roundDown(resourceCalculator, queueMaxResource, + minimumAllocation); + } + synchronized (queueHeadroomInfo) { - queueHeadroomInfo.setQueueMaxCap(queueMaxCap); + queueHeadroomInfo.setQueueMaxCap(queueMaxResource); queueHeadroomInfo.setClusterResource(clusterResource); } - return queueMaxCap; - + return queueMaxResource; } @Lock({LeafQueue.class, FiCaSchedulerApp.class}) @@ -1043,22 +1061,16 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, computeUserLimit(application, clusterResource, required, queueUser, requestedLabels); - //Max avail capacity needs to take into account usage by ancestor-siblings - //which are greater than their base capacity, so we are interested in "max avail" - //capacity - float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, this); - - Resource queueMaxCap = - updateHeadroomInfo(clusterResource, absoluteMaxAvailCapacity); + Resource queueMaxResource = + computeQueueMaxResAndSetHeadroomInfo(clusterResource, true); Resource headroom = - getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit); + getHeadroom(queueUser, queueMaxResource, clusterResource, userLimit); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + - " queueMaxCap=" + queueMaxCap + + " queueMaxAvailRes=" + queueMaxResource + " consumed=" + queueUser.getUsed() + " headroom=" + headroom); } @@ -1760,8 +1772,7 @@ public synchronized void updateClusterResource(Resource clusterResource) { // Update headroom info based on new cluster resource value // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // during allocation - updateHeadroomInfo(clusterResource, - queueCapacities.getAbsoluteMaximumCapacity()); + computeQueueMaxResAndSetHeadroomInfo(clusterResource, true); // Update metrics CSQueueUtils.updateQueueStatistics( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index a26b0aa..2abeeac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -530,6 +530,27 @@ private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { node.getAvailableResource(), minimumAllocation); } + private void setHeadroomOfChild(CSQueue child, Resource clusterResource) { + /* + * Set head-room of a given child, If this queue is root, child's head-room + * = this.max - this.used If this queue is not root, child's head-room = + * min(minimum-of-headroom-of-this-queue-and-ancestors, this.max - + * this.used). To avoid any of this queue's and its ancestors' limit being + * violated + */ + Resource nowHeadroom = + Resources.multiply(clusterResource, + queueCapacities.getAbsoluteMaximumCapacity()); + Resources.subtractFrom(nowHeadroom, getUsedResources()); + if (this.rootQueue) { + child.getQueueResourceUsage().setHeadroom(nowHeadroom); + } else { + child.getQueueResourceUsage().setHeadroom( + Resources.min(resourceCalculator, clusterResource, nowHeadroom, + queueUsage.getHeadroom())); + } + } + private synchronized CSAssignment assignContainersToChildQueues(Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve) { CSAssignment assignment = @@ -544,6 +565,10 @@ private synchronized CSAssignment assignContainersToChildQueues(Resource cluster LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() + " stats: " + childQueue); } + + // Set Headroom of Child Queue before assign containers + setHeadroomOfChild(childQueue, cluster); + assignment = childQueue.assignContainers(cluster, node, needToUnreserve); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + @@ -641,6 +666,7 @@ public void completedContainer(Resource clusterResource, public synchronized void updateClusterResource(Resource clusterResource) { // Update all children for (CSQueue childQueue : childQueues) { + setHeadroomOfChild(childQueue, clusterResource); childQueue.updateClusterResource(clusterResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index e1b8a3d..494f5a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -23,14 +23,12 @@ import java.util.ArrayList; import java.util.List; -import org.junit.Assert; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -45,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; public class MockAM { @@ -53,6 +52,7 @@ private RMContext context; private ApplicationMasterProtocol amRMProtocol; private UserGroupInformation ugi; + private volatile AllocateResponse lastResponse; private final List requests = new ArrayList(); private final List releases = new ArrayList(); @@ -223,7 +223,8 @@ public AllocateResponse allocate(AllocateRequest allocateRequest) context.getRMApps().get(attemptId.getApplicationId()) .getRMAppAttempt(attemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); - return doAllocateAs(ugi, allocateRequest); + lastResponse = doAllocateAs(ugi, allocateRequest); + return lastResponse; } public AllocateResponse doAllocateAs(UserGroupInformation ugi, @@ -240,6 +241,10 @@ public AllocateResponse run() throws Exception { throw (Exception) e.getCause(); } } + + public AllocateResponse doHeartbeat() throws Exception { + return allocate(null, null); + } public void unregisterAppAttempt() throws Exception { waitForState(RMAppAttemptState.RUNNING); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java deleted file mode 100644 index 5135ba9..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java +++ /dev/null @@ -1,250 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; -import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.Test; - -public class TestCSQueueUtils { - - private static final Log LOG = LogFactory.getLog(TestCSQueueUtils.class); - - final static int GB = 1024; - - @Test - public void testAbsoluteMaxAvailCapacityInvalidDivisor() throws Exception { - runInvalidDivisorTest(false); - runInvalidDivisorTest(true); - } - - public void runInvalidDivisorTest(boolean useDominant) throws Exception { - - ResourceCalculator resourceCalculator; - Resource clusterResource; - if (useDominant) { - resourceCalculator = new DominantResourceCalculator(); - clusterResource = Resources.createResource(10, 0); - } else { - resourceCalculator = new DefaultResourceCalculator(); - clusterResource = Resources.createResource(0, 99); - } - - YarnConfiguration conf = new YarnConfiguration(); - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); - - CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getClusterResource()).thenReturn(clusterResource); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getMinimumResourceCapability()). - thenReturn(Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()). - thenReturn(Resources.createResource(0, 0)); - RMContext rmContext = TestUtils.getMockRMContext(); - when(csContext.getRMContext()).thenReturn(rmContext); - - final String L1Q1 = "L1Q1"; - csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1}); - - final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1; - csConf.setCapacity(L1Q1P, 90); - csConf.setMaximumCapacity(L1Q1P, 90); - - ParentQueue root = new ParentQueue(csContext, - CapacitySchedulerConfiguration.ROOT, null, null); - LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null); - - LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, root)); - - LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l1q1)); - - assertEquals(0.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l1q1), 0.000001f); - - } - - @Test - public void testAbsoluteMaxAvailCapacityNoUse() throws Exception { - - ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); - Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32); - - YarnConfiguration conf = new YarnConfiguration(); - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); - - CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getClusterResource()).thenReturn(clusterResource); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getMinimumResourceCapability()). - thenReturn(Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()). - thenReturn(Resources.createResource(16*GB, 32)); - RMContext rmContext = TestUtils.getMockRMContext(); - when(csContext.getRMContext()).thenReturn(rmContext); - - final String L1Q1 = "L1Q1"; - csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1}); - - final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1; - csConf.setCapacity(L1Q1P, 90); - csConf.setMaximumCapacity(L1Q1P, 90); - - ParentQueue root = new ParentQueue(csContext, - CapacitySchedulerConfiguration.ROOT, null, null); - LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null); - - LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, root)); - - LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l1q1)); - - assertEquals(1.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, root), 0.000001f); - - assertEquals(0.9f, CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l1q1), 0.000001f); - - } - - @Test - public void testAbsoluteMaxAvailCapacityWithUse() throws Exception { - - ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); - Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32); - - YarnConfiguration conf = new YarnConfiguration(); - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); - - CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getClusterResource()).thenReturn(clusterResource); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getMinimumResourceCapability()). - thenReturn(Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()). - thenReturn(Resources.createResource(16*GB, 32)); - - RMContext rmContext = TestUtils.getMockRMContext(); - when(csContext.getRMContext()).thenReturn(rmContext); - - final String L1Q1 = "L1Q1"; - final String L1Q2 = "L1Q2"; - final String L2Q1 = "L2Q1"; - final String L2Q2 = "L2Q2"; - csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1, L1Q2, - L2Q1, L2Q2}); - - final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1; - csConf.setCapacity(L1Q1P, 80); - csConf.setMaximumCapacity(L1Q1P, 80); - - final String L1Q2P = CapacitySchedulerConfiguration.ROOT + "." + L1Q2; - csConf.setCapacity(L1Q2P, 20); - csConf.setMaximumCapacity(L1Q2P, 100); - - final String L2Q1P = L1Q1P + "." + L2Q1; - csConf.setCapacity(L2Q1P, 50); - csConf.setMaximumCapacity(L2Q1P, 50); - - final String L2Q2P = L1Q1P + "." + L2Q2; - csConf.setCapacity(L2Q2P, 50); - csConf.setMaximumCapacity(L2Q2P, 50); - - float result; - - ParentQueue root = new ParentQueue(csContext, - CapacitySchedulerConfiguration.ROOT, null, null); - - LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null); - LeafQueue l1q2 = new LeafQueue(csContext, L1Q2, root, null); - LeafQueue l2q2 = new LeafQueue(csContext, L2Q2, l1q1, null); - LeafQueue l2q1 = new LeafQueue(csContext, L2Q1, l1q1, null); - - //no usage, all based on maxCapacity (prior behavior) - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.4f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //some usage, but below the base capacity - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.4f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //usage gt base on parent sibling - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); - l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.3f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //same as last, but with usage also on direct parent - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.3f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //add to direct sibling, below the threshold of effect at present - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.3f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - //add to direct sibling, now above the threshold of effect - //(it's cumulative with prior tests) - root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); - result = CSQueueUtils.getAbsoluteMaxAvailCapacity( - resourceCalculator, clusterResource, l2q2); - assertEquals( 0.1f, result, 0.000001f); - LOG.info("t2 l2q2 " + result); - - - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index fabf47d..c6f7b9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -359,7 +360,8 @@ private void nodeUpdate( resourceManager.getResourceScheduler().handle(nodeUpdate); } - private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { + private CapacitySchedulerConfiguration setupQueueConfiguration( + CapacitySchedulerConfiguration conf) { // Define top-level queues conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); @@ -383,6 +385,7 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { conf.setUserLimitFactor(B3, 100.0f); LOG.info("Setup top-level queues a and b"); + return conf; } @Test @@ -2400,6 +2403,76 @@ public void testRefreshQueuesMaxAllocationCSLarger() throws Exception { assertEquals("queue B2 max vcores allocation", 12, ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores()); } + + private void waitContainerAllocated(MockAM am, int mem, int nContainer, + int startContainerId, MockRM rm, MockNM nm) throws Exception { + for (int cId = startContainerId; cId < startContainerId + nContainer; cId ++) { + am.allocate("*", mem, 1, new ArrayList()); + ContainerId containerId = ContainerId.newContainerId(am.getApplicationAttemptId(), cId); + Assert.assertTrue(rm.waitForState(nm, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + } + } + + @Test + public void testHierarchyQueueHeadroom() throws Exception { + YarnConfiguration conf = + new YarnConfiguration( + setupQueueConfiguration(new CapacitySchedulerConfiguration())); + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 100 * GB, rm1.getResourceTrackerService()); + nm1.registerNode(); + + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + waitContainerAllocated(am1, 1 * GB, 1, 2, rm1, nm1); + + // Maximum resoure of b1 is 100 * 0.895 * 0.792 = 70 GB + Assert.assertEquals(69 * GB, + am1.doHeartbeat().getAvailableResources().getMemory()); + + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // Allocate 5 containers, each one is 8 GB in am2 (40 GB in total) + waitContainerAllocated(am2, 8 * GB, 5, 2, rm1, nm1); + + // Allocated one more container with 1 GB resource in b1 + waitContainerAllocated(am1, 1 * GB, 1, 3, rm1, nm1); + + // Total is 100 GB, + // B2 uses 41 GB (5 * 8GB containers and 1 AM container) + // B1 uses 3 GB (2 * 1GB containers and 1 AM container) + // Available is 100 - 41 - 3 = 56 GB + Assert.assertEquals(56 * GB, + am1.doHeartbeat().getAvailableResources().getMemory()); + + // Now we submit app3 to a1 (in higher level hierarchy), to see if headroom + // of app1 (in queue b1) updated correctly + RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1); + + // Allocate 3 containers, each one is 8 GB in am3 (24 GB in total) + waitContainerAllocated(am3, 8 * GB, 3, 2, rm1, nm1); + + // Allocated one more container with 4 GB resource in b1 + waitContainerAllocated(am1, 1 * GB, 1, 4, rm1, nm1); + + // Total is 100 GB, + // B2 uses 41 GB (5 * 8GB containers and 1 AM container) + // B1 uses 4 GB (3 * 1GB containers and 1 AM container) + // A1 uses 25 GB (3 * 8GB containers and 1 AM container) + // Available is 100 - 41 - 4 - 25 = 30 GB + Assert.assertEquals(30 * GB, + am1.doHeartbeat().getAvailableResources().getMemory()); + } private void setMaxAllocMb(Configuration conf, int maxAllocMb) { conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,