diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 2a741ed83cb..8bc5ad11175 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -78,7 +78,7 @@ public Thread newThread(Runnable r) { } private void schedulePreemptionChecker() { - handler = ses.scheduleAtFixedRate(new PreemptionChecker(), + handler = ses.scheduleAtFixedRate(new PolicyInvoker(), 0, monitorInterval, TimeUnit.MILLISECONDS); } @@ -98,7 +98,7 @@ public void invokePolicy(){ scheduleEditPolicy.editSchedule(); } - private class PreemptionChecker implements Runnable { + private class PolicyInvoker implements Runnable { @Override public void run() { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 2c072d2544f..996fd488226 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -376,7 +376,7 @@ private void cleanupStaledPreemptionCandidates(long currentTime) { } private Set getLeafQueueNames(TempQueuePerPartition q) { - if (q.children == null || q.children.isEmpty()) { + if ((q.children == null || q.children.isEmpty()) && q.leafQueue != null) { return ImmutableSet.of(q.queueName); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java index 46f5cf113ea..9e36cc08193 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java @@ -19,6 +19,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common + .QueueEntitlement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +39,7 @@ AbstractManagedParentQueue.class); protected AutoCreatedLeafQueueTemplate leafQueueTemplate; + protected AutoCreatedQueueManagementPolicy queueManagementPolicy = null; public AbstractManagedParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -273,4 +277,23 @@ public int getMaxAppsPerUser() { public AutoCreatedLeafQueueTemplate getLeafQueueTemplate() { return leafQueueTemplate; } + + protected void validateQueueEntitlementChange(AutoCreatedLeafQueue + leafQueue, QueueEntitlement entitlement) + throws SchedulerDynamicEditException { + + float sumChilds = sumOfChildCapacities(); + float newChildCap = + sumChilds - leafQueue.getCapacity() + entitlement.getCapacity(); + + if (!(newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON)) { + throw new SchedulerDynamicEditException( + "Sum of child queues should exceed 100% for auto creating parent " + + "queue : " + queueName); + } + } + + public AutoCreatedQueueManagementPolicy getAutoCreatedQueueManagementPolicy() { + return queueManagementPolicy; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java index bc206d41521..10cb0eafdf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -94,6 +94,15 @@ public void setEntitlement(QueueEntitlement entitlement) throw new SchedulerDynamicEditException( "Capacity demand is not in the [0,1] range: " + capacity); } + + // note: epsilon checks here are not ok, as the epsilons might + // accumulate and become a problem in aggregate + if (Math.abs(entitlement.getCapacity() - getCapacity()) == 0 + && Math.abs( + entitlement.getMaxCapacity() - getMaximumCapacity()) == 0) { + return; + } + setCapacity(capacity); setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); setMaxCapacity(entitlement.getMaxCapacity()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java new file mode 100644 index 00000000000..7502ec7d941 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.util.Clock; + +import java.util.List; + +public interface AutoCreatedQueueManagementPolicy { + + /** + * Initialize policy + * @param schedulerContext Capacity Scheduler context + * @param clock a reference to the system clock. + */ + void init(CapacitySchedulerContext schedulerContext, ParentQueue parentQueue, + Clock clock); + + /** + * Reinitlize policy state ( if required) + * @param schedulerContext Capacity Scheduler context + * @param clock a reference to the system clock. + */ + void reinitialize(CapacitySchedulerContext schedulerContext, + Clock clock); + + /** + * Get initial entitlement for the specified leaf queue + * @param leafQueue + * @return + */ + QueueEntitlement getInitialEntitlement(AutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException; + + /** + * Compute/Adjust child queue capacities + * for auto created leaf queues - @see AutoCreatedQueueEntitlementPolicy + * + * @return returns a list of suggested QueueEntitlementChange(s) which may or may not be be enforced by the scheduler + */ + List computeQueueManagementChanges() + throws SchedulerDynamicEditException; + + /** + * Commit/Update state for the specified queue entitlement changes. + */ + void commitQueueManagementChanges( + List queueManagementChanges) + throws SchedulerDynamicEditException; + + /** + * Called by parent queue when a child queue is auto created + * @param childQueue + * @throws SchedulerDynamicEditException + */ + void addChildQueue(AutoCreatedLeafQueue childQueue) + throws SchedulerDynamicEditException; + + /** + * + * Called by parent queue when a child queue is deleted + * @param childQueue + * @throws SchedulerDynamicEditException + */ + void removeChildQueue(AutoCreatedLeafQueue childQueue) + throws SchedulerDynamicEditException; + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ed30ad181ef..4acfb94a33e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -128,6 +129,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .QueueManagementChangeEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; @@ -1493,7 +1496,7 @@ public void handle(SchedulerEvent event) { { NodeLabelsUpdateSchedulerEvent labelUpdateEvent = (NodeLabelsUpdateSchedulerEvent) event; - + updateNodeLabelsAndQueueResource(labelUpdateEvent); } break; @@ -1605,6 +1608,15 @@ public void handle(SchedulerEvent event) { } } break; + case MANAGE_QUEUE: + { + QueueManagementChangeEvent queueManagementChangeEvent = + (QueueManagementChangeEvent) event; + ParentQueue parentQueue = queueManagementChangeEvent.getParentQueue(); + final List queueManagementChanges = + queueManagementChangeEvent.getQueueManagementChanges(); + validateAndApplyQueueManagementChanges(parentQueue, queueManagementChanges); + } default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } @@ -2019,6 +2031,8 @@ public void addQueue(Queue queue) String queuename = newQueue.getQueueName(); parentPlan.addChildQueue(newQueue); this.queueManager.addQueue(queuename, newQueue); + + LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded"); } finally { writeLock.unlock(); @@ -2049,25 +2063,10 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement) } AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue; + parent.validateQueueEntitlementChange(newQueue, entitlement); - float sumChilds = parent.sumOfChildCapacities(); - float newChildCap = - sumChilds - queue.getCapacity() + entitlement.getCapacity(); + newQueue.setEntitlement(entitlement); - if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) { - // note: epsilon checks here are not ok, as the epsilons might - // accumulate and become a problem in aggregate - if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0 - && Math.abs( - entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) { - return; - } - newQueue.setEntitlement(entitlement); - } else{ - throw new SchedulerDynamicEditException( - "Sum of child queues should exceed 100% for auto creating parent " - + "queue : " + parent.getQueueName()); - } LOG.info( "Set entitlement for AutoCreatedLeafQueue " + inQueue + " to " + queue.getCapacity() + @@ -2710,7 +2709,6 @@ private LeafQueue autoCreateLeafQueue( addQueue(autoCreatedLeafQueue); - //TODO - Set entitlement through capacity management policy } else{ throw new SchedulerDynamicEditException( "Could not auto-create leaf queue for " + leafQueueName @@ -2726,4 +2724,134 @@ private LeafQueue autoCreateLeafQueue( } return autoCreatedLeafQueue; } + + /** + * Asynchronously called to apply queue management changes + * @param parentQueue + * @param queueManagementChanges + */ + public void validateAndApplyQueueManagementChanges(ParentQueue parentQueue, + List queueManagementChanges) { + try { + writeLock.lock(); + + if (!(AbstractManagedParentQueue.class. + isAssignableFrom(parentQueue.getClass()))) { + LOG.error("Queue " + parentQueue.getQueueName() + + " is not an instance of PlanQueue or ManagedParentQueue." + " " + + "Ignoring update " + queueManagementChanges); + return; + } + + try { + AbstractManagedParentQueue managedParentQueue = + (AbstractManagedParentQueue) parentQueue; + //validate queue changes + validateQueueManagementChanges(managedParentQueue, + queueManagementChanges); + + applyQueueManagementChanges(managedParentQueue, queueManagementChanges); + + AutoCreatedQueueManagementPolicy policy = + managedParentQueue.getAutoCreatedQueueManagementPolicy(); + + //acquires write lock on policy + policy.commitQueueManagementChanges(queueManagementChanges); + } catch (SchedulerDynamicEditException sde) { + LOG.error("Queue Management Change event cannot be applied for " + + "parent queue : " + parentQueue.getQueueName(), sde); + } + + } finally { + writeLock.unlock(); + } + } + + public void validateQueueManagementChanges( + AbstractManagedParentQueue parentQueue, + List queueManagementChanges) + throws SchedulerDynamicEditException { + + for (QueueManagementChange queueManagementChange : queueManagementChanges) { + + CSQueue childQueue = queueManagementChange.getQueue(); + + if (!(childQueue instanceof AutoCreatedLeafQueue)) { + throw new SchedulerDynamicEditException( + "queue should be " + "AutoCreatedLeafQueue. Found " + childQueue + .getClass()); + } + + if (!(AbstractManagedParentQueue.class. + isAssignableFrom(childQueue.getParent().getClass()))) { + LOG.error("Queue " + parentQueue.getQueueName() + + " is not an instance of PlanQueue or ManagedParentQueue." + " " + + "Ignoring update " + queueManagementChanges); + throw new SchedulerDynamicEditException( + "Queue " + parentQueue.getQueueName() + + " is not a AutoEnabledParentQueue." + " Ignoring update " + + queueManagementChanges); + } + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; + + switch (queueManagementChange.getQueueAction()){ + case UPDATE_QUEUE: + QueueEntitlement updatedEntitlement = + queueManagementChange.getUpdatedQueueEntitlement(); + parentQueue.validateQueueEntitlementChange(leafQueue, + updatedEntitlement); + break; + case REMOVE_QUEUE: + if (!QueueState.STOPPED.equals(leafQueue.getState())) { + throw new SchedulerDynamicEditException( + "The queue " + leafQueue.getQueueName() + " is in " + leafQueue + .getState() + " state. " + + ".Should have been in STOPPED state"); + } + if (leafQueue.getNumApplications() > 0) { + throw new SchedulerDynamicEditException( + "The queue " + leafQueue.getQueueName() + " is not empty. Has [" + + leafQueue.getNumActiveApplications() + "] active " + + "apps [" + leafQueue.getNumPendingApplications() + + "] pending apps"); + } + break; + case ADD_QUEUE: + if (!QueueState.RUNNING.equals(leafQueue.getState())) { + throw new SchedulerDynamicEditException( + "The queue " + leafQueue.getQueueName() + " is in " + leafQueue + .getState() + " state. " + + ".Should have been in RUNNING state"); + } + parentQueue.validateQueueEntitlementChange(leafQueue, + new QueueEntitlement(leafQueue.getCapacity(), + leafQueue.getMaximumCapacity())); + break; + + } + } + } + + private void applyQueueManagementChanges(AbstractManagedParentQueue + parentQueue, List queueManagementChanges) + throws SchedulerDynamicEditException { + for (QueueManagementChange queueManagementChange : + queueManagementChanges) { + switch (queueManagementChange.getQueueAction()){ + case ADD_QUEUE: + CSQueue childQueueToBeAdded = queueManagementChange.getQueue(); + addQueue(childQueueToBeAdded); + case REMOVE_QUEUE: + CSQueue childQueueToBeRemoved = queueManagementChange.getQueue(); + removeQueue(childQueueToBeRemoved.getQueueName()); + case UPDATE_QUEUE: + AutoCreatedLeafQueue childQueueToBeUpdated = + (AutoCreatedLeafQueue) queueManagementChange.getQueue(); + //acquires write lock on leaf queue + childQueueToBeUpdated.setEntitlement( + queueManagementChange.getUpdatedQueueEntitlement()); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 4515453c933..babd0c6900e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1660,7 +1660,97 @@ public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) { } @Private + public static final String AUTO_CREATED_QUEUE_ENTITLEMENT_POLICY = + "auto-created-queue-entitlement-policy"; + + @Private + public static final String DEFAULT_AUTO_CREATED_QUEUE_ENTITLEMENT_POLICY = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.GuaranteedOrZeroCapacityOverTimePolicy"; + + @Private + private static final String QUEUE_ENTITLEMENT_CONFIG_PREFIX = + "yarn.resourcemanager.monitor.capacity.queue-entitlement."; + + /** + * Time in milliseconds between invocations of this policy + */ + @Private + public static final String QUEUE_ENTITLEMENT_MONITORING_INTERVAL = + QUEUE_ENTITLEMENT_CONFIG_PREFIX + "monitoring_interval"; + + @Private + public static final long DEFAULT_QUEUE_ENTITLEMENT_MONITORING_INTERVAL = + 1500L; + + /** + * If true, run the policy but do not apply entitlements in the scheduler + */ + @Private + public static final String QUEUE_ENTITLEMENT_OBSERVE_ONLY = + QUEUE_ENTITLEMENT_CONFIG_PREFIX + "observe_only"; + + @Private + public static final boolean DEFAULT_QUEUE_ENTITLEMENT_OBSERVE_ONLY = false; + + /** + * Queue Entitlement computation policy for Auto Created queues + * @param queue The queue's path + * @return Configured policy class name + */ + @Private + public String getAutoCreatedQueueEntitlementPolicy(String queue) { + String autoCreatedQueueEntitlementPolicy = + get(getQueuePrefix(queue) + AUTO_CREATED_QUEUE_ENTITLEMENT_POLICY, + DEFAULT_AUTO_CREATED_QUEUE_ENTITLEMENT_POLICY); + return autoCreatedQueueEntitlementPolicy; + } + + /** + * Get The policy class configured to manage capacities for auto created leaf + * queues under the specified parent + * + * @param queueName The parent queue's name + * @return The policy class configured to manage capacities for auto created + * leaf queues under the specified parent queue + */ + @Private + protected AutoCreatedQueueManagementPolicy + getAutoCreatedQueueManagementPolicyClass( + String queueName) { + + String queueManagementPolicyClassName = + getAutoCreatedQueueEntitlementPolicy(queueName); + LOG.info("Using Auto Created Queue Entitlement Policy: " + + queueManagementPolicyClassName + " for queue: " + queueName); + try { + Class queueEntitlementPolicyClazz = getClassByName( + queueManagementPolicyClassName); + if (AutoCreatedQueueManagementPolicy.class.isAssignableFrom( + queueEntitlementPolicyClazz)) { + return (AutoCreatedQueueManagementPolicy) ReflectionUtils.newInstance( + queueEntitlementPolicyClazz, this); + } else{ + throw new YarnRuntimeException( + "Class: " + queueManagementPolicyClassName + " not instance of " + + AutoCreatedQueueManagementPolicy.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate " + "AutoCreatedQueueManagementPolicy: " + + queueManagementPolicyClassName + " for queue: " + queueName, + e); + } + } + @VisibleForTesting + @Private + public void setQueueEntitlementPolicyObserveOnlyFlag(final String queue, + final boolean isObserveOnly) { + setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, isObserveOnly); + } + + @VisibleForTesting + @Private public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath, float val) { String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 7c918a53620..ae74989a726 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; /** @@ -94,4 +95,11 @@ * @return if configuration is mutable */ boolean isConfigurationMutable(); + + /** + * Get clock from scheduler + * @return Clock + */ + Clock getClock(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/GuaranteedOrZeroCapacityOverTimePolicy.java new file mode 100644 index 00000000000..19501689302 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/GuaranteedOrZeroCapacityOverTimePolicy.java @@ -0,0 +1,537 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common + .QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica + .FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.Clock; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CSQueueUtils.EPSILON; + +/** + * Capacity Management policy for auto created leaf queues which sets the Queue + * Entitlement to 0 when there are no pending or running apps under that queue + * and enabled capacity to a pre-configured guaranteed capacity when an + * application attempt is pending or scheduleable. + */ +public class GuaranteedOrZeroCapacityOverTimePolicy + implements AutoCreatedQueueManagementPolicy { + + private CapacitySchedulerContext scheduler; + private Clock clock; + + private ManagedParentQueue parentQueue; + + private static final Log LOG = LogFactory.getLog( + GuaranteedOrZeroCapacityOverTimePolicy.class); + + private final QueueEntitlement ZERO_CAPACITY_ENTITLEMENT = + new QueueEntitlement(0.0f, 1.0f); + + private ReentrantReadWriteLock.WriteLock writeLock; + + private ReentrantReadWriteLock.ReadLock readLock; + + private Map leafQueueStateMap = new HashMap<>(); + + private ParentQueueState parentQueueState = new ParentQueueState(); + + private float parentAbsoluteCapacity = 0.0f; + + private float leafQueueTemplateAbsoluteCapacity = 0.0f; + + private float leafQueueTemplateCapacity = 0.0f; + + private float leafQueueTemplateMaxCapacity = 0.0f; + + private class LeafQueueState { + + private AtomicBoolean isActive = new AtomicBoolean(false); + + private long mostRecentActivationTime; + + private long mostRecentDeactivationTime; + + public long getMostRecentActivationTime() { + return mostRecentActivationTime; + } + + public long getMostRecentDeactivationTime() { + return mostRecentDeactivationTime; + } + + /** + * Is the queue currently active or deactivated? + * + * @return true if Active else false + */ + public boolean isActive() { + return isActive.get(); + } + + private boolean activate() { + LOG.info("Activated leaf queue "); + boolean ret = isActive.compareAndSet(false, true); + mostRecentActivationTime = clock.getTime(); + return ret; + } + + private boolean deactivate() { + LOG.info("De-Activated leaf queue "); + boolean ret = isActive.compareAndSet(true, false); + mostRecentDeactivationTime = clock.getTime(); + return ret; + } + } + + private class ParentQueueState { + + private float totalAbsoluteActivatedChildQueueCapacity = 0.0f; + + private float getTotalAbsoluteActivatedChildQueueCapacity() { + return totalAbsoluteActivatedChildQueueCapacity; + } + + private void incAbsoluteActivatedChildCapacity(float childQueueCapacity) { + totalAbsoluteActivatedChildQueueCapacity += childQueueCapacity; + } + + private void decAbsoluteActivateChildCapacity(float childQueueCapacity) { + totalAbsoluteActivatedChildQueueCapacity -= childQueueCapacity; + } + } + + /** + * Comparator that orders applications by their submit time + */ + private class PendingApplicationComparator + implements Comparator, Serializable { + + @Override + public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) { + RMApp rmApp1 = scheduler.getRMContext().getRMApps().get( + app1.getApplicationId()); + RMApp rmApp2 = scheduler.getRMContext().getRMApps().get( + app2.getApplicationId()); + return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime()); + } + } + + private PendingApplicationComparator applicationComparator = + new PendingApplicationComparator(); + + @Override + public void init(final CapacitySchedulerContext schedulerContext, + final ParentQueue parentQueue, final Clock clock) { + this.scheduler = schedulerContext; + this.parentQueue = (ManagedParentQueue) parentQueue; + this.clock = clock; + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + + QueueCapacities leafQueueCapacities = ((ManagedParentQueue) parentQueue). + getLeafQueueTemplate().getQueueCapacities(); + parentAbsoluteCapacity = parentQueue.getAbsoluteCapacity(); + leafQueueTemplateAbsoluteCapacity = + leafQueueCapacities.getAbsoluteCapacity(); + leafQueueTemplateCapacity = leafQueueCapacities.getCapacity(); + leafQueueTemplateMaxCapacity = leafQueueCapacities.getMaximumCapacity(); + + LOG.info("Initialized queue management policy for parent queue " + + parentQueue.getQueueName()); + } + + @Override + public List computeQueueManagementChanges() + throws SchedulerDynamicEditException { + + try { + readLock.lock(); + List computedEntitlements = new ArrayList<>(); + + // check if any leaf queues need to be deactivated based on pending + // applications and + Set deactivatedLeafQueues = deactivateLeafQueuesIfInActive( + parentQueue, computedEntitlements); + + float deactivatedCapacity = + computedEntitlements.size() * leafQueueTemplateAbsoluteCapacity; + + float sumOfChildQueueActivatedCapacity = + parentQueueState.getTotalAbsoluteActivatedChildQueueCapacity(); + + if ( LOG.isDebugEnabled()) { + LOG.debug("Parent queue : " + parentQueue.getQueueName() + " with " + + "abs capacity : " + parentAbsoluteCapacity + + ", deactivatedCapacity : " + deactivatedCapacity + + " , sumOfChildActivatedCapacity : " + sumOfChildQueueActivatedCapacity ); + } + + //Check if we need to activate anything at all? + float availableCapacity = getAvailableCapacity(parentAbsoluteCapacity, + deactivatedCapacity, sumOfChildQueueActivatedCapacity); + if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) { + + //sort applications across leaf queues by submit time + List pendingApps = getSortedPendingApplications(); + + if (pendingApps.size() > 0) { + int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated( + availableCapacity, leafQueueTemplateAbsoluteCapacity, + pendingApps.size()); + + if ( LOG.isDebugEnabled()) { + LOG.debug("Found " + maxLeafQueuesTobeActivated + + " leaf queues to be activated with " + pendingApps.size() + + " apps "); + } + + LinkedHashSet leafQueuesToBeActivated = getSortedLeafQueues( + pendingApps, maxLeafQueuesTobeActivated, deactivatedLeafQueues); + + //Not compute entitlement changes for the identified leaf queues + // which is appended to the list computedEntitlements + computeEntitlementChanges(leafQueuesToBeActivated, + computedEntitlements, availableCapacity); + } + } else{ + if ( LOG.isDebugEnabled()) { + LOG.debug( + "Could not activate auto created leaf queues since there is " + + "no capacity"); + } + } + + return computedEntitlements; + } finally { + readLock.unlock(); + } + } + + private LinkedHashSet getSortedLeafQueues( + final List pendingApps, int leafQueuesNeeded, + Set deactivatedQueues) throws SchedulerDynamicEditException { + LinkedHashSet leafQueues = new LinkedHashSet<>(leafQueuesNeeded); + int ctr = 0; + for (FiCaSchedulerApp app : pendingApps) { + AutoCreatedLeafQueue leafQueue = + (AutoCreatedLeafQueue) app.getCSLeafQueue(); + String leafQueueName = leafQueue.getQueueName(); + + //Check if leafQueue is not active already and has any pending apps + if (ctr < leafQueuesNeeded) { + //acquires read lock on leaf queue + if (!isActive(leafQueue) && hasPendingApps(leafQueue)) { + if (!isDeactivatedQueue(deactivatedQueues, + leafQueue.getQueueName())) { + if (addLeafQueueIfNotExists(leafQueues, leafQueueName)) { + ctr++; + } + } + } + } else{ + break; + } + } + return leafQueues; + } + + private boolean isDeactivatedQueue(Set deactivatedQueues, + String leafQueueName) { + return deactivatedQueues.contains(leafQueueName); + } + + private boolean addLeafQueueIfNotExists(Set leafQueues, + String leafQueueName) { + boolean ret = false; + if (!leafQueues.contains(leafQueueName)) { + ret = leafQueues.add(leafQueueName); + } + return ret; + } + + private boolean isActive(final AutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException { + LeafQueueState leafQueueStatus = getLeafQueueState(leafQueue); + return leafQueueStatus.isActive(); + } + + private Set deactivateLeafQueuesIfInActive(ParentQueue parentQueue, + List computedEntitlements) + throws SchedulerDynamicEditException { + Set deactivatedQueues = new HashSet<>(); + + for (CSQueue childQueue : parentQueue.getChildQueues()) { + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; + + if (leafQueue != null) { + if (isActive(leafQueue) && !hasPendingApps(leafQueue)) { + QueueEntitlement newEntitlement = ZERO_CAPACITY_ENTITLEMENT; + computedEntitlements.add( + new QueueManagementChange.UpdateQueue(leafQueue, newEntitlement)); + deactivatedQueues.add(leafQueue.getQueueName()); + } else{ + LOG.info(" Leaf queue has pending applications : " + leafQueue + .getNumApplications() + ".Skipping deactivation for " + + leafQueue); + } + } else{ + LOG.warn("Could not find queue in scheduler while trying" + + " to deactivate " + + childQueue.getQueueName()); + } + } + return deactivatedQueues; + } + + protected void computeEntitlementChanges( + LinkedHashSet leafQueuesToBeActivated, + List computedEntitlements, + final float availableCapacity) { + + float curAvailableCapacity = availableCapacity; + for (String curLeafQueueName : leafQueuesToBeActivated) { + // Activate queues if capacity is available + if (curAvailableCapacity >= leafQueueTemplateAbsoluteCapacity) { + AutoCreatedLeafQueue leafQueue = + (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager() + .getQueue(curLeafQueueName); + if (leafQueue != null) { + QueueEntitlement newEntitlement = new QueueEntitlement( + leafQueueTemplateCapacity, leafQueueTemplateMaxCapacity); + computedEntitlements.add( + new QueueManagementChange.UpdateQueue(leafQueue, newEntitlement)); + curAvailableCapacity -= leafQueueTemplateAbsoluteCapacity; + } else{ + LOG.warn( + "Could not find queue in scheduler while trying to deactivate " + + curLeafQueueName); + } + } + } + } + + @VisibleForTesting + int getMaxLeavesToBeActivated(float availableCapacity, + float childQueueAbsoluteCapacity, int numPendingApps) + throws SchedulerDynamicEditException { + + if (childQueueAbsoluteCapacity > 0) { + int numLeafQueuesNeeded = (int) Math.floor( + availableCapacity / childQueueAbsoluteCapacity); + + return Math.min(numLeafQueuesNeeded, numPendingApps); + } else{ + throw new SchedulerDynamicEditException("Child queue absolute capacity " + + "is initialized to 0. Check prent queue's " + parentQueue + .getQueueName() + " leaf queue template configuration"); + } + } + + private float getAvailableCapacity(float parentCapacity, + float deactivatedCapacity, float totalChildQueueActivatedCapacity) { + return parentCapacity - totalChildQueueActivatedCapacity + + deactivatedCapacity + EPSILON; + } + + /** + * Validate specified entitlement + * + * @param queueManagementChanges Queue Management changes to validate and + * apply on leaf queue(s) in the spcified parent + * queue + * @throws SchedulerDynamicEditException when validation fails + */ + @Override + public void commitQueueManagementChanges( + List queueManagementChanges) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + for (QueueManagementChange queueManagementChange : + queueManagementChanges) { + QueueEntitlement entitlement = + queueManagementChange.getUpdatedQueueEntitlement(); + CSQueue queue = queueManagementChange.getQueue(); + if (!(queue instanceof AutoCreatedLeafQueue)) { + throw new SchedulerDynamicEditException( + "Expected queue management change for AutoCreatedLeafQueue. " + + "Found " + queue.getClass().getName()); + } + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) queue; + + if (entitlement.getCapacity() > 0) { + if (isActive(leafQueue)) { + LOG.info("Queue is already active. Skipping activation : " + + queue.getQueuePath()); + } else{ + activate(leafQueue); + } + } else{ + if (!isActive(leafQueue)) { + LOG.info("Queue is already de-activated. Skipping de-activation : " + + leafQueue.getQueuePath()); + } else{ + deactivate(leafQueue); + } + } + } + } finally { + writeLock.unlock(); + } + } + + @Override + public void addChildQueue(AutoCreatedLeafQueue childQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if (leafQueueStateMap.containsKey(childQueue.getQueueName())) { + throw new SchedulerDynamicEditException( + "Queue already exists. Skip adding : " + childQueue.getQueuePath()); + } else{ + leafQueueStateMap.put(childQueue.getQueueName(), new LeafQueueState()); + QueueEntitlement entitlement = getInitialEntitlement(childQueue); + childQueue.setEntitlement(entitlement); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public void removeChildQueue(AutoCreatedLeafQueue childQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if (leafQueueStateMap.containsKey(childQueue.getQueueName())) { + leafQueueStateMap.remove(childQueue.getQueueName()); + } else{ + throw new SchedulerDynamicEditException( + "Could not remove queue from state " + childQueue.getQueuePath()); + } + } finally { + writeLock.unlock(); + } + } + + private void activate(final AutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + getLeafQueueState(leafQueue).activate(); + ManagedParentQueue parentQueue = + (ManagedParentQueue) leafQueue.getParent(); + parentQueueState.incAbsoluteActivatedChildCapacity( + parentQueue.leafQueueTemplate.getQueueCapacities() + .getAbsoluteCapacity()); + } finally { + writeLock.unlock(); + } + } + + private void deactivate(final AutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + getLeafQueueState(leafQueue).deactivate(); + ManagedParentQueue parentQueue = + (ManagedParentQueue) leafQueue.getParent(); + parentQueueState.decAbsoluteActivateChildCapacity( + parentQueue.leafQueueTemplate.getQueueCapacities() + .getAbsoluteCapacity()); + } finally { + writeLock.unlock(); + } + } + + public boolean hasPendingApps(final AutoCreatedLeafQueue leafQueue) { + return leafQueue.getNumApplications() > 0; + } + + @Override + public void reinitialize(CapacitySchedulerContext schedulerContext, + Clock clock) { + //Nothing to do + } + + @Override + public QueueEntitlement getInitialEntitlement(AutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if (parentQueueState.getTotalAbsoluteActivatedChildQueueCapacity() + + leafQueueTemplateAbsoluteCapacity <= parentAbsoluteCapacity) { + activate(leafQueue); + return new QueueEntitlement(leafQueueTemplateCapacity, 1.0f); + } else{ + return ZERO_CAPACITY_ENTITLEMENT; + } + } finally { + writeLock.unlock(); + } + } + + private LeafQueueState getLeafQueueState(LeafQueue queue) + throws SchedulerDynamicEditException { + String queueName = queue.getQueueName(); + if (!leafQueueStateMap.containsKey(queueName)) { + throw new SchedulerDynamicEditException( + "Could not find leaf queue in " + "state " + queueName); + } else{ + return leafQueueStateMap.get(queueName); + } + } + + @VisibleForTesting + float getTotalAbsChildQueueActivatedCapacity() { + return parentQueueState.getTotalAbsoluteActivatedChildQueueCapacity(); + } + + private List getSortedPendingApplications() { + List apps = new ArrayList<>( + parentQueue.getApplications()); + Collections.sort(apps, applicationComparator); + return apps; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java index ff795e47b23..f6b43f390db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -18,12 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; + + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; /** * Auto Creation enabled Parent queue. This queue initially does not have any @@ -60,12 +65,36 @@ public ManagedParentQueue(final CapacitySchedulerContext cs, "]\nwith user limit factor: [").append( leafQueueTemplate.getUserLimitFactor()).append("]."); LOG.info(queueInfo.toString()); + + queueManagementPolicy = + cs.getConfiguration().getAutoCreatedQueueManagementPolicyClass( + getQueuePath()); + + if ( queueManagementPolicy != null) { + queueManagementPolicy.init(cs, this, cs.getClock()); + } } @Override public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { validate(newlyParsedQueue); + + //validate if capacity is exceeded for child queues + if (csContext.getConfiguration(). + getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( + getQueuePath())) { + float childCap = sumOfChildCapacities(); + if (getCapacity() < childCap) { + throw new IOException( + "Total of Auto Created leaf queues guaranteed capacity : " + + childCap + " exceeds Parent queue's " + getQueuePath() + + " guaranteed capacity " + getCapacity() + "" + + ".Cannot enforce policy to auto" + + " create queues beyond parent queue's capacity"); + } + } + super.reinitialize(newlyParsedQueue, clusterResource); String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix( csContext.getConfiguration()); @@ -144,15 +173,44 @@ public void addChildQueue(CSQueue childQueue) AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; super.addChildQueue(leafQueue); - //TODO - refresh policy queue after capacity management is added + queueManagementPolicy.addChildQueue(leafQueue); } finally { writeLock.unlock(); } } + public List getApplications() { + try { + readLock.lock(); + List apps = new ArrayList<>(); + for (CSQueue childQueue : getChildQueues()) { + apps.addAll(((LeafQueue) childQueue).getApplications()); + } + return Collections.unmodifiableList(apps); + } finally { + readLock.unlock(); + } + } + + public List getPendingApplications() { + try { + readLock.lock(); + List apps = new ArrayList<>(); + for (CSQueue childQueue : getChildQueues()) { + apps.addAll(((LeafQueue) childQueue).getPendingApplications()); + } + return Collections.unmodifiableList(apps); + } finally { + readLock.unlock(); + } + } + private String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) { return conf.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()); } + public boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded() { + return shouldFailAutoCreationWhenGuaranteedCapacityExceeded; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index b7f8aa6996b..726af654a94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -22,6 +22,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerDynamicEditException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java new file mode 100644 index 00000000000..c76c56c5542 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; + + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager + .NO_LABEL; + +/** + * Encapsulates Queue entitlement and state updates needed for adjusting capacity dynamically + * + */ +@Private +@Unstable +public abstract class QueueManagementChange { + + private final CSQueue queue; + + /** + * Add, remove the specified queue + * Updating the queue may involve entitlement updates + * and/or QueueState changes + */ + public enum QueueAction { + ADD_QUEUE, + REMOVE_QUEUE, + UPDATE_QUEUE + } + + private Map updatedEntitlementsByPartition; + + private final QueueAction queueAction; + /** + * Updated Queue state with the new entitlement + */ + private QueueState transitionToQueueState; + + public QueueManagementChange(final CSQueue queue, final QueueAction queueAction) { + this.queue = queue; + this.queueAction = queueAction; + } + + public QueueManagementChange(final CSQueue queue, final QueueAction queueAction, QueueState targetQueueState, + final Map + entitlementsByPartition) { + this(queue, queueAction, entitlementsByPartition); + this.transitionToQueueState = targetQueueState; + } + + public QueueManagementChange(final CSQueue queue, final QueueAction + queueAction, final Map + entitlementsByPartition) { + this(queue, queueAction); + this.updatedEntitlementsByPartition = entitlementsByPartition; + } + + public QueueManagementChange(final CSQueue queue, final QueueAction + queueAction, QueueEntitlement entitlement) { + this(queue, queueAction); + this.updatedEntitlementsByPartition = new HashMap<>(); + updatedEntitlementsByPartition.put(NO_LABEL, entitlement); + } + + public QueueState getTransitionToQueueState() { + return transitionToQueueState; + } + + public CSQueue getQueue() { + return queue; + } + + public QueueEntitlement getUpdatedQueueEntitlement(String partition) { + return updatedEntitlementsByPartition.get(partition); + } + + public QueueEntitlement getUpdatedQueueEntitlement() { + return updatedEntitlementsByPartition.get(NO_LABEL); + } + + public QueueAction getQueueAction() { + return queueAction; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof QueueManagementChange)) + return false; + + QueueManagementChange that = (QueueManagementChange) o; + + if (queue != null ? !queue.equals(that.queue) : that.queue != null) + return false; + if (updatedEntitlementsByPartition != null ? + !updatedEntitlementsByPartition.equals( + that.updatedEntitlementsByPartition) : + that.updatedEntitlementsByPartition != null) + return false; + if (queueAction != that.queueAction) + return false; + return transitionToQueueState == that.transitionToQueueState; + } + + @Override + public int hashCode() { + int result = queue != null ? queue.hashCode() : 0; + result = 31 * result + (updatedEntitlementsByPartition != null ? + updatedEntitlementsByPartition.hashCode() : + 0); + result = 31 * result + (queueAction != null ? queueAction.hashCode() : 0); + result = 31 * result + (transitionToQueueState != null ? + transitionToQueueState.hashCode() : + 0); + return result; + } + + @Override + public String toString() { + return "QueueManagementChange{" + "queue=" + queue + + ", updatedEntitlementsByPartition=" + updatedEntitlementsByPartition + + ", queueAction=" + queueAction + ", transitionToQueueState=" + + transitionToQueueState + '}'; + } + + public static class AddQueue extends QueueManagementChange { + public AddQueue(final CSQueue queue) { + super(queue, QueueAction.ADD_QUEUE); + } + } + + public static class RemoveQueue extends QueueManagementChange { + public RemoveQueue(final CSQueue queue) { + super(queue, QueueAction.REMOVE_QUEUE); + } + } + + public static class UpdateQueue extends QueueManagementChange { + + public UpdateQueue(final CSQueue queue, QueueState targetQueueState, + final Map + entitlementsByPartition) { + super(queue, QueueAction.UPDATE_QUEUE, targetQueueState, entitlementsByPartition); + } + + public UpdateQueue(final CSQueue queue, final Map + entitlementsByPartition) { + super(queue, QueueAction.UPDATE_QUEUE, entitlementsByPartition); + } + + public UpdateQueue(final CSQueue queue, QueueEntitlement entitlement) { + super(queue, QueueAction.UPDATE_QUEUE, entitlement); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java new file mode 100644 index 00000000000..5d71131786c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .QueueManagementChangeEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Queue Management scheduling policy for managed parent queues which enable + * auto child queue creation + */ +public class QueueManagementDynamicEditPolicy implements SchedulingEditPolicy { + + private static final Log LOG = + LogFactory.getLog(QueueManagementDynamicEditPolicy.class); + + private Clock clock; + + // Pointer to other RM components + private RMContext rmContext; + private ResourceCalculator rc; + private CapacityScheduler scheduler; + private RMNodeLabelsManager nlm; + + private boolean observeOnly; + + private long monitoringInterval; + + private Set managedParentQueues = new HashSet<>(); + + /** + * Instantiated by CapacitySchedulerConfiguration + */ + public QueueManagementDynamicEditPolicy() { + clock = SystemClock.getInstance(); + } + + @SuppressWarnings("unchecked") + @VisibleForTesting + public QueueManagementDynamicEditPolicy(RMContext context, + CapacityScheduler scheduler) { + init(context.getYarnConfiguration(), context, scheduler); + } + + @SuppressWarnings("unchecked") + @VisibleForTesting + public QueueManagementDynamicEditPolicy(RMContext context, + CapacityScheduler scheduler, Clock clock) { + init(context.getYarnConfiguration(), context, scheduler); + this.clock = clock; + } + + /** + * @see super.init() + */ + @Override + public void init(final Configuration config, final RMContext context, final ResourceScheduler sched) { + LOG.info("Queue Entitlement Policy monitor:" + this.getClass().getCanonicalName()); + assert null == scheduler : "Unexpected duplicate call to init"; + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + rmContext = context; + scheduler = (CapacityScheduler) sched; + clock = scheduler.getClock(); + + rc = scheduler.getResourceCalculator(); + nlm = scheduler.getRMContext().getNodeLabelManager(); + + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + + monitoringInterval = csConfig.getLong( + CapacitySchedulerConfiguration.QUEUE_ENTITLEMENT_MONITORING_INTERVAL, + CapacitySchedulerConfiguration.DEFAULT_QUEUE_ENTITLEMENT_MONITORING_INTERVAL); + + observeOnly = csConfig.getBoolean( + CapacitySchedulerConfiguration.QUEUE_ENTITLEMENT_OBSERVE_ONLY, + CapacitySchedulerConfiguration.DEFAULT_QUEUE_ENTITLEMENT_OBSERVE_ONLY); + + initQueues(); + } + + /** + * Reinitializes queues(Called on scheduler.reinitialize) + * @param config Configuration + * @param context The resourceManager's context + * @param sched The scheduler + */ + public void reinitialize(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + //TODO - Wire with scheduler reinitialize when YARN-6124 is ready + initQueues(); + } + + private void initQueues() { + managedParentQueues.clear(); + for (Map.Entry queues : scheduler + .getCapacitySchedulerQueueManager() + .getQueues().entrySet()) { + + String queueName = queues.getKey(); + CSQueue queue = queues.getValue(); + + if ( queue instanceof ManagedParentQueue) { + managedParentQueues.add(queueName); + } + } + } + + @Override + public void editSchedule() { + long startTs = clock.getTime(); + + computeQueueEntitlementsForAutoCreatedLeafQueues(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); + } + } + + @VisibleForTesting + List computeQueueEntitlementsForAutoCreatedLeafQueues() { + + List queueManagementChanges = new ArrayList<>(); + // All partitions to look at + + //Proceed only if there are queues to process + if (managedParentQueues.size() > 0) { + for (String parentQueueName : managedParentQueues) { + ManagedParentQueue parentQueue = + (ManagedParentQueue) scheduler.getCapacitySchedulerQueueManager().getQueue(parentQueueName); + + queueManagementChanges.addAll(computeQueueEntitlementsForManagedQueue + (parentQueue)); + } + } + return queueManagementChanges; + } + + + @VisibleForTesting + List computeQueueEntitlementsForManagedQueue + (ManagedParentQueue parentQueue) { + + List queueManagementChanges = + Collections.emptyList(); + if (!parentQueue.shouldFailAutoCreationWhenGuaranteedCapacityExceeded()) { + + AutoCreatedQueueManagementPolicy policyClazz = + parentQueue.getAutoCreatedQueueManagementPolicy(); + long startTime = 0; + try { + if (LOG.isDebugEnabled()) { + LOG.debug(MessageFormat + .format("Trying to use {0} to select preemption candidates", + policyClazz.getClass().getName())); + startTime = clock.getTime(); + } + + queueManagementChanges = policyClazz.computeQueueManagementChanges(); + + //Scheduler update is synchronous to get feedback on errors + if (queueManagementChanges.size() > 0) { + if (isObserveOnly()) { + try { + scheduler.validateQueueManagementChanges(parentQueue, + queueManagementChanges); + } catch (SchedulerDynamicEditException e) { + LOG.error("Validation failed for queue entitlement changes : ", + e); + } + } else{ + QueueManagementChangeEvent queueManagementChangeEvent = new + QueueManagementChangeEvent(parentQueue, + queueManagementChanges); + scheduler.getRMContext().getDispatcher().getEventHandler() + .handle(queueManagementChangeEvent); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug(MessageFormat.format("{0} uses {1} millisecond to run", + policyClazz.getClass().getName(), clock.getTime() - startTime)); + if (queueManagementChanges.size() > 0) { + LOG.debug(" Updated queue entitlements for parent queue" + " [" + + parentQueue.getQueueName() + ": [\n" + queueManagementChanges + .toString() + "\n]"); + } + } + + if (queueManagementChanges.size() > 0) { + LOG.info( + " Updated queue entitlements for parent queue [" + parentQueue + .getQueueName() + ": [" + queueManagementChanges.toString() + + "]"); + } + + } catch (YarnException e) { + LOG.error( + "Could not compute child queue entitlements for parent " + "queue " + + parentQueue.getQueueName(), e); + } + } else{ + if (LOG.isDebugEnabled()) { + LOG.debug( + "Skipping queue entitlement updates for parent queue " + parentQueue + .getQueuePath() + " " + + "since configuration for auto creating queue's beyond " + + "parent's " + + "guaranteed capacity is disabled"); + } + } + return queueManagementChanges; + } + + /** + * @see super.getMonitoringInterval() + */ + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return "QueueEntitlementDynamicEditPolicy"; + } + + public ResourceCalculator getResourceCalculator() { + return rc; + } + + public boolean isObserveOnly() { + return observeOnly; + } + + public RMContext getRmContext() { + return rmContext; + } + + public ResourceCalculator getRC() { + return rc; + } + + public CapacityScheduler getScheduler() { + return scheduler; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java index 2a751e3e437..f4182f3101f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java @@ -43,4 +43,26 @@ public float getCapacity() { public void setCapacity(float capacity) { this.capacity = capacity; } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof QueueEntitlement)) + return false; + + QueueEntitlement that = (QueueEntitlement) o; + + if (Float.compare(that.capacity, capacity) != 0) + return false; + return Float.compare(that.maxCapacity, maxCapacity) == 0; + } + + @Override + public int hashCode() { + int result = (capacity != +0.0f ? Float.floatToIntBits(capacity) : 0); + result = 31 * result + (maxCapacity != +0.0f ? Float.floatToIntBits( + maxCapacity) : 0); + return result; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java new file mode 100644 index 00000000000..627b439a201 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java @@ -0,0 +1,32 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .QueueManagementChange; + +import java.util.List; + +/** + * Event to update scheduler of any queue management changes + */ +public class QueueManagementChangeEvent extends SchedulerEvent { + + private ParentQueue parentQueue; + private List queueManagementChanges; + + public QueueManagementChangeEvent(ParentQueue parentQueue, + List queueManagementChanges) { + super(SchedulerEventType.MANAGE_QUEUE); + this.parentQueue = parentQueue; + this.queueManagementChanges = queueManagementChanges; + } + + public ParentQueue getParentQueue() { + return parentQueue; + } + + public List getQueueManagementChanges() { + return queueManagementChanges; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 229e0bbc0be..b107cf4ee61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -51,5 +51,8 @@ MARK_CONTAINER_FOR_KILLABLE, // Cancel a killable container - MARK_CONTAINER_FOR_NONKILLABLE + MARK_CONTAINER_FOR_NONKILLABLE, + + //Queue Management Change + MANAGE_QUEUE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index 7090bc929f8..8223f4b8661 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -113,10 +113,12 @@ private static float C1_CAPACITY = 20f; private static float C2_CAPACITY = 20f; - private static String USER = "user_"; - private static String USER0 = USER + 0; - private static String USER2 = USER + 2; - private static String PARENT_QUEUE = "c"; + final String USER = "user_"; + final String USER0 = USER + 0; + final String USER1 = USER + 1; + final String USER3 = USER + 3; + final String USER2 = USER + 2; + final String PARENT_QUEUE = "c"; private MockRM mockRM = null; @@ -124,6 +126,8 @@ private final TestCapacityScheduler tcs = new TestCapacityScheduler(); + private QueueManagementDynamicEditPolicy policy; + private static SpyDispatcher dispatcher; private static EventHandler rmAppEventEventHandler; @@ -181,6 +185,8 @@ public void setUp() throws Exception { mockRM.start(); cs.start(); + + policy = new QueueManagementDynamicEditPolicy(mockRM.getRMContext(), cs); } private CapacitySchedulerConfiguration setupQueueMappings( @@ -289,7 +295,7 @@ public void testAutoCreateLeafQueueCreation() throws Exception { ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue( PARENT_QUEUE); assertEquals(parentQueue, autoCreatedLeafQueue.getParent()); - validateCapacities(autoCreatedLeafQueue); + validateInitialQueueEntitlement(parentQueue, USER0, 0.1f); } finally { cleanupQueue(USER0); } @@ -319,7 +325,7 @@ public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception { ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue( PARENT_QUEUE); assertEquals(parentQueue, autoCreatedLeafQueue.getParent()); - validateCapacities(autoCreatedLeafQueue); + validateInitialQueueEntitlement(parentQueue, USER0, 0.1f); ApplicationAttemptId appAttemptId = appsInC.get(0); @@ -354,7 +360,8 @@ public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception { AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( USER0); - validateCapacities(leafQueue); + validateInitialQueueEntitlement(parentQueue, leafQueue.getQueueName(), + 0.1f); } finally { cleanupQueue(USER0); @@ -532,10 +539,10 @@ public void testAutoCreateLeafQueueFailsWithNoQueueMapping() } private void validateCapacities(AutoCreatedLeafQueue autoCreatedLeafQueue) { - assertEquals(autoCreatedLeafQueue.getCapacity(), 0.0f, EPSILON); - assertEquals(autoCreatedLeafQueue.getAbsoluteCapacity(), 0.0f, EPSILON); - assertEquals(autoCreatedLeafQueue.getMaximumCapacity(), 0.0f, EPSILON); - assertEquals(autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), 0.0f, + assertEquals( 0.0f, autoCreatedLeafQueue.getCapacity(), EPSILON); + assertEquals( 0.0f, autoCreatedLeafQueue.getAbsoluteCapacity(), EPSILON); + assertEquals(1.0f, autoCreatedLeafQueue.getMaximumCapacity(), EPSILON); + assertEquals(1.0f, autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), EPSILON); int maxAppsForAutoCreatedQueues = (int) ( CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS @@ -555,13 +562,26 @@ private void cleanupQueue(String queueName) throws YarnException { ((ManagedParentQueue) queue.getParent()).removeChildQueue( queue.getQueueName()); cs.getCapacitySchedulerQueueManager().removeQueue(queue.getQueueName()); - } else{ - throw new YarnException("Queue does not exist " + queueName); } } - String getQueueMapping(String parentQueue, String leafQueue) { - return parentQueue + DOT + leafQueue; + private ApplicationId submitApp(CSQueue parentQueue, String leafQueueName, + String user, int expectedNumAppsInParentQueue, + int expectedNumAppsInLeafQueue) throws Exception { + // submit an app + RMApp rmApp = mockRM.submitApp(GB, "test-auto-queue-activation", user, null, + leafQueueName); + + // check preconditions + List appsInParentQueue = cs.getAppsInQueue( + parentQueue.getQueueName()); + assertEquals(expectedNumAppsInParentQueue, appsInParentQueue.size()); + + List appsInLeafQueue = cs.getAppsInQueue( + leafQueueName); + assertEquals(expectedNumAppsInLeafQueue, appsInLeafQueue.size()); + + return rmApp.getApplicationId(); } @Test(timeout = 10000) @@ -778,6 +798,132 @@ void checkQueueCapacities(CapacityScheduler newCS, float capacityC, } } + @Test + public void testAutoCreatedQueueActivationDeactivation() throws Exception { + + try { + String host = "127.0.0.1"; + RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, + host); + cs.handle(new NodeAddedSchedulerEvent(node)); + + CSQueue parentQueue = cs.getQueue(PARENT_QUEUE); + + //submit app1 as USER1 + submitApp(parentQueue, USER1, USER1, 1, 1); + validateInitialQueueEntitlement(parentQueue, USER1, 0.1f); + + //submit another app2 as USER2 + ApplicationId user2AppId = submitApp(parentQueue, USER2, USER2, 2, 1); + validateInitialQueueEntitlement(parentQueue, USER2, 0.2f); + + //submit another app3 as USER1 + submitApp(parentQueue, USER1, USER1, 3, 2); + + //validate total avtivated abs capacity remains the same + GuaranteedOrZeroCapacityOverTimePolicy policy = + (GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) parentQueue) + .getAutoCreatedQueueManagementPolicy(); + assertEquals(policy.getTotalAbsChildQueueActivatedCapacity(), 0.2f, + EPSILON); + + //submit user_3 app. This cant be scheduled since there is no capacity + submitApp(parentQueue, USER3, USER3, 4, 1); + final CSQueue user3LeafQueue = cs.getQueue(USER3); + validateCapacities((AutoCreatedLeafQueue) user3LeafQueue); + + //deactivate USER2 queue + cs.killAllAppsInQueue(USER2); + mockRM.waitForState(user2AppId, RMAppState.KILLED); + + //Verify if USER_2 can be deactivated since it has no pending appsA + List queueManagementChanges = policy.computeQueueManagementChanges(); + validateDeactivatedQueueEntitlement(USER2, queueManagementChanges); + + //USER_3 should now get activated + validateActivatedQueueEntitlement(parentQueue, USER3, 0.2f, + queueManagementChanges); + + } finally { + cleanupQueue(USER1); + cleanupQueue(USER2); + cleanupQueue(USER3); + } + } + + void validateInitialQueueEntitlement(CSQueue parentQueue, String + leafQueueName, float + expectedTotalChildQueueAbsCapacity) { + ManagedParentQueue autoCreateEnabledParentQueue = + (ManagedParentQueue) parentQueue; + + GuaranteedOrZeroCapacityOverTimePolicy policy = + (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue + .getAutoCreatedQueueManagementPolicy(); + + assertEquals(expectedTotalChildQueueAbsCapacity, + policy.getTotalAbsChildQueueActivatedCapacity(), EPSILON); + } + + void validateActivatedQueueEntitlement(CSQueue parentQueue, + String leafQueueName, float expectedTotalChildQueueAbsCapacity, + List queueManagementChanges) { + ManagedParentQueue autoCreateEnabledParentQueue = + (ManagedParentQueue) parentQueue; + + GuaranteedOrZeroCapacityOverTimePolicy policy = + (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue + .getAutoCreatedQueueManagementPolicy(); + QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate() + .getQueueCapacities(); + QueueEntitlement expectedEntitlement = new QueueEntitlement( + cap.getCapacity(), cap.getMaximumCapacity()); + validateQueueEntitlements(leafQueueName, expectedEntitlement, + queueManagementChanges); + assertEquals(expectedTotalChildQueueAbsCapacity, + policy.getTotalAbsChildQueueActivatedCapacity(), EPSILON); + } + + void validateDeactivatedQueueEntitlement( + String leafQueueName, List queueManagementChanges) { + QueueEntitlement expectedEntitlement = new QueueEntitlement(0.0f, 1.0f); + validateQueueEntitlements(leafQueueName, expectedEntitlement, + queueManagementChanges); + } + + void validateQueueEntitlements(String leafQueueName, + QueueEntitlement expectedEntitlement, List + queueEntitlementChanges) { + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( + leafQueueName); + validateQueueEntitlementChangesForLeafQueue(leafQueue, expectedEntitlement, + queueEntitlementChanges); + } + + String getQueueMapping(String parentQueue, String leafQueue) { + return parentQueue + DOT + leafQueue; + } + + private void validateQueueEntitlementChangesForLeafQueue(CSQueue leafQueue, + QueueEntitlement leafQueueEntitlement, + final List queueEntitlementChanges) { + boolean found = false; + for (QueueManagementChange entitlementChange : queueEntitlementChanges) { + if (leafQueue.getQueueName().equals( + entitlementChange.getQueue().getQueueName())) { + assertEquals(entitlementChange.getUpdatedQueueEntitlement(), + leafQueueEntitlement); + found = true; + break; + } + } + if (!found) { + fail( + "Could not find the specified leaf queue in entitlement changes : " + + leafQueue.getQueueName()); + } + } + ApplicationAttemptId submitApp(CapacityScheduler newCS, String user, String queue, String parentQueue) { ApplicationId appId = BuilderUtils.newApplicationId(1, 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java new file mode 100644 index 00000000000..622f5f07ee9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestGuaranteedOrZeroCapacityOverTimePolicy { + + @Test + public void testGetMaxLeavesToBeActivated() + throws SchedulerDynamicEditException { + GuaranteedOrZeroCapacityOverTimePolicy policy = + new GuaranteedOrZeroCapacityOverTimePolicy(); + + assertEquals(1, policy.getMaxLeavesToBeActivated(0.17f, 0.03f, 1)); + assertEquals(5, policy.getMaxLeavesToBeActivated(0.17f, 0.03f, 7)); + assertEquals(0, policy.getMaxLeavesToBeActivated(0, 0.03f, 10)); + } +}