diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index d5cdb32..b51eb93 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -64,7 +64,7 @@ public static void checkAbsoluteCapacity(String queueName, * - capacity <= maxCapacity * - absCapacity <= absMaximumCapacity */ - private static void capacitiesSanityCheck(String queueName, + public static void capacitiesSanityCheck(String queueName, QueueCapacities queueCapacities) { for (String label : queueCapacities.getExistingNodeLabels()) { float capacity = queueCapacities.getCapacity(label); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index faeb37e..b4e94de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -21,9 +21,11 @@ import java.io.IOException; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,17 +39,37 @@ private static final Logger LOG = LoggerFactory .getLogger(ReservationQueue.class); - private PlanQueue parent; - public ReservationQueue(CapacitySchedulerContext cs, String queueName, PlanQueue parent) throws IOException { super(cs, queueName, parent, null); + + initializeReservationQueueCapacities(); + // the following parameters are common to all reservation in the plan updateQuotas(parent.getUserLimitForReservation(), parent.getUserLimitFactor(), parent.getMaxApplicationsForReservations(), parent.getMaxApplicationsPerUserForReservation()); - this.parent = parent; + } + + private void initializeReservationQueueCapacities() { + String label = CommonNodeLabelsManager.NO_LABEL; + String parentQueuePath = parent.getQueuePath(); + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + queueCapacities.setMaximumCapacity(label, + conf.getNonLabeledQueueMaximumCapacity(parentQueuePath) / 100); + float maxCapacity = queueCapacities.getMaximumCapacity(label); + QueueCapacities parentQueueCapacities = parent.getQueueCapacities(); + if (maxCapacity > 0f) { + queueCapacities.setAbsoluteMaximumCapacity( + label, + maxCapacity + * (parentQueueCapacities == null ? 1 : parentQueueCapacities + .getAbsoluteMaximumCapacity(label))); + queueCapacities.setMaxAMResourcePercentage(label, + conf.getMaximumAMResourcePercentPerPartition(parentQueuePath, label)); + } + CSQueueUtils.capacitiesSanityCheck(parentQueuePath, queueCapacities); } @Override @@ -66,6 +88,7 @@ public void reinitialize(CSQueue newlyParsedQueue, CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, minimumAllocation, this, labelManager, null); + PlanQueue parent = (PlanQueue) getParent(); updateQuotas(parent.getUserLimitForReservation(), parent.getUserLimitFactor(), parent.getMaxApplicationsForReservations(), @@ -116,7 +139,25 @@ private void updateQuotas(int userLimit, float userLimitFactor, @Override protected void setupConfigurableCapacities() { + CSQueue parent = getParent(); CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), queueCapacities, parent == null ? null : parent.getQueueCapacities()); } + + @Override + public synchronized Resource getAMResourceLimit() { + // as for other MaxApplications limits, the time-varying nature of + // capacity for reservation queue makes the default behavior not + // desirable. We fall back to my parent limits; + Resource amResourceLimit = Resources.multiplyAndNormalizeUp( + resourceCalculator, csContext.getClusterResource(), + parent.getAbsoluteCapacity(), minimumAllocation); + metrics.setAMResouceLimit(amResourceLimit); + return amResourceLimit; + } + + @Override + public synchronized Resource getUserAMResourceLimit() { + return getAMResourceLimit(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java index e23e93c..09316a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java @@ -26,7 +26,9 @@ import java.io.IOException; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; @@ -68,6 +70,8 @@ public void setup() throws IOException { // create a queue PlanQueue pq = new PlanQueue(csContext, "root", null, null); reservationQueue = new ReservationQueue(csContext, "a", pq); + assertTrue(pq.getAbsoluteMaximumCapacity() == reservationQueue + .getAbsoluteMaximumCapacity()); } private void validateReservationQueue(double capacity) { @@ -77,6 +81,24 @@ private void validateReservationQueue(double capacity) { assertEquals(reservationQueue.maxApplicationsPerUser, DEF_MAX_APPS); } + private void validateUserAmResourceLimit(ReservationQueue queue, + ResourceCalculator calculator, Resource capacityResource, + float capacity) { + Resource userAmResourceLimit = queue + .getUserAMResourceLimitPerPartition(CommonNodeLabelsManager.NO_LABEL); + capacity = capacity * csConf.getMaximumApplicationMasterResourcePercent(); + Resource expectedUserAmResourceLimit = calculator.multiplyAndNormalizeUp( + capacityResource, capacity, Resource.newInstance(1, 1)); + assertTrue(calculator.compare(capacityResource, userAmResourceLimit, + expectedUserAmResourceLimit) == 0); + } + + private void updateAMResourceLimit(ReservationQueue queue) { + for (String nodeLabel : queue.getNodeLabelsForQueue()) { + queue.calculateAndGetAMResourceLimitPerPartition(nodeLabel); + } + } + @Test public void testAddSubtractCapacity() throws Exception { @@ -84,10 +106,21 @@ public void testAddSubtractCapacity() throws Exception { reservationQueue.setCapacity(1.0F); validateReservationQueue(1); reservationQueue.setEntitlement(new QueueEntitlement(0.9f, 1f)); + updateAMResourceLimit(reservationQueue); + validateUserAmResourceLimit(reservationQueue, resourceCalculator, + csContext.getClusterResource(), 0.9f); validateReservationQueue(0.9); + reservationQueue.setEntitlement(new QueueEntitlement(1f, 1f)); + updateAMResourceLimit(reservationQueue); + validateUserAmResourceLimit(reservationQueue, resourceCalculator, + csContext.getClusterResource(), 1f); validateReservationQueue(1); + reservationQueue.setEntitlement(new QueueEntitlement(0f, 1f)); + updateAMResourceLimit(reservationQueue); + validateUserAmResourceLimit(reservationQueue, resourceCalculator, + csContext.getClusterResource(), 0f); validateReservationQueue(0); try {