diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index d0425907f6a..dbb696ab258 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.security.AccessRequest; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; @@ -360,17 +361,8 @@ protected void recoverApplication(ApplicationStateData appState, private RMAppImpl createAndPopulateNewRMApp( ApplicationSubmissionContext submissionContext, long submitTime, String user, boolean isRecovery, long startTime) throws YarnException { - if (!isRecovery) { - // Do queue mapping - if (rmContext.getQueuePlacementManager() != null) { - // We only do queue mapping when it's a new application - rmContext.getQueuePlacementManager().placeApplication( - submissionContext, user); - } - // fail the submission if configured application timeout value is invalid - RMServerUtils.validateApplicationTimeouts( - submissionContext.getApplicationTimeouts()); - } + + ApplicationPlacementContext placementContext = placeApplication(submissionContext, user, isRecovery); ApplicationId applicationId = submissionContext.getApplicationId(); List amReqs = validateAndCreateResourceRequest( @@ -418,7 +410,7 @@ private RMAppImpl createAndPopulateNewRMApp( submissionContext.getQueue(), submissionContext, this.scheduler, this.masterService, submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags(), amReqs, startTime); + submissionContext.getApplicationTags(), amReqs, placementContext, startTime); // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not // influence each other @@ -763,4 +755,32 @@ private void updateAppDataToStateStore(String queue, RMApp app, + "' with below exception:" + ex.getMessage()); } } + + private ApplicationPlacementContext placeApplication(ApplicationSubmissionContext submissionContext, String user, boolean isRecovery) throws YarnException { + ApplicationPlacementContext placementContext = null; + if (!isRecovery) { + // Do queue mapping + if (rmContext.getQueuePlacementManager() != null) { + // We only do queue mapping when it's a new application + placementContext = rmContext.getQueuePlacementManager().placeApplication( + submissionContext, user); + // Set it to ApplicationSubmissionContext + if (placementContext != null && placementContext.getQueue() != null && + !placementContext.getQueue().equals(submissionContext.getQueue())) { + LOG.info("Placed application=" + submissionContext.getApplicationId() + " to queue=" + + placementContext.getQueue() + ", original queue=" + submissionContext.getQueue()); + submissionContext.setQueue(placementContext.getQueue()); + } + } + // fail the submission if configured application timeout value is invalid + RMServerUtils.validateApplicationTimeouts( + submissionContext.getApplicationTimeouts()); + } else { + if (rmContext.getQueuePlacementManager() != null) { + placementContext = rmContext.getQueuePlacementManager().placeApplication( + submissionContext, user); + } + } + return placementContext; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java new file mode 100644 index 00000000000..cb11e226c5e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java @@ -0,0 +1,80 @@ +package org.apache.hadoop.yarn.server.resourcemanager.placement; + + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; + +@Private +@Unstable +public class ApplicationPlacementContext { + + private final String user; + private final ApplicationSubmissionContext applicationSubmissionContext; + private final String queue; + private final String parentQueue; + private PlacementRule placementRule; + + @Private + @Unstable + public ApplicationPlacementContext(ApplicationSubmissionContext asc, String user, String queue) { + this(asc, user, queue, null, null); + } + + @Private + @Unstable + public ApplicationPlacementContext(ApplicationSubmissionContext asc, String user, String queue, PlacementRule rule) { + this(asc, user, queue, rule, null); + } + + @Private + @Unstable + public ApplicationPlacementContext(ApplicationSubmissionContext asc, String user, String queue, PlacementRule rule, String parentQueue) { + this.user = user; + this.applicationSubmissionContext = asc; + this.queue = queue; + this.placementRule = rule; + this.parentQueue = parentQueue; + } + + /** + * Get application's submission context + */ + public ApplicationSubmissionContext getApplicationSubmissionContext() { + return applicationSubmissionContext; + } + + /** + * Application submitter's user name + */ + public String getUserName() { + return user; + } + + /** + * Get placement rule if one applied during application->queue mapping + * + * @return The placement rule if one was applied during application->queue mapping + * else return null + */ + public PlacementRule getPlacementRule() { + return placementRule; + } + + /** + * @return Get queue that application has been mapped to by the placement rule + */ + public String getQueue() { + return queue; + } + + /** + * parent queue of the queue that application has been mapped to if available + * + * @return parent queue name + */ + public String getParentQueue() { + return parentQueue; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java index 43a4deb70eb..b71198a8b13 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java @@ -23,13 +23,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueuePath; public class PlacementManager { private static final Log LOG = LogFactory.getLog(PlacementManager.class); @@ -53,18 +53,21 @@ public void updateRules(List rules) { } } - public void placeApplication(ApplicationSubmissionContext asc, String user) + public ApplicationPlacementContext placeApplication(ApplicationSubmissionContext asc, String user) throws YarnException { + + PlacementRule ruleApplied = null; try { readLock.lock(); if (null == rules || rules.isEmpty()) { - return; + return new ApplicationPlacementContext(asc, user, asc.getQueue(), null); } String newQueueName = null; for (PlacementRule rule : rules) { newQueueName = rule.getQueueForApp(asc, user); if (newQueueName != null) { + ruleApplied = rule; break; } } @@ -77,11 +80,12 @@ public void placeApplication(ApplicationSubmissionContext asc, String user) throw new YarnException(msg); } - // Set it to ApplicationSubmissionContext - if (!StringUtils.equals(asc.getQueue(), newQueueName)) { - LOG.info("Placed application=" + asc.getApplicationId() + " to queue=" - + newQueueName + ", original queue=" + asc.getQueue()); - asc.setQueue(newQueueName); + QueuePath queuePath = QueuePath.toQueuePath(newQueueName); + + if ( queuePath.hasParentQueue() ) { + return new ApplicationPlacementContext(asc, user, queuePath.leafQueue, ruleApplied, queuePath.parentQueue); + } else { + return new ApplicationPlacementContext(asc, user, queuePath.leafQueue, ruleApplied); } } finally { readLock.unlock(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java index d617d161859..e3a1558d081 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java @@ -66,11 +66,22 @@ public String toString() { MappingType type; String source; String queue; + String parentQueue; + + public final String DELIMITER = ":"; public QueueMapping(MappingType type, String source, String queue) { this.type = type; this.source = source; this.queue = queue; + this.parentQueue = null; + } + + public QueueMapping(MappingType type, String source, String queue, String parentQueue) { + this.type = type; + this.source = source; + this.queue = queue; + this.parentQueue = parentQueue; } public String getQueue() { @@ -93,6 +104,13 @@ public boolean equals(Object obj) { return false; } } + + public String toString() { + return type.toString() + DELIMITER + source + DELIMITER + + (parentQueue != null ? + parentQueue + "." + queue : + queue); + } } public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings, @@ -107,21 +125,21 @@ private String getMappedQueue(String user) throws IOException { if (mapping.type == MappingType.USER) { if (mapping.source.equals(CURRENT_USER_MAPPING)) { if (mapping.queue.equals(CURRENT_USER_MAPPING)) { - return user; + return getQueueName(mapping, user); } else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { - return groups.getGroups(user).get(0); + return getQueueName(mapping, groups.getGroups(user).get(0)); } else { - return mapping.queue; + return getQueueName(mapping); } } if (user.equals(mapping.source)) { - return mapping.queue; + return getQueueName(mapping); } } if (mapping.type == MappingType.GROUP) { for (String userGroups : groups.getGroups(user)) { if (userGroups.equals(mapping.source)) { - return mapping.queue; + return getQueueName(mapping); } } } @@ -157,6 +175,22 @@ public String getQueueForApp(ApplicationSubmissionContext asc, String user) return queueName; } + private String getQueueName(QueueMapping mapping) { + if (mapping.parentQueue != null && !mapping.parentQueue.isEmpty()) { + return mapping.parentQueue + "." + mapping.queue; + } else { + return mapping.queue; + } + } + + private String getQueueName(QueueMapping mapping, String leafQueueName) { + if (mapping.parentQueue != null && !mapping.parentQueue.isEmpty()) { + return mapping.parentQueue + "." + leafQueueName; + } else { + return leafQueueName; + } + } + @VisibleForTesting public List getQueueMappings() { return mappings; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 93c41b6747c..76bc1da55fb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -301,4 +302,9 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return True/False to confirm whether app is in final states */ boolean isAppInCompletedStates(); + + /** + * Get application's queue placement context + */ + ApplicationPlacementContext getApplicationQueuePlacementContext(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index cfb8a74f59a..200a2e5bec4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -150,6 +151,7 @@ private final Set applicationTags; private final long attemptFailuresValidityInterval; + private final ApplicationPlacementContext placementContext; private boolean amBlacklistingEnabled = false; private float blacklistDisableThreshold; @@ -412,7 +416,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, List amReqs) { this(applicationId, rmContext, config, name, user, queue, submissionContext, scheduler, masterService, submitTime, applicationType, applicationTags, - amReqs, -1); + amReqs, null, -1); } public RMAppImpl(ApplicationId applicationId, RMContext rmContext, @@ -420,7 +424,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, ApplicationMasterService masterService, long submitTime, String applicationType, Set applicationTags, - List amReqs, long startTime) { + List amReqs, ApplicationPlacementContext placementContext, long startTime) { this.systemClock = SystemClock.getInstance(); @@ -520,6 +524,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD; } } + + this.placementContext = placementContext; } /** @@ -2004,6 +2010,11 @@ public void setApplicationPriority(Priority applicationPriority) { this.applicationPriority = applicationPriority; } + @Override + public ApplicationPlacementContext getApplicationQueuePlacementContext() { + return this.placementContext; + } + /** * Clear Unused fields to free memory. * @param app diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java index 9a67e0188ae..dca80900306 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueue.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; /** * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 250f4e6b9a7..f85ebbd1537 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -115,6 +116,8 @@ volatile Priority priority = Priority.newInstance(0); private Map userWeights = new HashMap(); + protected Clock clock; + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this.labelManager = cs.getRMContext().getNodeLabelManager(); @@ -266,16 +269,16 @@ public String getDefaultNodeLabelExpression() { return defaultLabelExpression; } - void setupQueueConfigs(Resource clusterResource) + protected void setupQueueConfigs(String queuePath, Resource clusterResource) throws IOException { try { writeLock.lock(); // get labels this.accessibleLabels = - csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath()); + csContext.getConfiguration().getAccessibleNodeLabels(queuePath); this.defaultLabelExpression = csContext.getConfiguration().getDefaultNodeLabelExpression( - getQueuePath()); + queuePath); // inherit from parent if labels not set if (this.accessibleLabels == null && parent != null) { @@ -294,7 +297,7 @@ void setupQueueConfigs(Resource clusterResource) this.maximumAllocation = csContext.getConfiguration().getMaximumAllocationPerQueue( - getQueuePath()); + queuePath); // initialized the queue state based on previous state, configured state // and its parent state. @@ -347,6 +350,12 @@ void setupQueueConfigs(Resource clusterResource) } finally { writeLock.unlock(); } + + } + + void setupQueueConfigs(Resource clusterResource) + throws IOException { + setupQueueConfigs(getQueuePath(), clusterResource); } private Map getUserWeightsFromHierarchy() throws IOException { @@ -993,4 +1002,9 @@ public Priority getPriority() { public Map getUserWeights() { return userWeights; } + + public Clock getClock() { + return clock; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreateEnabledParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreateEnabledParentQueue.java new file mode 100644 index 00000000000..d492cd3bc0d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreateEnabledParentQueue.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Auto Creation enabled Parent queue. This queue initially does not have any children to start with and all child + * leaf queues will be auto created. Currently this does not allow other pre-configured leaf or parent queues to + * co-exist along with auto-created leaf queues. The auto creation is limited to leaf queues currently. + */ +public class AutoCreateEnabledParentQueue extends ParentQueue { + + private static final Logger LOG = LoggerFactory.getLogger(AutoCreateEnabledParentQueue.class); + + public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX = "leaf-queue-template"; + + private float autoCreatedLeafQueueCapacity = 0.0f; + private float autoCreatedLeafQueueMaxCapacity = 0.0f; + + private float autoCreatedLeafQueueAbsoluteCapacity = 0.0f; + private float autoCreatedLeafQueueAbsoluteMaxCapacity = 0.0f; + + private int maxAppsForAutoCreatedQueues; + private int maxAppsPerUserForAutoCreatedQueues; + private int userLimit; + private float userLimitFactor; + + private AutoCreatedQueueEntitlementPolicy queueEntitlementPolicy; + + private float absoluteActivatedCapacity = 0.0f; + + public AutoCreateEnabledParentQueue(CapacitySchedulerContext cs, String queueName, + CSQueue parent, CSQueue old) throws IOException { + super(cs, queueName, parent, old); + + String queuePath = super.getQueuePath(); + + queueEntitlementPolicy = cs.getConfiguration().getAutoCreatedQueueEntitlementPolicyClass(queuePath); + queueEntitlementPolicy.init(cs, clock); + + super.setupQueueConfigs(csContext.getClusterResource()); + initializeApplicationAndUserLimits(); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Created Auto Created Enabled Parent Queue: ").append(queueName) + .append("\nwith capacity: [").append(super.getCapacity()) + .append("]\nwith max capacity: [").append(super.getMaximumCapacity()) + .append("\nwith max apps: [").append(maxAppsForAutoCreatedQueues) + .append("]\nwith max apps per user: [") + .append(maxAppsPerUserForAutoCreatedQueues).append("]\nwith user limit: [") + .append(userLimit).append("]\nwith user limit factor: [") + .append(userLimitFactor).append("]."); + LOG.info(queueInfo.toString()); + } + + @Override + public void reinitialize(CSQueue newlyParsedQueue, + Resource clusterResource) throws IOException { + super.reinitialize(newlyParsedQueue, clusterResource); + + //TODO - Get initial entitlements again since it could have changed + try { + writeLock.lock(); + // Sanity check + if (!(newlyParsedQueue instanceof AutoCreateEnabledParentQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + + // Set new configs + setupQueueConfigs(clusterResource); + + initializeApplicationAndUserLimits(); + + // run reinitialize on each existing queue, to trigger absolute cap + // recomputations + for (CSQueue res : this.getChildQueues()) { + res.reinitialize(res, clusterResource); + } + //TODO + } finally { + writeLock.unlock(); + } + } + + private void initializeApplicationAndUserLimits() { + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + autoCreatedLeafQueueMaxCapacity = conf.getAutoCreatedLeafQueueTemplateMaxCapacity(getQueuePath()) / 100; + autoCreatedLeafQueueCapacity = conf.getAutoCreatedLeafQueueTemplateCapacity(getQueuePath()) / 100; + + autoCreatedLeafQueueAbsoluteCapacity = autoCreatedLeafQueueCapacity * getAbsoluteCapacity(); + autoCreatedLeafQueueAbsoluteMaxCapacity = autoCreatedLeafQueueMaxCapacity * getAbsoluteMaximumCapacity(); + int maxApps = conf.getMaximumApplicationsPerQueue(getQueuePath()); + if (maxApps < 0) { + maxApps = + (int) (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super + .getAbsoluteCapacity()); + } + this.userLimit = conf.getUserLimit(getQueuePath()); + this.userLimitFactor = conf.getUserLimitFactor(getQueuePath()); + this.maxAppsForAutoCreatedQueues = maxApps; + int maxAppsPerUser = (int) (maxAppsForAutoCreatedQueues * (userLimit / 100.0f) * userLimitFactor); + this.maxAppsPerUserForAutoCreatedQueues = maxAppsPerUser; + } + + public float getAutoCreatedLeafQueueCapacity() { + return autoCreatedLeafQueueCapacity; + } + + public float getAutoCreatedLeafQueueMaxCapacity() { + return autoCreatedLeafQueueMaxCapacity; + } + + public float getAutoCreatedLeafQueueAbsoluteCapacity() { + return autoCreatedLeafQueueAbsoluteCapacity; + } + + public float getAutoCreatedLeafQueueAbsoluteMaxCapacity() { + return autoCreatedLeafQueueAbsoluteMaxCapacity; + } + + /** + * Number of maximum applications for each of the AutoCreatedQueues in this Plan. + * + * @return maxAppsForAutoCreatedQueues + */ + public int getMaxApplicationsForAutoCreatedQueues() { + return maxAppsForAutoCreatedQueues; + } + + /** + * Number of maximum applications per user for each of the AutoCreatedQueues in + * this Plan. + * + * @return maxAppsPerUserForAutoCreatedQueues + */ + public int getMaxApplicationsPerUserForAutoCreatedQueues() { + return maxAppsPerUserForAutoCreatedQueues; + } + + /** + * User limit value for each of the AutoCreatedQueues in this Plan. + * + * @return userLimit + */ + public int getUserLimitForAutoCreatedQueues() { + return userLimit; + } + + /** + * User limit factor value for each of the AutoCreatedQueues in this Plan. + * + * @return userLimitFactor + */ + public float getUserLimitFactor() { + return userLimitFactor; + } + + protected QueueEntitlement getInitialEntitlement(AutoCreatedLeafQueue leafQueue) { + return new QueueEntitlement(0.0f, 0.0f); + } + + void incAbsoluteAllocatedCapacity(float curAllocated) throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if ( absoluteActivatedCapacity + curAllocated > getAbsoluteCapacity()) { + throw new SchedulerDynamicEditException("Capacity increment exceeds available capacity : " + + absoluteActivatedCapacity + curAllocated + " > " + getAbsoluteCapacity()); + } + + absoluteActivatedCapacity += curAllocated; + } finally { + writeLock.unlock(); + } + } + + void decAbsoluteAllocatedCapacity(float curAllocated) throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if ( absoluteActivatedCapacity - curAllocated < 0) { + throw new SchedulerDynamicEditException("Capacity increment exceeds available capacity"); + } + absoluteActivatedCapacity -= curAllocated; + } finally { + writeLock.unlock(); + } + } + + public int getMaxAppsForAutoCreatedQueues() { + return maxAppsForAutoCreatedQueues; + } + + public int getMaxAppsPerUserForAutoCreatedQueues() { + return maxAppsPerUserForAutoCreatedQueues; + } + + public int getUserLimit() { + return userLimit; + } + + public AutoCreatedQueueEntitlementPolicy getQueueEntitlementPolicy() { + return queueEntitlementPolicy; + } + + public float getAbsoluteActivatedCapacity() { + try { + readLock.lock(); + return absoluteActivatedCapacity; + } finally { + readLock.unlock(); + } + } + + /** + * Checks if entitlement is valid and within allowed parent's absolute capacity + * @param entitlementChange The entitlment chnage that + * @throws SchedulerDynamicEditException if validation fails + */ + public void validateEntitlement(final QueueEntitlementChange entitlementChange) throws SchedulerDynamicEditException { + + if (!(entitlementChange instanceof QueueEntitlementChange.UpdateQueue)) { + throw new SchedulerDynamicEditException("Add/Remove queue is not supported for auto create enabled parent " + + "queues : " + entitlementChange.getQueue().getQueuePath()); + } + + QueueEntitlement entitlement = entitlementChange.getUpdatedEntitlement(); + try { + readLock.lock(); + final float absoluteCapacity = getAbsoluteCapacity(); + final float leafQueueAbsoluteCapacity = entitlement.getCapacity() * absoluteCapacity; + if ( absoluteActivatedCapacity + leafQueueAbsoluteCapacity > absoluteCapacity) { + throw new SchedulerDynamicEditException(" Activating leaf queue exceeds parent queue available capacity " + + absoluteActivatedCapacity + leafQueueAbsoluteCapacity + " > " + absoluteCapacity); + } + } finally { + readLock.unlock(); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java new file mode 100644 index 00000000000..d29270f1ba0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Auto created leaf queue which is managed(created, capacity quotas) by its parent AutoCreateEnabledParentQueue + */ +public class AutoCreatedLeafQueue extends LeafQueue { + + private static final Logger LOG = LoggerFactory + .getLogger(AutoCreatedLeafQueue.class); + + private AutoCreateEnabledParentQueue parent; + private AtomicBoolean isActive = new AtomicBoolean(false); + + private volatile long lastActivationTime; + + private volatile long lastDectivationTime; + + public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName, + AutoCreateEnabledParentQueue parent) throws IOException, SchedulerDynamicEditException { + super(cs, queueName, parent, null); + + updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(), + parent.getUserLimitFactor(), + parent.getMaxApplicationsForAutoCreatedQueues(), + parent.getMaxApplicationsPerUserForAutoCreatedQueues()); + + //Update initial capacity + QueueEntitlement entitlement = parent.getInitialEntitlement(this); + + try { + super.setEntitlement(entitlement); + } catch(SchedulerDynamicEditException sde) { + throw new IllegalStateException(sde); + } + + this.parent = parent; + } + + @Override + public void reinitialize(CSQueue newlyParsedQueue, + Resource clusterResource) throws IOException { + try { + writeLock.lock(); + // Sanity check + if (!(newlyParsedQueue instanceof AutoCreatedLeafQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + super.reinitialize(newlyParsedQueue, clusterResource); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + this, labelManager, null); + + updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(), + parent.getUserLimitFactor(), + parent.getMaxApplicationsForAutoCreatedQueues(), + parent.getMaxApplicationsPerUserForAutoCreatedQueues()); + } finally { + writeLock.unlock(); + } + } + + /** + * Is the queue currently active or deactivated? + * + * @return truw if Active else false + */ + public boolean isActive() { + return isActive.get(); + } + + private boolean activate() { + return isActive.compareAndSet(false, true); + } + + private boolean deactivate() { + return isActive.compareAndSet(true, false); + } + + /** + * Validate specified entitlement + * @param entitlement The entitlement to validate and apply on this leaf queue + * @throws SchedulerDynamicEditException when + */ + public void updateEntitlement(QueueEntitlement entitlement) throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if ( entitlement.getCapacity() > 0) { + if ( isActive() ) { + LOG.info("Queue is already active. Skipping activation : " + getQueuePath()); + } else { + parent.incAbsoluteAllocatedCapacity(parent.getAutoCreatedLeafQueueAbsoluteCapacity()); + setEntitlement(entitlement); + activate(); + lastActivationTime = csContext.getClock().getTime(); + } + } else { + if (!hasPendingApps()) { + if ( !isActive() ) { + LOG.info("Queue is already de-activated. Skipping de-activation : " + getQueuePath()); + } else { + parent.decAbsoluteAllocatedCapacity(parent.getAutoCreatedLeafQueueAbsoluteCapacity()); + setEntitlement(entitlement); + deactivate(); + } + lastDectivationTime = csContext.getClock().getTime(); + } else { + LOG.info("Skipping deactivation since queue has pending apps : " + getQueuePath()); + } + } + } finally { + writeLock.unlock(); + } + } + + @Override + protected void setupConfigurableCapacities() { + CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), + queueCapacities, parent == null ? null : parent.getQueueCapacities()); + } + + private void updateApplicationAndUserLimits(int userLimit, float userLimitFactor, + int maxAppsForAutoCreatedQueues, int maxAppsPerUserForAutoCreatedQueues) { + setUserLimit(userLimit); + setUserLimitFactor(userLimitFactor); + setMaxApplications(maxAppsForAutoCreatedQueues); + maxApplicationsPerUser = maxAppsPerUserForAutoCreatedQueues; + } + + public long getLastActivationTime() { + return lastActivationTime; + } + + public long getLastDectivationTime() { + return lastDectivationTime; + } + + public boolean hasPendingApps() { + return getNumPendingApplications() > 0 || getNumActiveApplications() > 0; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueEntitlementPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueEntitlementPolicy.java new file mode 100644 index 00000000000..48f3856d391 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueEntitlementPolicy.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.util.Clock; + +import java.util.List; + +public interface AutoCreatedQueueEntitlementPolicy { + + /** + * Initialize policy + * @param schedulerContext Capacity Scheduler context + * @param clock a reference to the system clock. + */ + void init(CapacitySchedulerContext schedulerContext, Clock clock); + + /** + * Compute/Adjust child queue capacities for a parent queue based on the configured policy for managing capacity + * for auto created leaf queues - @see AutoCreatedQueueEntitlementPolicy + * + * @param nodePartition the Node partition/label for which we are computing queue entitlements + * @param parentQueue reference to the parent queue underneath which leaf queue entitlements are computed + * @return returns a list of suggested QueueEntitlementChange(s) which may or may not be be enforced by the scheduler + */ + List computeChildQueueEntitlements(String nodePartition, ParentQueue parentQueue, + QueueEntitlementPolicyContext context); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 3a17d1b057d..322c77272b2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -46,12 +45,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.util.Clock; /** * CSQueue represents a node in the tree of diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d91aa55a487..ca41fd86994 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; @@ -132,12 +133,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.UpdateQueueEntitlementsSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; +import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -171,6 +176,8 @@ private int maxAssignPerHeartbeat; + private final Clock clock = new UTCClock(); + private CSConfigurationProvider csConfProvider; @Override @@ -435,7 +442,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext) } catch (Throwable t) { this.conf = oldConf; refreshMaximumAllocation(this.conf.getMaximumAllocation()); - throw new IOException("Failed to re-init queues : "+ t.getMessage(), t); + throw new IOException("Failed to re-init queues : " + t.getMessage(), t); } // update lazy preemption @@ -456,6 +463,7 @@ long getAsyncScheduleInterval() { /** * Schedule on all nodes by starting at a random point. + * * @param cs */ static void schedule(CapacityScheduler cs) { @@ -476,7 +484,8 @@ static void schedule(CapacityScheduler cs) { try { Thread.sleep(cs.getAsyncScheduleInterval()); - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + } } static class AsyncScheduleThread extends Thread { @@ -499,7 +508,7 @@ public void run() { // Don't run schedule if we have some pending backlogs already if (cs.getAsyncSchedulingPendingBacklogs() > 100) { Thread.sleep(1); - } else{ + } else { schedule(cs); } } @@ -597,7 +606,8 @@ public int getPendingBacklogs() { } } - private void updatePlacementRules() throws IOException { + @VisibleForTesting + void updatePlacementRules() throws IOException { // Initialize placement rules Collection placementRuleStrs = conf.getStringCollection( YarnConfiguration.QUEUE_PLACEMENT_RULES); @@ -677,7 +688,7 @@ private void addApplicationOnRecovery( "Application killed on recovery as it was submitted to queue " + queueName + " which no longer exists after restart.")); return; - } else{ + } else { String queueErrorMsg = "Queue named " + queueName + " missing during application recovery." + " Queue removal during recovery is not presently " @@ -698,7 +709,7 @@ private void addApplicationOnRecovery( + queueName + " which is no longer a leaf queue after restart.")); return; - } else{ + } else { String queueErrorMsg = "Queue named " + queueName + " is no longer a leaf queue during application recovery." + " Changing a leaf queue to a parent queue during recovery is" @@ -744,15 +755,43 @@ private void addApplication(ApplicationId applicationId, } // Sanity checks. CSQueue queue = getQueue(queueName); + if (queue == null) { - String message = + //Could be a potential auto-created leaf queue + try { + LeafQueue autoCreatedLeafQueue = autoCreateLeafQueue(applicationId); + if (autoCreatedLeafQueue == null) { + final String message = "Application " + applicationId + " submitted by user " + user + " to unknown queue: " + queueName; + this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, message)); - return; + } else { + queue = autoCreatedLeafQueue; + } + + } catch (SchedulerDynamicEditException e) { + LOG.error("Could not auto-create leaf queue due to : ", e); + final String message = + "Application " + applicationId + " submission by user " + user + + " to queue: " + queueName + " failed : " + e.getMessage(); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + } catch (IOException e) { + //TODO - add retries + final String message = + "Application " + applicationId + " submission by user " + user + + " to queue: " + queueName + " failed : " + e.getMessage(); + LOG.error("Could not auto-create leaf queue due to : ", e); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + } } + if (!(queue instanceof LeafQueue)) { String message = "Application " + applicationId + " submitted by user " + user @@ -832,7 +871,7 @@ private void addApplicationAttempt( LOG.debug(applicationAttemptId + " is recovering. Skipping notifying ATTEMPT_ADDED"); } - } else{ + } else { rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(applicationAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); @@ -858,7 +897,7 @@ private void doneApplication(ApplicationId applicationId, if (!(queue instanceof LeafQueue)) { LOG.error("Cannot finish application " + "from non-leaf queue: " + queue .getQueueName()); - } else{ + } else { queue.finishApplication(applicationId, application.getUser()); } application.stop(finalState); @@ -917,7 +956,7 @@ private void doneApplicationAttempt( if (!(queue instanceof LeafQueue)) { LOG.error( "Cannot finish application " + "from non-leaf queue: " + queueName); - } else{ + } else { queue.finishApplicationAttempt(attempt, queue.getQueueName()); } } finally { @@ -1094,7 +1133,7 @@ private void updateLabelsOnNode(NodeId nodeId, String newPartition; if (newLabels.isEmpty()) { newPartition = RMNodeLabelsManager.NO_LABEL; - } else{ + } else { newPartition = newLabels.iterator().next(); } @@ -1108,7 +1147,7 @@ private void updateLabelsOnNode(NodeId nodeId, if (null != application) { application.nodePartitionUpdated(rmContainer, oldPartition, newPartition); - } else{ + } else { LOG.warn("There's something wrong, some RMContainers running on" + " a node, but we cannot find SchedulerApplicationAttempt " + "for it. Node=" + node.getNodeID() + " applicationAttemptId=" @@ -1287,7 +1326,7 @@ private CSAssignment allocateContainerOnSingleNode(PlacementSet ps, @Override public void handle(SchedulerEvent event) { - switch(event.getType()) { + switch (event.getType()) { case NODE_ADDED: { - NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; + NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; addNode(nodeAddedEvent.getAddedRMNode()); recoverContainersOnNode(nodeAddedEvent.getContainerReports(), nodeAddedEvent.getAddedRMNode()); @@ -1437,14 +1476,14 @@ public void handle(SchedulerEvent event) { break; case NODE_REMOVED: { - NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; + NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent) event; removeNode(nodeRemovedEvent.getRemovedRMNode()); } break; case NODE_RESOURCE_UPDATE: { NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = - (NodeResourceUpdateSchedulerEvent)event; + (NodeResourceUpdateSchedulerEvent) event; updateNodeAndQueueResource(nodeResourceUpdatedEvent.getRMNode(), nodeResourceUpdatedEvent.getResourceOption()); } @@ -1459,7 +1498,7 @@ public void handle(SchedulerEvent event) { break; case NODE_UPDATE: { - NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; + NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent) event; nodeUpdate(nodeUpdatedEvent.getRMNode()); } break; @@ -1482,7 +1521,7 @@ public void handle(SchedulerEvent event) { break; case APP_REMOVED: { - AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event; + AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent) event; doneApplication(appRemovedEvent.getApplicationID(), appRemovedEvent.getFinalState()); } @@ -1542,7 +1581,7 @@ public void handle(SchedulerEvent event) { case MARK_CONTAINER_FOR_PREEMPTION: { ContainerPreemptEvent preemptContainerEvent = - (ContainerPreemptEvent)event; + (ContainerPreemptEvent) event; ApplicationAttemptId aid = preemptContainerEvent.getAppId(); RMContainer containerToBePreempted = preemptContainerEvent.getContainer(); markContainerForPreemption(aid, containerToBePreempted); @@ -1550,7 +1589,7 @@ public void handle(SchedulerEvent event) { break; case MARK_CONTAINER_FOR_KILLABLE: { - ContainerPreemptEvent containerKillableEvent = (ContainerPreemptEvent)event; + ContainerPreemptEvent containerKillableEvent = (ContainerPreemptEvent) event; RMContainer killableContainer = containerKillableEvent.getContainer(); markContainerForKillable(killableContainer); } @@ -1564,11 +1603,60 @@ public void handle(SchedulerEvent event) { } } break; + case UPDATE_QUEUE_ENTITLEMENT: + { + UpdateQueueEntitlementsSchedulerEvent updateEntitlementsEvent = + (UpdateQueueEntitlementsSchedulerEvent) event; + try { + vaidateAndApplyQueueEntitlements(updateEntitlementsEvent); + } catch (SchedulerDynamicEditException e) { + LOG.error("Could not update queue entitlements ", e); + //TODO - propagate exceptions here to some diagnostics? + throw new IllegalStateException(e); + } + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } } + private void vaidateAndApplyQueueEntitlements(final UpdateQueueEntitlementsSchedulerEvent updateEntitlementsEvent) + throws SchedulerDynamicEditException { + + CSQueue queue = updateEntitlementsEvent.getQueue(); + try { + writeLock.lock(); + for (QueueEntitlementChange entitlementChange : + updateEntitlementsEvent.getChildQueueEntitlementUpdates()) { + switch (entitlementChange.getQueueAction()) { + case ADD_QUEUE: + CSQueue childQueueToBeAdded = entitlementChange.getQueue(); + addQueue(childQueueToBeAdded); + case REMOVE_QUEUE: + CSQueue childQueueToBeRemoved = entitlementChange.getQueue(); + removeQueue(childQueueToBeRemoved.getQueueName()); + case UPDATE_QUEUE: + if (queue instanceof AutoCreateEnabledParentQueue) { + AutoCreateEnabledParentQueue parentQueue = (AutoCreateEnabledParentQueue) queue; + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) entitlementChange.getQueue(); + parentQueue.validateEntitlement(entitlementChange); + + QueueEntitlement newEntitlement = entitlementChange.getUpdatedEntitlement(); + leafQueue.updateEntitlement(newEntitlement); + } else { + LOG.error("Queue " + queue.getQueueName() + " is not a AutoEnabledParentQueue." + + " Ignoring update " + updateEntitlementsEvent); + throw new SchedulerDynamicEditException("Queue " + queue.getQueueName() + " is not a AutoEnabledParentQueue." + + " Ignoring update " + updateEntitlementsEvent); + } + } + } + } finally { + writeLock.unlock(); + } + } + /** * Process node labels update. */ @@ -1732,7 +1820,7 @@ public void recover(RMState state) throws Exception { @Override public void killReservedContainer(RMContainer container) { - if(LOG.isDebugEnabled()){ + if (LOG.isDebugEnabled()) { LOG.debug(SchedulerEventType.KILL_RESERVED_CONTAINER + ":" + container.toString()); } @@ -1748,7 +1836,7 @@ public void killReservedContainer(RMContainer container) { @Override public void markContainerForPreemption(ApplicationAttemptId aid, RMContainer cont) { - if(LOG.isDebugEnabled()){ + if (LOG.isDebugEnabled()) { LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION + ": appAttempt:" + aid.toString() + " container: " + cont.toString()); @@ -1772,7 +1860,7 @@ public void markContainerForKillable( super.completedContainer(killableContainer, SchedulerUtils .createPreemptedContainerStatus(killableContainer.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); - } else{ + } else { FiCaSchedulerNode node = (FiCaSchedulerNode) getSchedulerNode( killableContainer.getAllocatedNode()); @@ -1903,7 +1991,7 @@ private String resolveReservationQueueName(String queueName, } // use the reservation queue to run the app queueName = resQName; - } else{ + } else { // use the default child queue of the plan for unreserved apps queueName = getDefaultReservationQueueName(queueName); } @@ -2008,7 +2096,7 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement) return; } newQueue.setEntitlement(entitlement); - } else{ + } else { throw new SchedulerDynamicEditException( "Sum of child queues would exceed 100% for PlanQueue: " + parent .getQueueName()); @@ -2139,7 +2227,9 @@ private void checkQueuePartition(FiCaSchedulerApp app, LeafQueue dest) } } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public EnumSet getSchedulingResourceTypes() { if (calculator.getClass().getName() @@ -2160,7 +2250,7 @@ public Resource getMaximumResourceCapability(String queueName) { LOG.error("queue " + queueName + " is not an leaf queue"); return getMaximumResourceCapability(); } - return ((LeafQueue)queue).getMaximumAllocation(); + return ((LeafQueue) queue).getMaximumAllocation(); } private String handleMoveToPlanQueue(String targetQueueName) { @@ -2308,7 +2398,9 @@ public ResourceUsage getClusterResourceUsage() { FiCaSchedulerApp app = getApplicationAttempt( rmContainer.getApplicationAttemptId()); - if (null == app) { return null; } + if (null == app) { + return null; + } NodeId nodeId; // Get nodeId @@ -2365,7 +2457,7 @@ public void submitResourceCommitRequest(Resource cluster, if (scheduleAsynchronously) { // Submit to a commit thread and commit it async-ly resourceCommitterService.addNewCommitRequest(request); - } else{ + } else { // Otherwise do it sync-ly. tryCommit(cluster, request); } @@ -2487,7 +2579,7 @@ public void tryCommit(Resource cluster, ResourceCommitRequest r) { if (app.accept(cluster, request)) { app.apply(cluster, request); LOG.info("Allocation proposal accepted"); - } else{ + } else { LOG.info("Failed to accept allocation proposal"); } @@ -2511,6 +2603,11 @@ public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() { return this.queueManager; } + @Override + public Clock getClock() { + return clock; + } + /** * Try to move a reserved container to a targetNode. * If the targetNode is reserved by another application (other than this one). @@ -2619,6 +2716,25 @@ public long getMaximumApplicationLifetime(String queueName) { return ((LeafQueue) queue).getMaximumApplicationLifetime(); } + private LeafQueue autoCreateLeafQueue(final ApplicationId applicationId) throws SchedulerDynamicEditException, IOException { + LeafQueue autoCreatedLeafQueue = null; + RMApp rmApp = this.rmContext.getRMApps().get(applicationId); + ApplicationPlacementContext placementContext = rmApp.getApplicationQueuePlacementContext(); + String leafQueueName = placementContext.getQueue(); + String parentQueueName = placementContext.getParentQueue(); + + if (parentQueueName != null && !parentQueueName.isEmpty()) { + CSQueue parentQueue = getQueue(parentQueueName); + if (parentQueue != null && conf.isAutoCreateChildQueueEnabled(parentQueue.getQueuePath())) { + AutoCreateEnabledParentQueue autoCreatingParentQueue = (AutoCreateEnabledParentQueue) parentQueue; + autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName, autoCreatingParentQueue); + queueManager.addQueue(leafQueueName, autoCreatedLeafQueue); + autoCreatingParentQueue.addChildQueue(autoCreatedLeafQueue); + } + } + return autoCreatedLeafQueue; + } + @Override public boolean isConfigurationMutable() { return csConfProvider instanceof MutableConfigurationProvider; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 3a519ecf5f1..6a3334fb959 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.authorize.AccessControlList; @@ -185,15 +186,21 @@ @Private public static final String NONE_ACL = " "; - @Private public static final String ENABLE_USER_METRICS = - PREFIX +"user-metrics.enable"; - @Private public static final boolean DEFAULT_ENABLE_USER_METRICS = false; + @Private + public static final String ENABLE_USER_METRICS = + PREFIX + "user-metrics.enable"; + @Private + public static final boolean DEFAULT_ENABLE_USER_METRICS = false; - /** ResourceComparator for scheduling. */ - @Private public static final String RESOURCE_CALCULATOR_CLASS = + /** + * ResourceComparator for scheduling. + */ + @Private + public static final String RESOURCE_CALCULATOR_CLASS = PREFIX + "resource-calculator"; - @Private public static final Class + @Private + public static final Class DEFAULT_RESOURCE_CALCULATOR_CLASS = DefaultResourceCalculator.class; @Private @@ -308,7 +315,9 @@ @Private public static final boolean DEFAULT_ASSIGN_MULTIPLE_ENABLED = true; - /** Maximum number of containers to assign on each check-in. */ + /** + * Maximum number of containers to assign on each check-in. + */ @Private public static final String MAX_ASSIGN_PER_HEARTBEAT = PREFIX + "per-node-heartbeat.maximum-container-assignments"; @@ -316,6 +325,21 @@ @Private public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1; + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED = + "auto-create-child-queue.enabled"; + + @Private + public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false; + + @Private + public static final String AUTO_CREATED_QUEUE_ENTITLEMENT_POLICY = + "auto-created-queue-entitlement-policy"; + + @InterfaceAudience.Private + public static final String DEFAULT_AUTO_CREATED_QUEUE_ENTITLEMENT_POLICY = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.GuaranteedOrZeroCapacityOverTimePolicy"; + AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); public CapacitySchedulerConfiguration() { @@ -365,18 +389,20 @@ public float getMaximumApplicationMasterResourcePercent() { /** * Get the maximum applications per queue setting. + * * @param queue name of the queue * @return setting specified or -1 if not set */ public int getMaximumApplicationsPerQueue(String queue) { int maxApplicationsPerQueue = getInt(getQueuePrefix(queue) + MAXIMUM_APPLICATIONS_SUFFIX, - (int)UNDEFINED); + (int) UNDEFINED); return maxApplicationsPerQueue; } /** * Get the maximum am resource percent per queue setting. + * * @param queue name of the queue * @return per queue setting or defaults to the global am-resource-percent * setting if per queue setting not present @@ -800,16 +826,15 @@ public void setQueuePriority(String queue, int priority) { * Get the per queue setting for the maximum limit to allocate to * each container request. * - * @param queue - * name of the queue + * @param queue name of the queue * @return setting specified per queue else falls back to the cluster setting */ public Resource getMaximumAllocationPerQueue(String queue) { String queuePrefix = getQueuePrefix(queue); long maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, - (int)UNDEFINED); + (int) UNDEFINED); int maxAllocationVcoresPerQueue = getInt( - queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED); + queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int) UNDEFINED); if (LOG.isDebugEnabled()) { LOG.debug("max alloc mb per queue for " + queue + " is " + maxAllocationMbPerQueue); @@ -817,11 +842,11 @@ public Resource getMaximumAllocationPerQueue(String queue) { + maxAllocationVcoresPerQueue); } Resource clusterMax = getMaximumAllocation(); - if (maxAllocationMbPerQueue == (int)UNDEFINED) { + if (maxAllocationMbPerQueue == (int) UNDEFINED) { LOG.info("max alloc mb per queue for " + queue + " is undefined"); maxAllocationMbPerQueue = clusterMax.getMemorySize(); } - if (maxAllocationVcoresPerQueue == (int)UNDEFINED) { + if (maxAllocationVcoresPerQueue == (int) UNDEFINED) { LOG.info("max alloc vcore per queue for " + queue + " is undefined"); maxAllocationVcoresPerQueue = clusterMax.getVirtualCores(); } @@ -906,14 +931,18 @@ public boolean getOverrideWithQueueMappings() { DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE); } + @VisibleForTesting + public void setOverrideWithQueueMappings(boolean overrideWithQueueMappings) { + setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, + overrideWithQueueMappings); + } + /** * Returns a collection of strings, trimming leading and trailing whitespeace * on each value * - * @param str - * String to parse - * @param delim - * delimiter to separate the values + * @param str String to parse + * @param delim delimiter to separate the values * @return Collection of parsed elements. */ private static Collection getTrimmedStringCollection(String str, @@ -945,14 +974,14 @@ public boolean getOverrideWithQueueMappings() { for (String mappingValue : mappingsString) { String[] mapping = getTrimmedStringCollection(mappingValue, ":") - .toArray(new String[] {}); + .toArray(new String[]{}); if (mapping.length != 3 || mapping[1].length() == 0 || mapping[2].length() == 0) { throw new IllegalArgumentException( "Illegal queue mapping " + mappingValue); } - QueueMapping m; + QueueMapping m = null; try { QueueMapping.MappingType mappingType; if (mapping[0].equals("u")) { @@ -963,10 +992,22 @@ public boolean getOverrideWithQueueMappings() { throw new IllegalArgumentException( "unknown mapping prefix " + mapping[0]); } + + QueuePath queuePath = QueuePath.toQueuePath(mapping[2]); + if(queuePath.hasParentQueue()) { + m = new QueueMapping( + mappingType, + mapping[1], + queuePath.leafQueue, + queuePath.parentQueue + ); + } else { m = new QueueMapping( mappingType, mapping[1], - mapping[2]); + queuePath.leafQueue + ); + } } catch (Throwable t) { throw new IllegalArgumentException( "Illegal queue mapping " + mappingValue); @@ -987,6 +1028,7 @@ public boolean isReservable(String queue) { } public void setReservable(String queue, boolean isReservable) { + setBoolean(getQueuePrefix(queue) + IS_RESERVABLE, isReservable); setBoolean(getQueuePrefix(queue) + IS_RESERVABLE, isReservable); LOG.debug("here setReservableQueue: queuePrefix=" + getQueuePrefix(queue) + ", isReservableQueue=" + isReservable(queue)); @@ -1175,36 +1217,46 @@ public boolean getLazyPreemptionEnabled() { private static final String INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX = "intra-queue-preemption."; - /** If true, run the policy but do not affect the cluster with preemption and - * kill events. */ + /** + * If true, run the policy but do not affect the cluster with preemption and + * kill events. + */ public static final String PREEMPTION_OBSERVE_ONLY = PREEMPTION_CONFIG_PREFIX + "observe_only"; public static final boolean DEFAULT_PREEMPTION_OBSERVE_ONLY = false; - /** Time in milliseconds between invocations of this policy */ + /** + * Time in milliseconds between invocations of this policy + */ public static final String PREEMPTION_MONITORING_INTERVAL = PREEMPTION_CONFIG_PREFIX + "monitoring_interval"; public static final long DEFAULT_PREEMPTION_MONITORING_INTERVAL = 3000L; - /** Time in milliseconds between requesting a preemption from an application - * and killing the container. */ + /** + * Time in milliseconds between requesting a preemption from an application + * and killing the container. + */ public static final String PREEMPTION_WAIT_TIME_BEFORE_KILL = PREEMPTION_CONFIG_PREFIX + "max_wait_before_kill"; public static final long DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL = 15000L; - /** Maximum percentage of resources preemptionCandidates in a single round. By + /** + * Maximum percentage of resources preemptionCandidates in a single round. By * controlling this value one can throttle the pace at which containers are * reclaimed from the cluster. After computing the total desired preemption, - * the policy scales it back within this limit. */ + * the policy scales it back within this limit. + */ public static final String TOTAL_PREEMPTION_PER_ROUND = PREEMPTION_CONFIG_PREFIX + "total_preemption_per_round"; public static final float DEFAULT_TOTAL_PREEMPTION_PER_ROUND = 0.1f; - /** Maximum amount of resources above the target capacity ignored for + /** + * Maximum amount of resources above the target capacity ignored for * preemption. This defines a deadzone around the target capacity that helps * prevent thrashing and oscillations around the computed target balance. * High values would slow the time to capacity and (absent natural - * completions) it might prevent convergence to guaranteed capacity. */ + * completions) it might prevent convergence to guaranteed capacity. + */ public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY = PREEMPTION_CONFIG_PREFIX + "max_ignored_over_capacity"; public static final float DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1f; @@ -1214,7 +1266,8 @@ public boolean getLazyPreemptionEnabled() { * the rate of geometric convergence into the deadzone ({@link * #PREEMPTION_MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5 * will reclaim almost 95% of resources within 5 * {@link - * #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */ + * #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. + */ public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR = PREEMPTION_CONFIG_PREFIX + "natural_termination_factor"; public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR = @@ -1467,11 +1520,12 @@ public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation( /** * Get the weights of all users at this queue level from the configuration. * Used in computing user-specific user limit, relative to other users. + * * @param queuePath full queue path * @return map of user weights, if they exists. Otherwise, return empty map. */ public Map getAllUserWeightsForQueue(String queuePath) { - Map userWeights = new HashMap (); + Map userWeights = new HashMap(); String qPathPlusPrefix = getQueuePrefix(queuePath).replaceAll("\\.", "\\\\.") + USER_SETTINGS + "\\."; @@ -1522,4 +1576,242 @@ public long getDefaultLifetimePerQueue(String queue) { public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) { setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime); } + + private static final String QUEUE_ENTITLEMENT_CONFIG_PREFIX = + "yarn.resourcemanager.monitor.capacity.queue-entitlement."; + + /** + * Time in milliseconds between invocations of this policy + */ + public static final String QUEUE_ENTITLEMENT_MONITORING_INTERVAL = + QUEUE_ENTITLEMENT_CONFIG_PREFIX + "monitoring_interval"; + public static final long DEFAULT_QUEUE_ENTITLEMENT_MONITORING_INTERVAL = 3000L; + + /** + * If true, run the policy but do not apply entitlements in the scheduler + */ + public static final String QUEUE_ENTITLEMENT_OBSERVE_ONLY = + QUEUE_ENTITLEMENT_CONFIG_PREFIX + "observe_only"; + public static final boolean DEFAULT_QUEUE_ENTITLEMENT_OBSERVE_ONLY = false; + + /** + * If true, this queue will be created as a Parent Queue with Auto Created leaf queues + * @param queuePath The queues ptah + * @return true if auto create is enabled for child queues else false. Default is false + */ + public boolean isAutoCreateChildQueueEnabled(String queuePath) { + boolean isAutoCreateEnabled = + getBoolean(getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_ENABLED, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED); + return isAutoCreateEnabled; + } + + @VisibleForTesting + public void setAutoCreateChildQueueEnabled(String queuePath, boolean autoCreationEnabled) { + setBoolean(getQueuePrefix(queuePath) + AUTO_CREATE_CHILD_QUEUE_ENABLED, autoCreationEnabled); + } + + /** + * Get the auto created leaf queue's minimum guaranteed capacity. + * Leaf queue's template capacities are configured at the parent queue + * @param queuePath The parent queue's path + * @return the leaf queue's template capacity + */ + public float getAutoCreatedLeafQueueTemplateCapacity(String queuePath) { + return + getFloat(getAutoCreatedQueueTemplateConfPrefix(queuePath) + CAPACITY, + (float) UNDEFINED); + } + + @VisibleForTesting + public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath, float capacity) { + setFloat(getAutoCreatedQueueTemplateConfPrefix(queuePath) + + CAPACITY, + capacity); + } + + /** + * Get the auto created leaf queue's maximum capacity + * @param queuePath The parent queue's path + * @return the leaf queue's template capacity + * @return + */ + public float getAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath) { + return + getFloat(getAutoCreatedQueueTemplateConfPrefix(queuePath) + MAXIMUM_CAPACITY, + (float) UNDEFINED); + } + + @VisibleForTesting + public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath, float maxCapacity) { + setFloat(getAutoCreatedQueueTemplateConfPrefix(queuePath) + + MAXIMUM_CAPACITY, + maxCapacity); + } + + @VisibleForTesting + public void setAutoCreatedLeafQueueTemplateMaxApplicationsPerQueue(String queue, int maxApplicationsPerQueue) { + setInt(getAutoCreatedQueueTemplateConfPrefix(queue) + MAXIMUM_APPLICATIONS_SUFFIX, + maxApplicationsPerQueue); + } + + @VisibleForTesting + public void setQueuePlacementRules(Collection queuePlacementRules) { + if (queuePlacementRules == null) { + return; + } + String str = StringUtils.join(",", queuePlacementRules); + setStrings(YarnConfiguration.QUEUE_PLACEMENT_RULES, str); + } + + public Collection getQueuePlacementRules() { + return getStringCollection( + YarnConfiguration.QUEUE_PLACEMENT_RULES); + } + + public void setQueueMappings(List queueMappings) { + if (queueMappings == null) { + return; + } + + List queueMappingStrs = new ArrayList<>(); + for (QueueMapping mapping : queueMappings) { + queueMappingStrs.add(mapping.toString()); + } + + setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs)); + } + + /** + * Get the auto created leaf queue's template configuration prefix + * Leaf queue's template capacities are configured at the parent queue + * @param queuePath parent queue's path + * @return Config prefix for leaf queue template configurations + */ + public String getAutoCreatedQueueTemplateConfPrefix(String queuePath) { + return getQueuePrefix(queuePath) + AutoCreateEnabledParentQueue + .AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX + DOT; + } + + /** + * Queue Entitlement computation policy for Auto Created queues + * @param queue The queue's path + * @return Configured policy class name + */ + public String getAutoCreatedQueueEntitlementPolicy(String queue) { + String autoCreatedQueueEntitlementPolicy = + get(getQueuePrefix(queue) + AUTO_CREATED_QUEUE_ENTITLEMENT_POLICY, + DEFAULT_AUTO_CREATED_QUEUE_ENTITLEMENT_POLICY); + return autoCreatedQueueEntitlementPolicy; + } + + /** + * Get The policy class configured to manage capacities for auto created leaf queues under the specified parent + * @param queueName The parent queue's name + * @return The policy class configured to manage capacities for auto created leaf queues under the specified parent + * queue + */ + protected AutoCreatedQueueEntitlementPolicy getAutoCreatedQueueEntitlementPolicyClass(String queueName) { + + String queueEntitlementPolicyClassName = + getAutoCreatedQueueEntitlementPolicy(queueName); + LOG.info("Using Auto Created Queue Entitlement Policy: " + queueEntitlementPolicyClassName + + " for queue: " + queueName); + try { + Class queueEntitlementPolicyClazz = + getClassByName(queueEntitlementPolicyClassName); + if (AutoCreatedQueueEntitlementPolicy.class.isAssignableFrom(queueEntitlementPolicyClazz)) { + return (AutoCreatedQueueEntitlementPolicy) ReflectionUtils.newInstance(queueEntitlementPolicyClazz, + this); + } else { + throw new YarnRuntimeException("Class: " + queueEntitlementPolicyClassName + + " not instance of " + AutoCreatedQueueEntitlementPolicy.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Could not instantiate AdmissionPolicy: " + + queueEntitlementPolicyClassName + " for queue: " + queueName, e); + } + } + + @VisibleForTesting + public void setQueueEntitlementPolicyObserveOnlyFlag(final String queue, final boolean isObserveOnly) { + setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, + isObserveOnly); + } + + /** + * Can be used to parse a hierarchical queue path into a QueuePath + * eg: root.marketing.user1 would have queuePrefix as root, parentQueue as marketing + * and leaf queue as user1 + */ + public static class QueuePath { + + public String queuePrefix; + public String parentQueue; + public String leafQueue; + + public QueuePath(final String leafQueue) { + this.leafQueue = leafQueue; + } + + public QueuePath(final String queuePrefix, final String parentQueue, final String leafQueue) { + this(parentQueue, leafQueue); + this.queuePrefix = queuePrefix; + } + + public QueuePath(final String parentQueue, final String leafQueue) { + this.parentQueue = parentQueue; + this.leafQueue = leafQueue; + } + + public String getParentQueue() { + return parentQueue; + } + + public String getLeafQueue() { + return leafQueue; + } + + public String getQueuePrefix() { + return queuePrefix; + } + + public boolean hasParentQueue() { + return parentQueue != null; + } + + public boolean hasQueuePrefix() { + return queuePrefix != null; + } + + @Override + public String toString() { + return queuePrefix + DOT + parentQueue + leafQueue; + } + + /** + * extract immediate parent and leaf queue names from the specified queue name + * if queue name is not prefixed with parent queue name, returns only leaf queue + */ + //TODO - Do not allow '.' in auto created queue names + //TODO - Fix this to handle complete queue paths and parse queuePrefix + public static QueuePath toQueuePath(String queueName) { + QueuePath queuePath; + if (queueName.indexOf('.') > -1) { + String[] queueNames = queueName.split("\\."); + String parentQueue = queueNames[queueNames.length - 2].trim(); + if (!parentQueue.isEmpty()) { + String leafQueue = queueNames[queueNames.length - 1].trim(); + queuePath = new QueuePath(parentQueue, leafQueue); + } else { + queuePath = new QueuePath(queueName); + } + } else { + queuePath = new QueuePath(queueName); + } + return queuePath; + } + } } + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 7c918a53620..f2ef9d53685 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; /** @@ -94,4 +95,7 @@ * @return if configuration is mutable */ boolean isConfigurationMutable(); + + Clock getClock(); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 48c289f0cde..1ad396fdd6a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.Clock; /** * @@ -221,6 +222,7 @@ static CSQueue parseQueue( : (parent.getQueuePath() + "." + queueName); String[] childQueueNames = conf.getQueues(fullQueueName); boolean isReservableQueue = conf.isReservable(fullQueueName); + boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(fullQueueName); if (childQueueNames == null || childQueueNames.length == 0) { if (null == parent) { throw new IllegalStateException( @@ -249,6 +251,10 @@ static CSQueue parseQueue( ((PlanQueue) queue).setChildQueues(childQueues); queues.put(defReservationId, resQueue); + } else if (isAutoCreateEnabled) { + queue = new AutoCreateEnabledParentQueue(csContext, queueName, + parent, oldQueues.get(queueName)); + } else { queue = new LeafQueue(csContext, queueName, parent, @@ -262,9 +267,16 @@ static CSQueue parseQueue( throw new IllegalStateException( "Only Leaf Queues can be reservable for " + queueName); } - ParentQueue parentQueue = + + ParentQueue parentQueue; + if (isAutoCreateEnabled) { + parentQueue = new AutoCreateEnabledParentQueue(csContext, queueName, + parent, oldQueues.get(queueName)); + } else { + parentQueue = new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); + } // Used only for unit tests queue = hook.hook(parentQueue); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/GuaranteedOrZeroCapacityOverTimePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/GuaranteedOrZeroCapacityOverTimePolicy.java new file mode 100644 index 00000000000..eeb5d7dc1bc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/GuaranteedOrZeroCapacityOverTimePolicy.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.Clock; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; + +/** + * + * Capacity Management policy for auto created leaf queues which sets the Queue Entitlement to 0 when there are no + * pending or running apps under that queue and enabled capacity to a pre-configured guaranteed capacity when an + * application attempt is pending or scheduleable + */ +public class GuaranteedOrZeroCapacityOverTimePolicy implements AutoCreatedQueueEntitlementPolicy { + + private CapacitySchedulerContext scheduler; + private Clock clock; + + private static final Log LOG = + LogFactory.getLog(GuaranteedOrZeroCapacityOverTimePolicy.class); + + private final QueueEntitlement ZERO_CAPACITY_ENTITLEMENT = new QueueEntitlement(0.0f, 1.0f); + + /** + * Comparator that orders applications by their submit time + */ + private class PendingApplicationComparator implements Comparator, Serializable { + + @Override + public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) { + RMApp rmApp1 = scheduler.getRMContext().getRMApps().get(app1.getApplicationId()); + RMApp rmApp2 = scheduler.getRMContext().getRMApps().get(app2.getApplicationId()); + return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime()); + } + } + + private PendingApplicationComparator applicationComparator = new PendingApplicationComparator(); + + @Override + public void init(final CapacitySchedulerContext schedulerContext, final Clock clock) { + this.scheduler = schedulerContext; + this.clock = clock; + } + + @Override + public List computeChildQueueEntitlements(final String nodePartition, ParentQueue parentQueue, + QueueEntitlementPolicyContext policyContext) { + + List computedEntitlements = new ArrayList<>(); + + AutoCreateEnabledParentQueue autoCreateEnabledParentQueue = ((AutoCreateEnabledParentQueue) parentQueue); + //deactivate + Set deactivatedLeafQueues = deactivateChildQueuesIfInActive(nodePartition, autoCreateEnabledParentQueue, + computedEntitlements, policyContext); + + float deactivatedCapacity = computedEntitlements.size() * ((AutoCreateEnabledParentQueue) parentQueue) + .getAutoCreatedLeafQueueAbsoluteCapacity(); + + float sumOfChildActivatedCapacity = policyContext.getActivatedCapacity(nodePartition, autoCreateEnabledParentQueue.getQueueName()); + QueueCapacities qc = policyContext.getQueueCapacities(nodePartition, autoCreateEnabledParentQueue.getQueueName()); + + //Check if we need to activate anything at all? + if (qc.getAbsoluteCapacity() - sumOfChildActivatedCapacity + deactivatedCapacity > EPSILON) { + + //sort applications across leaf queues by submit time + List pendingApps = policyContext.getSortedPendingApplications(nodePartition, + autoCreateEnabledParentQueue.getQueueName(), applicationComparator); + + int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated(qc.getAbsoluteCapacity(), sumOfChildActivatedCapacity, + ((AutoCreateEnabledParentQueue) parentQueue).getAutoCreatedLeafQueueAbsoluteCapacity()); + + LinkedHashSet leafQueuesToBeActivated = getSortedLeafQueues(pendingApps, maxLeafQueuesTobeActivated, deactivatedLeafQueues); + activateChildQueues(autoCreateEnabledParentQueue, leafQueuesToBeActivated, computedEntitlements, deactivatedCapacity); + } + + return computedEntitlements; + } + + private LinkedHashSet getSortedLeafQueues(final List pendingApps, int leafQueuesNeeded, + Set deactivatedQueues) { + LinkedHashSet leafQueues = new LinkedHashSet<>(leafQueuesNeeded); + int ctr = 0; + for (FiCaSchedulerApp app : pendingApps) { + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) app.getCSLeafQueue(); + + //Check if leafQueue is not active already and has any pending apps + if (ctr < leafQueuesNeeded) { + if (!leafQueue.isActive() && leafQueue.hasPendingApps()) { + final String leafQueueName = app.getQueue().getQueueName(); + if (deactivatedQueues.contains(leafQueue.getQueueName())) { + RMApp rmApp = scheduler.getRMContext().getRMApps().get(app.getApplicationId()); + if (!rmApp.isAppInCompletedStates() ) { + if (!leafQueues.contains(leafQueueName)) { + leafQueues.add(leafQueueName); + } + ctr++; + } + } else { + if (!leafQueues.contains(leafQueueName)) { + leafQueues.add(leafQueueName); + } + ctr++; + } + } + } else { + break; + } + } + return leafQueues; + } + + private Set deactivateChildQueuesIfInActive(String nodePartition, ParentQueue parentQueue, + List + computedEntitlements, QueueEntitlementPolicyContext context) { + Set deactivatedQueues = new HashSet<>(); + List childQueueNames = context.getChildQueueNames(nodePartition, parentQueue.getQueueName()); + for (String childQueueName : childQueueNames) { + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager().getQueue + (childQueueName); + + if (leafQueue != null) { + if (leafQueue.isActive() && !leafQueue.hasPendingApps()) { + QueueEntitlement newEntitlement = ZERO_CAPACITY_ENTITLEMENT; + if (newEntitlement != null) { + computedEntitlements.add(new QueueEntitlementChange.UpdateQueue(leafQueue, newEntitlement)); + deactivatedQueues.add(leafQueue.getQueueName()); + } + } + } else { + LOG.warn("Could not find queue in scheduler while trying to deactivate " + childQueueName); + } + } + return deactivatedQueues; + } + + + protected void activateChildQueues(AutoCreateEnabledParentQueue autoCreateEnabledParentQueue, + LinkedHashSet leafQueuesToBeActivated, List computedEntitlements, final float deactivatedCapacity) { + + float availableCapacity = autoCreateEnabledParentQueue.getAbsoluteCapacity() - autoCreateEnabledParentQueue + .getAbsoluteActivatedCapacity() + deactivatedCapacity; + + for (String curLeafQueueName : leafQueuesToBeActivated) { + // Activate queues if absolute capacity is available + if (availableCapacity > CSQueueUtils.EPSILON) { + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager() + .getQueue(curLeafQueueName); + if (leafQueue != null) { + QueueEntitlement newEntitlement = new QueueEntitlement(autoCreateEnabledParentQueue.getAutoCreatedLeafQueueCapacity(), + autoCreateEnabledParentQueue.getAutoCreatedLeafQueueMaxCapacity()); + computedEntitlements.add(new QueueEntitlementChange.UpdateQueue(leafQueue, newEntitlement)); + availableCapacity -= autoCreateEnabledParentQueue.getAutoCreatedLeafQueueAbsoluteCapacity(); + } else { + LOG.warn("Could not find queue in scheduler while trying to deactivate " + curLeafQueueName); + } + } + } + } + + @VisibleForTesting + int getMaxLeavesToBeActivated(float absoluteCapacity, float sumOfChildActivatedCapacity, float + childQueueAbsoluteCapacity) { + return (int) Math.floor((absoluteCapacity - sumOfChildActivatedCapacity + EPSILON) + / childQueueAbsoluteCapacity); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f24e30aa1ee..fb167bcf51f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -593,15 +594,38 @@ public void validateSubmitApplication(ApplicationId applicationId, try { writeLock.lock(); // Check if the queue is accepting jobs - if (getState() != QueueState.RUNNING) { + validateQueueState(applicationId, QueueState.RUNNING); + + // Check submission limits for queues + validateQueueSubmissionLimits(applicationId, userName); + } finally { + writeLock.unlock(); + } + + try { + getParent().validateSubmitApplication(applicationId, userName, queue); + } catch (AccessControlException ace) { + LOG.info("Failed to submit application to parent-queue: " + + getParent().getQueuePath(), ace); + throw ace; + } + } + + protected void validateQueueState(ApplicationId applicationId, QueueState expectedState) throws + AccessControlException { + if (getState() != expectedState) { String msg = "Queue " + getQueuePath() - + " is STOPPED. Cannot accept submission of application: " + + " is " + getState() + " Cannot accept submission of application: " + applicationId; LOG.info(msg); throw new AccessControlException(msg); } + } - // Check submission limits for queues + protected void validateQueueSubmissionLimits(ApplicationId applicationId, String userName) throws + AccessControlException { + + // Check application submission limits on this queue if (getNumApplications() >= getMaxApplications()) { String msg = "Queue " + getQueuePath() + " already has " + getNumApplications() @@ -611,7 +635,7 @@ public void validateSubmitApplication(ApplicationId applicationId, throw new AccessControlException(msg); } - // Check submission limits for the user on this queue + // Check user submission limits on this queue User user = usersManager.getUserAndAddIfAbsent(userName); if (user.getTotalApplications() >= getMaxApplicationsPerUser()) { String msg = "Queue " + getQueuePath() + " already has " + user @@ -620,17 +644,6 @@ public void validateSubmitApplication(ApplicationId applicationId, LOG.info(msg); throw new AccessControlException(msg); } - } finally { - writeLock.unlock(); - } - - try { - getParent().validateSubmitApplication(applicationId, userName, queue); - } catch (AccessControlException ace) { - LOG.info("Failed to submit application to parent-queue: " + - getParent().getQueuePath(), ace); - throw ace; - } } public Resource getAMResourceLimit() { @@ -740,7 +753,7 @@ public Resource calculateAndGetAMResourceLimitPerPartition( } } - private void activateApplications() { + protected void activateApplications() { try { writeLock.lock(); // limit of allowed resource usage for application masters @@ -2116,4 +2129,37 @@ public long getMaximumApplicationLifetime() { public long getDefaultApplicationLifetime() { return defaultApplicationLifetime; } + + public CapacitySchedulerContext getCapacitySchedulerContext() { + return scheduler; + } + + /** + * This methods to change capacity for a queue and adjusts its + * absoluteCapacity + * + * @param entitlement the new entitlement for the queue (capacity, + * maxCapacity) + * @throws SchedulerDynamicEditException + */ + public void setEntitlement(QueueEntitlement entitlement) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + float capacity = entitlement.getCapacity(); + if (capacity < 0 || capacity > 1.0f) { + throw new SchedulerDynamicEditException( + "Capacity demand is not in the [0,1] range: " + capacity); + } + setCapacity(capacity); + setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); + setMaxCapacity(entitlement.getMaxCapacity()); + if (LOG.isDebugEnabled()) { + LOG.debug("successfully changed to " + capacity + " for queue " + this + .getQueueName()); + } + } finally { + writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6800b74f8d4..150ffd183a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; @@ -78,7 +79,7 @@ protected final List childQueues; private final boolean rootQueue; private volatile int numApplications; - private final CapacitySchedulerContext scheduler; + protected final CapacitySchedulerContext scheduler; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -1080,4 +1080,55 @@ public void stopQueue() { public QueueOrderingPolicy getQueueOrderingPolicy() { return queueOrderingPolicy; } + + public void addChildQueue(CSQueue childQueue) throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if (childQueue.getCapacity() > 0) { + throw new SchedulerDynamicEditException( + "Queue " + childQueue + " being added has non zero capacity."); + } + boolean added = this.childQueues.add(childQueue); + if (LOG.isDebugEnabled()) { + LOG.debug("updateChildQueues (action: add queue): " + added + " " + + getChildQueuesToPrint()); + } + } finally { + writeLock.unlock(); + } + } + + public void removeChildQueue(CSQueue childQueue) throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if (childQueue.getCapacity() > 0) { + throw new SchedulerDynamicEditException( + "Queue " + childQueue + " being removed has non zero capacity."); + } + Iterator qiter = childQueues.iterator(); + while (qiter.hasNext()) { + CSQueue cs = qiter.next(); + if (cs.equals(childQueue)) { + qiter.remove(); + if (LOG.isDebugEnabled()) { + LOG.debug("Removed child queue: {}" + cs.getQueueName()); + } + } + } + } finally { + writeLock.unlock(); + } + } + + public CSQueue removeChildQueue(String childQueueName) throws SchedulerDynamicEditException { + CSQueue childQueue; + try { + writeLock.lock(); + childQueue = this.csContext.getCapacitySchedulerQueueManager().getQueue(childQueueName); + removeChildQueue(childQueue); + } finally { + writeLock.unlock(); + } + return childQueue; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 882262fafcc..febe4fd2430 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -119,47 +119,6 @@ public void reinitialize(CSQueue newlyParsedQueue, } } - void addChildQueue(CSQueue newQueue) - throws SchedulerDynamicEditException { - try { - writeLock.lock(); - if (newQueue.getCapacity() > 0) { - throw new SchedulerDynamicEditException( - "Queue " + newQueue + " being added has non zero capacity."); - } - boolean added = this.childQueues.add(newQueue); - if (LOG.isDebugEnabled()) { - LOG.debug("updateChildQueues (action: add queue): " + added + " " - + getChildQueuesToPrint()); - } - } finally { - writeLock.unlock(); - } - } - - void removeChildQueue(CSQueue remQueue) - throws SchedulerDynamicEditException { - try { - writeLock.lock(); - if (remQueue.getCapacity() > 0) { - throw new SchedulerDynamicEditException( - "Queue " + remQueue + " being removed has non zero capacity."); - } - Iterator qiter = childQueues.iterator(); - while (qiter.hasNext()) { - CSQueue cs = qiter.next(); - if (cs.equals(remQueue)) { - qiter.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("Removed child queue: {}", cs.getQueueName()); - } - } - } - } finally { - writeLock.unlock(); - } - } - protected float sumOfChildCapacities() { try { writeLock.lock(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java index cc4af3dfb47..e4a7fac2492 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -317,4 +317,8 @@ public String toString() { readLock.unlock(); } } + + public boolean isRoot() { + return isRoot; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueEntitlementChange.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueEntitlementChange.java new file mode 100644 index 00000000000..d5b2d764ee5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueEntitlementChange.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; + +/** + * Encapsulates Queue entitlement and state updates needed for adjusting capacity dynamically + * + */ +@Private +@Unstable +public abstract class QueueEntitlementChange { + + private final CSQueue queue; + + /** + * Add, remove the specified queue + * Updating the queue may involve entitlement updates + * and/or QueueState changes + */ + public enum QueueAction { + ADD_QUEUE, + REMOVE_QUEUE, + UPDATE_QUEUE + } + + private QueueEntitlement updatedEntitlement; + + private final QueueAction queueAction; + /** + * Updated Queue state with the new entitlement + */ + private QueueState transitionToQueueState; + + public QueueEntitlementChange(final CSQueue queue, final QueueAction queueAction) { + this.queue = queue; + this.queueAction = queueAction; + } + + public QueueEntitlementChange(final CSQueue queue, final QueueAction queueAction, QueueState targetQueueState, + final QueueEntitlement entitlement) { + this(queue, queueAction, entitlement); + this.transitionToQueueState = targetQueueState; + } + + public QueueEntitlementChange(final CSQueue queue, final QueueAction queueAction, final QueueEntitlement + entitlement) { + this(queue, queueAction); + this.updatedEntitlement = entitlement; + } + + public QueueState getTransitionToQueueState() { + return transitionToQueueState; + } + + public CSQueue getQueue() { + return queue; + } + + public QueueEntitlement getUpdatedEntitlement() { + return updatedEntitlement; + } + + public QueueAction getQueueAction() { + return queueAction; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (!(o instanceof QueueEntitlementChange)) return false; + + final QueueEntitlementChange that = (QueueEntitlementChange) o; + + if (queue != null ? !queue.equals(that.queue) : that.queue != null) return false; + if (updatedEntitlement != null ? !updatedEntitlement.equals(that.updatedEntitlement) : that.updatedEntitlement != null) + return false; + if (queueAction != that.queueAction) return false; + return transitionToQueueState == that.transitionToQueueState; + } + + @Override + public int hashCode() { + int result = queue != null ? queue.hashCode() : 0; + result = 31 * result + (updatedEntitlement != null ? updatedEntitlement.hashCode() : 0); + result = 31 * result + (queueAction != null ? queueAction.hashCode() : 0); + result = 31 * result + (transitionToQueueState != null ? transitionToQueueState.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "QueueEntitlementChange{" + + "queueName='" + queue + '\'' + + ", updatedEntitlement=" + updatedEntitlement + + ", queueAction=" + queueAction + + ", targetQueueState=" + transitionToQueueState + + '}'; + } + + public static class AddQueue extends QueueEntitlementChange { + public AddQueue(final CSQueue queue) { + super(queue, QueueAction.ADD_QUEUE); + } + } + + public static class RemoveQueue extends QueueEntitlementChange { + public RemoveQueue(final CSQueue queue) { + super(queue, QueueAction.REMOVE_QUEUE); + } + } + + public static class UpdateQueue extends QueueEntitlementChange { + public UpdateQueue(final CSQueue queue, QueueState targetQueueState, + final QueueEntitlement entitlement) { + super(queue, QueueAction.UPDATE_QUEUE, targetQueueState, entitlement); + } + + public UpdateQueue(final CSQueue queue, final QueueEntitlement + entitlement) { + super(queue, QueueAction.UPDATE_QUEUE, entitlement); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueEntitlementDynamicEditPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueEntitlementDynamicEditPolicy.java new file mode 100644 index 00000000000..962e2bbd25e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueEntitlementDynamicEditPolicy.java @@ -0,0 +1,432 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.UpdateQueueEntitlementsSchedulerEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class QueueEntitlementDynamicEditPolicy implements SchedulingEditPolicy, + QueueEntitlementPolicyContext { + + private static final Log LOG = + LogFactory.getLog(QueueEntitlementDynamicEditPolicy.class); + + private Set allPartitions; + + private Clock clock = SystemClock.getInstance(); + + // Pointer to other RM components + private RMContext rmContext; + private ResourceCalculator rc; + private CapacityScheduler scheduler; + private RMNodeLabelsManager nlm; + + private boolean observeOnly; + + private Map> queueToPartitions = + new HashMap<>(); + + private long monitoringInterval; + + private Set parentQueues = new HashSet<>(); + + @SuppressWarnings("unchecked") + @VisibleForTesting + public QueueEntitlementDynamicEditPolicy(RMContext context, + CapacityScheduler scheduler) { + init(context.getYarnConfiguration(), context, scheduler); + } + + @SuppressWarnings("unchecked") + @VisibleForTesting + public QueueEntitlementDynamicEditPolicy(RMContext context, + CapacityScheduler scheduler, Clock clock) { + init(context.getYarnConfiguration(), context, scheduler); + this.clock = clock; + } + + @Override + public void init(final Configuration config, final RMContext context, final ResourceScheduler sched) { + LOG.info("Queue Entitlement Policy monitor:" + this.getClass().getCanonicalName()); + assert null == scheduler : "Unexpected duplicate call to init"; + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + rmContext = context; + scheduler = (CapacityScheduler) sched; + + rc = scheduler.getResourceCalculator(); + nlm = scheduler.getRMContext().getNodeLabelManager(); + + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + + monitoringInterval = csConfig.getLong( + CapacitySchedulerConfiguration.QUEUE_ENTITLEMENT_MONITORING_INTERVAL, + CapacitySchedulerConfiguration.DEFAULT_QUEUE_ENTITLEMENT_MONITORING_INTERVAL); + + observeOnly = csConfig.getBoolean( + CapacitySchedulerConfiguration.QUEUE_ENTITLEMENT_OBSERVE_ONLY, + CapacitySchedulerConfiguration.DEFAULT_QUEUE_ENTITLEMENT_OBSERVE_ONLY); + } + + @Override + public void editSchedule() { + long startTs = clock.getTime(); + + CSQueue root = scheduler.getRootQueue(); +// Resource clusterResources = Resources.clone(scheduler.getClusterResource()); + computeQueueEntitlementsForAutoCreatedLeafQueues(root); + + if (LOG.isDebugEnabled()) { + LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); + } + } + + @VisibleForTesting + List computeQueueEntitlementsForAutoCreatedLeafQueues(final CSQueue root) { + + List entitlementChanges = Collections.emptyList(); + // All partitions to look at + Set partitions = new HashSet<>(); + partitions.addAll(scheduler.getRMContext() + .getNodeLabelManager().getClusterNodeLabelNames()); + partitions.add(RMNodeLabelsManager.NO_LABEL); + this.allPartitions = ImmutableSet.copyOf(partitions); + + // extract a summary of the queues from scheduler and hold scheduler lock for lesser time + synchronized (scheduler) { + queueToPartitions.clear(); + + for (String partitionToLookAt : allPartitions) { + cloneQueues(root, partitionToLookAt); + } + } + + //Proceed only if there are queues to process + if (parentQueues.size() > 0) { + for (String parentQueueName : parentQueues) { + for (String partition : allPartitions) { + if (queueToPartitions.get(parentQueueName).get(partition) != null) { + AutoCreateEnabledParentQueue parentQueue = (AutoCreateEnabledParentQueue) scheduler.getQueue(parentQueueName); + AutoCreatedQueueEntitlementPolicy policyClazz = parentQueue.getQueueEntitlementPolicy(); + + entitlementChanges = policyClazz + .computeChildQueueEntitlements(partition, parentQueue, this); + if (entitlementChanges.size() > 0 && !isObserveOnly()) { + scheduler.handle(new UpdateQueueEntitlementsSchedulerEvent(parentQueue, entitlementChanges)); + } + if (LOG.isDebugEnabled()) { + LOG.debug(" Updated queue entitlements : " + entitlementChanges.toString()); + } + } + } + } + } + return entitlementChanges; + } + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return "QueueEntitlementDynamicEditPolicy"; + } + + public ResourceCalculator getResourceCalculator() { + return rc; + } + + @Override + public RMContext getRMContext() { + return null; + } + + @Override + public boolean isObserveOnly() { + return observeOnly; + } + + @Override + public List getSortedPendingApplications(final String partition, final String parentQueue, final + Comparator appComparator) { + final TempQueuePerPartition queueByPartition = getQueueByPartition(parentQueue, partition); + List apps = new ArrayList<>(); + + for (TempQueuePerPartition childQueue : queueByPartition.getChildren()) { + apps.addAll(childQueue.getApps()); + } + + Collections.sort(apps, appComparator); + return Collections.unmodifiableList(apps); + } + + /** + * This method walks a tree of CSQueue and clones the portion of the state + * relevant for auto queue entitlement updates in TempQueue(s). It also maintains a pointer to + * the parent queues. + * + * @param curQueue current queue which I'm looking at now + * @return the root of the cloned queue hierarchy + */ + private TempQueuePerPartition cloneQueues(CSQueue curQueue, + String partitionToLookAt) { + TempQueuePerPartition ret = null; + ReentrantReadWriteLock.ReadLock readLock = curQueue.getReadLock(); + + try { + // Acquire a read lock from Parent/LeafQueue. + readLock.lock(); + + String queueName = curQueue.getQueueName(); + QueueCapacities qc = curQueue.getQueueCapacities(); + QueueCapacities qcClone = cloneQueueCapacities(qc, partitionToLookAt); + if (curQueue instanceof AutoCreateEnabledParentQueue) { + ret = new TempQueuePerPartition(queueName, + partitionToLookAt, qcClone, curQueue); + ret.activatedCapacity = ((AutoCreateEnabledParentQueue) curQueue).getAbsoluteActivatedCapacity(); + + // Recursively add children + if (curQueue.getChildQueues() != null) { + for (CSQueue c : curQueue.getChildQueues()) { + TempQueuePerPartition subq = new TempQueuePerPartition(c.getQueueName(), + partitionToLookAt, qcClone, c); + //check only scheduleable apps + subq.addAllApps(((AutoCreatedLeafQueue) c).getApplications()); + ret.addChild(subq); + subq.parent = ret; + } + } + parentQueues.add(queueName); + } else if (curQueue instanceof ParentQueue) { + // Recursively check for auto create enabled queues in children + for (CSQueue c : curQueue.getChildQueues()) { + cloneQueues(c, partitionToLookAt); + } + } + } finally { + readLock.unlock(); + } + + if (ret != null) { + addTempQueuePartition(ret); + } + return ret; + } + + private void addTempQueuePartition(TempQueuePerPartition queuePartition) { + String queueName = queuePartition.queueName; + + Map queuePartitions; + if (null == (queuePartitions = queueToPartitions.get(queueName))) { + queuePartitions = new HashMap<>(); + queueToPartitions.put(queueName, queuePartitions); + } + queuePartitions.put(queuePartition.partition, queuePartition); + } + + private QueueCapacities cloneQueueCapacities(QueueCapacities qc, String partitionToLookAt) { + QueueCapacities clone = new QueueCapacities(qc.isRoot()); + clone.setAbsoluteCapacity(qc.getAbsoluteCapacity(partitionToLookAt)); + float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt); + // when partition is a non-exclusive partition, the actual maxCapacity + // could more than specified maxCapacity + try { + if (!scheduler.getRMContext().getNodeLabelManager() + .isExclusiveNodeLabel(partitionToLookAt)) { + absMaxCap = 1.0f; + } + } catch (IOException e) { + // This may cause by partition removed when running capacity monitor, + // just ignore the error, this will be corrected when doing next check. + } + + clone.setAbsoluteMaximumCapacity(absMaxCap); + clone.setCapacity(qc.getCapacity()); + clone.setAbsoluteUsedCapacity(qc.getAbsoluteUsedCapacity()); + clone.setMaxAMResourcePercentage(qc.getMaxAMResourcePercentage()); + clone.setAbsoluteReservedCapacity(qc.getAbsoluteReservedCapacity()); + clone.setUsedCapacity(qc.getUsedCapacity()); + clone.setAbsoluteMaximumCapacity(absMaxCap); + return clone; + } + + public Set getAllPartitions() { + return allPartitions; + } + + public RMContext getRmContext() { + return rmContext; + } + + public ResourceCalculator getRC() { + return rc; + } + + public CapacityScheduler getScheduler() { + return scheduler; + } + + + /** + * Get queue partition by given queueName and partitionName + */ + @VisibleForTesting + TempQueuePerPartition getQueueByPartition(String queueName, + String partition) { + Map partitionToQueues; + if (null == (partitionToQueues = queueToPartitions.get(queueName))) { + throw new YarnRuntimeException("This shouldn't happen, cannot find " + + "TempQueuePerPartition for queueName=" + queueName); + } + return partitionToQueues.get(partition); + } + + /** + * Get all queue partitions by given queueName + */ + @VisibleForTesting + Collection getQueuePartitions(String queueName) { + if (!queueToPartitions.containsKey(queueName)) { + throw new YarnRuntimeException("This shouldn't happen, cannot find " + + "TempQueuePerPartition collection for queueName=" + queueName); + } + return queueToPartitions.get(queueName).values(); + } + + @Override + public float getActivatedCapacity(String partition, String parentQueue) { + TempQueuePerPartition tempQueue = getQueueByPartition(parentQueue, partition); + return tempQueue.getActivatedCapacity(); + } + + @Override + public QueueCapacities getQueueCapacities(String partition, String parentQueue) { + TempQueuePerPartition tempQueue = getQueueByPartition(parentQueue, partition); + return tempQueue.getQueueCapacities(); + } + + @Override + public List getChildQueueNames(final String partition, final String parentQueue) { + TempQueuePerPartition tempQueue = getQueueByPartition(parentQueue, partition); + return tempQueue.getChildQueueNames(); + } + + static class TempQueuePerPartition { + + // Following fields are copied from scheduler + final String queueName; + final String partition; + + private final QueueCapacities qc; + + TempQueuePerPartition parent = null; + final List children; + final List childQueueNames; + private LeafQueue leafQueue; + private List apps; + + private float activatedCapacity = 0.0f; + + public TempQueuePerPartition(final String queueName, final String partition, final QueueCapacities qc, final CSQueue curQueue) { + + this.queueName = queueName; + if (curQueue instanceof LeafQueue) { + LeafQueue l = (LeafQueue) curQueue; + leafQueue = l; + } + + this.children = new ArrayList<>(); + this.childQueueNames = new ArrayList<>(); + this.apps = new ArrayList<>(); + this.partition = partition; + this.qc = qc; + } + + public void setLeafQueue(LeafQueue l) { + assert children.size() == 0; + this.leafQueue = l; + } + + /** + * When adding a child we also aggregate its pending resource needs. + * + * @param q the child queue to add to this queue + */ + public void addChild(TempQueuePerPartition q) { + assert leafQueue == null; + children.add(q); + childQueueNames.add(q.queueName); + } + + public List getChildren() { + return children; + } + + public void addAllApps(Collection orderedApps) { + this.apps.addAll(orderedApps); + } + + public List getApps() { + return apps; + } + + public float getActivatedCapacity() { + return activatedCapacity; + } + + public QueueCapacities getQueueCapacities() { + return qc; + } + + public List getChildQueueNames() { + return childQueueNames; + } + + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueEntitlementPolicyContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueEntitlementPolicyContext.java new file mode 100644 index 00000000000..2aaab4ca93a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueEntitlementPolicyContext.java @@ -0,0 +1,33 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +import java.util.Comparator; +import java.util.List; + +@Private +@Unstable +public interface QueueEntitlementPolicyContext { + + CapacityScheduler getScheduler(); + + ResourceCalculator getResourceCalculator(); + + RMContext getRMContext(); + + boolean isObserveOnly(); + + List getSortedPendingApplications(String partition, String parentQueue, + Comparator appComparator); + + float getActivatedCapacity(String partition, String parentQueue); + + QueueCapacities getQueueCapacities(String partition, String parentQueue); + + List getChildQueueNames(String partition, String parentQueue); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index 3d1b7317489..6540972067e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.util.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,37 +76,6 @@ public void reinitialize(CSQueue newlyParsedQueue, } } - /** - * This methods to change capacity for a queue and adjusts its - * absoluteCapacity - * - * @param entitlement the new entitlement for the queue (capacity, - * maxCapacity, etc..) - * @throws SchedulerDynamicEditException - */ - public void setEntitlement(QueueEntitlement entitlement) - throws SchedulerDynamicEditException { - try { - writeLock.lock(); - float capacity = entitlement.getCapacity(); - if (capacity < 0 || capacity > 1.0f) { - throw new SchedulerDynamicEditException( - "Capacity demand is not in the [0,1] range: " + capacity); - } - setCapacity(capacity); - setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); - // note: we currently set maxCapacity to capacity - // this might be revised later - setMaxCapacity(entitlement.getMaxCapacity()); - if (LOG.isDebugEnabled()) { - LOG.debug("successfully changed to " + capacity + " for queue " + this - .getQueueName()); - } - } finally { - writeLock.unlock(); - } - } - private void updateQuotas(int userLimit, float userLimitFactor, int maxAppsForReservation, int maxAppsPerUserForReservation) { setUserLimit(userLimit); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java index 2a751e3e437..2baf608d59a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java @@ -43,4 +43,30 @@ public float getCapacity() { public void setCapacity(float capacity) { this.capacity = capacity; } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (!(o instanceof QueueEntitlement)) return false; + + final QueueEntitlement that = (QueueEntitlement) o; + + if (Float.compare(that.capacity, capacity) != 0) return false; + return Float.compare(that.maxCapacity, maxCapacity) == 0; + } + + @Override + public int hashCode() { + int result = (capacity != +0.0f ? Float.floatToIntBits(capacity) : 0); + result = 31 * result + (maxCapacity != +0.0f ? Float.floatToIntBits(maxCapacity) : 0); + return result; + } + + @Override + public String toString() { + return "QueueEntitlement{" + + "capacity=" + capacity + + ", maxCapacity=" + maxCapacity + + '}'; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AddQueueSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AddQueueSchedulerEvent.java new file mode 100644 index 00000000000..c9a70fd859e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AddQueueSchedulerEvent.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; + +public class AddQueueSchedulerEvent extends SchedulerEvent { + + private CSQueue queueToBeAdded; + + public AddQueueSchedulerEvent(CSQueue queue) { + super(SchedulerEventType.ADD_QUEUE); + this.queueToBeAdded = queue; + } + + public CSQueue getQueue() { + return queueToBeAdded; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/RemoveQueueSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/RemoveQueueSchedulerEvent.java new file mode 100644 index 00000000000..91a780568aa --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/RemoveQueueSchedulerEvent.java @@ -0,0 +1,18 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; + +public class RemoveQueueSchedulerEvent extends SchedulerEvent { + + private CSQueue queue; + + public RemoveQueueSchedulerEvent(CSQueue queue) { + super(SchedulerEventType.REMOVE_QUEUE); + this.queue = queue; + } + + public CSQueue getQueue() { + return queue; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 229e0bbc0be..222984d4a84 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -51,5 +51,12 @@ MARK_CONTAINER_FOR_KILLABLE, // Cancel a killable container - MARK_CONTAINER_FOR_NONKILLABLE + MARK_CONTAINER_FOR_NONKILLABLE, + + ADD_QUEUE, + + REMOVE_QUEUE, + + UPDATE_QUEUE_ENTITLEMENT, + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/UpdateQueueEntitlementsSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/UpdateQueueEntitlementsSchedulerEvent.java new file mode 100644 index 00000000000..1411b184a32 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/UpdateQueueEntitlementsSchedulerEvent.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueEntitlementChange; + +import java.util.List; + +public class UpdateQueueEntitlementsSchedulerEvent extends SchedulerEvent { + + CSQueue queue; + List childQueueEntitlementUpdates; + + public UpdateQueueEntitlementsSchedulerEvent(final CSQueue queue, List + queueEntitlementUpdates) { + super(SchedulerEventType.UPDATE_QUEUE_ENTITLEMENT); + this.queue = queue; + this.childQueueEntitlementUpdates = queueEntitlementUpdates; + } + + public CSQueue getQueue() { + return queue; + } + + public List getChildQueueEntitlementUpdates() { + return childQueueEntitlementUpdates; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 399df02465e..10b9ced2f28 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -243,6 +244,11 @@ public boolean isAppInCompletedStates() { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public ApplicationPlacementContext getApplicationQueuePlacementContext() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public CollectorInfo getCollectorInfo() { throw new UnsupportedOperationException("Not supported yet."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 39a7f995ab6..341eb847bd3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -327,6 +328,12 @@ public boolean isAppInCompletedStates() { return false; } + @Override + public ApplicationPlacementContext getApplicationQueuePlacementContext() { + //TODO + return null; + } + @Override public CollectorInfo getCollectorInfo() { throw new UnsupportedOperationException("Not supported yet."); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 1dea4eea75f..081ceaa5e0f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -173,6 +173,7 @@ private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + private static final String C = CapacitySchedulerConfiguration.ROOT + ".c"; private static final String A1 = A + ".a1"; private static final String A2 = A + ".a2"; private static final String B1 = B + ".b1"; @@ -841,6 +842,53 @@ private CapacitySchedulerConfiguration setupOtherBlockedQueueConfiguration( return conf; } + /** + * @param conf, to be modified + * @return, CS configuration which has converted b1 to parent queue + * root + * / \ \ + * a b c + * / \ / | \ + * a1 a2 b1 b2 b3 + * + */ + private CapacitySchedulerConfiguration + setupQueueConfigurationWithCAsAutoCreateEnabledParentQueue( + CapacitySchedulerConfiguration conf) { + + + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY/2); + conf.setCapacity(C, B_CAPACITY/2); + + // Define 2nd-level queues + conf.setQueues(A, new String[] { "a1", "a2" }); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + conf.setQueues(B, new String[] {"b1","b2", "b3"}); + conf.setCapacity(B1, B1_CAPACITY); + conf.setUserLimitFactor(B1, 100.0f); + conf.setCapacity(B2, B2_CAPACITY); + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + // Set childQueue for root + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); + conf.setCapacity(C, 100.0f); + conf.setUserLimitFactor(C, 100.0f); + + return conf; + } + @Test public void testMaximumCapacitySetup() { float delta = 0.0000001f; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java new file mode 100644 index 00000000000..b3b107b9aef --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestCapacitySchedulerAutoQueueCreation { + + private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); + private final int GB = 1024; + private final static ContainerUpdates NULL_UPDATE_REQUESTS = + new ContainerUpdates(); + + private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + private static final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + private static final String A1 = A + ".a1"; + private static final String A2 = A + ".a2"; + private static final String B1 = B + ".b1"; + private static final String B2 = B + ".b2"; + private static final String B3 = B + ".b3"; + private static float A_CAPACITY = 20f; + private static float B_CAPACITY = 60f; + private static float C_CAPACITY = 20f; + private static float A1_CAPACITY = 30; + private static float A2_CAPACITY = 70; + private static float B1_CAPACITY = 60f; + private static float B2_CAPACITY = 20f; + private static float B3_CAPACITY = 20f; + + private MockRM mockRM = null; + private QueueEntitlementDynamicEditPolicy policy; + + private CapacityScheduler cs; + + @Before + public void setUp() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + List queuePlacementRules = new ArrayList(); + queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); + conf.setQueuePlacementRules(queuePlacementRules); + + //Set C as parent queue name for auto queue creation + UserGroupMappingPlacementRule.QueueMapping queueMapping = new UserGroupMappingPlacementRule.QueueMapping + (UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, "%user", "c.%user"); + + //set queue queue mapping + List queueMappings = new ArrayList<>(); + queueMappings.add(queueMapping); + conf.setQueueMappings(queueMappings); + + //override with queue mappings - is this usually set in prod? + conf.setOverrideWithQueueMappings(true); + + mockRM = new MockRM(conf); + cs = (CapacityScheduler) mockRM.getResourceScheduler(); + cs.updatePlacementRules(); + mockRM.start(); + + policy = new QueueEntitlementDynamicEditPolicy(mockRM.getRMContext(), cs); + + cs.start(); + } + + /** + * @param conf, to be modified + * @return, CS configuration which has C as an auto creation enabled parent queue + * root + * / \ \ + * a b c + * / \ / | \ + * a1 a2 b1 b2 b3 + */ + private CapacitySchedulerConfiguration setupQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + //setup new queues with one of them auto enabled + // Define top-level queues + // Set childQueue for root + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{"a", "b", "c"}); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + conf.setCapacity(C, C_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[]{"a1", "a2"}); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + conf.setQueues(B, new String[]{"b1", "b2", "b3"}); + conf.setCapacity(B1, B1_CAPACITY); + conf.setUserLimitFactor(B1, 100.0f); + conf.setCapacity(B2, B2_CAPACITY); + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + conf.setUserLimitFactor(C, 1.0f); + conf.setAutoCreateChildQueueEnabled(C, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueTemplateCapacity(C, 50.0f); + conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f); + + conf.setQueueEntitlementPolicyObserveOnlyFlag(C, true); +// conf.setAutoCreatedLeafQueueTemplateMaxApplicationsPerQueue(C, ); + + LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue"); + + return conf; + } + + @After + public void tearDown() throws Exception { + if (mockRM != null) { + mockRM.stop(); + } + } + + @Test(timeout = 10000) + public void testAutoCreateLeafQueueCreation() throws Exception { + + final String USER_NAME = "user_0"; + final String PARENT_QUEUE = "c"; + + try { + // submit an app + RMApp app = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER_NAME, null, USER_NAME); + // check preconditions + List appsInC = cs.getAppsInQueue(PARENT_QUEUE); + assertEquals(1, appsInC.size()); + ApplicationPlacementContext placementContext = app.getApplicationQueuePlacementContext(); + validatePlacementContext(placementContext, USER_NAME, PARENT_QUEUE); + + SchedulerEvent addAppEvent = + new AppAddedSchedulerEvent(app.getApplicationId(), USER_NAME, USER_NAME); + cs.handle(addAppEvent); + + assertNotNull(cs.getQueue(USER_NAME)); + + AutoCreatedLeafQueue autoCreatedLeafQueue = (AutoCreatedLeafQueue) cs.getQueue(USER_NAME); + AutoCreateEnabledParentQueue parentQueue = (AutoCreateEnabledParentQueue) cs.getQueue("c"); + assertEquals(parentQueue, autoCreatedLeafQueue.getParent()); + validateCapacities(autoCreatedLeafQueue); + } finally { + cleanupQueue(USER_NAME); + } + + //TODO - UT Scenarios + // create new parent queue with auto enabled and reinitialize + // check appropriate template config entries initialized in parent queue + // check if initial queue capacity is 0 after application submission and before attempt is sent + + // convert existing leaf queue to auto enabled and reinitialize + // convert existing parent queue to auto enabled - should fail since it has pre-configured queues (TODO) + // set root to auto enabled (TODO) + } + + // @Test(timeout = 10000) + @Test + public void testAutoCreatedQueueActivation() throws Exception { + + final String USER1 = "user_1"; + final String USER2 = "user_2"; + final String USER3 = "user_3"; + final String PARENT_QUEUE = "c"; + + try { + String host = "127.0.0.1"; + RMNode node = + MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); + cs.handle(new NodeAddedSchedulerEvent(node)); + + CSQueue parentQueue = cs.getQueue(PARENT_QUEUE); + + //submit app1 as USER1 + submitApp(parentQueue, USER1, USER1, 1, 1); + validateActivatedQueueEntitlement(parentQueue, USER1, 0.1f); + + //submit another app2 as USER2 + ApplicationId user2AppId = submitApp(parentQueue, USER2, USER2, 2, 1); + validateActivatedQueueEntitlement(parentQueue, USER2, 0.2f); + + //submit another app3 as USER1 + submitApp(parentQueue, USER1, USER1, 3, 2); + validateActivatedQueueEntitlement(parentQueue, USER1, 0.2f); + + submitApp(parentQueue, USER3, USER3, 4, 1); + final CSQueue user3LeafQueue = cs.getQueue(USER3); + validateCapacities((AutoCreatedLeafQueue)user3LeafQueue); + + //deactivate USER2 queue + cs.killAllAppsInQueue(USER2); + mockRM.waitForState(user2AppId, RMAppState.KILLED); + + //Verify if USER_2 can be deactivated since it has no pending apps + validateDeactivatedQueueEntitlement(parentQueue, USER2); + + //USER_3 should now get activated + validateActivatedQueueEntitlement(parentQueue, USER3, 0.2f); + + } finally { + cleanupQueue(USER1); + cleanupQueue(USER2); + cleanupQueue(USER3); + } + } + + private void validateQueueEntitlementChangesForLeafQueue(CSQueue leafQueue, QueueEntitlement + leafQueueEntitlement, final List queueEntitlementChanges) { + for (QueueEntitlementChange entitlementChange : queueEntitlementChanges) { + if (leafQueue.getQueueName().equals(entitlementChange.getQueue().getQueueName())) { + assertEquals(entitlementChange.getUpdatedEntitlement(), leafQueueEntitlement); + } + } + } + + private void validatePlacementContext(ApplicationPlacementContext placementContext, String source, String + parentQueue) { + assertEquals(placementContext.getParentQueue(), parentQueue); + assertEquals(placementContext.getQueue(), source); + assertEquals(placementContext.getUserName(), source); + assertNotNull(placementContext.getPlacementRule()); + assertEquals(placementContext.getPlacementRule().getName(), UserGroupMappingPlacementRule.class.getName()); + } + + private void validateCapacities(AutoCreatedLeafQueue autoCreatedLeafQueue) { + assertEquals(autoCreatedLeafQueue.getCapacity(), 0.0f, EPSILON); + assertEquals(autoCreatedLeafQueue.getAbsoluteCapacity(), 0.0f, EPSILON); + assertEquals(autoCreatedLeafQueue.getMaximumCapacity(), 0.0f, EPSILON); + assertEquals(autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), 0.0f, EPSILON); + int maxAppsForAutoCreatedQueues = (int) (CapacitySchedulerConfiguration + .DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * autoCreatedLeafQueue.getParent().getAbsoluteCapacity()); + assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(), maxAppsForAutoCreatedQueues); + assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(), (int) (maxAppsForAutoCreatedQueues * (cs.getConfiguration() + .getUserLimitFactor(autoCreatedLeafQueue.getParent().getQueuePath())))); + } + + + private void cleanupQueue(String queueName) throws YarnException { + AutoCreatedLeafQueue queue = (AutoCreatedLeafQueue) cs.getQueue(queueName); + if (queue != null) { + queue.setEntitlement(new QueueEntitlement(0.0f,0.0f)); + ((AutoCreateEnabledParentQueue) queue.getParent()).removeChildQueue(queue.getQueueName()); + cs.getCapacitySchedulerQueueManager().removeQueue(queue.getQueueName()); + } else { + throw new YarnException("Queue does not exist " + queueName); + } + } + + private ApplicationId submitApp(CSQueue parentQueue, + String leafQueueName, String user, int expectedNumAppsInParentQueue, int + expectedNumAppsInLeafQueue) throws Exception { + // submit an app + RMApp rmApp = mockRM.submitApp(GB, "test-auto-queue-activation", user, null, leafQueueName); + + // check preconditions + List appsInParentQueue = cs.getAppsInQueue(parentQueue.getQueueName()); + assertEquals(expectedNumAppsInParentQueue, appsInParentQueue.size()); + + List appsInLeafQueue = cs.getAppsInQueue(leafQueueName); + assertEquals(expectedNumAppsInLeafQueue, appsInLeafQueue.size()); + + return rmApp.getApplicationId(); + } + + void validateActivatedQueueEntitlement(CSQueue parentQueue, String leafQueueName, float + expectedTotalChildQueueActivatedCapacity) { + AutoCreateEnabledParentQueue autoCreateEnabledParentQueue = (AutoCreateEnabledParentQueue) parentQueue; + QueueEntitlement entitlement = new QueueEntitlement(autoCreateEnabledParentQueue + .getAutoCreatedLeafQueueCapacity(), autoCreateEnabledParentQueue.getAutoCreatedLeafQueueMaxCapacity()); + validateQueueEntitlements(leafQueueName, entitlement); + assertEquals(expectedTotalChildQueueActivatedCapacity, + autoCreateEnabledParentQueue.getAbsoluteActivatedCapacity(), EPSILON); + } + + void validateDeactivatedQueueEntitlement(CSQueue parentQueue, String leafQueueName) { + QueueEntitlement entitlement = new QueueEntitlement(0.0f, 1.0f); + validateQueueEntitlements(leafQueueName, entitlement); + } + + void validateQueueEntitlements(String leafQueueName, QueueEntitlement entitlement) { + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(leafQueueName); + + List queueEntitlementChanges = policy.computeQueueEntitlementsForAutoCreatedLeafQueues(cs.getRootQueue()); + validateQueueEntitlementChangesForLeafQueue(leafQueue, entitlement, queueEntitlementChanges); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java new file mode 100644 index 00000000000..2667ca1101d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestGuaranteedOrZeroCapacityOverTimePolicy { + + @Test + public void testGetMaxLeavesToBeActivated() { + GuaranteedOrZeroCapacityOverTimePolicy policy = new GuaranteedOrZeroCapacityOverTimePolicy(); + //leaf queue template capacity = 15% + assertEquals(0, policy.getMaxLeavesToBeActivated(0.2f, 0.18f, 0.03f)); + assertEquals(1, policy.getMaxLeavesToBeActivated(0.2f, 0.17f, 0.03f)); + assertEquals(6, policy.getMaxLeavesToBeActivated(0.2f, 0.0f, 0.03f)); + } + +}