diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a178f9e..5453aab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -114,10 +114,11 @@ // cache last cluster resource to compute actual capacity private Resource lastClusterResource = Resources.none(); - private final QueueResourceLimitsInfo queueResourceLimitsInfo = - new QueueResourceLimitsInfo(); + private final Map queueResourceLimitsInfo = + new ConcurrentHashMap<>(); - private volatile ResourceLimits cachedResourceLimitsForHeadroom = null; + private volatile Map cachedResourceLimitsForHeadroom = + new ConcurrentHashMap<>(); private volatile OrderingPolicy orderingPolicy = null; @@ -173,15 +174,15 @@ protected void setupQueueConfigs(Resource clusterResource, this.lastClusterResource = clusterResource; - this.cachedResourceLimitsForHeadroom = new ResourceLimits( - clusterResource); + this.cachedResourceLimitsForHeadroom.put(RMNodeLabelsManager.NO_LABEL, + new ResourceLimits(clusterResource)); // Initialize headroom info, also used for calculating application // master resource limits. Since this happens during queue initialization // and all queues may not be realized yet, we'll use (optimistic) // absoluteMaxCapacity (it will be replaced with the more accurate // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) - setQueueResourceLimitsInfo(clusterResource); + setQueueResourceLimitsInfo(clusterResource, RMNodeLabelsManager.NO_LABEL); setOrderingPolicy( conf.getAppOrderingPolicy(getQueuePath())); @@ -758,8 +759,10 @@ public Resource calculateAndGetAMResourceLimitPerPartition( // For non-labeled partition, we need to consider the current queue // usage limit. if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { - synchronized (queueResourceLimitsInfo){ - queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); + synchronized (queueResourceLimitsInfo + .get(RMNodeLabelsManager.NO_LABEL)) { + queueCurrentLimit = queueResourceLimitsInfo + .get(RMNodeLabelsManager.NO_LABEL).getQueueCurrentLimit(); } } @@ -1052,7 +1055,8 @@ private CSAssignment allocateFromReservedContainer(Resource clusterResource, public CSAssignment assignContainers(Resource clusterResource, CandidateNodeSet candidates, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { - updateCurrentResourceLimits(currentResourceLimits, clusterResource); + updateCurrentResourceLimits(currentResourceLimits, clusterResource, + candidates.getPartition()); FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); if (LOG.isDebugEnabled()) { @@ -1423,13 +1427,19 @@ private Resource getHeadroom(User user, clusterFreePartitionResource, headroom); return headroom; } - - private void setQueueResourceLimitsInfo( - Resource clusterResource) { - synchronized (queueResourceLimitsInfo) { - queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom - .getLimit()); - queueResourceLimitsInfo.setClusterResource(clusterResource); + + private void setQueueResourceLimitsInfo(Resource clusterResource, + String nodePartition) { + queueResourceLimitsInfo.putIfAbsent(nodePartition, + new QueueResourceLimitsInfo()); + cachedResourceLimitsForHeadroom.putIfAbsent(nodePartition, + new ResourceLimits(clusterResource)); + + synchronized (queueResourceLimitsInfo.get(nodePartition)) { + queueResourceLimitsInfo.get(nodePartition).setQueueCurrentLimit( + cachedResourceLimitsForHeadroom.get(nodePartition).getLimit()); + queueResourceLimitsInfo.get(nodePartition) + .setClusterResource(clusterResource); } } @@ -1451,23 +1461,23 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, userLimit = getResourceLimitForActiveUsers(application.getUser(), clusterResource, nodePartition, schedulingMode); } - setQueueResourceLimitsInfo(clusterResource); + setQueueResourceLimitsInfo(clusterResource, nodePartition); - Resource headroom = - metrics.getUserMetrics(user) == null ? Resources.none() : - getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), + Resource headroom = metrics.getUserMetrics(user) == null ? Resources.none() + : getHeadroom(queueUser, + cachedResourceLimitsForHeadroom.get(nodePartition).getLimit(), clusterResource, userLimit, nodePartition); - + if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + " queueMaxAvailRes=" - + cachedResourceLimitsForHeadroom.getLimit() + " consumed=" - + queueUser.getUsed() + " partition=" - + nodePartition); + + cachedResourceLimitsForHeadroom.get(nodePartition).getLimit() + + " consumed=" + queueUser.getUsed() + " partition=" + nodePartition); } - - CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider( - queueUser, this, application, queueResourceLimitsInfo); + + CapacityHeadroomProvider headroomProvider = + new CapacityHeadroomProvider(queueUser, this, application, + queueResourceLimitsInfo.get(nodePartition)); application.setHeadroomProvider(headroomProvider); @@ -1780,21 +1790,27 @@ void releaseResource(Resource clusterResource, writeLock.unlock(); } } - - private void updateCurrentResourceLimits( - ResourceLimits currentResourceLimits, Resource clusterResource) { + + private void updateCurrentResourceLimits(ResourceLimits currentResourceLimits, + Resource clusterResource, String partition) { // TODO: need consider non-empty node labels when resource limits supports // node labels // Even if ParentQueue will set limits respect child's max queue capacity, // but when allocating reserved container, CapacityScheduler doesn't do // this. So need cap limits by queue's max capacity here. - this.cachedResourceLimitsForHeadroom = - new ResourceLimits(currentResourceLimits.getLimit()); + if (null == this.cachedResourceLimitsForHeadroom.get(partition)) { + this.cachedResourceLimitsForHeadroom.put(partition, + new ResourceLimits(currentResourceLimits.getLimit())); + } else { + this.cachedResourceLimitsForHeadroom.get(partition) + .setLimit(currentResourceLimits.getLimit()); + } + Resource queueMaxResource = getEffectiveMaxCapacityDown( RMNodeLabelsManager.NO_LABEL, minimumAllocation); - this.cachedResourceLimitsForHeadroom.setLimit(Resources.min( - resourceCalculator, clusterResource, queueMaxResource, - currentResourceLimits.getLimit())); + this.cachedResourceLimitsForHeadroom.get(partition) + .setLimit(Resources.min(resourceCalculator, clusterResource, + queueMaxResource, currentResourceLimits.getLimit())); } @Override @@ -1802,13 +1818,14 @@ public void updateClusterResource(Resource clusterResource, ResourceLimits currentResourceLimits) { writeLock.lock(); try { - updateCurrentResourceLimits(currentResourceLimits, clusterResource); + updateCurrentResourceLimits(currentResourceLimits, clusterResource, + RMNodeLabelsManager.NO_LABEL); lastClusterResource = clusterResource; // Update headroom info based on new cluster resource value // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity // during allocation - setQueueResourceLimitsInfo(clusterResource); + setQueueResourceLimitsInfo(clusterResource, RMNodeLabelsManager.NO_LABEL); // Update user consumedRatios recalculateQueueUsageRatio(clusterResource, null); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestMaxApplicationMasterLimit.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestMaxApplicationMasterLimit.java new file mode 100644 index 0000000..280b495 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestMaxApplicationMasterLimit.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +/** + * Test Maximum Application Master Limit value of the default partition is same + * even after running MR job + */ +public class TestMaxApplicationMasterLimit { + private YarnConfiguration conf; + private RMNodeLabelsManager mgr; + + private final static String ROOT = CapacitySchedulerConfiguration.ROOT; + private final static String DEFAULT = ROOT + ".default"; + private final static String QUEUE1 = ROOT + ".queue1"; + private final static String ROOTDEFAULT = ROOT + ".rootdefault"; + private final static String POOL1 = "pool1"; + private final static String DEFAULTPARTITION = ""; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + @Test(timeout = 800000) + public void testMaxAMLimitOfDefPartitionAfterJobFinished() throws Exception { + // Add node label pool1 to cluster + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of(POOL1)); + + // Assign h1 host to pool1 and + // Assign h2 and h3 hosts to default partition + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), + toSet(POOL1), NodeId.newInstance("h2", 0), toSet(DEFAULTPARTITION), + NodeId.newInstance("h3", 0), toSet(DEFAULTPARTITION))); + + // Inject node label manager + MockRM rm1 = new MockRM(getSingleLevelQueuesConfiguration(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + // Start RM + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + // Register NMs with RM, where each NM is having 8GB memory + MockNM nm1 = rm1.registerNode("h1:1234", 8096); + MockNM nm2 = rm1.registerNode("h2:1234", 8096); + MockNM nm3 = rm1.registerNode("h3:1234", 8096); + + // Get queue1 + CapacityScheduler cs = + ((CapacityScheduler) rm1.getRMContext().getScheduler()); + LeafQueue queue1 = (LeafQueue) cs.getQueue("queue1"); + + // Max AM Limit of default partition before running job + Resource defPartionsAMLimitBeforerunjob = + queue1.getQueueResourceUsage().getAMLimit(DEFAULTPARTITION); + + // Submit an app to queue1 and launch AM am1 on nm1 + RMApp app1 = rm1.submitApp(200, "app", "user", null, "queue1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container. + am1.allocate("*", 1024, 1, new ArrayList(), POOL1); + ContainerId containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + + Assert.assertTrue( + rm1.waitForState(nm1, containerId, RMContainerState.ALLOCATED)); + + // Call Job finished event + cs.handle(new AppAttemptRemovedSchedulerEvent(am1.getApplicationAttemptId(), + RMAppAttemptState.FINISHED, false)); + + // Max AM Limit of default partition after running job + Resource defPartionsAMLimitAfterrunjob = + queue1.getQueueResourceUsage().getAMLimit(DEFAULTPARTITION); + + // Close RM + rm1.close(); + + // Assert Max AM Limit values before and after running job + Assert.assertEquals(defPartionsAMLimitBeforerunjob, + defPartionsAMLimitAfterrunjob); + } + + /** + * + * @param config + * @return + */ + private Configuration getSingleLevelQueuesConfiguration( + Configuration config) { + CapacitySchedulerConfiguration scConf = + new CapacitySchedulerConfiguration(config); + conf.setInt("yarn.scheduler.minimum-allocation-mb", 64); + conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1); + conf.setInt("yarn.scheduler.maximum-allocation-mb", 8196); + conf.setInt("yarn.scheduler.maximum-allocation-vcores", 8); + + // Define top-level queues + scConf.setQueues(ROOT, new String[]{ "default", "rootdefault", "queue1" }); + + // Queue Configuration: root + scConf.setAccessibleNodeLabels(ROOT, toSet(POOL1)); + scConf.setCapacityByLabel(ROOT, POOL1, 100.0f); + scConf.setMaximumApplicationMasterResourcePerQueuePercent(ROOT, 1F); + + // Queue Configuration: root.default + scConf.setCapacity(DEFAULT, 20); + scConf.setMaximumCapacity(DEFAULT, 100); + scConf.setMaximumApplicationMasterResourcePerQueuePercent(DEFAULT, 0.1F); + scConf.setAccessibleNodeLabels(DEFAULT, toSet("")); + + // Queue Configuration: root.rootdefault + scConf.setCapacity(ROOTDEFAULT, 70); + scConf.setMaximumCapacity(ROOTDEFAULT, 100); + scConf.setMaximumApplicationMasterResourcePerQueuePercent(ROOTDEFAULT, + 0.1f); + scConf.setAccessibleNodeLabels(ROOTDEFAULT, toSet(POOL1)); + scConf.setDefaultNodeLabelExpression(ROOTDEFAULT, POOL1); + scConf.setCapacityByLabel(ROOTDEFAULT, POOL1, 80.0f); + scConf.setMaximumCapacityByLabel(ROOTDEFAULT, POOL1, 100.0f); + + // Queue Configuration: root.queue1 + scConf.setCapacity(QUEUE1, 10); + scConf.setMaximumCapacity(QUEUE1, 100); + scConf.setMaximumApplicationMasterResourcePerQueuePercent(QUEUE1, 0.8f); + scConf.setAccessibleNodeLabels(QUEUE1, toSet(POOL1)); + scConf.setDefaultNodeLabelExpression(QUEUE1, POOL1); + scConf.setCapacityByLabel(QUEUE1, POOL1, 20.0f); + scConf.setMaximumCapacityByLabel(QUEUE1, POOL1, 100.0f); + return scConf; + } + + @SuppressWarnings("unchecked") + private Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } +}