diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 2a741ed83cb..8bc5ad11175 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -78,7 +78,7 @@ public Thread newThread(Runnable r) { } private void schedulePreemptionChecker() { - handler = ses.scheduleAtFixedRate(new PreemptionChecker(), + handler = ses.scheduleAtFixedRate(new PolicyInvoker(), 0, monitorInterval, TimeUnit.MILLISECONDS); } @@ -98,7 +98,7 @@ public void invokePolicy(){ scheduleEditPolicy.editSchedule(); } - private class PreemptionChecker implements Runnable { + private class PolicyInvoker implements Runnable { @Override public void run() { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 2c072d2544f..5aaecc58881 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -36,6 +36,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .ManagedParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue; @@ -376,7 +379,9 @@ private void cleanupStaledPreemptionCandidates(long currentTime) { } private Set getLeafQueueNames(TempQueuePerPartition q) { - if (q.children == null || q.children.isEmpty()) { + // If its a ManagedParentQueue, it might not have any children + if ((q.children == null || q.children.isEmpty()) && !(q.parentQueue instanceof + ManagedParentQueue)){ return ImmutableSet.of(q.queueName); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 89452f9c0d4..0314b54638d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -21,6 +21,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .ParentQueue; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -51,6 +54,7 @@ final ArrayList children; private Collection apps; LeafQueue leafQueue; + ParentQueue parentQueue; boolean preemptionDisabled; protected Resource pendingDeductReserved; @@ -84,6 +88,10 @@ pendingDeductReserved = Resources.createResource(0); } + if (ParentQueue.class.isAssignableFrom(queue.getClass())) { + parentQueue = (ParentQueue) queue; + } + this.normalizedGuarantee = Float.NaN; this.children = new ArrayList<>(); this.apps = new ArrayList<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java index 46f5cf113ea..d2799282177 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java @@ -19,6 +19,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common + .QueueEntitlement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +38,8 @@ private static final Logger LOG = LoggerFactory.getLogger( AbstractManagedParentQueue.class); - protected AutoCreatedLeafQueueTemplate leafQueueTemplate; + protected AutoCreatedLeafQueueConfig leafQueueTemplate; + protected AutoCreatedQueueManagementPolicy queueManagementPolicy = null; public AbstractManagedParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -67,13 +71,13 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) * Initialize leaf queue configs from template configurations specified on * parent queue. */ - protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs + protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs (String queuePath) { CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - AutoCreatedLeafQueueTemplate.Builder leafQueueTemplateBuilder = new - AutoCreatedLeafQueueTemplate.Builder(); + AutoCreatedLeafQueueConfig.Builder leafQueueTemplateBuilder = new + AutoCreatedLeafQueueConfig.Builder(); int maxApps = conf.getMaximumApplicationsPerQueue(queuePath); if (maxApps < 0) { maxApps = (int) ( @@ -193,84 +197,27 @@ protected float sumOfChildAbsCapacities() { } } - public static class AutoCreatedLeafQueueTemplate { - - private QueueCapacities queueCapacities; - - private int maxApps; - private int maxAppsPerUser; - private int userLimit; - private float userLimitFactor; - - AutoCreatedLeafQueueTemplate(Builder builder) { - this.maxApps = builder.maxApps; - this.maxAppsPerUser = builder.maxAppsPerUser; - this.userLimit = builder.userLimit; - this.userLimitFactor = builder.userLimitFactor; - this.queueCapacities = builder.queueCapacities; - } - - public static class Builder { - private int maxApps; - private int maxAppsPerUser; - - private int userLimit; - private float userLimitFactor; - - private QueueCapacities queueCapacities; - - Builder maxApps(int maxApplications) { - this.maxApps = maxApplications; - return this; - } - - Builder maxAppsPerUser(int maxApplicationsPerUser) { - this.maxAppsPerUser = maxApplicationsPerUser; - return this; - } - - Builder userLimit(int usrLimit) { - this.userLimit = usrLimit; - return this; - } - - Builder userLimitFactor(float ulf) { - this.userLimitFactor = ulf; - return this; - } - - Builder capacities(QueueCapacities capacities) { - this.queueCapacities = capacities; - return this; - } - - AutoCreatedLeafQueueTemplate build() { - return new AutoCreatedLeafQueueTemplate(this); - } - } - - public int getUserLimit() { - return userLimit; - } - - public float getUserLimitFactor() { - return userLimitFactor; - } + public AutoCreatedLeafQueueConfig getLeafQueueTemplate() { + return leafQueueTemplate; + } - public QueueCapacities getQueueCapacities() { - return queueCapacities; - } + protected void validateQueueEntitlementChange(AutoCreatedLeafQueue + leafQueue, QueueEntitlement entitlement) + throws SchedulerDynamicEditException { - public int getMaxApps() { - return maxApps; - } + float sumChilds = sumOfChildCapacities(); + float newChildCap = + sumChilds - leafQueue.getCapacity() + entitlement.getCapacity(); - public int getMaxAppsPerUser() { - return maxAppsPerUser; + if (!(newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON)) { + throw new SchedulerDynamicEditException( + "Sum of child queues should exceed 100% for auto creating parent " + + "queue : " + queueName); } } - public AutoCreatedLeafQueueTemplate getLeafQueueTemplate() { - return leafQueueTemplate; + public AutoCreatedQueueManagementPolicy + getAutoCreatedQueueManagementPolicy() { + return queueManagementPolicy; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java index bc206d41521..a4b7d10ee91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -21,8 +21,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity - .AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,12 +42,7 @@ public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName, AbstractManagedParentQueue parent) throws IOException { super(cs, queueName, parent, null); - AutoCreatedLeafQueueTemplate leafQueueTemplate = - parent.getLeafQueueTemplate(); - updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(), - leafQueueTemplate.getUserLimitFactor(), - leafQueueTemplate.getMaxApps(), - leafQueueTemplate.getMaxAppsPerUser()); + initializeLimitsFromTemplate(parent.getLeafQueueTemplate()); this.parent = parent; } @@ -65,18 +58,51 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, this, labelManager, null); - AutoCreatedLeafQueueTemplate leafQueueTemplate = - parent.getLeafQueueTemplate(); - updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(), - leafQueueTemplate.getUserLimitFactor(), - leafQueueTemplate.getMaxApps(), - leafQueueTemplate.getMaxAppsPerUser()); + initializeLimitsFromTemplate(parent.getLeafQueueTemplate()); } finally { writeLock.unlock(); } } + public void initializeLimitsFromTemplate(AutoCreatedLeafQueueConfig + leafQueueTemplate) { + + setUserLimit(leafQueueTemplate + .getUserLimit()); + setUserLimitFactor(leafQueueTemplate.getUserLimitFactor()); + setMaxApplications(leafQueueTemplate.getMaxApps()); + setMaxApplicationsPerUser(leafQueueTemplate.getMaxAppsPerUser()); + setMaxAMResourcePerQueuePercent(leafQueueTemplate.getMaxAMResourcePerQueuePercent()); + } + + public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig + leafQueueTemplate) throws SchedulerDynamicEditException { + + try { + writeLock.lock(); + QueueCapacities capacities = leafQueueTemplate.getQueueCapacities(); + setupConfigurableCapacities(capacities); + this.queueCapacities = capacities; + + initializeLimitsFromTemplate(leafQueueTemplate); + } finally { + writeLock.unlock(); + } + } + + public void validateConfigurations(AutoCreatedLeafQueueConfig template) + throws SchedulerDynamicEditException { + QueueCapacities capacities = template.getQueueCapacities(); + for (String label : capacities.getExistingNodeLabels()) { + float capacity = capacities.getCapacity(label); + if (capacity < 0 || capacity > 1.0f) { + throw new SchedulerDynamicEditException( + "Capacity demand is not in the [0,1] range: " + capacity); + } + } + } + /** * This methods to change capacity for a queue and adjusts its * absoluteCapacity. @@ -94,6 +120,15 @@ public void setEntitlement(QueueEntitlement entitlement) throw new SchedulerDynamicEditException( "Capacity demand is not in the [0,1] range: " + capacity); } + + // note: epsilon checks here are not ok, as the epsilons might + // accumulate and become a problem in aggregate + if (Math.abs(entitlement.getCapacity() - getCapacity()) == 0 + && Math.abs( + entitlement.getMaxCapacity() - getMaximumCapacity()) == 0) { + return; + } + setCapacity(capacity); setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); setMaxCapacity(entitlement.getMaxCapacity()); @@ -113,22 +148,16 @@ private void validate(final CSQueue newlyParsedQueue) throws IOException { "Error trying to reinitialize " + getQueuePath() + " from " + newlyParsedQueue.getQueuePath()); } - } @Override protected void setupConfigurableCapacities() { + setupConfigurableCapacities(queueCapacities); + } + + protected void setupConfigurableCapacities(QueueCapacities queueCapacities) { CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), queueCapacities, parent == null ? null : parent.getQueueCapacities()); } - private void updateApplicationAndUserLimits(int userLimit, - float userLimitFactor, - int maxAppsForAutoCreatedQueues, - int maxAppsPerUserForAutoCreatedQueues) { - setUserLimit(userLimit); - setUserLimitFactor(userLimitFactor); - setMaxApplications(maxAppsForAutoCreatedQueues); - setMaxApplicationsPerUser(maxAppsPerUserForAutoCreatedQueues); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueueConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueueConfig.java new file mode 100644 index 00000000000..698fc4eca97 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueueConfig.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +/** + * Auto Created Leaf queue configurations + */ +public class AutoCreatedLeafQueueConfig { + + private QueueCapacities queueCapacities; + + private int maxApps; + private int maxAppsPerUser; + private int userLimit; + private float userLimitFactor; + + private float maxAMResourcePerQueuePercent; + + public AutoCreatedLeafQueueConfig(Builder builder) { + this.maxApps = builder.maxApps; + this.maxAppsPerUser = builder.maxAppsPerUser; + this.userLimit = builder.userLimit; + this.userLimitFactor = builder.userLimitFactor; + this.queueCapacities = builder.queueCapacities; + this.maxAMResourcePerQueuePercent = builder.maxAMResourcePerQueuePercent; + } + + public static class Builder { + private int maxApps; + private int maxAppsPerUser; + + private int userLimit; + private float userLimitFactor; + + private float maxAMResourcePerQueuePercent; + + private QueueCapacities queueCapacities; + + public Builder maxApps(int maxApplications) { + this.maxApps = maxApplications; + return this; + } + + public Builder maxAppsPerUser(int maxApplicationsPerUser) { + this.maxAppsPerUser = maxApplicationsPerUser; + return this; + } + + public Builder userLimit(int usrLimit) { + this.userLimit = usrLimit; + return this; + } + + public Builder userLimitFactor(float ulf) { + this.userLimitFactor = ulf; + return this; + } + + public Builder maxAmResourcePerQueuePercent(float maxAMResPerQueuePercent) { + this.maxAMResourcePerQueuePercent = maxAMResPerQueuePercent; + return this; + } + + public Builder capacities(QueueCapacities capacities) { + this.queueCapacities = capacities; + return this; + } + + public AutoCreatedLeafQueueConfig build() { + return new AutoCreatedLeafQueueConfig(this); + } + } + + public int getUserLimit() { + return userLimit; + } + + public float getUserLimitFactor() { + return userLimitFactor; + } + + public QueueCapacities getQueueCapacities() { + return queueCapacities; + } + + public int getMaxApps() { + return maxApps; + } + + public int getMaxAppsPerUser() { + return maxAppsPerUser; + } + + public float getMaxAMResourcePerQueuePercent() { + return maxAMResourcePerQueuePercent; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java new file mode 100644 index 00000000000..c14dd43dc3c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import java.util.List; + +public interface AutoCreatedQueueManagementPolicy { + + /** + * Initialize policy + * @param schedulerContext Capacity Scheduler context + */ + void init(CapacitySchedulerContext schedulerContext, ParentQueue parentQueue); + + /** + * Reinitialize policy state ( if required ) + * @param schedulerContext Capacity Scheduler context + */ + void reinitialize(CapacitySchedulerContext schedulerContext, + ParentQueue parentQueue); + + /** + * Get initial template for the specified leaf queue + * @param leafQueue the leaf queue + * @return Map of entitlements by node label + */ + AutoCreatedLeafQueueConfig getInitialLeafQueueConfiguration(AutoCreatedLeafQueue + leafQueue) + throws SchedulerDynamicEditException; + + /** + * Compute/Adjust child queue capacities + * for auto created leaf queues + * + * @return returns a list of suggested QueueEntitlementChange(s) which may + * or may not be be enforced by the scheduler + */ + List computeQueueManagementChanges() + throws SchedulerDynamicEditException; + + /** + * Commit/Update state for the specified queue management changes. + */ + void commitQueueManagementChanges( + List queueManagementChanges) + throws SchedulerDynamicEditException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index e1014c11fc8..a316b64f60e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -29,9 +29,9 @@ import com.google.common.collect.Sets; -class CSQueueUtils { +public class CSQueueUtils { - final static float EPSILON = 0.0001f; + public final static float EPSILON = 0.0001f; /* * Used only by tests @@ -133,12 +133,12 @@ private static void loadCapacitiesByLabelsFromConf(String queuePath, for (String label : configuredNodelabels) { if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { - queueCapacities.setCapacity(CommonNodeLabelsManager.NO_LABEL, + queueCapacities.setCapacity(label, csConf.getNonLabeledQueueCapacity(queuePath) / 100); - queueCapacities.setMaximumCapacity(CommonNodeLabelsManager.NO_LABEL, + queueCapacities.setMaximumCapacity(label, csConf.getNonLabeledQueueMaximumCapacity(queuePath) / 100); queueCapacities.setMaxAMResourcePercentage( - CommonNodeLabelsManager.NO_LABEL, + label, csConf.getMaximumAMResourcePercentPerPartition(queuePath, label)); } else { queueCapacities.setCapacity(label, @@ -150,7 +150,7 @@ private static void loadCapacitiesByLabelsFromConf(String queuePath, } } } - + // Set absolute capacities for {capacity, maximum-capacity} private static void updateAbsoluteCapacitiesByNodeLabels( QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 218adf3d682..a691de5052f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -128,6 +129,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .QueueManagementChangeEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -1495,7 +1498,7 @@ public void handle(SchedulerEvent event) { { NodeLabelsUpdateSchedulerEvent labelUpdateEvent = (NodeLabelsUpdateSchedulerEvent) event; - + updateNodeLabelsAndQueueResource(labelUpdateEvent); } break; @@ -1607,6 +1610,17 @@ public void handle(SchedulerEvent event) { } } break; + case MANAGE_QUEUE: + { + QueueManagementChangeEvent queueManagementChangeEvent = + (QueueManagementChangeEvent) event; + ParentQueue parentQueue = queueManagementChangeEvent.getParentQueue(); + + final List queueManagementChanges = + queueManagementChangeEvent.getQueueManagementChanges(); + ((ManagedParentQueue)parentQueue).validateAndApplyQueueManagementChanges + (queueManagementChanges); + } default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } @@ -2021,6 +2035,8 @@ public void addQueue(Queue queue) String queuename = newQueue.getQueueName(); parentPlan.addChildQueue(newQueue); this.queueManager.addQueue(queuename, newQueue); + + LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded"); } finally { writeLock.unlock(); @@ -2051,25 +2067,10 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement) } AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue; + parent.validateQueueEntitlementChange(newQueue, entitlement); - float sumChilds = parent.sumOfChildCapacities(); - float newChildCap = - sumChilds - queue.getCapacity() + entitlement.getCapacity(); + newQueue.setEntitlement(entitlement); - if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) { - // note: epsilon checks here are not ok, as the epsilons might - // accumulate and become a problem in aggregate - if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0 - && Math.abs( - entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) { - return; - } - newQueue.setEntitlement(entitlement); - } else{ - throw new SchedulerDynamicEditException( - "Sum of child queues should exceed 100% for auto creating parent " - + "queue : " + parent.getQueueName()); - } LOG.info( "Set entitlement for AutoCreatedLeafQueue " + inQueue + " to " + queue.getCapacity() + @@ -2712,7 +2713,6 @@ private LeafQueue autoCreateLeafQueue( addQueue(autoCreatedLeafQueue); - //TODO - Set entitlement through capacity management policy } else{ throw new SchedulerDynamicEditException( "Could not auto-create leaf queue for " + leafQueueName diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 4515453c933..2efee66587c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1558,9 +1558,13 @@ public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) { @Private public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false; + @Private + private static final String AUTO_CREATE_CHILD_QUEUE_PREFIX = + "auto-create-child-queue."; + @Private public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED = - "auto-create-child-queue.enabled"; + AUTO_CREATE_CHILD_QUEUE_PREFIX + "enabled"; @Private public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX = @@ -1660,8 +1664,83 @@ public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) { } @Private + public static final String AUTO_CREATED_QUEUE_MANAGEMENT_POLICY = + AUTO_CREATE_CHILD_QUEUE_PREFIX + "management-policy"; + + @Private + public static final String DEFAULT_AUTO_CREATED_QUEUE_MANAGEMENT_POLICY = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity" + + ".queuemanagement." + + "GuaranteedOrZeroCapacityOverTimePolicy"; + + @Private + private static final String QUEUE_MANAGEMENT_CONFIG_PREFIX = + "yarn.resourcemanager.monitor.capacity.queue-management."; + + /** + * Time in milliseconds between invocations of this policy + */ + @Private + public static final String QUEUE_MANAGEMENT_MONITORING_INTERVAL = + QUEUE_MANAGEMENT_CONFIG_PREFIX + "monitoring_interval"; + + @Private + public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL = + 1500L; + + /** + * Queue Management computation policy for Auto Created queues + * @param queue The queue's path + * @return Configured policy class name + */ + @Private + public String getAutoCreatedQueueManagementPolicy(String queue) { + String autoCreatedQueueManagementPolicy = + get(getQueuePrefix(queue) + AUTO_CREATED_QUEUE_MANAGEMENT_POLICY, + DEFAULT_AUTO_CREATED_QUEUE_MANAGEMENT_POLICY); + return autoCreatedQueueManagementPolicy; + } + + /** + * Get The policy class configured to manage capacities for auto created leaf + * queues under the specified parent + * + * @param queueName The parent queue's name + * @return The policy class configured to manage capacities for auto created + * leaf queues under the specified parent queue + */ + @Private + protected AutoCreatedQueueManagementPolicy + getAutoCreatedQueueManagementPolicyClass( + String queueName) { + + String queueManagementPolicyClassName = + getAutoCreatedQueueManagementPolicy(queueName); + LOG.info("Using Auto Created Queue Management Policy: " + + queueManagementPolicyClassName + " for queue: " + queueName); + try { + Class queueManagementPolicyClazz = getClassByName( + queueManagementPolicyClassName); + if (AutoCreatedQueueManagementPolicy.class.isAssignableFrom( + queueManagementPolicyClazz)) { + return (AutoCreatedQueueManagementPolicy) ReflectionUtils.newInstance( + queueManagementPolicyClazz, this); + } else{ + throw new YarnRuntimeException( + "Class: " + queueManagementPolicyClassName + " not instance of " + + AutoCreatedQueueManagementPolicy.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate " + "AutoCreatedQueueManagementPolicy: " + + queueManagementPolicyClassName + " for queue: " + queueName, + e); + } + } + @VisibleForTesting - public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath, + @Private + public void setAutoCreatedLeafQueueConfigCapacity(String queuePath, float val) { String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( queuePath); @@ -1670,7 +1749,7 @@ public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath, @Private @VisibleForTesting - public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath, + public void setAutoCreatedLeafQueueConfigMaxCapacity(String queuePath, float val) { String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( queuePath); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 7c918a53620..ae74989a726 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; /** @@ -94,4 +95,11 @@ * @return if configuration is mutable */ boolean isConfigurationMutable(); + + /** + * Get clock from scheduler + * @return Clock + */ + Clock getClock(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index eb501233b2b..6348756cfb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -394,7 +394,6 @@ private void updateQueues(Map existingQueues, String queueName = e.getKey(); CSQueue existingQueue = e.getValue(); - //TODO - Handle case when auto create is disabled on parent queues if (!newQueues.containsKey(queueName) && !( existingQueue instanceof AutoCreatedLeafQueue && conf .isAutoCreateChildQueueEnabled( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index e8342d956ed..399f5b45bc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -2007,7 +2007,12 @@ public void setMaxApplicationsPerUser(int maxApplicationsPerUser) { public void setMaxApplications(int maxApplications) { this.maxApplications = maxApplications; } - + + public void setMaxAMResourcePerQueuePercent( + float maxAMResourcePerQueuePercent) { + this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent; + } + public OrderingPolicy getOrderingPolicy() { return orderingPolicy; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java index ff795e47b23..1160232cda7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -18,12 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; + + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; /** * Auto Creation enabled Parent queue. This queue initially does not have any @@ -44,10 +49,7 @@ public ManagedParentQueue(final CapacitySchedulerContext cs, final String queueName, final CSQueue parent, final CSQueue old) throws IOException { super(cs, queueName, parent, old); - String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix( - csContext.getConfiguration()); - this.leafQueueTemplate = initializeLeafQueueConfigs( - leafQueueTemplateConfPrefix).build(); + initializeLeafQueueTemplate(); StringBuffer queueInfo = new StringBuffer(); queueInfo.append("Created Managed Parent Queue: ").append(queueName).append( @@ -60,13 +62,74 @@ public ManagedParentQueue(final CapacitySchedulerContext cs, "]\nwith user limit factor: [").append( leafQueueTemplate.getUserLimitFactor()).append("]."); LOG.info(queueInfo.toString()); + + initializeQueueManagementPolicy(); } @Override public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { validate(newlyParsedQueue); + + //validate if capacity is exceeded for child queues + if (csContext.getConfiguration(). + getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( + getQueuePath())) { + float childCap = sumOfChildCapacities(); + if (getCapacity() < childCap) { + throw new IOException( + "Total of Auto Created leaf queues guaranteed capacity : " + + childCap + " exceeds Parent queue's " + getQueuePath() + + " guaranteed capacity " + getCapacity() + "" + + ".Cannot enforce policy to auto" + + " create queues beyond parent queue's capacity"); + } + } + super.reinitialize(newlyParsedQueue, clusterResource); + + initializeLeafQueueTemplate(); + + reinitializeQueueManagementPolicy(); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Reinitialized Managed Parent Queue: ").append(queueName) + .append( + "]\nwith capacity: [").append(super.getCapacity()).append( + "]\nwith max capacity: [").append(super.getMaximumCapacity()).append( + "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append( + "]\nwith max apps per user: [").append( + leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [") + .append(leafQueueTemplate.getUserLimit()).append( + "]\nwith user limit factor: [").append( + leafQueueTemplate.getUserLimitFactor()).append("]."); + LOG.info(queueInfo.toString()); + + } + + private void initializeQueueManagementPolicy() { + queueManagementPolicy = + csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass( + getQueuePath()); + + queueManagementPolicy.init(csContext, this); + } + + private void reinitializeQueueManagementPolicy() { + AutoCreatedQueueManagementPolicy managementPolicy = + csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass( + getQueuePath()); + + if (!(managementPolicy.getClass() + .equals(this.queueManagementPolicy.getClass()))) { + queueManagementPolicy = managementPolicy; + queueManagementPolicy.init(csContext, this); + } else { + queueManagementPolicy.reinitialize(csContext, this); + } + } + + private void initializeLeafQueueTemplate() { String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix( csContext.getConfiguration()); this.leafQueueTemplate = initializeLeafQueueConfigs( @@ -74,15 +137,16 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) } @Override - protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs( + protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs( String queuePath) { - AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = + AutoCreatedLeafQueueConfig.Builder leafQueueTemplate = super.initializeLeafQueueConfigs(queuePath); CapacitySchedulerConfiguration conf = csContext.getConfiguration(); String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(conf); QueueCapacities queueCapacities = new QueueCapacities(false); + CSQueueUtils.loadUpdateAndCheckCapacities(leafQueueTemplateConfPrefix, csContext.getConfiguration(), queueCapacities, getQueueCapacities()); leafQueueTemplate.capacities(queueCapacities); @@ -144,15 +208,127 @@ public void addChildQueue(CSQueue childQueue) AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; super.addChildQueue(leafQueue); - //TODO - refresh policy queue after capacity management is added + final AutoCreatedLeafQueueConfig initialLeafQueueTemplate = + queueManagementPolicy.getInitialLeafQueueConfiguration(leafQueue); + leafQueue.reinitializeFromTemplate(initialLeafQueueTemplate); } finally { writeLock.unlock(); } } + public List getApplications() { + try { + readLock.lock(); + List apps = new ArrayList<>(); + for (CSQueue childQueue : getChildQueues()) { + apps.addAll(((LeafQueue) childQueue).getApplications()); + } + return Collections.unmodifiableList(apps); + } finally { + readLock.unlock(); + } + } + + public List getPendingApplications() { + try { + readLock.lock(); + List apps = new ArrayList<>(); + for (CSQueue childQueue : getChildQueues()) { + apps.addAll(((LeafQueue) childQueue).getPendingApplications()); + } + return Collections.unmodifiableList(apps); + } finally { + readLock.unlock(); + } + } + private String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) { return conf.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()); } + public boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded() { + return shouldFailAutoCreationWhenGuaranteedCapacityExceeded; + } + + /** + * Asynchronously called from scheduler to apply queue management changes + * @param queueManagementChanges + */ + public void validateAndApplyQueueManagementChanges( + List queueManagementChanges) { + try { + writeLock.lock(); + + try { + validateQueueManagementChanges(queueManagementChanges); + + applyQueueManagementChanges(queueManagementChanges); + + AutoCreatedQueueManagementPolicy policy = + getAutoCreatedQueueManagementPolicy(); + + //acquires write lock on policy + policy.commitQueueManagementChanges(queueManagementChanges); + } catch (SchedulerDynamicEditException sde) { + LOG.error("Queue Management Change event cannot be applied for " + + "parent queue : " + getQueueName(), sde); + } + + } finally { + writeLock.unlock(); + } + } + + public void validateQueueManagementChanges( + List queueManagementChanges) + throws SchedulerDynamicEditException { + + for (QueueManagementChange queueManagementChange : queueManagementChanges) { + + CSQueue childQueue = queueManagementChange.getQueue(); + + if (!(childQueue instanceof AutoCreatedLeafQueue)) { + throw new SchedulerDynamicEditException( + "queue should be " + "AutoCreatedLeafQueue. Found " + childQueue + .getClass()); + } + + if (!(AbstractManagedParentQueue.class. + isAssignableFrom(childQueue.getParent().getClass()))) { + LOG.error("Queue " + getQueueName() + + " is not an instance of PlanQueue or ManagedParentQueue." + " " + + "Ignoring update " + queueManagementChanges); + throw new SchedulerDynamicEditException( + "Queue " + getQueueName() + + " is not a AutoEnabledParentQueue." + " Ignoring update " + + queueManagementChanges); + } + + switch (queueManagementChange.getQueueAction()){ + case UPDATE_QUEUE: + AutoCreatedLeafQueueConfig template = + queueManagementChange.getUpdatedQueueTemplate(); + ((AutoCreatedLeafQueue) childQueue) + .validateConfigurations(template); + break; + } + } + } + + private void applyQueueManagementChanges( + List queueManagementChanges) + throws SchedulerDynamicEditException { + for (QueueManagementChange queueManagementChange : + queueManagementChanges) { + switch (queueManagementChange.getQueueAction()){ + case UPDATE_QUEUE: + AutoCreatedLeafQueue childQueueToBeUpdated = + (AutoCreatedLeafQueue) queueManagementChange.getQueue(); + //acquires write lock on leaf queue + childQueueToBeUpdated.reinitializeFromTemplate(queueManagementChange.getUpdatedQueueTemplate()); + break; + } + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index b7f8aa6996b..e798ce137da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -22,6 +22,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerDynamicEditException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,9 +67,9 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) } @Override - protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs + protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs (String queuePath) { - AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = super + AutoCreatedLeafQueueConfig.Builder leafQueueTemplate = super .initializeLeafQueueConfigs (queuePath); showReservationsAsQueues = csContext.getConfiguration() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java new file mode 100644 index 00000000000..74d9b23c0b3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.QueueState; + +/** + * Encapsulates Queue entitlement and state updates needed + * for adjusting capacity dynamically + * + */ +@Private +@Unstable +public abstract class QueueManagementChange { + + private final CSQueue queue; + + /** + * Updating the queue may involve entitlement updates + * and/or QueueState changes + * + * QueueAction can potentially be enhanced + * for adding, removing queues for queue management + */ + public enum QueueAction { + UPDATE_QUEUE + } + + private AutoCreatedLeafQueueConfig + queueTemplateUpdate; + + private final QueueAction queueAction; + /** + * Updated Queue state with the new entitlement + */ + private QueueState transitionToQueueState; + + public QueueManagementChange(final CSQueue queue, + final QueueAction queueAction) { + this.queue = queue; + this.queueAction = queueAction; + } + + public QueueManagementChange(final CSQueue queue, + final QueueAction queueAction, QueueState targetQueueState, + final AutoCreatedLeafQueueConfig + queueTemplateUpdates) { + this(queue, queueAction, queueTemplateUpdates); + this.transitionToQueueState = targetQueueState; + } + + public QueueManagementChange(final CSQueue queue, + final QueueAction queueAction, + final AutoCreatedLeafQueueConfig + queueTemplateUpdates) { + this(queue, queueAction); + this.queueTemplateUpdate = queueTemplateUpdates; + } + + public QueueState getTransitionToQueueState() { + return transitionToQueueState; + } + + public CSQueue getQueue() { + return queue; + } + + public AutoCreatedLeafQueueConfig getUpdatedQueueTemplate() { + return queueTemplateUpdate; + } + + public QueueAction getQueueAction() { + return queueAction; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof QueueManagementChange)) + return false; + + QueueManagementChange that = (QueueManagementChange) o; + + if (queue != null ? !queue.equals(that.queue) : that.queue != null) + return false; + if (queueTemplateUpdate != null ? !queueTemplateUpdate.equals( + that.queueTemplateUpdate) : that.queueTemplateUpdate != null) + return false; + if (queueAction != that.queueAction) + return false; + return transitionToQueueState == that.transitionToQueueState; + } + + @Override + public int hashCode() { + int result = queue != null ? queue.hashCode() : 0; + result = 31 * result + (queueTemplateUpdate != null ? + queueTemplateUpdate.hashCode() : + 0); + result = 31 * result + (queueAction != null ? queueAction.hashCode() : 0); + result = 31 * result + (transitionToQueueState != null ? + transitionToQueueState.hashCode() : + 0); + return result; + } + + @Override + public String toString() { + return "QueueManagementChange{" + "queue=" + queue + + ", updatedEntitlementsByPartition=" + queueTemplateUpdate + + ", queueAction=" + queueAction + ", transitionToQueueState=" + + transitionToQueueState + '}'; + } + + public static class UpdateQueue extends QueueManagementChange { + + public UpdateQueue(final CSQueue queue, QueueState targetQueueState, + final AutoCreatedLeafQueueConfig + queueTemplateUpdate) { + super(queue, QueueAction.UPDATE_QUEUE, targetQueueState, + queueTemplateUpdate); + } + + public UpdateQueue(final CSQueue queue, + final AutoCreatedLeafQueueConfig + queueTemplateUpdate) { + super(queue, QueueAction.UPDATE_QUEUE, queueTemplateUpdate); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java new file mode 100644 index 00000000000..0d025d82b85 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .QueueManagementChangeEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Queue Management scheduling policy for managed parent queues which enable + * auto child queue creation + */ +public class QueueManagementDynamicEditPolicy implements SchedulingEditPolicy { + + private static final Log LOG = + LogFactory.getLog(QueueManagementDynamicEditPolicy.class); + + private Clock clock; + + // Pointer to other RM components + private RMContext rmContext; + private ResourceCalculator rc; + private CapacityScheduler scheduler; + private RMNodeLabelsManager nlm; + + private long monitoringInterval; + + private Set managedParentQueues = new HashSet<>(); + + /** + * Instantiated by CapacitySchedulerConfiguration + */ + public QueueManagementDynamicEditPolicy() { + clock = SystemClock.getInstance(); + } + + @SuppressWarnings("unchecked") + @VisibleForTesting + public QueueManagementDynamicEditPolicy(RMContext context, + CapacityScheduler scheduler) { + init(context.getYarnConfiguration(), context, scheduler); + } + + @SuppressWarnings("unchecked") + @VisibleForTesting + public QueueManagementDynamicEditPolicy(RMContext context, + CapacityScheduler scheduler, Clock clock) { + init(context.getYarnConfiguration(), context, scheduler); + this.clock = clock; + } + + @Override + public void init(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + LOG.info("Queue Management Policy monitor:" + this. + getClass().getCanonicalName()); + assert null == scheduler : "Unexpected duplicate call to init"; + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + rmContext = context; + scheduler = (CapacityScheduler) sched; + clock = scheduler.getClock(); + + rc = scheduler.getResourceCalculator(); + nlm = scheduler.getRMContext().getNodeLabelManager(); + + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + + monitoringInterval = csConfig.getLong( + CapacitySchedulerConfiguration.QUEUE_MANAGEMENT_MONITORING_INTERVAL, + CapacitySchedulerConfiguration. + DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL); + + initQueues(); + } + + /** + * Reinitializes queues(Called on scheduler.reinitialize) + * @param config Configuration + * @param context The resourceManager's context + * @param sched The scheduler + */ + public void reinitialize(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + //TODO - Wire with scheduler reinitialize when YARN-6124 is ready + initQueues(); + } + + private void initQueues() { + managedParentQueues.clear(); + for (Map.Entry queues : scheduler + .getCapacitySchedulerQueueManager() + .getQueues().entrySet()) { + + String queueName = queues.getKey(); + CSQueue queue = queues.getValue(); + + if ( queue instanceof ManagedParentQueue) { + managedParentQueues.add(queueName); + } + } + } + + @Override + public void editSchedule() { + long startTs = clock.getTime(); + + manageAutoCreatedLeafQueues(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); + } + } + + @VisibleForTesting + List manageAutoCreatedLeafQueues() + { + + List queueManagementChanges = new ArrayList<>(); + // All partitions to look at + + //Proceed only if there are queues to process + if (managedParentQueues.size() > 0) { + for (String parentQueueName : managedParentQueues) { + ManagedParentQueue parentQueue = + (ManagedParentQueue) scheduler.getCapacitySchedulerQueueManager(). + getQueue(parentQueueName); + + queueManagementChanges.addAll( + computeQueueManagementChanges + (parentQueue)); + } + } + return queueManagementChanges; + } + + + @VisibleForTesting + List computeQueueManagementChanges + (ManagedParentQueue parentQueue) { + + List queueManagementChanges = + Collections.emptyList(); + if (!parentQueue.shouldFailAutoCreationWhenGuaranteedCapacityExceeded()) { + + AutoCreatedQueueManagementPolicy policyClazz = + parentQueue.getAutoCreatedQueueManagementPolicy(); + long startTime = 0; + try { + if (LOG.isDebugEnabled()) { + LOG.debug(MessageFormat + .format("Trying to use {0} to compute preemption " + + "candidates", + policyClazz.getClass().getName())); + startTime = clock.getTime(); + } + + queueManagementChanges = policyClazz.computeQueueManagementChanges(); + + //Scheduler update is synchronous to get feedback on errors + if (queueManagementChanges.size() > 0) { + QueueManagementChangeEvent queueManagementChangeEvent = + new QueueManagementChangeEvent(parentQueue, + queueManagementChanges); + scheduler.getRMContext().getDispatcher().getEventHandler().handle( + queueManagementChangeEvent); + } + + if (LOG.isDebugEnabled()) { + LOG.debug(MessageFormat.format("{0} uses {1} millisecond" + + " to run", + policyClazz.getClass().getName(), clock.getTime() + - startTime)); + if (queueManagementChanges.size() > 0) { + LOG.debug(" Updated queue management updates for parent queue" + + " [" + + parentQueue.getQueueName() + ": [\n" + queueManagementChanges + .toString() + "\n]"); + } + } + + if (queueManagementChanges.size() > 0) { + LOG.info( + " Updated queue management updates for parent queue [" + + parentQueue + .getQueueName() + ": [" + queueManagementChanges.toString() + + "]"); + } + + } catch (YarnException e) { + LOG.error( + "Could not compute child queue management updates for parent " + + "queue " + + parentQueue.getQueueName(), e); + } + } else{ + if (LOG.isDebugEnabled()) { + LOG.debug( + "Skipping queue management updates for parent queue " + + parentQueue + .getQueuePath() + " " + + "since configuration for auto creating queue's beyond " + + "parent's " + + "guaranteed capacity is disabled"); + } + } + return queueManagementChanges; + } + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return "QueueManagementDynamicEditPolicy"; + } + + public ResourceCalculator getResourceCalculator() { + return rc; + } + + public RMContext getRmContext() { + return rmContext; + } + + public ResourceCalculator getRC() { + return rc; + } + + public CapacityScheduler getScheduler() { + return scheduler; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java new file mode 100644 index 00000000000..9ba35b147a7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java @@ -0,0 +1,729 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .AutoCreatedLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .AutoCreatedLeafQueueConfig; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .AutoCreatedQueueManagementPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .ManagedParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueManagementChange; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica + .FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.Clock; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager + .NO_LABEL; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CSQueueUtils.EPSILON; + + +/** + * Capacity Management policy for auto created leaf queues + * + * Assigns capacity if available to leaf queues based on application + * submission order i.e leaf queues are assigned capacity in FCFS order based + * on application submission time. Updates leaf queue capacities to 0 when + * there are no pending or running apps under that queue. + */ +public class GuaranteedOrZeroCapacityOverTimePolicy + implements AutoCreatedQueueManagementPolicy { + + private CapacitySchedulerContext scheduler; + private Clock clock; + + private ManagedParentQueue parentQueue; + + private static final Log LOG = LogFactory.getLog( + GuaranteedOrZeroCapacityOverTimePolicy.class); + + private AutoCreatedLeafQueueConfig ZERO_CAPACITY_ENTITLEMENT; + + private ReentrantReadWriteLock.WriteLock writeLock; + + private ReentrantReadWriteLock.ReadLock readLock; + + private ParentQueueState parentQueueState = new ParentQueueState(); + + private AutoCreatedLeafQueueConfig leafQueueTemplate; + + private QueueCapacities leafQueueTemplateCapacities; + + private Map leafQueueStateMap = new HashMap<>(); + + private class LeafQueueState { + + private AtomicBoolean isActive = new AtomicBoolean(false); + + private long mostRecentActivationTime; + + private long mostRecentDeactivationTime; + + public long getMostRecentActivationTime() { + return mostRecentActivationTime; + } + + public long getMostRecentDeactivationTime() { + return mostRecentDeactivationTime; + } + + /** + * Is the queue currently active or deactivated? + * + * @return true if Active else false + */ + public boolean isActive() { + return isActive.get(); + } + + private boolean activate() { + boolean ret = isActive.compareAndSet(false, true); + mostRecentActivationTime = clock.getTime(); + return ret; + } + + private boolean deactivate() { + boolean ret = isActive.compareAndSet(true, false); + mostRecentDeactivationTime = clock.getTime(); + return ret; + } + } + + private boolean containsLeafQueue(String leafQueueName) { + return leafQueueStateMap.containsKey(leafQueueName); + } + + private boolean addLeafQueueStateIfNotExists(String leafQueueName, + LeafQueueState + leafQueueState) { + if ( !containsLeafQueue(leafQueueName)) { + leafQueueStateMap.put(leafQueueName, leafQueueState); + return true; + } + return false; + } + + private boolean addLeafQueueStateIfNotExists(LeafQueue leafQueue) { + return addLeafQueueStateIfNotExists(leafQueue.getQueueName(), new LeafQueueState()); + } + + private class ParentQueueState { + + private Map + totalAbsoluteActivatedChildQueueCapacityByLabel = + new HashMap(); + + private float getAbsoluteActivatedChildQueueCapacity() { + return getAbsoluteActivatedChildQueueCapacity(NO_LABEL); + } + + private float getAbsoluteActivatedChildQueueCapacity(String + nodeLabel) { + try { + readLock.lock(); + Float totalActivatedCapacity = getByLabel(nodeLabel); + if (totalActivatedCapacity != null) { + return totalActivatedCapacity; + } else{ + return 0; + } + } finally { + readLock.unlock(); + } + } + + private void incAbsoluteActivatedChildCapacity(String nodeLabel, float + childQueueCapacity) { + try { + writeLock.lock(); + Float activatedChildCapacity = getByLabel(nodeLabel); + if (activatedChildCapacity != null) { + setByLabel(nodeLabel, activatedChildCapacity + childQueueCapacity); + } else{ + setByLabel(nodeLabel, childQueueCapacity); + } + } finally { + writeLock.unlock(); + } + } + + private void decAbsoluteActivatedChildCapacity(String nodeLabel, float + childQueueCapacity) { + try { + writeLock.lock(); + Float activatedChildCapacity = getByLabel(nodeLabel); + if (activatedChildCapacity != null) { + setByLabel(nodeLabel, activatedChildCapacity - childQueueCapacity); + } else{ + setByLabel(nodeLabel, childQueueCapacity); + } + } finally { + writeLock.unlock(); + } + } + + Float getByLabel(String label) { + return totalAbsoluteActivatedChildQueueCapacityByLabel.get(label); + } + + Float setByLabel(String label, float val) { + return totalAbsoluteActivatedChildQueueCapacityByLabel.put(label, val); + } + } + + /** + * Comparator that orders applications by their submit time + */ + private class PendingApplicationComparator + implements Comparator, Serializable { + + @Override + public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) { + RMApp rmApp1 = scheduler.getRMContext().getRMApps().get( + app1.getApplicationId()); + RMApp rmApp2 = scheduler.getRMContext().getRMApps().get( + app2.getApplicationId()); + if ( rmApp1 != null && rmApp2 != null) { + return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime()); + } else if ( rmApp1 != null) { + return -1; + } else if ( rmApp2 != null) { + return 1; + } else { + return 0; + } + } + } + + private PendingApplicationComparator applicationComparator = + new PendingApplicationComparator(); + + @Override + public void init(final CapacitySchedulerContext schedulerContext, + final ParentQueue parentQueue) { + this.scheduler = schedulerContext; + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + + this.clock = schedulerContext.getClock(); + + if (!(parentQueue instanceof ManagedParentQueue)) { + throw new IllegalArgumentException( + "Expected instance of type " + ManagedParentQueue.class); + } + + this.parentQueue = (ManagedParentQueue) parentQueue; + + initializeLeafQueueTemplate(this.parentQueue); + + LOG.info("Initialized queue management policy for parent queue " + + parentQueue.getQueueName()); + } + + private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) { + leafQueueTemplate = parentQueue.getLeafQueueTemplate(); + + leafQueueTemplateCapacities = leafQueueTemplate.getQueueCapacities(); + + ZERO_CAPACITY_ENTITLEMENT = buildTemplate(0.0f, + leafQueueTemplateCapacities.getMaximumCapacity()); + } + + @Override + public List computeQueueManagementChanges() + throws SchedulerDynamicEditException { + + //TODO : Add support for node labels on leaf queue template configurations + //synch/add missing leaf queue(s) if any to state + updateLeafQueueState(); + + try { + readLock.lock(); + List queueManagementChanges = new ArrayList<>(); + + // check if any leaf queues need to be deactivated based on pending + // applications and + float parentAbsoluteCapacity = parentQueue.getQueueCapacities() + .getAbsoluteCapacity(); + + float leafQueueTemplateAbsoluteCapacity = + leafQueueTemplateCapacities.getAbsoluteCapacity(); + Map deactivatedLeafQueues = + deactivateLeafQueuesIfInActive(parentQueue, queueManagementChanges); + + float deactivatedCapacity = getTotalDeactivatedCapacity + (deactivatedLeafQueues); + + float sumOfChildQueueActivatedCapacity = parentQueueState. + getAbsoluteActivatedChildQueueCapacity(); + + //Check if we need to activate anything at all? + float availableCapacity = getAvailableCapacity(parentAbsoluteCapacity, + deactivatedCapacity, sumOfChildQueueActivatedCapacity); + + if (LOG.isDebugEnabled()) { + LOG.debug("Parent queue : " + parentQueue.getQueueName() + + " absCapacity = " + parentAbsoluteCapacity + + ", leafQueueAbsoluteCapacity = " + + leafQueueTemplateAbsoluteCapacity + + ", deactivatedCapacity = " + deactivatedCapacity + + " , absChildActivatedCapacity = " + + sumOfChildQueueActivatedCapacity + ", availableCapacity = " + + availableCapacity); + } + + if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) { + //sort applications across leaf queues by submit time + List pendingApps = getSortedPendingApplications(); + + if (pendingApps.size() > 0) { + int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated( + availableCapacity, leafQueueTemplateAbsoluteCapacity, + pendingApps.size()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + maxLeafQueuesTobeActivated + + " leaf queues to be activated with " + pendingApps.size() + + " apps "); + } + + LinkedHashSet leafQueuesToBeActivated = getSortedLeafQueues( + pendingApps, maxLeafQueuesTobeActivated, deactivatedLeafQueues + .keySet()); + + //Compute entitlement changes for the identified leaf queues + // which is appended to the List of queueManagementChanges + computeQueueManagementChanges(leafQueuesToBeActivated, + queueManagementChanges, availableCapacity, + leafQueueTemplateAbsoluteCapacity); + + if( LOG.isDebugEnabled()) { + if ( leafQueuesToBeActivated.size() > 0) { + LOG.debug("Activated leaf queues : [" + + leafQueuesToBeActivated + "]"); + } + } + } + } + return queueManagementChanges; + } finally { + readLock.unlock(); + } + } + + private float getTotalDeactivatedCapacity( + Map deactivatedLeafQueues) { + float deactivatedCapacity = 0; + for (Iterator> iterator = + deactivatedLeafQueues.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry deactivatedQueueCapacity = iterator + .next(); + deactivatedCapacity += deactivatedQueueCapacity.getValue() + .getAbsoluteCapacity(); + } + return deactivatedCapacity; + } + + @VisibleForTesting + void updateLeafQueueState() { + try { + writeLock.lock(); + Set newQueues = new HashSet<>(); + for (CSQueue newQueue : parentQueue.getChildQueues()) { + if (newQueue instanceof LeafQueue) { + addLeafQueueStateIfNotExists((LeafQueue) newQueue); + newQueues.add(newQueue.getQueueName()); + } + } + + for (Iterator> itr = + leafQueueStateMap.entrySet().iterator(); itr.hasNext(); ) { + Map.Entry e = itr.next(); + String queueName = e.getKey(); + if (!newQueues.contains(queueName)) { + itr.remove(); + } + } + } finally { + writeLock.unlock(); + } + } + + private LinkedHashSet getSortedLeafQueues( + final List pendingApps, int leafQueuesNeeded, + Set deactivatedQueues) throws SchedulerDynamicEditException { + + LinkedHashSet leafQueues = new LinkedHashSet<>(leafQueuesNeeded); + int ctr = 0; + for (FiCaSchedulerApp app : pendingApps) { + + AutoCreatedLeafQueue leafQueue = + (AutoCreatedLeafQueue) app.getCSLeafQueue(); + String leafQueueName = leafQueue.getQueueName(); + + //Check if leafQueue is not active already and has any pending apps + if (ctr < leafQueuesNeeded) { + + if (!isActive(leafQueue)) { + if (!deactivatedQueues.contains(leafQueueName)) { + if (addLeafQueueIfNotExists(leafQueues, leafQueueName)) { + ctr++; + } + } + } + } else{ + break; + } + } + return leafQueues; + } + + private boolean addLeafQueueIfNotExists(Set leafQueues, + String leafQueueName) { + boolean ret = false; + if (!leafQueues.contains(leafQueueName)) { + ret = leafQueues.add(leafQueueName); + } + return ret; + } + + @VisibleForTesting + public boolean isActive(final AutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException { + try { + readLock.lock(); + LeafQueueState leafQueueStatus = getLeafQueueState(leafQueue); + return leafQueueStatus.isActive(); + } finally { + readLock.unlock(); + } + } + + private Map deactivateLeafQueuesIfInActive(ParentQueue + parentQueue, + List queueManagementChanges) + throws SchedulerDynamicEditException { + Map deactivatedQueues = new HashMap<>(); + + for (CSQueue childQueue : parentQueue.getChildQueues()) { + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; + + if (isActive(leafQueue) && !hasPendingApps(leafQueue)) { + queueManagementChanges.add( + new QueueManagementChange.UpdateQueue(leafQueue, + ZERO_CAPACITY_ENTITLEMENT)); + deactivatedQueues.put(leafQueue.getQueueName(), + leafQueueTemplateCapacities); + } else{ + if (LOG.isDebugEnabled()) { + LOG.debug(" Leaf queue has pending applications : " + leafQueue + .getNumApplications() + ".Skipping deactivation for " + + leafQueue); + } + } + } + + if (LOG.isDebugEnabled()) { + if (deactivatedQueues.size() > 0) { + LOG.debug("Deactivated leaf queues : " + deactivatedQueues); + } + } + return deactivatedQueues; + } + + private void computeQueueManagementChanges( + Set leafQueuesToBeActivated, + List queueManagementChanges, + final float availableCapacity, + final float leafQueueTemplateAbsoluteCapacity) { + + float curAvailableCapacity = availableCapacity; + + for (String curLeafQueue : leafQueuesToBeActivated) { + // Activate queues if capacity is available + if (curAvailableCapacity >= leafQueueTemplateAbsoluteCapacity) { + AutoCreatedLeafQueue leafQueue = + (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager() + .getQueue(curLeafQueue); + if (leafQueue != null) { + AutoCreatedLeafQueueConfig newTemplate = + buildTemplate(leafQueueTemplateCapacities.getCapacity(), + leafQueueTemplateCapacities.getMaximumCapacity()); + queueManagementChanges.add( + new QueueManagementChange.UpdateQueue(leafQueue, + newTemplate)); + curAvailableCapacity -= leafQueueTemplateAbsoluteCapacity; + } else{ + LOG.warn( + "Could not find queue in scheduler while trying to deactivate " + + curLeafQueue); + } + } + } + } + + @VisibleForTesting + public int getMaxLeavesToBeActivated(float availableCapacity, + float childQueueAbsoluteCapacity, int numPendingApps) + throws SchedulerDynamicEditException { + + if (childQueueAbsoluteCapacity > 0) { + int numLeafQueuesNeeded = (int) Math.floor( + availableCapacity / childQueueAbsoluteCapacity); + + return Math.min(numLeafQueuesNeeded, numPendingApps); + } else{ + throw new SchedulerDynamicEditException("Child queue absolute capacity " + + "is initialized to 0. Check parent queue's " + parentQueue + .getQueueName() + " leaf queue template configuration"); + } + } + + private float getAvailableCapacity(float parentAbsCapacity, + float deactivatedAbsCapacity, float totalChildQueueActivatedCapacity) { + return parentAbsCapacity - totalChildQueueActivatedCapacity + + deactivatedAbsCapacity + EPSILON; + } + + /** + * Commit queue management changes - which involves updating required state + * on parent/underlying leaf queues + * + * @param queueManagementChanges Queue Management changes to commit + * @throws SchedulerDynamicEditException when validation fails + */ + @Override + public void commitQueueManagementChanges( + List queueManagementChanges) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + for (QueueManagementChange queueManagementChange : + queueManagementChanges) { + AutoCreatedLeafQueueConfig updatedQueueTemplate = + queueManagementChange.getUpdatedQueueTemplate(); + CSQueue queue = queueManagementChange.getQueue(); + if (!(queue instanceof AutoCreatedLeafQueue)) { + throw new SchedulerDynamicEditException( + "Expected queue management change for AutoCreatedLeafQueue. " + + "Found " + queue.getClass().getName()); + } + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) queue; + + if (updatedQueueTemplate.getQueueCapacities().getCapacity() > 0) { + if (isActive(leafQueue)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Queue is already active. Skipping activation : " + queue + .getQueuePath()); + } + } else{ + activate(leafQueue); + } + } else{ + if (!isActive(leafQueue)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Queue is already de-activated. " + + "Skipping de-activation " + + ": " + leafQueue.getQueuePath()); + } + } else{ + deactivate(leafQueue); + } + } + } + } finally { + writeLock.unlock(); + } + } + + private void activate(final AutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + getLeafQueueState(leafQueue).activate(); + + parentQueueState.incAbsoluteActivatedChildCapacity(NO_LABEL, + leafQueueTemplateCapacities + .getAbsoluteCapacity()); + } finally { + writeLock.unlock(); + } + } + + private void deactivate(final AutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + getLeafQueueState(leafQueue).deactivate(); + + for (String nodeLabel: parentQueue.getQueueCapacities() + .getExistingNodeLabels()) { + parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel, + leafQueueTemplateCapacities + .getAbsoluteCapacity()); + } + } finally { + writeLock.unlock(); + } + } + + public boolean hasPendingApps(final AutoCreatedLeafQueue leafQueue) { + return leafQueue.getNumApplications() > 0; + } + + @Override + public void reinitialize(CapacitySchedulerContext schedulerContext, final + ParentQueue parentQueue) { + if (!(parentQueue instanceof ManagedParentQueue)) { + throw new IllegalStateException( + "Expected instance of type " + ManagedParentQueue.class + " found " + + " : " + parentQueue.getClass()); + } + + if(this.parentQueue != null && !parentQueue + .getQueuePath().equals(this.parentQueue.getQueuePath())) { + throw new IllegalStateException( + "Expected parent queue path to match " + this.parentQueue.getQueuePath() + + " found : " + parentQueue.getQueuePath()); + } + + this.parentQueue = (ManagedParentQueue) parentQueue; + + initializeLeafQueueTemplate(this.parentQueue); + + LOG.info("Reinitialized queue management policy for parent queue " + + parentQueue.getQueueName()); + } + + @Override + public AutoCreatedLeafQueueConfig getInitialLeafQueueConfiguration + (AutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException { + + AutoCreatedLeafQueueConfig template = ZERO_CAPACITY_ENTITLEMENT; + try { + writeLock.lock(); + if (!addLeafQueueStateIfNotExists(leafQueue)) { + LOG.error("Leaf queue already exists in state : " + + getLeafQueueState(leafQueue)); + throw new SchedulerDynamicEditException( + "Leaf queue already exists in state : " + getLeafQueueState( + leafQueue)); + } + + float availableCapacity = getAvailableCapacity( + parentQueue.getQueueCapacities().getAbsoluteCapacity(), 0, + parentQueueState.getAbsoluteActivatedChildQueueCapacity()); + + if (availableCapacity >= leafQueueTemplateCapacities + .getAbsoluteCapacity()) { + activate(leafQueue); + template = buildTemplate(leafQueueTemplateCapacities.getCapacity(), + leafQueueTemplateCapacities.getMaximumCapacity()); + } + } finally { + writeLock.unlock(); + } + return template; + } + + @VisibleForTesting + LeafQueueState getLeafQueueState(LeafQueue queue) + throws SchedulerDynamicEditException { + try { + readLock.lock(); + String queueName = queue.getQueueName(); + if (!containsLeafQueue(queueName)) { + throw new SchedulerDynamicEditException( + "Could not find leaf queue in " + "state " + queueName); + } else{ + return leafQueueStateMap.get(queueName); + } + } finally { + readLock.unlock(); + } + } + + @VisibleForTesting + public float getAbsoluteActivatedChildQueueCapacity() { + return parentQueueState.getAbsoluteActivatedChildQueueCapacity(); + } + + private List getSortedPendingApplications() { + List apps = new ArrayList<>( + parentQueue.getApplications()); + Collections.sort(apps, applicationComparator); + return apps; + } + + private AutoCreatedLeafQueueConfig buildTemplate( + float capacity, float maxCapacity) { + AutoCreatedLeafQueueConfig.Builder templateBuilder = new + AutoCreatedLeafQueueConfig.Builder(); + QueueCapacities capacities = new QueueCapacities(false); + templateBuilder.capacities(capacities); + + for (String nodeLabel : parentQueue.getQueueCapacities() + .getExistingNodeLabels()) { + capacities.setCapacity(nodeLabel, capacity); + capacities.setMaximumCapacity(nodeLabel, maxCapacity); + } + + templateBuilder.maxApps(leafQueueTemplate.getMaxApps()); + templateBuilder.maxAppsPerUser(leafQueueTemplate.getMaxAppsPerUser()); + templateBuilder.userLimit(leafQueueTemplate.getUserLimit()); + templateBuilder.userLimitFactor(leafQueueTemplate.getUserLimit()); + + return new AutoCreatedLeafQueueConfig(templateBuilder); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java index 2a751e3e437..f4182f3101f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java @@ -43,4 +43,26 @@ public float getCapacity() { public void setCapacity(float capacity) { this.capacity = capacity; } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof QueueEntitlement)) + return false; + + QueueEntitlement that = (QueueEntitlement) o; + + if (Float.compare(that.capacity, capacity) != 0) + return false; + return Float.compare(that.maxCapacity, maxCapacity) == 0; + } + + @Override + public int hashCode() { + int result = (capacity != +0.0f ? Float.floatToIntBits(capacity) : 0); + result = 31 * result + (maxCapacity != +0.0f ? Float.floatToIntBits( + maxCapacity) : 0); + return result; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java new file mode 100644 index 00000000000..926e1be6668 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .QueueManagementChange; + +import java.util.List; + +/** + * Event to update scheduler of any queue management changes + */ +public class QueueManagementChangeEvent extends SchedulerEvent { + + private ParentQueue parentQueue; + private List queueManagementChanges; + + public QueueManagementChangeEvent(ParentQueue parentQueue, + List queueManagementChanges) { + super(SchedulerEventType.MANAGE_QUEUE); + this.parentQueue = parentQueue; + this.queueManagementChanges = queueManagementChanges; + } + + public ParentQueue getParentQueue() { + return parentQueue; + } + + public List getQueueManagementChanges() { + return queueManagementChanges; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 229e0bbc0be..b107cf4ee61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -51,5 +51,8 @@ MARK_CONTAINER_FOR_KILLABLE, // Cancel a killable container - MARK_CONTAINER_FOR_NONKILLABLE + MARK_CONTAINER_FOR_NONKILLABLE, + + //Queue Management Change + MANAGE_QUEUE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index 7090bc929f8..931e776d0d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.commons.lang.math.RandomUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -47,6 +48,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -57,17 +61,22 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager + .NO_LABEL; import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; @@ -79,7 +88,7 @@ import static org.mockito.Mockito.when; /** - * Tests for creation and reinitilization of auto created leaf queues + * Tests for creation and reinitialization of auto created leaf queues * under a ManagedParentQueue. */ public class TestCapacitySchedulerAutoQueueCreation { @@ -113,10 +122,17 @@ private static float C1_CAPACITY = 20f; private static float C2_CAPACITY = 20f; - private static String USER = "user_"; - private static String USER0 = USER + 0; - private static String USER2 = USER + 2; - private static String PARENT_QUEUE = "c"; + private final String USER = "user_"; + private final String USER0 = USER + 0; + private final String USER1 = USER + 1; + private final String USER3 = USER + 3; + private final String USER2 = USER + 2; + private final String PARENT_QUEUE = "c"; + + private Set accessibleNodeLabelsOnC = new HashSet<>(); + + private final String NODEL_LABEL_GPU = "GPU"; + private final String NODEL_LABEL_SSD = "SSD"; private MockRM mockRM = null; @@ -248,8 +264,8 @@ private CapacitySchedulerConfiguration setupQueueConfiguration( conf.setAutoCreateChildQueueEnabled(C, true); //Setup leaf queue template configs - conf.setAutoCreatedLeafQueueTemplateCapacity(C, 50.0f); - conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f); + conf.setAutoCreatedLeafQueueConfigCapacity(C, 50.0f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f); LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue"); @@ -257,8 +273,16 @@ private CapacitySchedulerConfiguration setupQueueConfiguration( conf.setAutoCreateChildQueueEnabled(D, true); //Setup leaf queue template configs - conf.setAutoCreatedLeafQueueTemplateCapacity(D, 10.0f); - conf.setAutoCreatedLeafQueueTemplateMaxCapacity(D, 100.0f); + conf.setAutoCreatedLeafQueueConfigCapacity(D, 10.0f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(D, 100.0f); + + accessibleNodeLabelsOnC.add(NODEL_LABEL_GPU); + accessibleNodeLabelsOnC.add(NODEL_LABEL_SSD); + accessibleNodeLabelsOnC.add(NO_LABEL); + + conf.setAccessibleNodeLabels(C, accessibleNodeLabelsOnC); + conf.setCapacityByLabel(C, NODEL_LABEL_GPU, 50); + conf.setCapacityByLabel(C, NODEL_LABEL_SSD, 50); LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue"); @@ -289,7 +313,7 @@ public void testAutoCreateLeafQueueCreation() throws Exception { ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue( PARENT_QUEUE); assertEquals(parentQueue, autoCreatedLeafQueue.getParent()); - validateCapacities(autoCreatedLeafQueue); + validateInitialQueueEntitlement(parentQueue, USER0, 0.1f); } finally { cleanupQueue(USER0); } @@ -297,7 +321,6 @@ public void testAutoCreateLeafQueueCreation() throws Exception { @Test public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception { - try { String host = "127.0.0.1"; RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, @@ -319,7 +342,7 @@ public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception { ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue( PARENT_QUEUE); assertEquals(parentQueue, autoCreatedLeafQueue.getParent()); - validateCapacities(autoCreatedLeafQueue); + validateInitialQueueEntitlement(parentQueue, USER0, 0.1f); ApplicationAttemptId appAttemptId = appsInC.get(0); @@ -354,7 +377,8 @@ public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception { AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( USER0); - validateCapacities(leafQueue); + validateInitialQueueEntitlement(parentQueue, leafQueue.getQueueName(), + 0.1f); } finally { cleanupQueue(USER0); @@ -399,16 +423,16 @@ public void testRefreshQueuesWithAutoCreatedLeafQueues() throws Exception { checkQueueCapacities(newCS, 40f, 20f); //chnage parent template configs and reinitialize - conf.setAutoCreatedLeafQueueTemplateCapacity(C, 30.0f); - conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f); + conf.setAutoCreatedLeafQueueConfigCapacity(C, 30.0f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f); newCS.reinitialize(conf, newMockRM.getRMContext()); ManagedParentQueue c = (ManagedParentQueue) newCS.getQueue("c"); AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS, "c3", c); newCS.addQueue(c3); - AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate - leafQueueTemplate = parentQueue.getLeafQueueTemplate(); + AutoCreatedLeafQueueConfig leafQueueTemplate = + parentQueue.getLeafQueueTemplate(); QueueCapacities cap = leafQueueTemplate.getQueueCapacities(); c3.setEntitlement( new QueueEntitlement(cap.getCapacity(), cap.getMaximumCapacity())); @@ -460,7 +484,7 @@ public void testConvertLeafQueueToParentQueueWithAutoCreate() CapacitySchedulerConfiguration newConf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(newConf); - newConf.setAutoCreatedLeafQueueTemplateCapacity(A1, A1_CAPACITY / 10); + newConf.setAutoCreatedLeafQueueConfigCapacity(A1, A1_CAPACITY / 10); newConf.setAutoCreateChildQueueEnabled(A1, true); newCS.setConf(new YarnConfiguration()); @@ -490,7 +514,7 @@ public void testConvertFailsFromParentQueueToManagedParentQueue() CapacitySchedulerConfiguration newConf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(newConf); - newConf.setAutoCreatedLeafQueueTemplateCapacity(A, A_CAPACITY / 10); + newConf.setAutoCreatedLeafQueueConfigCapacity(A, A_CAPACITY / 10); newConf.setAutoCreateChildQueueEnabled(A, true); newCS.setConf(new YarnConfiguration()); @@ -532,10 +556,10 @@ public void testAutoCreateLeafQueueFailsWithNoQueueMapping() } private void validateCapacities(AutoCreatedLeafQueue autoCreatedLeafQueue) { - assertEquals(autoCreatedLeafQueue.getCapacity(), 0.0f, EPSILON); - assertEquals(autoCreatedLeafQueue.getAbsoluteCapacity(), 0.0f, EPSILON); - assertEquals(autoCreatedLeafQueue.getMaximumCapacity(), 0.0f, EPSILON); - assertEquals(autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), 0.0f, + assertEquals( 0.0f, autoCreatedLeafQueue.getCapacity(), EPSILON); + assertEquals( 0.0f, autoCreatedLeafQueue.getAbsoluteCapacity(), EPSILON); + assertEquals(1.0f, autoCreatedLeafQueue.getMaximumCapacity(), EPSILON); + assertEquals(1.0f, autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), EPSILON); int maxAppsForAutoCreatedQueues = (int) ( CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS @@ -555,13 +579,26 @@ private void cleanupQueue(String queueName) throws YarnException { ((ManagedParentQueue) queue.getParent()).removeChildQueue( queue.getQueueName()); cs.getCapacitySchedulerQueueManager().removeQueue(queue.getQueueName()); - } else{ - throw new YarnException("Queue does not exist " + queueName); } } - String getQueueMapping(String parentQueue, String leafQueue) { - return parentQueue + DOT + leafQueue; + private ApplicationId submitApp(CSQueue parentQueue, String leafQueueName, + String user, int expectedNumAppsInParentQueue, + int expectedNumAppsInLeafQueue) throws Exception { + // submit an app + RMApp rmApp = mockRM.submitApp(GB, "test-auto-queue-activation", user, null, + leafQueueName); + + // check preconditions + List appsInParentQueue = cs.getAppsInQueue( + parentQueue.getQueueName()); + assertEquals(expectedNumAppsInParentQueue, appsInParentQueue.size()); + + List appsInLeafQueue = cs.getAppsInQueue( + leafQueueName); + assertEquals(expectedNumAppsInLeafQueue, appsInLeafQueue.size()); + + return rmApp.getApplicationId(); } @Test(timeout = 10000) @@ -778,6 +815,221 @@ void checkQueueCapacities(CapacityScheduler newCS, float capacityC, } } + @Test + public void testAutoCreatedQueueActivationDeactivation() throws Exception { + + try { + String host = "127.0.0.1"; + RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, + host); + cs.handle(new NodeAddedSchedulerEvent(node)); + + CSQueue parentQueue = cs.getQueue(PARENT_QUEUE); + + //submit app1 as USER1 + submitApp(parentQueue, USER1, USER1, 1, 1); + validateInitialQueueEntitlement(parentQueue, USER1, 0.1f); + + //submit another app2 as USER2 + ApplicationId user2AppId = submitApp(parentQueue, USER2, USER2, 2, 1); + validateInitialQueueEntitlement(parentQueue, USER2, 0.2f); + + //submit another app3 as USER1 + submitApp(parentQueue, USER1, USER1, 3, 2); + + //validate total activated abs capacity remains the same + GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy = + (GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) + parentQueue) + .getAutoCreatedQueueManagementPolicy(); + assertEquals(autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON); + + //submit user_3 app. This cant be scheduled since there is no capacity + submitApp(parentQueue, USER3, USER3, 4, 1); + final CSQueue user3LeafQueue = cs.getQueue(USER3); + validateCapacities((AutoCreatedLeafQueue) user3LeafQueue); + + //deactivate USER2 queue + cs.killAllAppsInQueue(USER2); + mockRM.waitForState(user2AppId, RMAppState.KILLED); + + //Verify if USER_2 can be deactivated since it has no pending appsA + List queueManagementChanges = + autoCreatedQueueManagementPolicy.computeQueueManagementChanges(); + + ManagedParentQueue managedParentQueue = (ManagedParentQueue) parentQueue; + managedParentQueue.validateAndApplyQueueManagementChanges(queueManagementChanges); + + validateDeactivatedQueueEntitlement(parentQueue, USER2, 0.2f, + queueManagementChanges); + + //USER_3 should now get activated + validateActivatedQueueEntitlement(parentQueue, USER3, 0.2f, + queueManagementChanges); + + } finally { + cleanupQueue(USER1); + cleanupQueue(USER2); + cleanupQueue(USER3); + } + } + + @Test + public void testAutoCreatedQueueInheritsNodeLabels() throws Exception { + + try { + String host = "127.0.0.1"; + RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, + host); + cs.handle(new NodeAddedSchedulerEvent(node)); + + CSQueue parentQueue = cs.getQueue(PARENT_QUEUE); + + submitApp(USER1, USER1, NODEL_LABEL_GPU); + //submit app1 as USER1 + validateInitialQueueEntitlement(parentQueue, USER1, 0.1f); + } finally { + cleanupQueue(USER1); + } + } + + void validateInitialQueueEntitlement(CSQueue parentQueue, String + leafQueueName, float + expectedTotalChildQueueAbsCapacity) throws + SchedulerDynamicEditException { + ManagedParentQueue autoCreateEnabledParentQueue = + (ManagedParentQueue) parentQueue; + + GuaranteedOrZeroCapacityOverTimePolicy policy = + (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue + .getAutoCreatedQueueManagementPolicy(); + + assertEquals(expectedTotalChildQueueAbsCapacity, + policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON); + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(leafQueueName); + + for (String label : accessibleNodeLabelsOnC) { + validateCapacitiesByLabel(autoCreateEnabledParentQueue, leafQueue, label); + } + + assertEquals(true, policy.isActive(leafQueue)); + } + + void validateCapacitiesByLabel(ManagedParentQueue + autoCreateEnabledParentQueue, AutoCreatedLeafQueue leafQueue, String + label) { + assertEquals(autoCreateEnabledParentQueue.getLeafQueueTemplate() + .getQueueCapacities().getCapacity(), + leafQueue.getQueueCapacities() + .getCapacity(label), EPSILON); + assertEquals(autoCreateEnabledParentQueue.getLeafQueueTemplate() + .getQueueCapacities().getMaximumCapacity(), + leafQueue.getQueueCapacities() + .getMaximumCapacity(label), EPSILON); + } + + void validateActivatedQueueEntitlement(CSQueue parentQueue, + String leafQueueName, float expectedTotalChildQueueAbsCapacity, + List queueManagementChanges) + throws SchedulerDynamicEditException { + ManagedParentQueue autoCreateEnabledParentQueue = + (ManagedParentQueue) parentQueue; + + GuaranteedOrZeroCapacityOverTimePolicy policy = + (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue + .getAutoCreatedQueueManagementPolicy(); + + QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate() + .getQueueCapacities(); + QueueEntitlement expectedEntitlement = new QueueEntitlement( + cap.getCapacity(), cap.getMaximumCapacity()); + + //validate capacity + validateQueueEntitlements(leafQueueName, expectedEntitlement, + queueManagementChanges); + + //validate parent queue state + assertEquals(expectedTotalChildQueueAbsCapacity, + policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON); + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(leafQueueName); + + //validate leaf queue state + assertEquals(true, policy.isActive(leafQueue)); + } + + void validateDeactivatedQueueEntitlement(CSQueue parentQueue, + String leafQueueName, float expectedTotalChildQueueAbsCapacity, List + queueManagementChanges) + throws SchedulerDynamicEditException { + QueueEntitlement expectedEntitlement = new QueueEntitlement(0.0f, 1.0f); + + ManagedParentQueue autoCreateEnabledParentQueue = + (ManagedParentQueue) parentQueue; + + AutoCreatedLeafQueue leafQueue = + (AutoCreatedLeafQueue) cs.getQueue(leafQueueName); + + GuaranteedOrZeroCapacityOverTimePolicy policy = + (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue + .getAutoCreatedQueueManagementPolicy(); + + //validate parent queue state + assertEquals(expectedTotalChildQueueAbsCapacity, + policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON); + + //validate leaf queue state + assertEquals(false, policy.isActive(leafQueue)); + + //validate capacity + validateQueueEntitlements(leafQueueName, expectedEntitlement, + queueManagementChanges); + } + + void validateQueueEntitlements(String leafQueueName, + QueueEntitlement expectedEntitlement, List + queueEntitlementChanges) { + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( + leafQueueName); + validateQueueEntitlementChangesForLeafQueue(leafQueue, expectedEntitlement, + queueEntitlementChanges); + } + + String getQueueMapping(String parentQueue, String leafQueue) { + return parentQueue + DOT + leafQueue; + } + + private void validateQueueEntitlementChangesForLeafQueue(CSQueue leafQueue, + QueueEntitlement expectedQueueEntitlement, + final List queueEntitlementChanges) { + boolean found = false; + for (QueueManagementChange entitlementChange : queueEntitlementChanges) { + if (leafQueue.getQueueName().equals( + entitlementChange.getQueue().getQueueName())) { + + AutoCreatedLeafQueueConfig updatedQueueTemplate = + entitlementChange.getUpdatedQueueTemplate(); + + for (String label : accessibleNodeLabelsOnC) { + QueueEntitlement newEntitlement = new QueueEntitlement( + updatedQueueTemplate.getQueueCapacities().getCapacity(label), + updatedQueueTemplate.getQueueCapacities().getMaximumCapacity + (label)); + assertEquals(expectedQueueEntitlement, newEntitlement); + } + found = true; + break; + } + } + if (!found) { + fail( + "Could not find the specified leaf queue in entitlement changes : " + + leafQueue.getQueueName()); + } + } + ApplicationAttemptId submitApp(CapacityScheduler newCS, String user, String queue, String parentQueue) { ApplicationId appId = BuilderUtils.newApplicationId(1, 1); @@ -791,4 +1043,19 @@ ApplicationAttemptId submitApp(CapacityScheduler newCS, String user, newCS.handle(addAttemptEvent); return appAttemptId; } + + RMApp submitApp(String user, String queue, String nodeLabel) throws + Exception { + RMApp app = mockRM.submitApp(GB, "test-auto-queue-creation" + RandomUtils + .nextInt(100), + user, + null, queue, nodeLabel); + Assert.assertEquals(app.getAmNodeLabelExpression(), nodeLabel); + // check preconditions + List appsInC = cs.getAppsInQueue(PARENT_QUEUE); + assertEquals(1, appsInC.size()); + assertNotNull(cs.getQueue(queue)); + + return app; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java new file mode 100644 index 00000000000..c71d2bf9134 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestGuaranteedOrZeroCapacityOverTimePolicy { + + @Test + public void testGetMaxLeavesToBeActivated() + throws SchedulerDynamicEditException { + GuaranteedOrZeroCapacityOverTimePolicy policy = + new GuaranteedOrZeroCapacityOverTimePolicy(); + + assertEquals(1, policy.getMaxLeavesToBeActivated(0.17f, 0.03f, 1)); + assertEquals(5, policy.getMaxLeavesToBeActivated(0.17f, 0.03f, 7)); + assertEquals(0, policy.getMaxLeavesToBeActivated(0, 0.03f, 10)); + } +}