diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index d5cdb32..6ad4cf1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -58,7 +58,7 @@ public static void checkAbsoluteCapacity(String queueName, + ")"); } } - + /** * Check sanity of capacities: * - capacity <= maxCapacity @@ -96,18 +96,6 @@ public static float computeAbsoluteMaximumCapacity( (parent == null) ? 1.0f : parent.getAbsoluteMaximumCapacity(); return (parentAbsMaxCapacity * maximumCapacity); } - - /** - * This method intends to be used by ReservationQueue, ReservationQueue will - * not appear in configuration file, so we shouldn't do load capacities - * settings in configuration for reservation queue. - */ - public static void updateAndCheckCapacitiesByLabel(String queuePath, - QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) { - updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities); - - capacitiesSanityCheck(queuePath, queueCapacities); - } /** * Do following steps for capacities @@ -118,10 +106,18 @@ public static void updateAndCheckCapacitiesByLabel(String queuePath, public static void loadUpdateAndCheckCapacities(String queuePath, CapacitySchedulerConfiguration csConf, QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) { - loadCapacitiesByLabelsFromConf(queuePath, - queueCapacities, csConf); + loadUpdateAndCheckCapacities(queuePath, csConf, queueCapacities, + parentQueueCapacities, null); + } - updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities); + public static void loadUpdateAndCheckCapacities(String queuePath, + CapacitySchedulerConfiguration csConf, + QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities, + Set whiteList) { + loadCapacitiesByLabelsFromConf(queuePath, queueCapacities, csConf); + + updateAbsoluteCapacitiesByNodeLabels(queueCapacities, + parentQueueCapacities, whiteList); capacitiesSanityCheck(queuePath, queueCapacities); } @@ -154,8 +150,13 @@ private static void loadCapacitiesByLabelsFromConf(String queuePath, // Set absolute capacities for {capacity, maximum-capacity} private static void updateAbsoluteCapacitiesByNodeLabels( - QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) { + QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities, + Set whiteList) { for (String label : queueCapacities.getExistingNodeLabels()) { + if (whiteList != null && whiteList.size() > 0 && + !whiteList.contains(label)) { + continue; + } float capacity = queueCapacities.getCapacity(label); if (capacity > 0f) { queueCapacities.setAbsoluteCapacity( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index faeb37e..0509409 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -19,11 +19,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,8 +41,6 @@ private static final Logger LOG = LoggerFactory .getLogger(ReservationQueue.class); - private PlanQueue parent; - public ReservationQueue(CapacitySchedulerContext cs, String queueName, PlanQueue parent) throws IOException { super(cs, queueName, parent, null); @@ -47,7 +49,6 @@ public ReservationQueue(CapacitySchedulerContext cs, String queueName, parent.getUserLimitFactor(), parent.getMaxApplicationsForReservations(), parent.getMaxApplicationsPerUserForReservation()); - this.parent = parent; } @Override @@ -66,6 +67,7 @@ public void reinitialize(CSQueue newlyParsedQueue, CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, minimumAllocation, this, labelManager, null); + PlanQueue parent = (PlanQueue) getParent(); updateQuotas(parent.getUserLimitForReservation(), parent.getUserLimitFactor(), parent.getMaxApplicationsForReservations(), @@ -116,7 +118,27 @@ private void updateQuotas(int userLimit, float userLimitFactor, @Override protected void setupConfigurableCapacities() { - CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), - queueCapacities, parent == null ? null : parent.getQueueCapacities()); + CSQueue parent = getParent(); + CSQueueUtils.loadUpdateAndCheckCapacities(parent.getQueuePath(), + csContext.getConfiguration(), queueCapacities, + parent.getQueueCapacities(), new HashSet<>( + Collections.singletonList(CommonNodeLabelsManager.NO_LABEL))); + } + + @Override + public synchronized Resource getAMResourceLimit() { + // as for other MaxApplications limits, the time-varying nature of + // capacity for reservation queue makes the default behavior not + // desirable. We fall back to my parent limits; + Resource amResourceLimit = Resources.multiplyAndNormalizeUp( + resourceCalculator, csContext.getClusterResource(), + parent.getAbsoluteCapacity(), minimumAllocation); + metrics.setAMResouceLimit(amResourceLimit); + return amResourceLimit; + } + + @Override + public synchronized Resource getUserAMResourceLimit() { + return getAMResourceLimit(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java index e23e93c..29cb000 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java @@ -26,7 +26,9 @@ import java.io.IOException; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; @@ -68,6 +70,10 @@ public void setup() throws IOException { // create a queue PlanQueue pq = new PlanQueue(csContext, "root", null, null); reservationQueue = new ReservationQueue(csContext, "a", pq); + assertTrue(pq.getAbsoluteCapacity() == reservationQueue + .getAbsoluteCapacity()); + assertTrue(pq.getAbsoluteMaximumCapacity() == reservationQueue + .getAbsoluteMaximumCapacity()); } private void validateReservationQueue(double capacity) { @@ -77,6 +83,24 @@ private void validateReservationQueue(double capacity) { assertEquals(reservationQueue.maxApplicationsPerUser, DEF_MAX_APPS); } + private void validateUserAmResourceLimit(ReservationQueue queue, + ResourceCalculator calculator, Resource capacityResource, + float capacity) { + Resource userAmResourceLimit = queue + .getUserAMResourceLimitPerPartition(CommonNodeLabelsManager.NO_LABEL); + capacity = capacity * csConf.getMaximumApplicationMasterResourcePercent(); + Resource expectedUserAmResourceLimit = calculator.multiplyAndNormalizeUp( + capacityResource, capacity, Resource.newInstance(1, 1)); + assertTrue(calculator.compare(capacityResource, userAmResourceLimit, + expectedUserAmResourceLimit) == 0); + } + + private void updateAMResourceLimit(ReservationQueue queue) { + for (String nodeLabel : queue.getNodeLabelsForQueue()) { + queue.calculateAndGetAMResourceLimitPerPartition(nodeLabel); + } + } + @Test public void testAddSubtractCapacity() throws Exception { @@ -84,10 +108,21 @@ public void testAddSubtractCapacity() throws Exception { reservationQueue.setCapacity(1.0F); validateReservationQueue(1); reservationQueue.setEntitlement(new QueueEntitlement(0.9f, 1f)); + updateAMResourceLimit(reservationQueue); + validateUserAmResourceLimit(reservationQueue, resourceCalculator, + csContext.getClusterResource(), 0.9f); validateReservationQueue(0.9); + reservationQueue.setEntitlement(new QueueEntitlement(1f, 1f)); + updateAMResourceLimit(reservationQueue); + validateUserAmResourceLimit(reservationQueue, resourceCalculator, + csContext.getClusterResource(), 1f); validateReservationQueue(1); + reservationQueue.setEntitlement(new QueueEntitlement(0f, 1f)); + updateAMResourceLimit(reservationQueue); + validateUserAmResourceLimit(reservationQueue, resourceCalculator, + csContext.getClusterResource(), 0f); validateReservationQueue(0); try {